Compare commits

...

6 Commits

Author SHA1 Message Date
Ho Ngoc Hai
0aa4fec615 feat(infra): scope pre-commit test hook to staged packages (GOO-228)
Replace blanket `npm test` with lint-staged for linting/formatting and
a turbo --filter script that runs tests only for workspace packages
that have staged .ts/.tsx files.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-25 18:34:38 +07:00
Ho Ngoc Hai
f70d7e3deb feat(notifications): pilot listener cutover + consumer skeleton (GOO-173)
Phase 1 steps 4–6 — cutover PaymentCompletedListener to the async
outbox path behind NOTIFICATIONS_ASYNC_ENABLED, and add the Redis
Streams consumer that picks up notification.requested events.

PaymentCompletedListener changes:
- Injects NotificationsAsyncConfig + NotificationsPublisher
- When asyncEnabled: builds NotificationRequestedPayload and calls
  publisher.publishStandalone() instead of commandBus.execute()
- Legacy SendNotificationCommand path retained in else branch
- recipientEmail passed via params so the consumer can resolve it

NotificationsConsumer (new):
- XREADGROUP against `events:notification.requested` stream,
  consumer group `notifications-workers`
- Idempotency via Redis SET NX EX 86400 keyed on
  envelope.payload.dedupeKey ?? envelope.eventId
- Dispatches to existing SendNotificationHandler per channel via
  CommandBus, mapping contract channels (email/sms/fcm/zalo/in_app)
  to domain channels (EMAIL/SMS/PUSH/ZALO_OA)
- DLQ: after 3 failed deliveries, XADD to
  events:notification.requested:dlq with original envelope + reason
- Consumer group created lazily with MKSTREAM; poll loop gated by
  NOTIFICATIONS_ASYNC_ENABLED
- Registered in NotificationsModule providers

Tests (28 specs, all green):
- PaymentCompletedListener: legacy path, async path, skip-when-no-email
  (4 specs, updated from 3 to match new 5-arg constructor)
- NotificationsConsumer: process message, dedupe skip, missing envelope
  skip, DLQ after max retries, multi-channel dispatch, empty stream
  (6 specs)
- NotificationsPublisher: 4 specs (unchanged)
- NotificationsAsyncConfig: 14 specs (unchanged)

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-25 18:28:29 +07:00
Ho Ngoc Hai
b3836d8d0f feat(notifications): wire publisher + async feature flag (GOO-173)
Phase 1 step 3 — make NotificationsPublisher available via DI and
introduce the NOTIFICATIONS_ASYNC_ENABLED flag that will gate the
listener cutover.

- NotificationsAsyncConfig: tiny injectable that reads
  NOTIFICATIONS_ASYNC_ENABLED (truthy: 1/true/yes/on, default disabled).
  Callers don't touch process.env directly; per-category rollout can be
  added later without churn in listeners.
- NotificationsModule providers/exports now include NotificationsPublisher
  and NotificationsAsyncConfig so listeners/handlers can inject them.
- Tests (14 specs on the flag + 4 on the publisher = 18 total green):
  * default disabled when unset
  * truthy parsing (1, true, TRUE, yes, on, padded)
  * falsy parsing (false, 0, no, off, empty, unknown)
  * describe() reports human-readable state

No call sites updated yet — next commit migrates the first listener
(PaymentCompletedListener pilot) onto the publisher with the flag check.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 14:26:10 +07:00
Ho Ngoc Hai
cf1dee5491 feat(notifications): add NotificationsPublisher (outbox-backed) — GOO-173
Phase 1 step 2 — introduce the publisher primitive that emits
notification.requested envelopes through the RFC-004 transactional
outbox. Listeners and command handlers will migrate onto this in a
follow-up commit (flag-gated cutover).

- NotificationsPublisher exposes:
  * publishWithin(tx, input) — preferred; appends to outbox inside an
    existing Prisma transaction so the row commits atomically with the
    domain mutation that triggered the notification
  * publishStandalone(input) — opens a single-row tx; convenience for
    callers that don't already own one
- Builds EventEnvelope<NotificationRequestedPayload> via the Phase 0
  envelope builder (UUIDv7 eventId, current trace id, ISO occurredAt)
- Producer string: "goodgo-api/notifications"; eventType:
  "notification.requested"
- aggregateId on outbox row = notificationId for downstream tracing
- Optional fields (locale, priority, dedupeKey) only included when set,
  matching the JSON Schema's additionalProperties=false contract

Tests (4 specs, all green):
- publishWithin builds a valid envelope (assertValidEnvelope) and writes
  to the supplied tx with aggregateId
- publishStandalone opens its own transaction
- Optional fields are omitted when undefined and preserved when provided
- UUIDv7 eventIds are strictly time-ordered between successive calls

Not yet wired in NotificationsModule providers — that lands with the
listener cutover in the next commit so we don't ship dead DI nodes.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 14:12:55 +07:00
Ho Ngoc Hai
c68883bd69 feat(contracts): add notification.requested event schema (GOO-173)
Phase 1 RFC-004 prep — define the event contract that the notifications
module will publish through the Phase 0 outbox + Redis Streams backbone.

- Add notification.requested.schema.json (JSON Schema 2020-12) covering
  notificationId, userId, category, template, params, channels, locale,
  priority, dedupeKey, requestedAt
- Add NotificationRequestedPayload + supporting union types
  (NotificationCategory, NotificationChannel, NotificationLocale,
  NotificationPriority) to @goodgo/contracts-events barrel
- Register 'notification.requested' in KNOWN_EVENT_TYPES

Pure contract addition; no producer/consumer wiring yet (follow-up commits
in this branch will introduce the publisher refactor + BullMQ worker).

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 12:40:33 +07:00
Ho Ngoc Hai
dd9d5261ad fix(a11y): fix color contrast for foreground-dim tokens (GOO-109)
Light --foreground-dim: 215 12% 60% → 215 14% 45% (~4.6:1 on #f7f7f8 bg)
Dark  --foreground-dim: 215 12% 35% → 215 12% 70% (~5.2:1 on #090c12 bg)
--muted-foreground verified passing WCAG AA in both themes, no change needed.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-24 12:38:56 +07:00
25 changed files with 842 additions and 16 deletions

View File

@@ -1 +1,2 @@
npm test
npx lint-staged
bash scripts/pre-commit-tests.sh

View File

@@ -7,6 +7,8 @@ describe('PaymentCompletedListener', () => {
let mockCommandBus: { execute: ReturnType<typeof vi.fn> };
let mockPrisma: { user: { findUnique: ReturnType<typeof vi.fn> } };
let mockLogger: { log: ReturnType<typeof vi.fn>; warn: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn> };
let mockAsyncConfig: { asyncEnabled: boolean };
let mockPublisher: { publishStandalone: ReturnType<typeof vi.fn> };
beforeEach(() => {
mockCommandBus = { execute: vi.fn().mockResolvedValue(undefined) };
@@ -14,15 +16,19 @@ describe('PaymentCompletedListener', () => {
user: { findUnique: vi.fn() },
};
mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() };
mockAsyncConfig = { asyncEnabled: false };
mockPublisher = { publishStandalone: vi.fn().mockResolvedValue({ eventId: 'eid' }) };
listener = new PaymentCompletedListener(
mockCommandBus as any,
mockPrisma as any,
mockLogger as any,
mockAsyncConfig as any,
mockPublisher as any,
);
});
it('sends payment confirmation email', async () => {
it('sends payment confirmation email via legacy path when async disabled', async () => {
mockPrisma.user.findUnique.mockResolvedValue({ email: 'buyer@test.com' });
const event = new PaymentCompletedEvent('pay-1', 'user-1', 'VNPAY', BigInt(5000000));
@@ -35,6 +41,24 @@ describe('PaymentCompletedListener', () => {
expect(cmd.templateData.paymentId).toBe('pay-1');
expect(cmd.templateData.provider).toBe('VNPAY');
expect(cmd.recipientAddress).toBe('buyer@test.com');
expect(mockPublisher.publishStandalone).not.toHaveBeenCalled();
});
it('publishes via async outbox when NOTIFICATIONS_ASYNC_ENABLED is on', async () => {
mockAsyncConfig.asyncEnabled = true;
mockPrisma.user.findUnique.mockResolvedValue({ email: 'buyer@test.com' });
const event = new PaymentCompletedEvent('pay-1', 'user-1', 'VNPAY', BigInt(5000000));
await listener.handle(event);
expect(mockPublisher.publishStandalone).toHaveBeenCalledTimes(1);
const input = mockPublisher.publishStandalone.mock.calls[0][0];
expect(input.category).toBe('payment');
expect(input.template).toBe('payment.confirmed');
expect(input.channels).toEqual(['email']);
expect(input.params.recipientEmail).toBe('buyer@test.com');
expect(input.params.paymentId).toBe('pay-1');
expect(mockCommandBus.execute).not.toHaveBeenCalled();
});
it('skips notification when user not found', async () => {
@@ -44,6 +68,7 @@ describe('PaymentCompletedListener', () => {
await listener.handle(event);
expect(mockCommandBus.execute).not.toHaveBeenCalled();
expect(mockPublisher.publishStandalone).not.toHaveBeenCalled();
});
it('skips notification when user has no email', async () => {
@@ -53,5 +78,6 @@ describe('PaymentCompletedListener', () => {
await listener.handle(event);
expect(mockCommandBus.execute).not.toHaveBeenCalled();
expect(mockPublisher.publishStandalone).not.toHaveBeenCalled();
});
});

View File

@@ -1,8 +1,11 @@
import { Injectable } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
import { OnEvent } from '@nestjs/event-emitter';
import { uuidv7 } from '@goodgo/contracts-events';
import { type PaymentCompletedEvent } from '@modules/payments';
import { LoggerService, PrismaService } from '@modules/shared';
import { NotificationsAsyncConfig } from '../../infrastructure/services/notifications-async.config';
import { NotificationsPublisher } from '../../infrastructure/services/notifications-publisher.service';
import { SendNotificationCommand } from '../commands/send-notification/send-notification.command';
@Injectable()
@@ -11,6 +14,8 @@ export class PaymentCompletedListener {
private readonly commandBus: CommandBus,
private readonly prisma: PrismaService,
private readonly logger: LoggerService,
private readonly asyncConfig: NotificationsAsyncConfig,
private readonly publisher: NotificationsPublisher,
) {}
@OnEvent('payment.completed', { async: true })
@@ -26,6 +31,24 @@ export class PaymentCompletedListener {
const amountFormatted = new Intl.NumberFormat('vi-VN').format(event.amountVND);
if (this.asyncConfig.asyncEnabled) {
await this.publisher.publishStandalone({
notificationId: uuidv7(),
userId: event.userId,
category: 'payment',
template: 'payment.confirmed',
params: {
paymentId: event.aggregateId,
amountVND: amountFormatted,
provider: event.provider,
recipientEmail: user.email,
},
channels: ['email'],
priority: 'normal',
});
return;
}
await this.commandBus.execute(
new SendNotificationCommand(
event.userId,

View File

@@ -0,0 +1,48 @@
import { afterEach, describe, expect, it } from 'vitest';
import { NotificationsAsyncConfig } from '../services/notifications-async.config';
const FLAG = 'NOTIFICATIONS_ASYNC_ENABLED';
describe('NotificationsAsyncConfig', () => {
const originalValue = process.env[FLAG];
afterEach(() => {
if (originalValue === undefined) {
delete process.env[FLAG];
} else {
process.env[FLAG] = originalValue;
}
});
it('defaults to disabled when the env var is unset', () => {
delete process.env[FLAG];
expect(new NotificationsAsyncConfig().asyncEnabled).toBe(false);
});
it.each(['true', 'TRUE', '1', 'yes', 'on', ' true '])(
'treats %j as enabled',
(value) => {
process.env[FLAG] = value;
expect(new NotificationsAsyncConfig().asyncEnabled).toBe(true);
},
);
it.each(['false', '0', 'no', 'off', '', 'maybe'])(
'treats %j as disabled',
(value) => {
process.env[FLAG] = value;
expect(new NotificationsAsyncConfig().asyncEnabled).toBe(false);
},
);
it('describe() reports the human-readable state', () => {
process.env[FLAG] = 'true';
expect(new NotificationsAsyncConfig().describe()).toBe(
'NOTIFICATIONS_ASYNC_ENABLED=enabled',
);
process.env[FLAG] = 'false';
expect(new NotificationsAsyncConfig().describe()).toBe(
'NOTIFICATIONS_ASYNC_ENABLED=disabled',
);
});
});

View File

@@ -0,0 +1,158 @@
import { type EventEnvelope, type NotificationRequestedPayload } from '@goodgo/contracts-events';
import { describe, expect, it, vi, beforeEach } from 'vitest';
import { NotificationsConsumer } from '../consumers/notifications.consumer';
function makeEnvelope(
overrides: Partial<NotificationRequestedPayload> = {},
): EventEnvelope<NotificationRequestedPayload> {
return {
schemaVersion: 1,
eventId: overrides.notificationId ?? 'evt-1',
eventType: 'notification.requested',
occurredAt: new Date().toISOString(),
producer: 'test',
traceId: '0'.repeat(32),
payload: {
notificationId: 'notif-1',
userId: 'user-1',
category: 'payment',
template: 'payment.confirmed',
params: { paymentId: 'pay-1', recipientEmail: 'a@b.com' },
channels: ['email'],
requestedAt: new Date().toISOString(),
...overrides,
},
};
}
describe('NotificationsConsumer', () => {
let consumer: NotificationsConsumer;
let mockCommandBus: { execute: ReturnType<typeof vi.fn> };
let mockXreadgroup: ReturnType<typeof vi.fn>;
let mockXack: ReturnType<typeof vi.fn>;
let mockXadd: ReturnType<typeof vi.fn>;
let mockXpending: ReturnType<typeof vi.fn>;
let mockSetNX: ReturnType<typeof vi.fn>;
let mockXgroup: ReturnType<typeof vi.fn>;
beforeEach(() => {
mockCommandBus = { execute: vi.fn().mockResolvedValue(undefined) };
mockXreadgroup = vi.fn().mockResolvedValue(null);
mockXack = vi.fn().mockResolvedValue(1);
mockXadd = vi.fn().mockResolvedValue('dlq-1');
mockXpending = vi.fn().mockResolvedValue([]);
mockSetNX = vi.fn().mockResolvedValue('OK');
mockXgroup = vi.fn().mockResolvedValue('OK');
const mockRedis = {
getClient: () => ({
xreadgroup: mockXreadgroup,
xack: mockXack,
xadd: mockXadd,
xpending: mockXpending,
set: mockSetNX,
xgroup: mockXgroup,
}),
};
const mockAsyncConfig = { asyncEnabled: true };
consumer = new NotificationsConsumer(
mockRedis as any,
mockCommandBus as any,
mockAsyncConfig as any,
);
});
it('processes a message and dispatches via CommandBus', async () => {
const envelope = makeEnvelope();
mockXreadgroup.mockResolvedValueOnce([
['events:notification.requested', [
['1-0', ['envelope', JSON.stringify(envelope)]],
]],
]);
const result = await consumer.tick();
expect(result.processed).toBe(1);
expect(result.skipped).toBe(0);
expect(mockCommandBus.execute).toHaveBeenCalledTimes(1);
expect(mockXack).toHaveBeenCalledWith('events:notification.requested', 'notifications-workers', '1-0');
});
it('skips duplicate messages via SET NX dedupe', async () => {
const envelope = makeEnvelope();
mockXreadgroup.mockResolvedValueOnce([
['events:notification.requested', [
['1-0', ['envelope', JSON.stringify(envelope)]],
]],
]);
mockSetNX.mockResolvedValueOnce(null);
const result = await consumer.tick();
expect(result.skipped).toBe(1);
expect(result.processed).toBe(0);
expect(mockCommandBus.execute).not.toHaveBeenCalled();
expect(mockXack).toHaveBeenCalledTimes(1);
});
it('skips messages missing the envelope field', async () => {
mockXreadgroup.mockResolvedValueOnce([
['events:notification.requested', [
['1-0', ['other_field', 'value']],
]],
]);
const result = await consumer.tick();
expect(result.skipped).toBe(1);
expect(mockXack).toHaveBeenCalledTimes(1);
expect(mockCommandBus.execute).not.toHaveBeenCalled();
});
it('moves to DLQ after exceeding max retries', async () => {
const envelope = makeEnvelope();
mockXreadgroup.mockResolvedValueOnce([
['events:notification.requested', [
['1-0', ['envelope', JSON.stringify(envelope)]],
]],
]);
mockSetNX.mockResolvedValueOnce('OK');
mockCommandBus.execute.mockRejectedValueOnce(new Error('dispatch failed'));
mockXpending.mockResolvedValueOnce([['1-0', 'worker-1', 0, 3]]);
const result = await consumer.tick();
expect(result.failed).toBe(1);
expect(mockXadd).toHaveBeenCalledWith(
'events:notification.requested:dlq',
'*',
'envelope', JSON.stringify(envelope),
'originalMessageId', '1-0',
'reason', 'max_retries_exceeded',
);
expect(mockXack).toHaveBeenCalledWith('events:notification.requested', 'notifications-workers', '1-0');
});
it('dispatches to all channels in the notification payload', async () => {
const envelope = makeEnvelope({ channels: ['email', 'sms'] });
mockXreadgroup.mockResolvedValueOnce([
['events:notification.requested', [
['1-0', ['envelope', JSON.stringify(envelope)]],
]],
]);
await consumer.tick();
expect(mockCommandBus.execute).toHaveBeenCalledTimes(2);
});
it('returns zero counts when stream is empty', async () => {
mockXreadgroup.mockResolvedValueOnce(null);
const result = await consumer.tick();
expect(result).toEqual({ processed: 0, skipped: 0, failed: 0 });
});
});

View File

@@ -0,0 +1,87 @@
import { assertValidEnvelope } from '@goodgo/contracts-events';
import { describe, expect, it, vi, beforeEach } from 'vitest';
import { NotificationsPublisher } from '../services/notifications-publisher.service';
type Envelope = Parameters<typeof assertValidEnvelope>[0];
describe('NotificationsPublisher', () => {
let outboxAppend: ReturnType<typeof vi.fn>;
let prismaTransaction: ReturnType<typeof vi.fn>;
let publisher: NotificationsPublisher;
beforeEach(() => {
outboxAppend = vi.fn().mockResolvedValue(undefined);
prismaTransaction = vi.fn(async (cb: (tx: unknown) => Promise<unknown>) => cb({}));
const outbox = { append: outboxAppend } as unknown as ConstructorParameters<
typeof NotificationsPublisher
>[1];
const prisma = { $transaction: prismaTransaction } as unknown as ConstructorParameters<
typeof NotificationsPublisher
>[0];
publisher = new NotificationsPublisher(prisma, outbox);
});
const baseInput = {
notificationId: '01920000-0000-7000-8000-000000000001',
userId: 'user-1',
category: 'listing' as const,
template: 'listing.approved.email.vi',
params: { listingTitle: 'Nhà đẹp Q1' },
channels: ['email' as const, 'in_app' as const],
};
it('publishWithin builds a valid envelope and writes to outbox with aggregateId=notificationId', async () => {
const tx = {} as unknown;
const envelope = await publisher.publishWithin(tx as Parameters<typeof publisher.publishWithin>[0], baseInput);
expect(envelope.schemaVersion).toBe(1);
expect(envelope.eventType).toBe('notification.requested');
expect(envelope.producer).toBe('goodgo-api/notifications');
expect(envelope.payload.notificationId).toBe(baseInput.notificationId);
expect(envelope.payload.channels).toEqual(['email', 'in_app']);
expect(envelope.payload.requestedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/);
expect(() => assertValidEnvelope(envelope as Envelope)).not.toThrow();
expect(outboxAppend).toHaveBeenCalledTimes(1);
expect(outboxAppend).toHaveBeenCalledWith(tx, envelope, { aggregateId: baseInput.notificationId });
expect(prismaTransaction).not.toHaveBeenCalled();
});
it('publishStandalone opens a transaction and appends inside it', async () => {
const envelope = await publisher.publishStandalone(baseInput);
expect(() => assertValidEnvelope(envelope as Envelope)).not.toThrow();
expect(prismaTransaction).toHaveBeenCalledTimes(1);
expect(outboxAppend).toHaveBeenCalledTimes(1);
});
it('omits optional fields when undefined but preserves them when provided', async () => {
const tx = {} as Parameters<typeof publisher.publishWithin>[0];
const minimal = await publisher.publishWithin(tx, baseInput);
expect(minimal.payload).not.toHaveProperty('locale');
expect(minimal.payload).not.toHaveProperty('priority');
expect(minimal.payload).not.toHaveProperty('dedupeKey');
const rich = await publisher.publishWithin(tx, {
...baseInput,
locale: 'vi',
priority: 'high',
dedupeKey: 'listing:123:approved',
});
expect(rich.payload.locale).toBe('vi');
expect(rich.payload.priority).toBe('high');
expect(rich.payload.dedupeKey).toBe('listing:123:approved');
});
it('uses UUIDv7 for eventId so envelopes sort by time naturally', async () => {
const tx = {} as Parameters<typeof publisher.publishWithin>[0];
const first = await publisher.publishWithin(tx, baseInput);
await new Promise((r) => setTimeout(r, 2));
const second = await publisher.publishWithin(tx, baseInput);
expect(first.eventId).not.toBe(second.eventId);
// UUIDv7: first 48 bits = unix-ms timestamp; lexical compare matches time
expect(first.eventId < second.eventId).toBe(true);
});
});

View File

@@ -0,0 +1,179 @@
import { type EventEnvelope, type NotificationRequestedPayload } from '@goodgo/contracts-events';
import { Injectable, Logger, type OnModuleDestroy, type OnModuleInit } from '@nestjs/common';
import { CommandBus } from '@nestjs/cqrs';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { RedisService } from '@modules/shared/infrastructure/redis.service';
import { SendNotificationCommand } from '../../application/commands/send-notification/send-notification.command';
import { NotificationsAsyncConfig } from '../services/notifications-async.config';
const STREAM = 'events:notification.requested';
const GROUP = 'notifications-workers';
const CONSUMER_PREFIX = 'worker';
const BLOCK_MS = 5_000;
const BATCH_SIZE = 10;
const DEDUPE_TTL_S = 86_400;
const MAX_RETRIES = 3;
const CHANNEL_MAP: Record<string, string> = {
email: 'EMAIL',
sms: 'SMS',
fcm: 'PUSH',
zalo: 'ZALO_OA',
in_app: 'PUSH',
};
@Injectable()
export class NotificationsConsumer implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(NotificationsConsumer.name);
private readonly consumerName: string;
private stopped = false;
private running = false;
constructor(
private readonly redis: RedisService,
private readonly commandBus: CommandBus,
private readonly asyncConfig: NotificationsAsyncConfig,
) {
this.consumerName = `${CONSUMER_PREFIX}-${process.pid}`;
}
async onModuleInit(): Promise<void> {
if (!this.asyncConfig.asyncEnabled) {
this.logger.log('NotificationsConsumer disabled (NOTIFICATIONS_ASYNC_ENABLED is off)');
return;
}
await this.ensureConsumerGroup();
this.poll();
}
async onModuleDestroy(): Promise<void> {
this.stopped = true;
}
private async ensureConsumerGroup(): Promise<void> {
const client = this.redis.getClient();
try {
await client.xgroup('CREATE', STREAM, GROUP, '0', 'MKSTREAM');
this.logger.log(`Created consumer group ${GROUP} on ${STREAM}`);
} catch (err: unknown) {
if (err instanceof Error && err.message.includes('BUSYGROUP')) {
this.logger.debug(`Consumer group ${GROUP} already exists`);
} else {
throw err;
}
}
}
private poll(): void {
if (this.stopped) return;
void this.tick()
.catch((err) => this.logger.error(`Consumer tick failed: ${(err as Error).message}`))
.finally(() => {
if (!this.stopped) setTimeout(() => this.poll(), 100);
});
}
async tick(): Promise<{ processed: number; skipped: number; failed: number }> {
if (this.running) return { processed: 0, skipped: 0, failed: 0 };
this.running = true;
let processed = 0;
let skipped = 0;
let failed = 0;
try {
const client = this.redis.getClient();
const results = await client.xreadgroup(
'GROUP', GROUP, this.consumerName,
'COUNT', BATCH_SIZE,
'BLOCK', BLOCK_MS,
'STREAMS', STREAM, '>',
);
if (!results) return { processed, skipped, failed };
for (const [, messages] of results) {
for (const [messageId, fields] of messages) {
const envelopeIdx = fields.indexOf('envelope');
const envelopeJson = envelopeIdx >= 0 ? fields[envelopeIdx + 1] : undefined;
if (!envelopeJson) {
this.logger.warn(`Message ${messageId} missing envelope field, ACKing`);
await client.xack(STREAM, GROUP, messageId);
skipped++;
continue;
}
const envelope: EventEnvelope<NotificationRequestedPayload> = JSON.parse(envelopeJson);
const dedupeKey = `notif:dedupe:${envelope.payload.dedupeKey ?? envelope.eventId}`;
const isNew = await client.set(dedupeKey, '1', 'EX', DEDUPE_TTL_S, 'NX');
if (!isNew) {
this.logger.debug(`Duplicate ${envelope.eventId}, skipping`);
await client.xack(STREAM, GROUP, messageId);
skipped++;
continue;
}
try {
await this.dispatch(envelope.payload);
await client.xack(STREAM, GROUP, messageId);
processed++;
} catch (err) {
failed++;
this.logger.error(
`Failed to dispatch ${envelope.eventId}: ${(err as Error).message}`,
);
await this.handleRetryOrDlq(client, messageId, envelope);
}
}
}
} finally {
this.running = false;
}
return { processed, skipped, failed };
}
private async dispatch(payload: NotificationRequestedPayload): Promise<void> {
for (const ch of payload.channels) {
const mappedChannel = CHANNEL_MAP[ch];
if (!mappedChannel) {
this.logger.warn(`Unknown channel '${ch}' in notification ${payload.notificationId}`);
continue;
}
await this.commandBus.execute(
new SendNotificationCommand(
payload.userId,
mappedChannel as 'EMAIL' | 'SMS' | 'PUSH' | 'ZALO_OA',
payload.template,
payload.params,
(payload.params['recipientEmail'] as string) ??
(payload.params['recipientPhone'] as string) ??
payload.userId,
),
);
}
}
private async handleRetryOrDlq(
client: ReturnType<RedisService['getClient']>,
messageId: string,
envelope: EventEnvelope<NotificationRequestedPayload>,
): Promise<void> {
const infoRaw = await client.xpending(STREAM, GROUP, messageId, messageId, 1);
const deliveryCount = Array.isArray(infoRaw?.[0]) ? Number(infoRaw[0][3]) : 1;
if (deliveryCount >= MAX_RETRIES) {
this.logger.warn(
`DLQ: ${envelope.eventId} exceeded ${MAX_RETRIES} retries, moving to DLQ stream`,
);
await client.xadd(
`${STREAM}:dlq`,
'*',
'envelope', JSON.stringify(envelope),
'originalMessageId', messageId,
'reason', 'max_retries_exceeded',
);
await client.xack(STREAM, GROUP, messageId);
}
}
}

View File

@@ -0,0 +1,33 @@
/**
* Feature-flag config for the Phase 1 async notifications path (RFC-004 / GOO-173).
*
* When NOTIFICATIONS_ASYNC_ENABLED is `true`, producers route through
* `NotificationsPublisher` → outbox → Redis Streams → BullMQ consumer.
* When `false` (default until parity is confirmed in production), producers
* continue to execute `SendNotificationCommand` directly in-process.
*
* Kept as a tiny injectable so:
* - callers don't read process.env directly (testability)
* - we have one place to evolve the flag shape (per-category rollout,
* percentage shadowing, etc.) without ripping through listeners again
*/
import { Injectable } from '@nestjs/common';
const FLAG_ENV = 'NOTIFICATIONS_ASYNC_ENABLED';
@Injectable()
export class NotificationsAsyncConfig {
/** True when the async outbox path should be used for new notifications. */
get asyncEnabled(): boolean {
const raw = process.env[FLAG_ENV];
if (raw === undefined) return false;
const v = raw.trim().toLowerCase();
return v === '1' || v === 'true' || v === 'yes' || v === 'on';
}
/** Exposed for observability / startup logs. */
describe(): string {
return `${FLAG_ENV}=${this.asyncEnabled ? 'enabled' : 'disabled'}`;
}
}

View File

@@ -0,0 +1,100 @@
import {
type EventEnvelope,
type NotificationCategory,
type NotificationChannel,
type NotificationLocale,
type NotificationPriority,
type NotificationRequestedPayload,
} from '@goodgo/contracts-events';
import { Injectable, Logger } from '@nestjs/common';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { OutboxService } from '@modules/shared/infrastructure/outbox';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { PrismaService } from '@modules/shared/infrastructure/prisma.service';
import { buildEnvelope } from '@modules/shared/infrastructure/event-bus';
const PRODUCER = 'goodgo-api/notifications';
const EVENT_TYPE = 'notification.requested';
export interface PublishNotificationInput {
notificationId: string;
userId: string;
category: NotificationCategory;
template: string;
params: Record<string, unknown>;
channels: NotificationChannel[];
locale?: NotificationLocale;
priority?: NotificationPriority;
dedupeKey?: string;
requestedAt?: Date;
}
type PrismaTxLike = Parameters<Parameters<PrismaService['$transaction']>[0]>[0];
/**
* Publishes `notification.requested` events through the RFC-004 outbox.
*
* Two entry points:
* - `publishWithin(tx, input)` — call inside an existing Prisma transaction
* so the outbox row commits atomically with the domain mutation that
* triggered the notification (preferred).
* - `publishStandalone(input)` — convenience for callers that don't already
* own a transaction; opens a one-row tx around the outbox append. Use
* sparingly; prefer transactional publishing.
*
* The publisher does NOT enqueue or deliver. The OutboxRelay (Phase 0) drains
* the table and forwards envelopes to the EventBus, where the BullMQ
* notifications consumer (forthcoming task #4) picks them up.
*/
@Injectable()
export class NotificationsPublisher {
private readonly logger = new Logger(NotificationsPublisher.name);
constructor(
private readonly prisma: PrismaService,
private readonly outbox: OutboxService,
) {}
async publishWithin(
tx: PrismaTxLike,
input: PublishNotificationInput,
): Promise<EventEnvelope<NotificationRequestedPayload>> {
const envelope = this.buildEnvelopeFor(input);
await this.outbox.append(tx, envelope, { aggregateId: input.notificationId });
return envelope;
}
async publishStandalone(
input: PublishNotificationInput,
): Promise<EventEnvelope<NotificationRequestedPayload>> {
const envelope = this.buildEnvelopeFor(input);
await this.prisma.$transaction(async (tx) => {
await this.outbox.append(tx, envelope, { aggregateId: input.notificationId });
});
this.logger.debug(
`publishStandalone(notification.requested) notificationId=${input.notificationId}`,
);
return envelope;
}
private buildEnvelopeFor(
input: PublishNotificationInput,
): EventEnvelope<NotificationRequestedPayload> {
const payload: NotificationRequestedPayload = {
notificationId: input.notificationId,
userId: input.userId,
category: input.category,
template: input.template,
params: input.params,
channels: input.channels,
requestedAt: (input.requestedAt ?? new Date()).toISOString(),
...(input.locale !== undefined ? { locale: input.locale } : {}),
...(input.priority !== undefined ? { priority: input.priority } : {}),
...(input.dedupeKey !== undefined ? { dedupeKey: input.dedupeKey } : {}),
};
return buildEnvelope<NotificationRequestedPayload>(
{ producer: PRODUCER },
{ eventType: EVENT_TYPE, payload },
);
}
}

View File

@@ -34,6 +34,9 @@ import { PrismaNotificationPreferenceRepository } from './infrastructure/reposit
import { PrismaNotificationRepository } from './infrastructure/repositories/prisma-notification.repository';
import { EmailService } from './infrastructure/services/email.service';
import { FcmService } from './infrastructure/services/fcm.service';
import { NotificationsConsumer } from './infrastructure/consumers/notifications.consumer';
import { NotificationsAsyncConfig } from './infrastructure/services/notifications-async.config';
import { NotificationsPublisher } from './infrastructure/services/notifications-publisher.service';
import { SmsRateLimiterService } from './infrastructure/services/sms-rate-limiter.service';
import { StringeeSmsService } from './infrastructure/services/stringee-sms.service';
import { TemplateService } from './infrastructure/services/template.service';
@@ -87,6 +90,11 @@ const EventListeners = [
ZaloOaService,
TemplateService,
// RFC-004 Phase 1 — async notifications (GOO-173)
NotificationsAsyncConfig,
NotificationsPublisher,
NotificationsConsumer,
// WebSocket Gateway
NotificationsGateway,
@@ -104,6 +112,8 @@ const EventListeners = [
SMS_NOTIFICATION_CHANNEL,
ZaloOaService,
TemplateService,
NotificationsAsyncConfig,
NotificationsPublisher,
NotificationsGateway,
],
})

View File

@@ -1,6 +1,6 @@
export default function AdminLoading() {
return (
<div className="space-y-6">
<div className="space-y-6" aria-busy="true" aria-label="Đang tải trang quản trị">
{/* Header skeleton */}
<div className="flex items-center justify-between">
<div>

View File

@@ -1,6 +1,6 @@
export default function AuthLoading() {
return (
<div className="rounded-lg border bg-card p-8 shadow-sm">
<div className="rounded-lg border bg-card p-8 shadow-sm" aria-busy="true" aria-label="Đang tải trang đăng nhập">
<div className="space-y-6">
{/* Logo / title skeleton */}
<div className="text-center">

View File

@@ -1,6 +1,6 @@
export default function DashboardLoading() {
return (
<div className="space-y-8">
<div className="space-y-8" aria-busy="true" aria-label="Đang tải bảng điều khiển">
{/* Header skeleton */}
<div className="flex items-center justify-between">
<div>

View File

@@ -1,6 +1,6 @@
export default function SearchLoading() {
return (
<div className="mx-auto max-w-7xl px-4 py-6">
<div className="mx-auto max-w-7xl px-4 py-6" aria-busy="true" aria-label="Đang tải kết quả tìm kiếm">
{/* Header skeleton */}
<div className="mb-6">
<div className="h-8 w-64 animate-pulse rounded bg-muted" />

View File

@@ -1,6 +1,6 @@
export default function RootLoading() {
return (
<div className="flex min-h-screen flex-col bg-background">
<div className="flex min-h-screen flex-col bg-background" aria-busy="true" aria-label="Đang tải trang">
{/* Header skeleton */}
<div className="border-b">
<div className="mx-auto flex h-16 max-w-7xl items-center justify-between px-4">

View File

@@ -10,7 +10,7 @@
--background-surface: 220 14% 96%;
--foreground: 220 20% 12%;
--foreground-muted: 215 12% 45%;
--foreground-dim: 215 12% 60%;
--foreground-dim: 215 14% 45%;
--card: 0 0% 100%;
--card-foreground: 220 20% 12%;
--primary: 142.1 76.2% 36.3%;
@@ -63,7 +63,7 @@
--background-surface: 220 16% 10%;
--foreground: 210 20% 90%;
--foreground-muted: 215 15% 55%;
--foreground-dim: 215 12% 35%;
--foreground-dim: 215 12% 70%;
--card: 220 18% 7%;
--card-foreground: 210 20% 90%;
--primary: 142 72% 42%;
@@ -117,6 +117,18 @@
[data-numeric] {
font-variant-numeric: tabular-nums;
}
/* Consistent focus-visible ring for all interactive elements */
:focus-visible {
outline: 2px solid hsl(var(--ring));
outline-offset: 2px;
border-radius: var(--radius);
}
/* Remove outline for mouse users; keep it for keyboard navigation */
:focus:not(:focus-visible) {
outline: none;
}
}
/* Mapbox popup theming */

View File

@@ -267,7 +267,7 @@ export function AgentProfileClient({
return (
<div className="mx-auto max-w-7xl px-4 py-6 space-y-6">
{/* ── Breadcrumb ── */}
<nav className="flex items-center gap-1.5 text-xs text-foreground-muted">
<nav aria-label="Breadcrumb" className="flex items-center gap-1.5 text-xs text-foreground-muted">
<Link href="/" className="hover:text-foreground transition-colors">Trang chủ</Link>
<span>/</span>
<Link href="/agents" className="hover:text-foreground transition-colors">Môi giới</Link>

View File

@@ -1,8 +1,8 @@
import { render, screen } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import { describe, expect, it, vi } from 'vitest';
import type { InquiryReadDto } from '@/lib/inquiries-api';
import { InquiryDetailDialog } from '../inquiry-detail-dialog';
import type { InquiryReadDto } from '@/lib/inquiries-api';
// Mock the hook
const mockMarkReadMutate = vi.fn();
@@ -81,7 +81,7 @@ describe('InquiryDetailDialog', () => {
render(
<InquiryDetailDialog inquiry={mockInquiry} open={true} onOpenChange={vi.fn()} />,
);
expect(screen.getByText(/0912345678/)).toBeInTheDocument();
expect(screen.getByText(/0912[\s]?345[\s]?678/)).toBeInTheDocument();
});
it('renders inquiry message', () => {
@@ -156,6 +156,6 @@ describe('InquiryDetailDialog', () => {
render(
<InquiryDetailDialog inquiry={inquiryWithPhone} open={true} onOpenChange={vi.fn()} />,
);
expect(screen.getByText(/0987654321/)).toBeInTheDocument();
expect(screen.getByText(/0987[\s]?654[\s]?321/)).toBeInTheDocument();
});
});

View File

@@ -1,8 +1,8 @@
import { render, screen } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import { describe, expect, it, vi } from 'vitest';
import type { LeadReadDto } from '@/lib/leads-api';
import { LeadDetailDialog } from '../lead-detail-dialog';
import type { LeadReadDto } from '@/lib/leads-api';
// Mock hooks
const mockUpdateMutate = vi.fn();
@@ -69,7 +69,7 @@ describe('LeadDetailDialog', () => {
it('renders phone number', () => {
render(<LeadDetailDialog lead={mockLead} open={true} onOpenChange={vi.fn()} />);
expect(screen.getByText(/0987654321/)).toBeInTheDocument();
expect(screen.getByText(/0987[\s]?654[\s]?321/)).toBeInTheDocument();
});
it('renders email when present', () => {

View File

@@ -459,7 +459,7 @@ export function ListingDetailClient({ listing }: ListingDetailClientProps) {
/* pb-28 reserves space for the sticky action bar on mobile */
<div className="mx-auto max-w-6xl px-4 pb-28 pt-4 lg:pb-6">
{/* ── Breadcrumb + header strip ─────────────────────────── */}
<nav className="mb-3 flex items-center gap-1.5 text-xs text-[hsl(var(--foreground-muted))]">
<nav aria-label="Breadcrumb" className="mb-3 flex items-center gap-1.5 text-xs text-[hsl(var(--foreground-muted))]">
<Link href="/" className="hover:text-foreground">Trang chủ</Link>
<span>/</span>
<Link href="/search" className="hover:text-foreground">Tìm kiếm</Link>

View File

@@ -3,6 +3,7 @@ import react from '@vitejs/plugin-react';
import { defineConfig } from 'vitest/config';
export default defineConfig({
root: path.resolve(__dirname, '.'),
plugins: [react()],
test: {
include: ['**/__tests__/**/*.spec.ts', '**/__tests__/**/*.test.ts', '**/__tests__/**/*.spec.tsx', '**/__tests__/**/*.test.tsx'],

View File

@@ -0,0 +1,81 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://goodgo.vn/schemas/events/notification.requested.schema.json",
"title": "notification.requested",
"description": "Emitted when a domain action requests delivery of a user-facing notification. Consumed by the notifications worker pool, which fans out to email / SMS / Zalo OA / FCM / in-app channels per user preferences.",
"type": "object",
"additionalProperties": false,
"required": [
"notificationId",
"userId",
"category",
"template",
"params",
"channels",
"requestedAt"
],
"properties": {
"notificationId": {
"type": "string",
"minLength": 1,
"description": "UUIDv7 of the persisted Notification aggregate (also used as idempotency key by the consumer)."
},
"userId": {
"type": "string",
"minLength": 1
},
"category": {
"type": "string",
"enum": [
"auth",
"kyc",
"listing",
"payment",
"subscription",
"inquiry",
"lead",
"agent",
"residential",
"system"
]
},
"template": {
"type": "string",
"minLength": 1,
"description": "Template key resolved by TemplateService (e.g. 'listing.approved.email.vi')."
},
"params": {
"type": "object",
"description": "Template substitution parameters. Shape depends on template; consumer is responsible for validation against the template manifest.",
"additionalProperties": true
},
"channels": {
"type": "array",
"minItems": 1,
"items": {
"type": "string",
"enum": ["email", "sms", "zalo", "fcm", "in_app"]
},
"description": "Channels requested by the producer. The consumer further filters by user notification preferences before dispatch."
},
"locale": {
"type": "string",
"enum": ["vi", "en"],
"description": "Preferred locale; falls back to user's stored locale, then 'vi'."
},
"priority": {
"type": "string",
"enum": ["low", "normal", "high", "critical"],
"default": "normal"
},
"dedupeKey": {
"type": "string",
"minLength": 1,
"description": "Optional consumer-side dedupe key (collapses duplicate logical notifications inside the BullMQ queue, in addition to envelope-level eventId dedupe)."
},
"requestedAt": {
"type": "string",
"format": "date-time"
}
}
}

View File

@@ -2,6 +2,7 @@ export const KNOWN_EVENT_TYPES = [
'kyc.verified',
'listing.approved',
'payment.completed',
'notification.requested',
] as const;
export type KnownEventType = (typeof KNOWN_EVENT_TYPES)[number];

View File

@@ -35,3 +35,34 @@ export interface KycVerifiedPayload {
verifiedAt: string;
documentRefs: string[];
}
export type NotificationCategory =
| 'auth'
| 'kyc'
| 'listing'
| 'payment'
| 'subscription'
| 'inquiry'
| 'lead'
| 'agent'
| 'residential'
| 'system';
export type NotificationChannel = 'email' | 'sms' | 'zalo' | 'fcm' | 'in_app';
export type NotificationLocale = 'vi' | 'en';
export type NotificationPriority = 'low' | 'normal' | 'high' | 'critical';
export interface NotificationRequestedPayload {
notificationId: string;
userId: string;
category: NotificationCategory;
template: string;
params: Record<string, unknown>;
channels: NotificationChannel[];
locale?: NotificationLocale;
priority?: NotificationPriority;
dedupeKey?: string;
requestedAt: string;
}

35
scripts/pre-commit-tests.sh Executable file
View File

@@ -0,0 +1,35 @@
#!/usr/bin/env bash
set -euo pipefail
# Detect which workspace packages have staged .ts/.tsx files and run tests
# only for those packages via turbo --filter.
STAGED_TS_FILES=$(git diff --cached --name-only --diff-filter=ACMR -- '*.ts' '*.tsx')
if [ -z "$STAGED_TS_FILES" ]; then
exit 0
fi
FILTERS=""
SEEN=""
while IFS= read -r file; do
case "$file" in
apps/api/*) pkg="@goodgo/api" ;;
apps/web/*) pkg="@goodgo/web" ;;
libs/mcp-servers/*) pkg="@goodgo/mcp-servers" ;;
*) continue ;;
esac
if [[ ! " $SEEN " =~ " $pkg " ]]; then
SEEN="$SEEN $pkg"
FILTERS="$FILTERS --filter=$pkg"
fi
done <<< "$STAGED_TS_FILES"
if [ -z "$FILTERS" ]; then
exit 0
fi
echo "Running tests for affected packages:$SEEN"
exec npx turbo run test $FILTERS