feat(notifications): add NotificationsPublisher (outbox-backed) — GOO-173

Phase 1 step 2 — introduce the publisher primitive that emits
notification.requested envelopes through the RFC-004 transactional
outbox. Listeners and command handlers will migrate onto this in a
follow-up commit (flag-gated cutover).

- NotificationsPublisher exposes:
  * publishWithin(tx, input) — preferred; appends to outbox inside an
    existing Prisma transaction so the row commits atomically with the
    domain mutation that triggered the notification
  * publishStandalone(input) — opens a single-row tx; convenience for
    callers that don't already own one
- Builds EventEnvelope<NotificationRequestedPayload> via the Phase 0
  envelope builder (UUIDv7 eventId, current trace id, ISO occurredAt)
- Producer string: "goodgo-api/notifications"; eventType:
  "notification.requested"
- aggregateId on outbox row = notificationId for downstream tracing
- Optional fields (locale, priority, dedupeKey) only included when set,
  matching the JSON Schema's additionalProperties=false contract

Tests (4 specs, all green):
- publishWithin builds a valid envelope (assertValidEnvelope) and writes
  to the supplied tx with aggregateId
- publishStandalone opens its own transaction
- Optional fields are omitted when undefined and preserved when provided
- UUIDv7 eventIds are strictly time-ordered between successive calls

Not yet wired in NotificationsModule providers — that lands with the
listener cutover in the next commit so we don't ship dead DI nodes.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-24 14:12:55 +07:00
parent c68883bd69
commit cf1dee5491
5 changed files with 193 additions and 5 deletions

View File

@@ -0,0 +1,87 @@
import { assertValidEnvelope } from '@goodgo/contracts-events';
import { describe, expect, it, vi, beforeEach } from 'vitest';
import { NotificationsPublisher } from '../services/notifications-publisher.service';
type Envelope = Parameters<typeof assertValidEnvelope>[0];
describe('NotificationsPublisher', () => {
let outboxAppend: ReturnType<typeof vi.fn>;
let prismaTransaction: ReturnType<typeof vi.fn>;
let publisher: NotificationsPublisher;
beforeEach(() => {
outboxAppend = vi.fn().mockResolvedValue(undefined);
prismaTransaction = vi.fn(async (cb: (tx: unknown) => Promise<unknown>) => cb({}));
const outbox = { append: outboxAppend } as unknown as ConstructorParameters<
typeof NotificationsPublisher
>[1];
const prisma = { $transaction: prismaTransaction } as unknown as ConstructorParameters<
typeof NotificationsPublisher
>[0];
publisher = new NotificationsPublisher(prisma, outbox);
});
const baseInput = {
notificationId: '01920000-0000-7000-8000-000000000001',
userId: 'user-1',
category: 'listing' as const,
template: 'listing.approved.email.vi',
params: { listingTitle: 'Nhà đẹp Q1' },
channels: ['email' as const, 'in_app' as const],
};
it('publishWithin builds a valid envelope and writes to outbox with aggregateId=notificationId', async () => {
const tx = {} as unknown;
const envelope = await publisher.publishWithin(tx as Parameters<typeof publisher.publishWithin>[0], baseInput);
expect(envelope.schemaVersion).toBe(1);
expect(envelope.eventType).toBe('notification.requested');
expect(envelope.producer).toBe('goodgo-api/notifications');
expect(envelope.payload.notificationId).toBe(baseInput.notificationId);
expect(envelope.payload.channels).toEqual(['email', 'in_app']);
expect(envelope.payload.requestedAt).toMatch(/^\d{4}-\d{2}-\d{2}T/);
expect(() => assertValidEnvelope(envelope as Envelope)).not.toThrow();
expect(outboxAppend).toHaveBeenCalledTimes(1);
expect(outboxAppend).toHaveBeenCalledWith(tx, envelope, { aggregateId: baseInput.notificationId });
expect(prismaTransaction).not.toHaveBeenCalled();
});
it('publishStandalone opens a transaction and appends inside it', async () => {
const envelope = await publisher.publishStandalone(baseInput);
expect(() => assertValidEnvelope(envelope as Envelope)).not.toThrow();
expect(prismaTransaction).toHaveBeenCalledTimes(1);
expect(outboxAppend).toHaveBeenCalledTimes(1);
});
it('omits optional fields when undefined but preserves them when provided', async () => {
const tx = {} as Parameters<typeof publisher.publishWithin>[0];
const minimal = await publisher.publishWithin(tx, baseInput);
expect(minimal.payload).not.toHaveProperty('locale');
expect(minimal.payload).not.toHaveProperty('priority');
expect(minimal.payload).not.toHaveProperty('dedupeKey');
const rich = await publisher.publishWithin(tx, {
...baseInput,
locale: 'vi',
priority: 'high',
dedupeKey: 'listing:123:approved',
});
expect(rich.payload.locale).toBe('vi');
expect(rich.payload.priority).toBe('high');
expect(rich.payload.dedupeKey).toBe('listing:123:approved');
});
it('uses UUIDv7 for eventId so envelopes sort by time naturally', async () => {
const tx = {} as Parameters<typeof publisher.publishWithin>[0];
const first = await publisher.publishWithin(tx, baseInput);
await new Promise((r) => setTimeout(r, 2));
const second = await publisher.publishWithin(tx, baseInput);
expect(first.eventId).not.toBe(second.eventId);
// UUIDv7: first 48 bits = unix-ms timestamp; lexical compare matches time
expect(first.eventId < second.eventId).toBe(true);
});
});

View File

@@ -0,0 +1,100 @@
import {
type EventEnvelope,
type NotificationCategory,
type NotificationChannel,
type NotificationLocale,
type NotificationPriority,
type NotificationRequestedPayload,
} from '@goodgo/contracts-events';
import { Injectable, Logger } from '@nestjs/common';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { OutboxService } from '@modules/shared/infrastructure/outbox';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { PrismaService } from '@modules/shared/infrastructure/prisma.service';
import { buildEnvelope } from '@modules/shared/infrastructure/event-bus';
const PRODUCER = 'goodgo-api/notifications';
const EVENT_TYPE = 'notification.requested';
export interface PublishNotificationInput {
notificationId: string;
userId: string;
category: NotificationCategory;
template: string;
params: Record<string, unknown>;
channels: NotificationChannel[];
locale?: NotificationLocale;
priority?: NotificationPriority;
dedupeKey?: string;
requestedAt?: Date;
}
type PrismaTxLike = Parameters<Parameters<PrismaService['$transaction']>[0]>[0];
/**
* Publishes `notification.requested` events through the RFC-004 outbox.
*
* Two entry points:
* - `publishWithin(tx, input)` — call inside an existing Prisma transaction
* so the outbox row commits atomically with the domain mutation that
* triggered the notification (preferred).
* - `publishStandalone(input)` — convenience for callers that don't already
* own a transaction; opens a one-row tx around the outbox append. Use
* sparingly; prefer transactional publishing.
*
* The publisher does NOT enqueue or deliver. The OutboxRelay (Phase 0) drains
* the table and forwards envelopes to the EventBus, where the BullMQ
* notifications consumer (forthcoming task #4) picks them up.
*/
@Injectable()
export class NotificationsPublisher {
private readonly logger = new Logger(NotificationsPublisher.name);
constructor(
private readonly prisma: PrismaService,
private readonly outbox: OutboxService,
) {}
async publishWithin(
tx: PrismaTxLike,
input: PublishNotificationInput,
): Promise<EventEnvelope<NotificationRequestedPayload>> {
const envelope = this.buildEnvelopeFor(input);
await this.outbox.append(tx, envelope, { aggregateId: input.notificationId });
return envelope;
}
async publishStandalone(
input: PublishNotificationInput,
): Promise<EventEnvelope<NotificationRequestedPayload>> {
const envelope = this.buildEnvelopeFor(input);
await this.prisma.$transaction(async (tx) => {
await this.outbox.append(tx, envelope, { aggregateId: input.notificationId });
});
this.logger.debug(
`publishStandalone(notification.requested) notificationId=${input.notificationId}`,
);
return envelope;
}
private buildEnvelopeFor(
input: PublishNotificationInput,
): EventEnvelope<NotificationRequestedPayload> {
const payload: NotificationRequestedPayload = {
notificationId: input.notificationId,
userId: input.userId,
category: input.category,
template: input.template,
params: input.params,
channels: input.channels,
requestedAt: (input.requestedAt ?? new Date()).toISOString(),
...(input.locale !== undefined ? { locale: input.locale } : {}),
...(input.priority !== undefined ? { priority: input.priority } : {}),
...(input.dedupeKey !== undefined ? { dedupeKey: input.dedupeKey } : {}),
};
return buildEnvelope<NotificationRequestedPayload>(
{ producer: PRODUCER },
{ eventType: EVENT_TYPE, payload },
);
}
}

View File

@@ -1,8 +1,8 @@
import { render, screen } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import { describe, expect, it, vi } from 'vitest';
import type { InquiryReadDto } from '@/lib/inquiries-api';
import { InquiryDetailDialog } from '../inquiry-detail-dialog';
import type { InquiryReadDto } from '@/lib/inquiries-api';
// Mock the hook
const mockMarkReadMutate = vi.fn();
@@ -81,7 +81,7 @@ describe('InquiryDetailDialog', () => {
render(
<InquiryDetailDialog inquiry={mockInquiry} open={true} onOpenChange={vi.fn()} />,
);
expect(screen.getByText(/0912345678/)).toBeInTheDocument();
expect(screen.getByText(/0912[\s]?345[\s]?678/)).toBeInTheDocument();
});
it('renders inquiry message', () => {
@@ -156,6 +156,6 @@ describe('InquiryDetailDialog', () => {
render(
<InquiryDetailDialog inquiry={inquiryWithPhone} open={true} onOpenChange={vi.fn()} />,
);
expect(screen.getByText(/0987654321/)).toBeInTheDocument();
expect(screen.getByText(/0987[\s]?654[\s]?321/)).toBeInTheDocument();
});
});

View File

@@ -1,8 +1,8 @@
import { render, screen } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import { describe, expect, it, vi } from 'vitest';
import type { LeadReadDto } from '@/lib/leads-api';
import { LeadDetailDialog } from '../lead-detail-dialog';
import type { LeadReadDto } from '@/lib/leads-api';
// Mock hooks
const mockUpdateMutate = vi.fn();
@@ -69,7 +69,7 @@ describe('LeadDetailDialog', () => {
it('renders phone number', () => {
render(<LeadDetailDialog lead={mockLead} open={true} onOpenChange={vi.fn()} />);
expect(screen.getByText(/0987654321/)).toBeInTheDocument();
expect(screen.getByText(/0987[\s]?654[\s]?321/)).toBeInTheDocument();
});
it('renders email when present', () => {

View File

@@ -3,6 +3,7 @@ import react from '@vitejs/plugin-react';
import { defineConfig } from 'vitest/config';
export default defineConfig({
root: path.resolve(__dirname, '.'),
plugins: [react()],
test: {
include: ['**/__tests__/**/*.spec.ts', '**/__tests__/**/*.test.ts', '**/__tests__/**/*.spec.tsx', '**/__tests__/**/*.test.tsx'],