feat(notifications): production-ready WebSocket gateway (TEC-2766)

- Add RedisIoAdapter (shared/infra) for multi-instance Socket.IO fan-out
  with graceful fallback to the in-memory IoAdapter when Redis is
  unreachable.
- Pin Socket.IO heartbeat (pingInterval/pingTimeout/connectTimeout)
  via env-tunable gateway options for reconnect stability.
- Expose Prometheus metrics on /notifications: goodgo_ws_connected_clients
  (Gauge) and goodgo_ws_messages_total (Counter) with namespace/event/
  direction labels. Wired through MetricsService and tracked across
  connect/disconnect + emits.
- Unit tests: RedisIoAdapter connect/fallback/close, new MetricsService
  WS helpers, and gateway metric increments/decrements on auth paths.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-18 15:06:25 +07:00
parent 5d4ecdeb2f
commit 329a821b4a
13 changed files with 410 additions and 5 deletions

View File

@@ -37,6 +37,7 @@
"@prisma/client": "^7.7.0", "@prisma/client": "^7.7.0",
"@sentry/nestjs": "^10.47.0", "@sentry/nestjs": "^10.47.0",
"@sentry/profiling-node": "^10.47.0", "@sentry/profiling-node": "^10.47.0",
"@socket.io/redis-adapter": "^8.3.0",
"@willsoto/nestjs-prometheus": "^6.1.0", "@willsoto/nestjs-prometheus": "^6.1.0",
"bcrypt": "^6.0.0", "bcrypt": "^6.0.0",
"bullmq": "^5.74.1", "bullmq": "^5.74.1",

View File

@@ -8,11 +8,10 @@ import './instrument';
import { RequestMethod, ValidationPipe } from '@nestjs/common'; import { RequestMethod, ValidationPipe } from '@nestjs/common';
import { NestFactory } from '@nestjs/core'; import { NestFactory } from '@nestjs/core';
import { IoAdapter } from '@nestjs/platform-socket.io';
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger'; import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
import cookieParser from 'cookie-parser'; import cookieParser from 'cookie-parser';
import helmet from 'helmet'; import helmet from 'helmet';
import { LoggerService, validateEnv } from '@modules/shared'; import { LoggerService, RedisIoAdapter, validateEnv } from '@modules/shared';
import { AppModule } from './app.module'; import { AppModule } from './app.module';
async function bootstrap() { async function bootstrap() {
@@ -60,7 +59,11 @@ async function bootstrap() {
}); });
// ── WebSocket Adapter (Socket.IO) ── // ── WebSocket Adapter (Socket.IO) ──
app.useWebSocketAdapter(new IoAdapter(app)); // Redis pub/sub fan-out for multi-instance broadcasts; falls back to the
// in-memory IoAdapter when Redis is unreachable (single-node / local dev).
const wsAdapter = new RedisIoAdapter(app);
await wsAdapter.connectToRedis();
app.useWebSocketAdapter(wsAdapter);
// ── Security Headers (Helmet) ── // ── Security Headers (Helmet) ──
app.use( app.use(

View File

@@ -9,6 +9,11 @@ describe('MetricsService', () => {
let mockSearchQueriesCounter: { inc: ReturnType<typeof vi.fn> }; let mockSearchQueriesCounter: { inc: ReturnType<typeof vi.fn> };
let mockRequestDurationHistogram: { observe: ReturnType<typeof vi.fn> }; let mockRequestDurationHistogram: { observe: ReturnType<typeof vi.fn> };
let mockHttpRequestsCounter: { inc: ReturnType<typeof vi.fn> }; let mockHttpRequestsCounter: { inc: ReturnType<typeof vi.fn> };
let mockWsConnectedClientsGauge: {
inc: ReturnType<typeof vi.fn>;
set: ReturnType<typeof vi.fn>;
};
let mockWsMessagesCounter: { inc: ReturnType<typeof vi.fn> };
beforeEach(() => { beforeEach(() => {
mockListingsCreatedCounter = { inc: vi.fn() }; mockListingsCreatedCounter = { inc: vi.fn() };
@@ -17,6 +22,8 @@ describe('MetricsService', () => {
mockSearchQueriesCounter = { inc: vi.fn() }; mockSearchQueriesCounter = { inc: vi.fn() };
mockRequestDurationHistogram = { observe: vi.fn() }; mockRequestDurationHistogram = { observe: vi.fn() };
mockHttpRequestsCounter = { inc: vi.fn() }; mockHttpRequestsCounter = { inc: vi.fn() };
mockWsConnectedClientsGauge = { inc: vi.fn(), set: vi.fn() };
mockWsMessagesCounter = { inc: vi.fn() };
service = new MetricsService( service = new MetricsService(
mockListingsCreatedCounter as unknown as Counter, mockListingsCreatedCounter as unknown as Counter,
@@ -25,6 +32,8 @@ describe('MetricsService', () => {
mockSearchQueriesCounter as unknown as Counter, mockSearchQueriesCounter as unknown as Counter,
mockRequestDurationHistogram as unknown as Histogram, mockRequestDurationHistogram as unknown as Histogram,
mockHttpRequestsCounter as unknown as Counter, mockHttpRequestsCounter as unknown as Counter,
mockWsConnectedClientsGauge as unknown as Gauge,
mockWsMessagesCounter as unknown as Counter,
); );
}); });
@@ -102,4 +111,41 @@ describe('MetricsService', () => {
expect.objectContaining({ status_code: '503' }), expect.objectContaining({ status_code: '503' }),
); );
}); });
it('recordWsConnection increments the connected-clients gauge with +1 on connect', () => {
service.recordWsConnection('/notifications', 1);
expect(mockWsConnectedClientsGauge.inc).toHaveBeenCalledWith(
{ namespace: '/notifications' },
1,
);
});
it('recordWsConnection decrements the connected-clients gauge with -1 on disconnect', () => {
service.recordWsConnection('/notifications', -1);
expect(mockWsConnectedClientsGauge.inc).toHaveBeenCalledWith(
{ namespace: '/notifications' },
-1,
);
});
it('setWsConnectedClients sets the gauge for a namespace', () => {
service.setWsConnectedClients('/notifications', 0);
expect(mockWsConnectedClientsGauge.set).toHaveBeenCalledWith(
{ namespace: '/notifications' },
0,
);
});
it('recordWsMessage increments the messages counter with namespace/event/direction', () => {
service.recordWsMessage('/notifications', 'notification:new', 'out');
expect(mockWsMessagesCounter.inc).toHaveBeenCalledWith({
namespace: '/notifications',
event: 'notification:new',
direction: 'out',
});
});
}); });

View File

@@ -8,6 +8,8 @@ import {
GOODGO_SEARCH_QUERIES_TOTAL, GOODGO_SEARCH_QUERIES_TOTAL,
GOODGO_API_REQUEST_DURATION, GOODGO_API_REQUEST_DURATION,
HTTP_REQUESTS_TOTAL, HTTP_REQUESTS_TOTAL,
GOODGO_WS_CONNECTED_CLIENTS,
GOODGO_WS_MESSAGES_TOTAL,
WEB_VITALS_LCP, WEB_VITALS_LCP,
WEB_VITALS_FCP, WEB_VITALS_FCP,
WEB_VITALS_CLS, WEB_VITALS_CLS,
@@ -31,6 +33,10 @@ export class MetricsService {
private readonly requestDurationHistogram: Histogram, private readonly requestDurationHistogram: Histogram,
@InjectMetric(HTTP_REQUESTS_TOTAL) @InjectMetric(HTTP_REQUESTS_TOTAL)
private readonly httpRequestsCounter: Counter, private readonly httpRequestsCounter: Counter,
@InjectMetric(GOODGO_WS_CONNECTED_CLIENTS)
private readonly wsConnectedClientsGauge: Gauge,
@InjectMetric(GOODGO_WS_MESSAGES_TOTAL)
private readonly wsMessagesCounter: Counter,
@InjectMetric(WEB_VITALS_LCP) @InjectMetric(WEB_VITALS_LCP)
private readonly lcpHistogram: Histogram, private readonly lcpHistogram: Histogram,
@InjectMetric(WEB_VITALS_FCP) @InjectMetric(WEB_VITALS_FCP)
@@ -81,6 +87,25 @@ export class MetricsService {
this.httpRequestsCounter.inc(labels); this.httpRequestsCounter.inc(labels);
} }
/** Track a WebSocket client connection (++) or disconnection (--). */
recordWsConnection(namespace: string, delta: 1 | -1): void {
this.wsConnectedClientsGauge.inc({ namespace }, delta);
}
/** Reset the connected-clients gauge for a namespace (e.g. on shutdown). */
setWsConnectedClients(namespace: string, count: number): void {
this.wsConnectedClientsGauge.set({ namespace }, count);
}
/** Record a WebSocket message emitted/received on a given event. */
recordWsMessage(
namespace: string,
event: string,
direction: 'in' | 'out',
): void {
this.wsMessagesCounter.inc({ namespace, event, direction });
}
/** Map metric name → the correct histogram. */ /** Map metric name → the correct histogram. */
private readonly vitalHistograms: Record<string, Histogram | undefined> = {}; private readonly vitalHistograms: Record<string, Histogram | undefined> = {};

View File

@@ -11,6 +11,10 @@ export const DB_QUERY_DURATION = 'db_query_duration_seconds';
export const DB_POOL_ACTIVE_CONNECTIONS = 'db_pool_active_connections'; export const DB_POOL_ACTIVE_CONNECTIONS = 'db_pool_active_connections';
export const SEARCH_QUERY_DURATION = 'search_query_duration_seconds'; export const SEARCH_QUERY_DURATION = 'search_query_duration_seconds';
// ── WebSocket Metrics ──
export const GOODGO_WS_CONNECTED_CLIENTS = 'goodgo_ws_connected_clients';
export const GOODGO_WS_MESSAGES_TOTAL = 'goodgo_ws_messages_total';
// ── Web Vitals / RUM Metrics ── // ── Web Vitals / RUM Metrics ──
export const WEB_VITALS_LCP = 'goodgo_web_vitals_lcp_seconds'; export const WEB_VITALS_LCP = 'goodgo_web_vitals_lcp_seconds';
export const WEB_VITALS_FCP = 'goodgo_web_vitals_fcp_seconds'; export const WEB_VITALS_FCP = 'goodgo_web_vitals_fcp_seconds';

View File

@@ -15,6 +15,8 @@ import {
DB_QUERY_DURATION, DB_QUERY_DURATION,
DB_POOL_ACTIVE_CONNECTIONS, DB_POOL_ACTIVE_CONNECTIONS,
SEARCH_QUERY_DURATION, SEARCH_QUERY_DURATION,
GOODGO_WS_CONNECTED_CLIENTS,
GOODGO_WS_MESSAGES_TOTAL,
WEB_VITALS_LCP, WEB_VITALS_LCP,
WEB_VITALS_FCP, WEB_VITALS_FCP,
WEB_VITALS_CLS, WEB_VITALS_CLS,
@@ -83,6 +85,18 @@ import { HttpMetricsInterceptor } from './presentation/interceptors/http-metrics
labelNames: ['plan'], labelNames: ['plan'],
}), }),
// ── WebSocket Metrics ──
makeGaugeProvider({
name: GOODGO_WS_CONNECTED_CLIENTS,
help: 'Number of active WebSocket clients',
labelNames: ['namespace'],
}),
makeCounterProvider({
name: GOODGO_WS_MESSAGES_TOTAL,
help: 'Total number of WebSocket messages emitted/received',
labelNames: ['namespace', 'event', 'direction'],
}),
// ── Services & Interceptors ── // ── Services & Interceptors ──
MetricsService, MetricsService,
HttpMetricsInterceptor, HttpMetricsInterceptor,

View File

@@ -1,6 +1,7 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs'; import { CqrsModule } from '@nestjs/cqrs';
import { AuthModule } from '@modules/auth'; import { AuthModule } from '@modules/auth';
import { MetricsModule } from '@modules/metrics';
import { SendNotificationHandler } from './application/commands/send-notification/send-notification.handler'; import { SendNotificationHandler } from './application/commands/send-notification/send-notification.handler';
import { AgentVerifiedListener } from './application/listeners/agent-verified.listener'; import { AgentVerifiedListener } from './application/listeners/agent-verified.listener';
import { EmailChangeRequestedListener } from './application/listeners/email-change-requested.listener'; import { EmailChangeRequestedListener } from './application/listeners/email-change-requested.listener';
@@ -53,7 +54,7 @@ const EventListeners = [
]; ];
@Module({ @Module({
imports: [CqrsModule, AuthModule], imports: [CqrsModule, AuthModule, MetricsModule],
controllers: [NotificationsController, ZaloOaWebhookController], controllers: [NotificationsController, ZaloOaWebhookController],
providers: [ providers: [
// Repositories // Repositories

View File

@@ -36,6 +36,11 @@ describe('NotificationsGateway', () => {
getClient: ReturnType<typeof vi.fn>; getClient: ReturnType<typeof vi.fn>;
}; };
let mockNotificationRepo: { countUnreadByUserId: ReturnType<typeof vi.fn> }; let mockNotificationRepo: { countUnreadByUserId: ReturnType<typeof vi.fn> };
let mockMetrics: {
recordWsConnection: ReturnType<typeof vi.fn>;
setWsConnectedClients: ReturnType<typeof vi.fn>;
recordWsMessage: ReturnType<typeof vi.fn>;
};
let mockServer: { let mockServer: {
to: ReturnType<typeof vi.fn>; to: ReturnType<typeof vi.fn>;
}; };
@@ -53,11 +58,17 @@ describe('NotificationsGateway', () => {
getClient: vi.fn().mockReturnValue({ exists: vi.fn().mockResolvedValue(0), incr: vi.fn() }), getClient: vi.fn().mockReturnValue({ exists: vi.fn().mockResolvedValue(0), incr: vi.fn() }),
}; };
mockNotificationRepo = { countUnreadByUserId: vi.fn().mockResolvedValue(3) }; mockNotificationRepo = { countUnreadByUserId: vi.fn().mockResolvedValue(3) };
mockMetrics = {
recordWsConnection: vi.fn(),
setWsConnectedClients: vi.fn(),
recordWsMessage: vi.fn(),
};
gateway = new NotificationsGateway( gateway = new NotificationsGateway(
mockTokenService as any, mockTokenService as any,
mockLogger as any, mockLogger as any,
mockRedisService as any, mockRedisService as any,
mockMetrics as any,
mockNotificationRepo as any, mockNotificationRepo as any,
); );
@@ -74,6 +85,14 @@ describe('NotificationsGateway', () => {
'NotificationsGateway', 'NotificationsGateway',
); );
}); });
it('resets the WS connected-clients gauge to 0', () => {
gateway.afterInit();
expect(mockMetrics.setWsConnectedClients).toHaveBeenCalledWith(
'/notifications',
0,
);
});
}); });
describe('handleConnection', () => { describe('handleConnection', () => {
@@ -152,6 +171,28 @@ describe('NotificationsGateway', () => {
expect(mockNotificationRepo.countUnreadByUserId).toHaveBeenCalledWith('user-1'); expect(mockNotificationRepo.countUnreadByUserId).toHaveBeenCalledWith('user-1');
expect(socket.emit).toHaveBeenCalledWith('notification:unread-count', { unreadCount: 3 }); expect(socket.emit).toHaveBeenCalledWith('notification:unread-count', { unreadCount: 3 });
}); });
it('increments WS connection metric and records the initial unread-count emit', async () => {
const socket = createMockSocket();
await gateway.handleConnection(socket);
expect(mockMetrics.recordWsConnection).toHaveBeenCalledWith('/notifications', 1);
expect(mockMetrics.recordWsMessage).toHaveBeenCalledWith(
'/notifications',
'notification:unread-count',
'out',
);
});
it('does not increment metrics when auth fails', async () => {
mockTokenService.verifyAccessToken.mockReturnValue(null);
const socket = createMockSocket();
await gateway.handleConnection(socket);
expect(mockMetrics.recordWsConnection).not.toHaveBeenCalled();
});
}); });
describe('handleDisconnect', () => { describe('handleDisconnect', () => {
@@ -183,6 +224,24 @@ describe('NotificationsGateway', () => {
// No prior connection — should not throw // No prior connection — should not throw
expect(() => gateway.handleDisconnect(socket)).not.toThrow(); expect(() => gateway.handleDisconnect(socket)).not.toThrow();
}); });
it('decrements the WS connection metric when a tracked socket disconnects', async () => {
const socket = createMockSocket({ id: 'sock-1' });
await gateway.handleConnection(socket);
mockMetrics.recordWsConnection.mockClear();
gateway.handleDisconnect(socket);
expect(mockMetrics.recordWsConnection).toHaveBeenCalledWith('/notifications', -1);
});
it('does not decrement the gauge for untracked sockets', () => {
const socket = createMockSocket();
gateway.handleDisconnect(socket);
expect(mockMetrics.recordWsConnection).not.toHaveBeenCalled();
});
}); });
describe('handleNotificationSent', () => { describe('handleNotificationSent', () => {

View File

@@ -11,6 +11,8 @@ import type { Server, Socket } from 'socket.io';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports // eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { TokenService, type JwtPayload } from '@modules/auth'; import { TokenService, type JwtPayload } from '@modules/auth';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports // eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { MetricsService } from '@modules/metrics';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { LoggerService, RedisService } from '@modules/shared'; import { LoggerService, RedisService } from '@modules/shared';
import type { NotificationSentEvent } from '../../domain/events/notification-sent.event'; import type { NotificationSentEvent } from '../../domain/events/notification-sent.event';
import { import {
@@ -24,6 +26,20 @@ const UNREAD_COUNT_KEY = (userId: string) => `notifications:unread:${userId}`;
/** TTL for the cached unread count (1 hour). */ /** TTL for the cached unread count (1 hour). */
const UNREAD_COUNT_TTL = 3600; const UNREAD_COUNT_TTL = 3600;
/** Namespace label used for Prometheus metrics. */
const NAMESPACE_LABEL = '/notifications';
/**
* Server → client heartbeat every 25 s and 20 s wait for the pong
* before declaring the connection dead. Matches socket.io defaults but
* pinned explicitly so operations teams can tune via env without code
* changes. Clients must reconnect with exponential backoff on their side.
*/
const WS_PING_INTERVAL_MS = Number(process.env['WS_PING_INTERVAL_MS'] ?? 25_000);
const WS_PING_TIMEOUT_MS = Number(process.env['WS_PING_TIMEOUT_MS'] ?? 20_000);
/** Allow large upgrade windows so poor networks don't churn handshakes. */
const WS_CONNECT_TIMEOUT_MS = Number(process.env['WS_CONNECT_TIMEOUT_MS'] ?? 45_000);
@WebSocketGateway({ @WebSocketGateway({
namespace: '/notifications', namespace: '/notifications',
cors: { cors: {
@@ -32,6 +48,10 @@ const UNREAD_COUNT_TTL = 3600;
.map((o) => o.trim()), .map((o) => o.trim()),
credentials: true, credentials: true,
}, },
pingInterval: WS_PING_INTERVAL_MS,
pingTimeout: WS_PING_TIMEOUT_MS,
connectTimeout: WS_CONNECT_TIMEOUT_MS,
transports: ['websocket', 'polling'],
}) })
export class NotificationsGateway export class NotificationsGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
@@ -46,12 +66,17 @@ export class NotificationsGateway
private readonly tokenService: TokenService, private readonly tokenService: TokenService,
private readonly logger: LoggerService, private readonly logger: LoggerService,
private readonly redisService: RedisService, private readonly redisService: RedisService,
private readonly metrics: MetricsService,
@Inject(NOTIFICATION_REPOSITORY) @Inject(NOTIFICATION_REPOSITORY)
private readonly notificationRepo: INotificationRepository, private readonly notificationRepo: INotificationRepository,
) {} ) {}
afterInit(): void { afterInit(): void {
this.logger.log('NotificationsGateway initialized', 'NotificationsGateway'); this.metrics.setWsConnectedClients(NAMESPACE_LABEL, 0);
this.logger.log(
`NotificationsGateway initialized (pingInterval=${WS_PING_INTERVAL_MS}ms, pingTimeout=${WS_PING_TIMEOUT_MS}ms)`,
'NotificationsGateway',
);
} }
/* ──────────────────────────────────────────── /* ────────────────────────────────────────────
@@ -83,6 +108,13 @@ export class NotificationsGateway
const unreadCount = await this.getUnreadCount(payload.sub); const unreadCount = await this.getUnreadCount(payload.sub);
client.emit('notification:unread-count', { unreadCount }); client.emit('notification:unread-count', { unreadCount });
this.metrics.recordWsConnection(NAMESPACE_LABEL, 1);
this.metrics.recordWsMessage(
NAMESPACE_LABEL,
'notification:unread-count',
'out',
);
this.logger.debug( this.logger.debug(
`WS connected: user=${payload.sub} socket=${client.id}`, `WS connected: user=${payload.sub} socket=${client.id}`,
'NotificationsGateway', 'NotificationsGateway',
@@ -107,6 +139,8 @@ export class NotificationsGateway
this.userSockets.delete(userId); this.userSockets.delete(userId);
} }
} }
// Only decrement if the socket completed auth (we tracked it).
this.metrics.recordWsConnection(NAMESPACE_LABEL, -1);
} }
this.logger.debug( this.logger.debug(
`WS disconnected: user=${userId ?? 'unknown'} socket=${client.id}`, `WS disconnected: user=${userId ?? 'unknown'} socket=${client.id}`,

View File

@@ -0,0 +1,90 @@
const hoisted = vi.hoisted(() => ({
redisConnect: vi.fn(),
redisQuit: vi.fn(),
createAdapterMock: vi.fn(() => Symbol('adapter')),
}));
vi.mock('ioredis', () => {
class FakeRedis {
connect = hoisted.redisConnect;
quit = hoisted.redisQuit;
duplicate() {
return new FakeRedis();
}
}
return { default: FakeRedis };
});
vi.mock('@socket.io/redis-adapter', () => ({
createAdapter: hoisted.createAdapterMock,
}));
import { RedisIoAdapter } from '../redis-io.adapter';
function createApp(): unknown {
return {
get: () => ({
log: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
}),
getHttpServer: () => undefined,
};
}
describe('RedisIoAdapter', () => {
beforeEach(() => {
hoisted.redisConnect.mockReset();
hoisted.redisQuit.mockReset();
hoisted.createAdapterMock.mockClear();
});
it('connects pub/sub clients and registers the adapter on the server', async () => {
hoisted.redisConnect.mockResolvedValue(undefined);
const adapter = new RedisIoAdapter(createApp() as any);
await adapter.connectToRedis();
expect(hoisted.redisConnect).toHaveBeenCalledTimes(2);
expect(hoisted.createAdapterMock).toHaveBeenCalledTimes(1);
const adapterFn = vi.fn();
const fakeServer = { adapter: adapterFn };
const superProto = Object.getPrototypeOf(Object.getPrototypeOf(adapter)) as object;
vi.spyOn(superProto, 'createIOServer').mockReturnValue(fakeServer);
const result = adapter.createIOServer(3001);
expect(adapterFn).toHaveBeenCalledTimes(1);
expect(result).toBe(fakeServer);
});
it('falls back silently when Redis pub/sub connect fails', async () => {
hoisted.redisConnect.mockRejectedValue(new Error('connection refused'));
const adapter = new RedisIoAdapter(createApp() as any);
await adapter.connectToRedis();
expect(hoisted.createAdapterMock).not.toHaveBeenCalled();
const fakeServer = { adapter: vi.fn() };
const superProto = Object.getPrototypeOf(Object.getPrototypeOf(adapter)) as object;
vi.spyOn(superProto, 'createIOServer').mockReturnValue(fakeServer);
adapter.createIOServer(3001);
expect(fakeServer.adapter).not.toHaveBeenCalled();
});
it('close() quits pub/sub clients', async () => {
hoisted.redisConnect.mockResolvedValue(undefined);
hoisted.redisQuit.mockResolvedValue(undefined);
const adapter = new RedisIoAdapter(createApp() as any);
await adapter.connectToRedis();
await adapter.close();
expect(hoisted.redisQuit).toHaveBeenCalledTimes(2);
});
});

View File

@@ -11,6 +11,7 @@ export {
export { createEncryptionExtension } from './encryption-middleware'; export { createEncryptionExtension } from './encryption-middleware';
export { PrismaService } from './prisma.service'; export { PrismaService } from './prisma.service';
export { RedisService } from './redis.service'; export { RedisService } from './redis.service';
export { RedisIoAdapter } from './redis-io.adapter';
export { CacheService, CachePrefix, CacheTTL } from './cache.service'; export { CacheService, CachePrefix, CacheTTL } from './cache.service';
export { LoggerService } from './logger.service'; export { LoggerService } from './logger.service';
export { EventBusService } from './event-bus.service'; export { EventBusService } from './event-bus.service';

View File

@@ -0,0 +1,85 @@
import type { INestApplicationContext } from '@nestjs/common';
import { IoAdapter } from '@nestjs/platform-socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import Redis from 'ioredis';
import type { ServerOptions } from 'socket.io';
import { LoggerService } from './logger.service';
const CONTEXT = 'RedisIoAdapter';
/**
* Socket.IO adapter backed by Redis pub/sub so WebSocket broadcasts
* fan out across every API instance.
*
* Falls back to the in-memory IoAdapter when Redis cannot be reached,
* so local dev without Redis and single-node deployments still work.
*/
export class RedisIoAdapter extends IoAdapter {
private adapterConstructor: ReturnType<typeof createAdapter> | null = null;
private pubClient: Redis | null = null;
private subClient: Redis | null = null;
private readonly logger: LoggerService;
constructor(app: INestApplicationContext) {
super(app);
this.logger = app.get(LoggerService);
}
async connectToRedis(): Promise<void> {
const host = process.env['REDIS_HOST'] ?? 'localhost';
const port = Number(process.env['REDIS_PORT'] ?? 6379);
const password = process.env['REDIS_PASSWORD'] ?? undefined;
const pub = new Redis({
host,
port,
password,
lazyConnect: true,
enableReadyCheck: false,
maxRetriesPerRequest: 1,
retryStrategy: (times) => Math.min(times * 1000, 5000),
});
const sub = pub.duplicate();
try {
await Promise.all([pub.connect(), sub.connect()]);
} catch (error) {
this.logger.warn(
`Redis pub/sub unavailable — falling back to in-memory adapter: ${
error instanceof Error ? error.message : String(error)
}`,
CONTEXT,
);
await Promise.allSettled([pub.quit(), sub.quit()]);
return;
}
this.pubClient = pub;
this.subClient = sub;
this.adapterConstructor = createAdapter(pub, sub);
this.logger.log(
`Redis pub/sub adapter connected (${host}:${port})`,
CONTEXT,
);
}
override createIOServer(port: number, options?: ServerOptions): unknown {
const server = super.createIOServer(port, options) as {
adapter: (constructor: unknown) => void;
};
if (this.adapterConstructor) {
server.adapter(this.adapterConstructor);
}
return server;
}
override async close(): Promise<void> {
await Promise.allSettled([
this.pubClient?.quit(),
this.subClient?.quit(),
]);
this.pubClient = null;
this.subClient = null;
this.adapterConstructor = null;
}
}

42
pnpm-lock.yaml generated
View File

@@ -147,6 +147,9 @@ importers:
'@sentry/profiling-node': '@sentry/profiling-node':
specifier: ^10.47.0 specifier: ^10.47.0
version: 10.47.0 version: 10.47.0
'@socket.io/redis-adapter':
specifier: ^8.3.0
version: 8.3.0(socket.io-adapter@2.5.6)
'@willsoto/nestjs-prometheus': '@willsoto/nestjs-prometheus':
specifier: ^6.1.0 specifier: ^6.1.0
version: 6.1.0(@nestjs/common@11.1.18(class-transformer@0.5.1)(class-validator@0.15.1)(reflect-metadata@0.2.2)(rxjs@7.8.2))(prom-client@15.1.3) version: 6.1.0(@nestjs/common@11.1.18(class-transformer@0.5.1)(class-validator@0.15.1)(reflect-metadata@0.2.2)(rxjs@7.8.2))(prom-client@15.1.3)
@@ -2869,6 +2872,12 @@ packages:
'@socket.io/component-emitter@3.1.2': '@socket.io/component-emitter@3.1.2':
resolution: {integrity: sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==} resolution: {integrity: sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==}
'@socket.io/redis-adapter@8.3.0':
resolution: {integrity: sha512-ly0cra+48hDmChxmIpnESKrc94LjRL80TEmZVscuQ/WWkRP81nNj8W8cCGMqbI4L6NCuAaPRSzZF1a9GlAxxnA==}
engines: {node: '>=10.0.0'}
peerDependencies:
socket.io-adapter: ^2.5.4
'@standard-schema/spec@1.1.0': '@standard-schema/spec@1.1.0':
resolution: {integrity: sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w==} resolution: {integrity: sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w==}
@@ -4222,6 +4231,15 @@ packages:
dateformat@4.6.3: dateformat@4.6.3:
resolution: {integrity: sha512-2P0p0pFGzHS5EMnhdxQi7aJN+iMheud0UhG4dlE1DLAlvL8JHjJJTX/CSm4JXwV0Ka5nGk3zC5mcb5bUQUxxMA==} resolution: {integrity: sha512-2P0p0pFGzHS5EMnhdxQi7aJN+iMheud0UhG4dlE1DLAlvL8JHjJJTX/CSm4JXwV0Ka5nGk3zC5mcb5bUQUxxMA==}
debug@4.3.7:
resolution: {integrity: sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==}
engines: {node: '>=6.0'}
peerDependencies:
supports-color: '*'
peerDependenciesMeta:
supports-color:
optional: true
debug@4.4.3: debug@4.4.3:
resolution: {integrity: sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==} resolution: {integrity: sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==}
engines: {node: '>=6.0'} engines: {node: '>=6.0'}
@@ -5691,6 +5709,9 @@ packages:
resolution: {integrity: sha512-6eZs5Ls3WtCisHWp9S2GUy8dqkpGi4BVSz3GaqiE6ezub0512ESztXUwUB6C6IKbQkY2Pnb/mD4WYojCRwcwLA==} resolution: {integrity: sha512-6eZs5Ls3WtCisHWp9S2GUy8dqkpGi4BVSz3GaqiE6ezub0512ESztXUwUB6C6IKbQkY2Pnb/mD4WYojCRwcwLA==}
engines: {node: '>=0.10.0'} engines: {node: '>=0.10.0'}
notepack.io@3.0.1:
resolution: {integrity: sha512-TKC/8zH5pXIAMVQio2TvVDTtPRX+DJPHDqjRbxogtFiByHyzKmy96RA0JtCQJ+WouyyL4A10xomQzgbUT+1jCg==}
nypm@0.6.5: nypm@0.6.5:
resolution: {integrity: sha512-K6AJy1GMVyfyMXRVB88700BJqNUkByijGJM8kEHpLdcAt+vSQAVfkWWHYzuRXHSY6xA2sNc5RjTj0p9rE2izVQ==} resolution: {integrity: sha512-K6AJy1GMVyfyMXRVB88700BJqNUkByijGJM8kEHpLdcAt+vSQAVfkWWHYzuRXHSY6xA2sNc5RjTj0p9rE2izVQ==}
engines: {node: '>=18'} engines: {node: '>=18'}
@@ -6909,6 +6930,10 @@ packages:
uid2@0.0.4: uid2@0.0.4:
resolution: {integrity: sha512-IevTus0SbGwQzYh3+fRsAMTVVPOoIVufzacXcHPmdlle1jUpq7BRL+mw3dgeLanvGZdwwbWhRV6XrcFNdBmjWA==} resolution: {integrity: sha512-IevTus0SbGwQzYh3+fRsAMTVVPOoIVufzacXcHPmdlle1jUpq7BRL+mw3dgeLanvGZdwwbWhRV6XrcFNdBmjWA==}
uid2@1.0.0:
resolution: {integrity: sha512-+I6aJUv63YAcY9n4mQreLUt0d4lvwkkopDNmpomkAUz0fAkEMV9pRWxN0EjhW1YfRhcuyHg2v3mwddCDW1+LFQ==}
engines: {node: '>= 4.0.0'}
uid@2.0.2: uid@2.0.2:
resolution: {integrity: sha512-u3xV3X7uzvi5b1MncmZo3i2Aw222Zk1keqLA1YkHldREkAhAqi65wuPfe7lHx8H/Wzy+8CE7S7uS3jekIM5s8g==} resolution: {integrity: sha512-u3xV3X7uzvi5b1MncmZo3i2Aw222Zk1keqLA1YkHldREkAhAqi65wuPfe7lHx8H/Wzy+8CE7S7uS3jekIM5s8g==}
engines: {node: '>=8'} engines: {node: '>=8'}
@@ -10174,6 +10199,15 @@ snapshots:
'@socket.io/component-emitter@3.1.2': {} '@socket.io/component-emitter@3.1.2': {}
'@socket.io/redis-adapter@8.3.0(socket.io-adapter@2.5.6)':
dependencies:
debug: 4.3.7
notepack.io: 3.0.1
socket.io-adapter: 2.5.6
uid2: 1.0.0
transitivePeerDependencies:
- supports-color
'@standard-schema/spec@1.1.0': {} '@standard-schema/spec@1.1.0': {}
'@standard-schema/utils@0.3.0': {} '@standard-schema/utils@0.3.0': {}
@@ -11548,6 +11582,10 @@ snapshots:
dateformat@4.6.3: {} dateformat@4.6.3: {}
debug@4.3.7:
dependencies:
ms: 2.1.3
debug@4.4.3: debug@4.4.3:
dependencies: dependencies:
ms: 2.1.3 ms: 2.1.3
@@ -13190,6 +13228,8 @@ snapshots:
normalize-path@3.0.0: {} normalize-path@3.0.0: {}
notepack.io@3.0.1: {}
nypm@0.6.5: nypm@0.6.5:
dependencies: dependencies:
citty: 0.2.2 citty: 0.2.2
@@ -14609,6 +14649,8 @@ snapshots:
uid2@0.0.4: {} uid2@0.0.4: {}
uid2@1.0.0: {}
uid@2.0.2: uid@2.0.2:
dependencies: dependencies:
'@lukeed/csprng': 1.1.0 '@lukeed/csprng': 1.1.0