From f70d7e3deb2288b2643a6a3e19b537c9d61df5c1 Mon Sep 17 00:00:00 2001 From: Ho Ngoc Hai Date: Sat, 25 Apr 2026 18:28:29 +0700 Subject: [PATCH] feat(notifications): pilot listener cutover + consumer skeleton (GOO-173) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 steps 4–6 — cutover PaymentCompletedListener to the async outbox path behind NOTIFICATIONS_ASYNC_ENABLED, and add the Redis Streams consumer that picks up notification.requested events. PaymentCompletedListener changes: - Injects NotificationsAsyncConfig + NotificationsPublisher - When asyncEnabled: builds NotificationRequestedPayload and calls publisher.publishStandalone() instead of commandBus.execute() - Legacy SendNotificationCommand path retained in else branch - recipientEmail passed via params so the consumer can resolve it NotificationsConsumer (new): - XREADGROUP against `events:notification.requested` stream, consumer group `notifications-workers` - Idempotency via Redis SET NX EX 86400 keyed on envelope.payload.dedupeKey ?? envelope.eventId - Dispatches to existing SendNotificationHandler per channel via CommandBus, mapping contract channels (email/sms/fcm/zalo/in_app) to domain channels (EMAIL/SMS/PUSH/ZALO_OA) - DLQ: after 3 failed deliveries, XADD to events:notification.requested:dlq with original envelope + reason - Consumer group created lazily with MKSTREAM; poll loop gated by NOTIFICATIONS_ASYNC_ENABLED - Registered in NotificationsModule providers Tests (28 specs, all green): - PaymentCompletedListener: legacy path, async path, skip-when-no-email (4 specs, updated from 3 to match new 5-arg constructor) - NotificationsConsumer: process message, dedupe skip, missing envelope skip, DLQ after max retries, multi-channel dispatch, empty stream (6 specs) - NotificationsPublisher: 4 specs (unchanged) - NotificationsAsyncConfig: 14 specs (unchanged) Co-Authored-By: Paperclip --- .../payment-completed.listener.spec.ts | 28 ++- .../listeners/payment-completed.listener.ts | 23 +++ .../__tests__/notifications-consumer.spec.ts | 158 ++++++++++++++++ .../consumers/notifications.consumer.ts | 179 ++++++++++++++++++ .../notifications/notifications.module.ts | 2 + 5 files changed, 389 insertions(+), 1 deletion(-) create mode 100644 apps/api/src/modules/notifications/infrastructure/__tests__/notifications-consumer.spec.ts create mode 100644 apps/api/src/modules/notifications/infrastructure/consumers/notifications.consumer.ts diff --git a/apps/api/src/modules/notifications/application/__tests__/payment-completed.listener.spec.ts b/apps/api/src/modules/notifications/application/__tests__/payment-completed.listener.spec.ts index efa8bf4..bf830c6 100644 --- a/apps/api/src/modules/notifications/application/__tests__/payment-completed.listener.spec.ts +++ b/apps/api/src/modules/notifications/application/__tests__/payment-completed.listener.spec.ts @@ -7,6 +7,8 @@ describe('PaymentCompletedListener', () => { let mockCommandBus: { execute: ReturnType }; let mockPrisma: { user: { findUnique: ReturnType } }; let mockLogger: { log: ReturnType; warn: ReturnType; error: ReturnType }; + let mockAsyncConfig: { asyncEnabled: boolean }; + let mockPublisher: { publishStandalone: ReturnType }; beforeEach(() => { mockCommandBus = { execute: vi.fn().mockResolvedValue(undefined) }; @@ -14,15 +16,19 @@ describe('PaymentCompletedListener', () => { user: { findUnique: vi.fn() }, }; mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() }; + mockAsyncConfig = { asyncEnabled: false }; + mockPublisher = { publishStandalone: vi.fn().mockResolvedValue({ eventId: 'eid' }) }; listener = new PaymentCompletedListener( mockCommandBus as any, mockPrisma as any, mockLogger as any, + mockAsyncConfig as any, + mockPublisher as any, ); }); - it('sends payment confirmation email', async () => { + it('sends payment confirmation email via legacy path when async disabled', async () => { mockPrisma.user.findUnique.mockResolvedValue({ email: 'buyer@test.com' }); const event = new PaymentCompletedEvent('pay-1', 'user-1', 'VNPAY', BigInt(5000000)); @@ -35,6 +41,24 @@ describe('PaymentCompletedListener', () => { expect(cmd.templateData.paymentId).toBe('pay-1'); expect(cmd.templateData.provider).toBe('VNPAY'); expect(cmd.recipientAddress).toBe('buyer@test.com'); + expect(mockPublisher.publishStandalone).not.toHaveBeenCalled(); + }); + + it('publishes via async outbox when NOTIFICATIONS_ASYNC_ENABLED is on', async () => { + mockAsyncConfig.asyncEnabled = true; + mockPrisma.user.findUnique.mockResolvedValue({ email: 'buyer@test.com' }); + + const event = new PaymentCompletedEvent('pay-1', 'user-1', 'VNPAY', BigInt(5000000)); + await listener.handle(event); + + expect(mockPublisher.publishStandalone).toHaveBeenCalledTimes(1); + const input = mockPublisher.publishStandalone.mock.calls[0][0]; + expect(input.category).toBe('payment'); + expect(input.template).toBe('payment.confirmed'); + expect(input.channels).toEqual(['email']); + expect(input.params.recipientEmail).toBe('buyer@test.com'); + expect(input.params.paymentId).toBe('pay-1'); + expect(mockCommandBus.execute).not.toHaveBeenCalled(); }); it('skips notification when user not found', async () => { @@ -44,6 +68,7 @@ describe('PaymentCompletedListener', () => { await listener.handle(event); expect(mockCommandBus.execute).not.toHaveBeenCalled(); + expect(mockPublisher.publishStandalone).not.toHaveBeenCalled(); }); it('skips notification when user has no email', async () => { @@ -53,5 +78,6 @@ describe('PaymentCompletedListener', () => { await listener.handle(event); expect(mockCommandBus.execute).not.toHaveBeenCalled(); + expect(mockPublisher.publishStandalone).not.toHaveBeenCalled(); }); }); diff --git a/apps/api/src/modules/notifications/application/listeners/payment-completed.listener.ts b/apps/api/src/modules/notifications/application/listeners/payment-completed.listener.ts index 9835027..b092976 100644 --- a/apps/api/src/modules/notifications/application/listeners/payment-completed.listener.ts +++ b/apps/api/src/modules/notifications/application/listeners/payment-completed.listener.ts @@ -1,8 +1,11 @@ import { Injectable } from '@nestjs/common'; import { CommandBus } from '@nestjs/cqrs'; import { OnEvent } from '@nestjs/event-emitter'; +import { uuidv7 } from '@goodgo/contracts-events'; import { type PaymentCompletedEvent } from '@modules/payments'; import { LoggerService, PrismaService } from '@modules/shared'; +import { NotificationsAsyncConfig } from '../../infrastructure/services/notifications-async.config'; +import { NotificationsPublisher } from '../../infrastructure/services/notifications-publisher.service'; import { SendNotificationCommand } from '../commands/send-notification/send-notification.command'; @Injectable() @@ -11,6 +14,8 @@ export class PaymentCompletedListener { private readonly commandBus: CommandBus, private readonly prisma: PrismaService, private readonly logger: LoggerService, + private readonly asyncConfig: NotificationsAsyncConfig, + private readonly publisher: NotificationsPublisher, ) {} @OnEvent('payment.completed', { async: true }) @@ -26,6 +31,24 @@ export class PaymentCompletedListener { const amountFormatted = new Intl.NumberFormat('vi-VN').format(event.amountVND); + if (this.asyncConfig.asyncEnabled) { + await this.publisher.publishStandalone({ + notificationId: uuidv7(), + userId: event.userId, + category: 'payment', + template: 'payment.confirmed', + params: { + paymentId: event.aggregateId, + amountVND: amountFormatted, + provider: event.provider, + recipientEmail: user.email, + }, + channels: ['email'], + priority: 'normal', + }); + return; + } + await this.commandBus.execute( new SendNotificationCommand( event.userId, diff --git a/apps/api/src/modules/notifications/infrastructure/__tests__/notifications-consumer.spec.ts b/apps/api/src/modules/notifications/infrastructure/__tests__/notifications-consumer.spec.ts new file mode 100644 index 0000000..793dd84 --- /dev/null +++ b/apps/api/src/modules/notifications/infrastructure/__tests__/notifications-consumer.spec.ts @@ -0,0 +1,158 @@ +import { type EventEnvelope, type NotificationRequestedPayload } from '@goodgo/contracts-events'; +import { describe, expect, it, vi, beforeEach } from 'vitest'; +import { NotificationsConsumer } from '../consumers/notifications.consumer'; + +function makeEnvelope( + overrides: Partial = {}, +): EventEnvelope { + return { + schemaVersion: 1, + eventId: overrides.notificationId ?? 'evt-1', + eventType: 'notification.requested', + occurredAt: new Date().toISOString(), + producer: 'test', + traceId: '0'.repeat(32), + payload: { + notificationId: 'notif-1', + userId: 'user-1', + category: 'payment', + template: 'payment.confirmed', + params: { paymentId: 'pay-1', recipientEmail: 'a@b.com' }, + channels: ['email'], + requestedAt: new Date().toISOString(), + ...overrides, + }, + }; +} + +describe('NotificationsConsumer', () => { + let consumer: NotificationsConsumer; + let mockCommandBus: { execute: ReturnType }; + let mockXreadgroup: ReturnType; + let mockXack: ReturnType; + let mockXadd: ReturnType; + let mockXpending: ReturnType; + let mockSetNX: ReturnType; + let mockXgroup: ReturnType; + + beforeEach(() => { + mockCommandBus = { execute: vi.fn().mockResolvedValue(undefined) }; + mockXreadgroup = vi.fn().mockResolvedValue(null); + mockXack = vi.fn().mockResolvedValue(1); + mockXadd = vi.fn().mockResolvedValue('dlq-1'); + mockXpending = vi.fn().mockResolvedValue([]); + mockSetNX = vi.fn().mockResolvedValue('OK'); + mockXgroup = vi.fn().mockResolvedValue('OK'); + + const mockRedis = { + getClient: () => ({ + xreadgroup: mockXreadgroup, + xack: mockXack, + xadd: mockXadd, + xpending: mockXpending, + set: mockSetNX, + xgroup: mockXgroup, + }), + }; + + const mockAsyncConfig = { asyncEnabled: true }; + + consumer = new NotificationsConsumer( + mockRedis as any, + mockCommandBus as any, + mockAsyncConfig as any, + ); + }); + + it('processes a message and dispatches via CommandBus', async () => { + const envelope = makeEnvelope(); + mockXreadgroup.mockResolvedValueOnce([ + ['events:notification.requested', [ + ['1-0', ['envelope', JSON.stringify(envelope)]], + ]], + ]); + + const result = await consumer.tick(); + + expect(result.processed).toBe(1); + expect(result.skipped).toBe(0); + expect(mockCommandBus.execute).toHaveBeenCalledTimes(1); + expect(mockXack).toHaveBeenCalledWith('events:notification.requested', 'notifications-workers', '1-0'); + }); + + it('skips duplicate messages via SET NX dedupe', async () => { + const envelope = makeEnvelope(); + mockXreadgroup.mockResolvedValueOnce([ + ['events:notification.requested', [ + ['1-0', ['envelope', JSON.stringify(envelope)]], + ]], + ]); + mockSetNX.mockResolvedValueOnce(null); + + const result = await consumer.tick(); + + expect(result.skipped).toBe(1); + expect(result.processed).toBe(0); + expect(mockCommandBus.execute).not.toHaveBeenCalled(); + expect(mockXack).toHaveBeenCalledTimes(1); + }); + + it('skips messages missing the envelope field', async () => { + mockXreadgroup.mockResolvedValueOnce([ + ['events:notification.requested', [ + ['1-0', ['other_field', 'value']], + ]], + ]); + + const result = await consumer.tick(); + + expect(result.skipped).toBe(1); + expect(mockXack).toHaveBeenCalledTimes(1); + expect(mockCommandBus.execute).not.toHaveBeenCalled(); + }); + + it('moves to DLQ after exceeding max retries', async () => { + const envelope = makeEnvelope(); + mockXreadgroup.mockResolvedValueOnce([ + ['events:notification.requested', [ + ['1-0', ['envelope', JSON.stringify(envelope)]], + ]], + ]); + mockSetNX.mockResolvedValueOnce('OK'); + mockCommandBus.execute.mockRejectedValueOnce(new Error('dispatch failed')); + mockXpending.mockResolvedValueOnce([['1-0', 'worker-1', 0, 3]]); + + const result = await consumer.tick(); + + expect(result.failed).toBe(1); + expect(mockXadd).toHaveBeenCalledWith( + 'events:notification.requested:dlq', + '*', + 'envelope', JSON.stringify(envelope), + 'originalMessageId', '1-0', + 'reason', 'max_retries_exceeded', + ); + expect(mockXack).toHaveBeenCalledWith('events:notification.requested', 'notifications-workers', '1-0'); + }); + + it('dispatches to all channels in the notification payload', async () => { + const envelope = makeEnvelope({ channels: ['email', 'sms'] }); + mockXreadgroup.mockResolvedValueOnce([ + ['events:notification.requested', [ + ['1-0', ['envelope', JSON.stringify(envelope)]], + ]], + ]); + + await consumer.tick(); + + expect(mockCommandBus.execute).toHaveBeenCalledTimes(2); + }); + + it('returns zero counts when stream is empty', async () => { + mockXreadgroup.mockResolvedValueOnce(null); + + const result = await consumer.tick(); + + expect(result).toEqual({ processed: 0, skipped: 0, failed: 0 }); + }); +}); diff --git a/apps/api/src/modules/notifications/infrastructure/consumers/notifications.consumer.ts b/apps/api/src/modules/notifications/infrastructure/consumers/notifications.consumer.ts new file mode 100644 index 0000000..f33d6d5 --- /dev/null +++ b/apps/api/src/modules/notifications/infrastructure/consumers/notifications.consumer.ts @@ -0,0 +1,179 @@ +import { type EventEnvelope, type NotificationRequestedPayload } from '@goodgo/contracts-events'; +import { Injectable, Logger, type OnModuleDestroy, type OnModuleInit } from '@nestjs/common'; +import { CommandBus } from '@nestjs/cqrs'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports +import { RedisService } from '@modules/shared/infrastructure/redis.service'; +import { SendNotificationCommand } from '../../application/commands/send-notification/send-notification.command'; +import { NotificationsAsyncConfig } from '../services/notifications-async.config'; + +const STREAM = 'events:notification.requested'; +const GROUP = 'notifications-workers'; +const CONSUMER_PREFIX = 'worker'; +const BLOCK_MS = 5_000; +const BATCH_SIZE = 10; +const DEDUPE_TTL_S = 86_400; +const MAX_RETRIES = 3; + +const CHANNEL_MAP: Record = { + email: 'EMAIL', + sms: 'SMS', + fcm: 'PUSH', + zalo: 'ZALO_OA', + in_app: 'PUSH', +}; + +@Injectable() +export class NotificationsConsumer implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(NotificationsConsumer.name); + private readonly consumerName: string; + private stopped = false; + private running = false; + + constructor( + private readonly redis: RedisService, + private readonly commandBus: CommandBus, + private readonly asyncConfig: NotificationsAsyncConfig, + ) { + this.consumerName = `${CONSUMER_PREFIX}-${process.pid}`; + } + + async onModuleInit(): Promise { + if (!this.asyncConfig.asyncEnabled) { + this.logger.log('NotificationsConsumer disabled (NOTIFICATIONS_ASYNC_ENABLED is off)'); + return; + } + await this.ensureConsumerGroup(); + this.poll(); + } + + async onModuleDestroy(): Promise { + this.stopped = true; + } + + private async ensureConsumerGroup(): Promise { + const client = this.redis.getClient(); + try { + await client.xgroup('CREATE', STREAM, GROUP, '0', 'MKSTREAM'); + this.logger.log(`Created consumer group ${GROUP} on ${STREAM}`); + } catch (err: unknown) { + if (err instanceof Error && err.message.includes('BUSYGROUP')) { + this.logger.debug(`Consumer group ${GROUP} already exists`); + } else { + throw err; + } + } + } + + private poll(): void { + if (this.stopped) return; + void this.tick() + .catch((err) => this.logger.error(`Consumer tick failed: ${(err as Error).message}`)) + .finally(() => { + if (!this.stopped) setTimeout(() => this.poll(), 100); + }); + } + + async tick(): Promise<{ processed: number; skipped: number; failed: number }> { + if (this.running) return { processed: 0, skipped: 0, failed: 0 }; + this.running = true; + let processed = 0; + let skipped = 0; + let failed = 0; + + try { + const client = this.redis.getClient(); + const results = await client.xreadgroup( + 'GROUP', GROUP, this.consumerName, + 'COUNT', BATCH_SIZE, + 'BLOCK', BLOCK_MS, + 'STREAMS', STREAM, '>', + ); + + if (!results) return { processed, skipped, failed }; + + for (const [, messages] of results) { + for (const [messageId, fields] of messages) { + const envelopeIdx = fields.indexOf('envelope'); + const envelopeJson = envelopeIdx >= 0 ? fields[envelopeIdx + 1] : undefined; + if (!envelopeJson) { + this.logger.warn(`Message ${messageId} missing envelope field, ACKing`); + await client.xack(STREAM, GROUP, messageId); + skipped++; + continue; + } + + const envelope: EventEnvelope = JSON.parse(envelopeJson); + + const dedupeKey = `notif:dedupe:${envelope.payload.dedupeKey ?? envelope.eventId}`; + const isNew = await client.set(dedupeKey, '1', 'EX', DEDUPE_TTL_S, 'NX'); + if (!isNew) { + this.logger.debug(`Duplicate ${envelope.eventId}, skipping`); + await client.xack(STREAM, GROUP, messageId); + skipped++; + continue; + } + + try { + await this.dispatch(envelope.payload); + await client.xack(STREAM, GROUP, messageId); + processed++; + } catch (err) { + failed++; + this.logger.error( + `Failed to dispatch ${envelope.eventId}: ${(err as Error).message}`, + ); + await this.handleRetryOrDlq(client, messageId, envelope); + } + } + } + } finally { + this.running = false; + } + + return { processed, skipped, failed }; + } + + private async dispatch(payload: NotificationRequestedPayload): Promise { + for (const ch of payload.channels) { + const mappedChannel = CHANNEL_MAP[ch]; + if (!mappedChannel) { + this.logger.warn(`Unknown channel '${ch}' in notification ${payload.notificationId}`); + continue; + } + await this.commandBus.execute( + new SendNotificationCommand( + payload.userId, + mappedChannel as 'EMAIL' | 'SMS' | 'PUSH' | 'ZALO_OA', + payload.template, + payload.params, + (payload.params['recipientEmail'] as string) ?? + (payload.params['recipientPhone'] as string) ?? + payload.userId, + ), + ); + } + } + + private async handleRetryOrDlq( + client: ReturnType, + messageId: string, + envelope: EventEnvelope, + ): Promise { + const infoRaw = await client.xpending(STREAM, GROUP, messageId, messageId, 1); + const deliveryCount = Array.isArray(infoRaw?.[0]) ? Number(infoRaw[0][3]) : 1; + + if (deliveryCount >= MAX_RETRIES) { + this.logger.warn( + `DLQ: ${envelope.eventId} exceeded ${MAX_RETRIES} retries, moving to DLQ stream`, + ); + await client.xadd( + `${STREAM}:dlq`, + '*', + 'envelope', JSON.stringify(envelope), + 'originalMessageId', messageId, + 'reason', 'max_retries_exceeded', + ); + await client.xack(STREAM, GROUP, messageId); + } + } +} diff --git a/apps/api/src/modules/notifications/notifications.module.ts b/apps/api/src/modules/notifications/notifications.module.ts index 89f0b94..ffc8793 100644 --- a/apps/api/src/modules/notifications/notifications.module.ts +++ b/apps/api/src/modules/notifications/notifications.module.ts @@ -34,6 +34,7 @@ import { PrismaNotificationPreferenceRepository } from './infrastructure/reposit import { PrismaNotificationRepository } from './infrastructure/repositories/prisma-notification.repository'; import { EmailService } from './infrastructure/services/email.service'; import { FcmService } from './infrastructure/services/fcm.service'; +import { NotificationsConsumer } from './infrastructure/consumers/notifications.consumer'; import { NotificationsAsyncConfig } from './infrastructure/services/notifications-async.config'; import { NotificationsPublisher } from './infrastructure/services/notifications-publisher.service'; import { SmsRateLimiterService } from './infrastructure/services/sms-rate-limiter.service'; @@ -92,6 +93,7 @@ const EventListeners = [ // RFC-004 Phase 1 — async notifications (GOO-173) NotificationsAsyncConfig, NotificationsPublisher, + NotificationsConsumer, // WebSocket Gateway NotificationsGateway,