- Add RedisIoAdapter (shared/infra) for multi-instance Socket.IO fan-out with graceful fallback to the in-memory IoAdapter when Redis is unreachable. - Pin Socket.IO heartbeat (pingInterval/pingTimeout/connectTimeout) via env-tunable gateway options for reconnect stability. - Expose Prometheus metrics on /notifications: goodgo_ws_connected_clients (Gauge) and goodgo_ws_messages_total (Counter) with namespace/event/ direction labels. Wired through MetricsService and tracked across connect/disconnect + emits. - Unit tests: RedisIoAdapter connect/fallback/close, new MetricsService WS helpers, and gateway metric increments/decrements on auth paths. Co-Authored-By: Paperclip <noreply@paperclip.ing>
86 lines
2.5 KiB
TypeScript
86 lines
2.5 KiB
TypeScript
import type { INestApplicationContext } from '@nestjs/common';
|
|
import { IoAdapter } from '@nestjs/platform-socket.io';
|
|
import { createAdapter } from '@socket.io/redis-adapter';
|
|
import Redis from 'ioredis';
|
|
import type { ServerOptions } from 'socket.io';
|
|
import { LoggerService } from './logger.service';
|
|
|
|
const CONTEXT = 'RedisIoAdapter';
|
|
|
|
/**
|
|
* Socket.IO adapter backed by Redis pub/sub so WebSocket broadcasts
|
|
* fan out across every API instance.
|
|
*
|
|
* Falls back to the in-memory IoAdapter when Redis cannot be reached,
|
|
* so local dev without Redis and single-node deployments still work.
|
|
*/
|
|
export class RedisIoAdapter extends IoAdapter {
|
|
private adapterConstructor: ReturnType<typeof createAdapter> | null = null;
|
|
private pubClient: Redis | null = null;
|
|
private subClient: Redis | null = null;
|
|
private readonly logger: LoggerService;
|
|
|
|
constructor(app: INestApplicationContext) {
|
|
super(app);
|
|
this.logger = app.get(LoggerService);
|
|
}
|
|
|
|
async connectToRedis(): Promise<void> {
|
|
const host = process.env['REDIS_HOST'] ?? 'localhost';
|
|
const port = Number(process.env['REDIS_PORT'] ?? 6379);
|
|
const password = process.env['REDIS_PASSWORD'] ?? undefined;
|
|
|
|
const pub = new Redis({
|
|
host,
|
|
port,
|
|
password,
|
|
lazyConnect: true,
|
|
enableReadyCheck: false,
|
|
maxRetriesPerRequest: 1,
|
|
retryStrategy: (times) => Math.min(times * 1000, 5000),
|
|
});
|
|
const sub = pub.duplicate();
|
|
|
|
try {
|
|
await Promise.all([pub.connect(), sub.connect()]);
|
|
} catch (error) {
|
|
this.logger.warn(
|
|
`Redis pub/sub unavailable — falling back to in-memory adapter: ${
|
|
error instanceof Error ? error.message : String(error)
|
|
}`,
|
|
CONTEXT,
|
|
);
|
|
await Promise.allSettled([pub.quit(), sub.quit()]);
|
|
return;
|
|
}
|
|
|
|
this.pubClient = pub;
|
|
this.subClient = sub;
|
|
this.adapterConstructor = createAdapter(pub, sub);
|
|
this.logger.log(
|
|
`Redis pub/sub adapter connected (${host}:${port})`,
|
|
CONTEXT,
|
|
);
|
|
}
|
|
|
|
override createIOServer(port: number, options?: ServerOptions): unknown {
|
|
const server = super.createIOServer(port, options) as {
|
|
adapter: (constructor: unknown) => void;
|
|
};
|
|
if (this.adapterConstructor) {
|
|
server.adapter(this.adapterConstructor);
|
|
}
|
|
return server;
|
|
}
|
|
|
|
override async close(): Promise<void> {
|
|
await Promise.allSettled([
|
|
this.pubClient?.quit(),
|
|
this.subClient?.quit(),
|
|
]);
|
|
this.pubClient = null;
|
|
this.subClient = null;
|
|
this.adapterConstructor = null;
|
|
}
|
|
}
|