feat(notifications): pilot listener cutover + consumer skeleton (GOO-173)

Phase 1 steps 4–6 — cutover PaymentCompletedListener to the async
outbox path behind NOTIFICATIONS_ASYNC_ENABLED, and add the Redis
Streams consumer that picks up notification.requested events.

PaymentCompletedListener changes:
- Injects NotificationsAsyncConfig + NotificationsPublisher
- When asyncEnabled: builds NotificationRequestedPayload and calls
  publisher.publishStandalone() instead of commandBus.execute()
- Legacy SendNotificationCommand path retained in else branch
- recipientEmail passed via params so the consumer can resolve it

NotificationsConsumer (new):
- XREADGROUP against `events:notification.requested` stream,
  consumer group `notifications-workers`
- Idempotency via Redis SET NX EX 86400 keyed on
  envelope.payload.dedupeKey ?? envelope.eventId
- Dispatches to existing SendNotificationHandler per channel via
  CommandBus, mapping contract channels (email/sms/fcm/zalo/in_app)
  to domain channels (EMAIL/SMS/PUSH/ZALO_OA)
- DLQ: after 3 failed deliveries, XADD to
  events:notification.requested:dlq with original envelope + reason
- Consumer group created lazily with MKSTREAM; poll loop gated by
  NOTIFICATIONS_ASYNC_ENABLED
- Registered in NotificationsModule providers

Tests (28 specs, all green):
- PaymentCompletedListener: legacy path, async path, skip-when-no-email
  (4 specs, updated from 3 to match new 5-arg constructor)
- NotificationsConsumer: process message, dedupe skip, missing envelope
  skip, DLQ after max retries, multi-channel dispatch, empty stream
  (6 specs)
- NotificationsPublisher: 4 specs (unchanged)
- NotificationsAsyncConfig: 14 specs (unchanged)

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-25 18:28:29 +07:00
parent b3836d8d0f
commit f70d7e3deb
5 changed files with 389 additions and 1 deletions

View File

@@ -7,6 +7,8 @@ describe('PaymentCompletedListener', () => {
let mockCommandBus: { execute: ReturnType<typeof vi.fn> };
let mockPrisma: { user: { findUnique: ReturnType<typeof vi.fn> } };
let mockLogger: { log: ReturnType<typeof vi.fn>; warn: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn> };
let mockAsyncConfig: { asyncEnabled: boolean };
let mockPublisher: { publishStandalone: ReturnType<typeof vi.fn> };
beforeEach(() => {
mockCommandBus = { execute: vi.fn().mockResolvedValue(undefined) };
@@ -14,15 +16,19 @@ describe('PaymentCompletedListener', () => {
user: { findUnique: vi.fn() },
};
mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() };
mockAsyncConfig = { asyncEnabled: false };
mockPublisher = { publishStandalone: vi.fn().mockResolvedValue({ eventId: 'eid' }) };
listener = new PaymentCompletedListener(
mockCommandBus as any,
mockPrisma as any,
mockLogger as any,
mockAsyncConfig as any,
mockPublisher as any,
);
});
it('sends payment confirmation email', async () => {
it('sends payment confirmation email via legacy path when async disabled', async () => {
mockPrisma.user.findUnique.mockResolvedValue({ email: 'buyer@test.com' });
const event = new PaymentCompletedEvent('pay-1', 'user-1', 'VNPAY', BigInt(5000000));
@@ -35,6 +41,24 @@ describe('PaymentCompletedListener', () => {
expect(cmd.templateData.paymentId).toBe('pay-1');
expect(cmd.templateData.provider).toBe('VNPAY');
expect(cmd.recipientAddress).toBe('buyer@test.com');
expect(mockPublisher.publishStandalone).not.toHaveBeenCalled();
});
it('publishes via async outbox when NOTIFICATIONS_ASYNC_ENABLED is on', async () => {
mockAsyncConfig.asyncEnabled = true;
mockPrisma.user.findUnique.mockResolvedValue({ email: 'buyer@test.com' });
const event = new PaymentCompletedEvent('pay-1', 'user-1', 'VNPAY', BigInt(5000000));
await listener.handle(event);
expect(mockPublisher.publishStandalone).toHaveBeenCalledTimes(1);
const input = mockPublisher.publishStandalone.mock.calls[0][0];
expect(input.category).toBe('payment');
expect(input.template).toBe('payment.confirmed');
expect(input.channels).toEqual(['email']);
expect(input.params.recipientEmail).toBe('buyer@test.com');
expect(input.params.paymentId).toBe('pay-1');
expect(mockCommandBus.execute).not.toHaveBeenCalled();
});
it('skips notification when user not found', async () => {
@@ -44,6 +68,7 @@ describe('PaymentCompletedListener', () => {
await listener.handle(event);
expect(mockCommandBus.execute).not.toHaveBeenCalled();
expect(mockPublisher.publishStandalone).not.toHaveBeenCalled();
});
it('skips notification when user has no email', async () => {
@@ -53,5 +78,6 @@ describe('PaymentCompletedListener', () => {
await listener.handle(event);
expect(mockCommandBus.execute).not.toHaveBeenCalled();
expect(mockPublisher.publishStandalone).not.toHaveBeenCalled();
});
});

View File

@@ -1,8 +1,11 @@
import { Injectable } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
import { OnEvent } from '@nestjs/event-emitter';
import { uuidv7 } from '@goodgo/contracts-events';
import { type PaymentCompletedEvent } from '@modules/payments';
import { LoggerService, PrismaService } from '@modules/shared';
import { NotificationsAsyncConfig } from '../../infrastructure/services/notifications-async.config';
import { NotificationsPublisher } from '../../infrastructure/services/notifications-publisher.service';
import { SendNotificationCommand } from '../commands/send-notification/send-notification.command';
@Injectable()
@@ -11,6 +14,8 @@ export class PaymentCompletedListener {
private readonly commandBus: CommandBus,
private readonly prisma: PrismaService,
private readonly logger: LoggerService,
private readonly asyncConfig: NotificationsAsyncConfig,
private readonly publisher: NotificationsPublisher,
) {}
@OnEvent('payment.completed', { async: true })
@@ -26,6 +31,24 @@ export class PaymentCompletedListener {
const amountFormatted = new Intl.NumberFormat('vi-VN').format(event.amountVND);
if (this.asyncConfig.asyncEnabled) {
await this.publisher.publishStandalone({
notificationId: uuidv7(),
userId: event.userId,
category: 'payment',
template: 'payment.confirmed',
params: {
paymentId: event.aggregateId,
amountVND: amountFormatted,
provider: event.provider,
recipientEmail: user.email,
},
channels: ['email'],
priority: 'normal',
});
return;
}
await this.commandBus.execute(
new SendNotificationCommand(
event.userId,

View File

@@ -0,0 +1,158 @@
import { type EventEnvelope, type NotificationRequestedPayload } from '@goodgo/contracts-events';
import { describe, expect, it, vi, beforeEach } from 'vitest';
import { NotificationsConsumer } from '../consumers/notifications.consumer';
function makeEnvelope(
overrides: Partial<NotificationRequestedPayload> = {},
): EventEnvelope<NotificationRequestedPayload> {
return {
schemaVersion: 1,
eventId: overrides.notificationId ?? 'evt-1',
eventType: 'notification.requested',
occurredAt: new Date().toISOString(),
producer: 'test',
traceId: '0'.repeat(32),
payload: {
notificationId: 'notif-1',
userId: 'user-1',
category: 'payment',
template: 'payment.confirmed',
params: { paymentId: 'pay-1', recipientEmail: 'a@b.com' },
channels: ['email'],
requestedAt: new Date().toISOString(),
...overrides,
},
};
}
describe('NotificationsConsumer', () => {
let consumer: NotificationsConsumer;
let mockCommandBus: { execute: ReturnType<typeof vi.fn> };
let mockXreadgroup: ReturnType<typeof vi.fn>;
let mockXack: ReturnType<typeof vi.fn>;
let mockXadd: ReturnType<typeof vi.fn>;
let mockXpending: ReturnType<typeof vi.fn>;
let mockSetNX: ReturnType<typeof vi.fn>;
let mockXgroup: ReturnType<typeof vi.fn>;
beforeEach(() => {
mockCommandBus = { execute: vi.fn().mockResolvedValue(undefined) };
mockXreadgroup = vi.fn().mockResolvedValue(null);
mockXack = vi.fn().mockResolvedValue(1);
mockXadd = vi.fn().mockResolvedValue('dlq-1');
mockXpending = vi.fn().mockResolvedValue([]);
mockSetNX = vi.fn().mockResolvedValue('OK');
mockXgroup = vi.fn().mockResolvedValue('OK');
const mockRedis = {
getClient: () => ({
xreadgroup: mockXreadgroup,
xack: mockXack,
xadd: mockXadd,
xpending: mockXpending,
set: mockSetNX,
xgroup: mockXgroup,
}),
};
const mockAsyncConfig = { asyncEnabled: true };
consumer = new NotificationsConsumer(
mockRedis as any,
mockCommandBus as any,
mockAsyncConfig as any,
);
});
it('processes a message and dispatches via CommandBus', async () => {
const envelope = makeEnvelope();
mockXreadgroup.mockResolvedValueOnce([
['events:notification.requested', [
['1-0', ['envelope', JSON.stringify(envelope)]],
]],
]);
const result = await consumer.tick();
expect(result.processed).toBe(1);
expect(result.skipped).toBe(0);
expect(mockCommandBus.execute).toHaveBeenCalledTimes(1);
expect(mockXack).toHaveBeenCalledWith('events:notification.requested', 'notifications-workers', '1-0');
});
it('skips duplicate messages via SET NX dedupe', async () => {
const envelope = makeEnvelope();
mockXreadgroup.mockResolvedValueOnce([
['events:notification.requested', [
['1-0', ['envelope', JSON.stringify(envelope)]],
]],
]);
mockSetNX.mockResolvedValueOnce(null);
const result = await consumer.tick();
expect(result.skipped).toBe(1);
expect(result.processed).toBe(0);
expect(mockCommandBus.execute).not.toHaveBeenCalled();
expect(mockXack).toHaveBeenCalledTimes(1);
});
it('skips messages missing the envelope field', async () => {
mockXreadgroup.mockResolvedValueOnce([
['events:notification.requested', [
['1-0', ['other_field', 'value']],
]],
]);
const result = await consumer.tick();
expect(result.skipped).toBe(1);
expect(mockXack).toHaveBeenCalledTimes(1);
expect(mockCommandBus.execute).not.toHaveBeenCalled();
});
it('moves to DLQ after exceeding max retries', async () => {
const envelope = makeEnvelope();
mockXreadgroup.mockResolvedValueOnce([
['events:notification.requested', [
['1-0', ['envelope', JSON.stringify(envelope)]],
]],
]);
mockSetNX.mockResolvedValueOnce('OK');
mockCommandBus.execute.mockRejectedValueOnce(new Error('dispatch failed'));
mockXpending.mockResolvedValueOnce([['1-0', 'worker-1', 0, 3]]);
const result = await consumer.tick();
expect(result.failed).toBe(1);
expect(mockXadd).toHaveBeenCalledWith(
'events:notification.requested:dlq',
'*',
'envelope', JSON.stringify(envelope),
'originalMessageId', '1-0',
'reason', 'max_retries_exceeded',
);
expect(mockXack).toHaveBeenCalledWith('events:notification.requested', 'notifications-workers', '1-0');
});
it('dispatches to all channels in the notification payload', async () => {
const envelope = makeEnvelope({ channels: ['email', 'sms'] });
mockXreadgroup.mockResolvedValueOnce([
['events:notification.requested', [
['1-0', ['envelope', JSON.stringify(envelope)]],
]],
]);
await consumer.tick();
expect(mockCommandBus.execute).toHaveBeenCalledTimes(2);
});
it('returns zero counts when stream is empty', async () => {
mockXreadgroup.mockResolvedValueOnce(null);
const result = await consumer.tick();
expect(result).toEqual({ processed: 0, skipped: 0, failed: 0 });
});
});

View File

@@ -0,0 +1,179 @@
import { type EventEnvelope, type NotificationRequestedPayload } from '@goodgo/contracts-events';
import { Injectable, Logger, type OnModuleDestroy, type OnModuleInit } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { RedisService } from '@modules/shared/infrastructure/redis.service';
import { SendNotificationCommand } from '../../application/commands/send-notification/send-notification.command';
import { NotificationsAsyncConfig } from '../services/notifications-async.config';
const STREAM = 'events:notification.requested';
const GROUP = 'notifications-workers';
const CONSUMER_PREFIX = 'worker';
const BLOCK_MS = 5_000;
const BATCH_SIZE = 10;
const DEDUPE_TTL_S = 86_400;
const MAX_RETRIES = 3;
const CHANNEL_MAP: Record<string, string> = {
email: 'EMAIL',
sms: 'SMS',
fcm: 'PUSH',
zalo: 'ZALO_OA',
in_app: 'PUSH',
};
@Injectable()
export class NotificationsConsumer implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(NotificationsConsumer.name);
private readonly consumerName: string;
private stopped = false;
private running = false;
constructor(
private readonly redis: RedisService,
private readonly commandBus: CommandBus,
private readonly asyncConfig: NotificationsAsyncConfig,
) {
this.consumerName = `${CONSUMER_PREFIX}-${process.pid}`;
}
async onModuleInit(): Promise<void> {
if (!this.asyncConfig.asyncEnabled) {
this.logger.log('NotificationsConsumer disabled (NOTIFICATIONS_ASYNC_ENABLED is off)');
return;
}
await this.ensureConsumerGroup();
this.poll();
}
async onModuleDestroy(): Promise<void> {
this.stopped = true;
}
private async ensureConsumerGroup(): Promise<void> {
const client = this.redis.getClient();
try {
await client.xgroup('CREATE', STREAM, GROUP, '0', 'MKSTREAM');
this.logger.log(`Created consumer group ${GROUP} on ${STREAM}`);
} catch (err: unknown) {
if (err instanceof Error && err.message.includes('BUSYGROUP')) {
this.logger.debug(`Consumer group ${GROUP} already exists`);
} else {
throw err;
}
}
}
private poll(): void {
if (this.stopped) return;
void this.tick()
.catch((err) => this.logger.error(`Consumer tick failed: ${(err as Error).message}`))
.finally(() => {
if (!this.stopped) setTimeout(() => this.poll(), 100);
});
}
async tick(): Promise<{ processed: number; skipped: number; failed: number }> {
if (this.running) return { processed: 0, skipped: 0, failed: 0 };
this.running = true;
let processed = 0;
let skipped = 0;
let failed = 0;
try {
const client = this.redis.getClient();
const results = await client.xreadgroup(
'GROUP', GROUP, this.consumerName,
'COUNT', BATCH_SIZE,
'BLOCK', BLOCK_MS,
'STREAMS', STREAM, '>',
);
if (!results) return { processed, skipped, failed };
for (const [, messages] of results) {
for (const [messageId, fields] of messages) {
const envelopeIdx = fields.indexOf('envelope');
const envelopeJson = envelopeIdx >= 0 ? fields[envelopeIdx + 1] : undefined;
if (!envelopeJson) {
this.logger.warn(`Message ${messageId} missing envelope field, ACKing`);
await client.xack(STREAM, GROUP, messageId);
skipped++;
continue;
}
const envelope: EventEnvelope<NotificationRequestedPayload> = JSON.parse(envelopeJson);
const dedupeKey = `notif:dedupe:${envelope.payload.dedupeKey ?? envelope.eventId}`;
const isNew = await client.set(dedupeKey, '1', 'EX', DEDUPE_TTL_S, 'NX');
if (!isNew) {
this.logger.debug(`Duplicate ${envelope.eventId}, skipping`);
await client.xack(STREAM, GROUP, messageId);
skipped++;
continue;
}
try {
await this.dispatch(envelope.payload);
await client.xack(STREAM, GROUP, messageId);
processed++;
} catch (err) {
failed++;
this.logger.error(
`Failed to dispatch ${envelope.eventId}: ${(err as Error).message}`,
);
await this.handleRetryOrDlq(client, messageId, envelope);
}
}
}
} finally {
this.running = false;
}
return { processed, skipped, failed };
}
private async dispatch(payload: NotificationRequestedPayload): Promise<void> {
for (const ch of payload.channels) {
const mappedChannel = CHANNEL_MAP[ch];
if (!mappedChannel) {
this.logger.warn(`Unknown channel '${ch}' in notification ${payload.notificationId}`);
continue;
}
await this.commandBus.execute(
new SendNotificationCommand(
payload.userId,
mappedChannel as 'EMAIL' | 'SMS' | 'PUSH' | 'ZALO_OA',
payload.template,
payload.params,
(payload.params['recipientEmail'] as string) ??
(payload.params['recipientPhone'] as string) ??
payload.userId,
),
);
}
}
private async handleRetryOrDlq(
client: ReturnType<RedisService['getClient']>,
messageId: string,
envelope: EventEnvelope<NotificationRequestedPayload>,
): Promise<void> {
const infoRaw = await client.xpending(STREAM, GROUP, messageId, messageId, 1);
const deliveryCount = Array.isArray(infoRaw?.[0]) ? Number(infoRaw[0][3]) : 1;
if (deliveryCount >= MAX_RETRIES) {
this.logger.warn(
`DLQ: ${envelope.eventId} exceeded ${MAX_RETRIES} retries, moving to DLQ stream`,
);
await client.xadd(
`${STREAM}:dlq`,
'*',
'envelope', JSON.stringify(envelope),
'originalMessageId', messageId,
'reason', 'max_retries_exceeded',
);
await client.xack(STREAM, GROUP, messageId);
}
}
}

View File

@@ -34,6 +34,7 @@ import { PrismaNotificationPreferenceRepository } from './infrastructure/reposit
import { PrismaNotificationRepository } from './infrastructure/repositories/prisma-notification.repository';
import { EmailService } from './infrastructure/services/email.service';
import { FcmService } from './infrastructure/services/fcm.service';
import { NotificationsConsumer } from './infrastructure/consumers/notifications.consumer';
import { NotificationsAsyncConfig } from './infrastructure/services/notifications-async.config';
import { NotificationsPublisher } from './infrastructure/services/notifications-publisher.service';
import { SmsRateLimiterService } from './infrastructure/services/sms-rate-limiter.service';
@@ -92,6 +93,7 @@ const EventListeners = [
// RFC-004 Phase 1 — async notifications (GOO-173)
NotificationsAsyncConfig,
NotificationsPublisher,
NotificationsConsumer,
// WebSocket Gateway
NotificationsGateway,