diff --git a/apps/api/docs/observability/README.md b/apps/api/docs/observability/README.md new file mode 100644 index 0000000..aa4afbe --- /dev/null +++ b/apps/api/docs/observability/README.md @@ -0,0 +1,50 @@ +# Observability — Read-Model / Projector (RFC-003 Phase 0) + +Grafana dashboards and wiring notes for the read-model observability stack +introduced in [GOO-192](/GOO/issues/GOO-192) under [GOO-94](/GOO/issues/GOO-94) §6 Phase 0. + +## Metrics + +All metrics live in the existing NestJS `metrics/` module +(`apps/api/src/modules/metrics/`) and are scraped via the standard `/metrics` +endpoint. + +| Metric | Type | Labels | Purpose | +| --------------------------------------- | --------- | --------- | --------------------------------------------------------- | +| `read_model_projector_lag_seconds` | Gauge | `handler` | Seconds between latest source event and projector cursor. | +| `read_model_refresh_duration_seconds` | Histogram | `view` | Duration of read-model / materialised view refreshes. | +| `read_model_reconciliation_drift_total` | Counter | `model` | Count of drift discrepancies found during reconciliation. | + +### Emit points + +Inject `MetricsService` and call: + +```ts +metrics.setProjectorLag(handler, lagSeconds); +metrics.recordReadModelRefresh(view, durationSeconds); +metrics.recordReconciliationDrift(model, count?); +``` + +## Dashboard + +- File: `read-models-dashboard.json` (Grafana schema v38). +- Import into Grafana (`Dashboards → Import → Upload JSON`), pick the Prometheus + data source. +- Variables: `handler`, `view`, `model` — derived from Prometheus label values. +- Panels: + 1. Projector lag by handler (time series + thresholded) + 2. Max projector lag (stat, RAG 30s / 120s) + 3. Refresh duration p50/p95 by view + 4. Refresh throughput (refreshes/sec) by view + 5. Reconciliation drift rate by model (15m rate) + 6. Total drift events in last 24h (stat, RAG 1 / 10) + +## Local verification + +```bash +pnpm --filter @goodgo/api dev +curl -s http://localhost:3001/metrics | grep read_model_ +``` + +All three metric families should appear with `# HELP` / `# TYPE` headers even +before any samples are recorded. diff --git a/apps/api/docs/observability/read-models-dashboard.json b/apps/api/docs/observability/read-models-dashboard.json new file mode 100644 index 0000000..1049d19 --- /dev/null +++ b/apps/api/docs/observability/read-models-dashboard.json @@ -0,0 +1,77 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "graphTooltip": 1, + "id": null, + "uid": "goodgo-read-models", + "title": "GoodGo · Read-Model Observability (RFC-003 Phase 0)", + "tags": ["goodgo", "rfc-003", "read-models", "observability"], + "timezone": "browser", + "schemaVersion": 38, + "version": 1, + "refresh": "30s", + "time": { "from": "now-6h", "to": "now" }, + "templating": { + "list": [ + { "name": "datasource", "type": "datasource", "query": "prometheus", "current": { "text": "Prometheus", "value": "Prometheus" } }, + { "name": "handler", "type": "query", "datasource": "${datasource}", "query": "label_values(read_model_projector_lag_seconds, handler)", "includeAll": true, "multi": true, "refresh": 2 }, + { "name": "view", "type": "query", "datasource": "${datasource}", "query": "label_values(read_model_refresh_duration_seconds_bucket, view)", "includeAll": true, "multi": true, "refresh": 2 }, + { "name": "model", "type": "query", "datasource": "${datasource}", "query": "label_values(read_model_reconciliation_drift_total, model)", "includeAll": true, "multi": true, "refresh": 2 } + ] + }, + "panels": [ + { + "id": 1, "type": "timeseries", "title": "Projector lag (seconds) — by handler", + "datasource": "${datasource}", "gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 }, + "fieldConfig": { "defaults": { "unit": "s", "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }, { "color": "yellow", "value": 30 }, { "color": "red", "value": 120 }] } } }, + "targets": [{ "expr": "read_model_projector_lag_seconds{handler=~\"$handler\"}", "legendFormat": "{{handler}}", "refId": "A" }] + }, + { + "id": 2, "type": "stat", "title": "Max projector lag (current)", + "datasource": "${datasource}", "gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 }, + "fieldConfig": { "defaults": { "unit": "s", "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }, { "color": "yellow", "value": 30 }, { "color": "red", "value": 120 }] } } }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] } }, + "targets": [{ "expr": "max(read_model_projector_lag_seconds{handler=~\"$handler\"})", "refId": "A" }] + }, + { + "id": 3, "type": "timeseries", "title": "Refresh duration p50/p95 — by view", + "datasource": "${datasource}", "gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }, + "fieldConfig": { "defaults": { "unit": "s" } }, + "targets": [ + { "expr": "histogram_quantile(0.95, sum by (view, le) (rate(read_model_refresh_duration_seconds_bucket{view=~\"$view\"}[5m])))", "legendFormat": "p95 · {{view}}", "refId": "A" }, + { "expr": "histogram_quantile(0.50, sum by (view, le) (rate(read_model_refresh_duration_seconds_bucket{view=~\"$view\"}[5m])))", "legendFormat": "p50 · {{view}}", "refId": "B" } + ] + }, + { + "id": 4, "type": "timeseries", "title": "Refresh throughput (refreshes/sec) — by view", + "datasource": "${datasource}", "gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }, + "fieldConfig": { "defaults": { "unit": "ops" } }, + "targets": [{ "expr": "sum by (view) (rate(read_model_refresh_duration_seconds_count{view=~\"$view\"}[5m]))", "legendFormat": "{{view}}", "refId": "A" }] + }, + { + "id": 5, "type": "timeseries", "title": "Reconciliation drift rate — by model", + "datasource": "${datasource}", "gridPos": { "h": 8, "w": 12, "x": 0, "y": 16 }, + "fieldConfig": { "defaults": { "unit": "ops" } }, + "targets": [{ "expr": "sum by (model) (rate(read_model_reconciliation_drift_total{model=~\"$model\"}[15m]))", "legendFormat": "{{model}}", "refId": "A" }] + }, + { + "id": 6, "type": "stat", "title": "Total drift events (last 24h)", + "datasource": "${datasource}", "gridPos": { "h": 8, "w": 12, "x": 12, "y": 16 }, + "fieldConfig": { "defaults": { "unit": "short", "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": null }, { "color": "yellow", "value": 1 }, { "color": "red", "value": 10 }] } } }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] } }, + "targets": [{ "expr": "sum by (model) (increase(read_model_reconciliation_drift_total{model=~\"$model\"}[24h]))", "legendFormat": "{{model}}", "refId": "A" }] + } + ] +} diff --git a/apps/api/package.json b/apps/api/package.json index b644ef4..9839f9e 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -17,6 +17,7 @@ "@aws-sdk/client-s3": "^3.1026.0", "@aws-sdk/s3-request-presigner": "^3.1026.0", "@goodgo/mcp-servers": "workspace:*", + "@goodgo/contracts-events": "workspace:*", "@nest-lab/throttler-storage-redis": "^1.2.0", "@nestjs/bullmq": "^11.0.4", "@nestjs/common": "^11.0.0", diff --git a/apps/api/src/modules/auth/application/__tests__/export-user-data.handler.spec.ts b/apps/api/src/modules/auth/application/__tests__/export-user-data.handler.spec.ts index 3a53042..60754cd 100644 --- a/apps/api/src/modules/auth/application/__tests__/export-user-data.handler.spec.ts +++ b/apps/api/src/modules/auth/application/__tests__/export-user-data.handler.spec.ts @@ -1,7 +1,16 @@ +import { PayloadTooLargeException } from '@nestjs/common'; import { NotFoundException } from '@modules/shared'; import { ExportUserDataCommand } from '../commands/export-user-data/export-user-data.command'; import { ExportUserDataHandler } from '../commands/export-user-data/export-user-data.handler'; +async function readStream(stream: NodeJS.ReadableStream): Promise { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string)); + } + return Buffer.concat(chunks).toString('utf8'); +} + describe('ExportUserDataHandler', () => { let handler: ExportUserDataHandler; @@ -17,7 +26,13 @@ describe('ExportUserDataHandler', () => { transaction: { findMany: vi.fn() }, }; - const mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn(), verbose: vi.fn() }; + const mockLogger = { + log: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + verbose: vi.fn(), + }; const sampleUser = { id: 'user-1', @@ -29,12 +44,25 @@ describe('ExportUserDataHandler', () => { createdAt: new Date('2025-01-01'), }; + function setupEmptyRelations() { + mockPrisma.agent.findUnique.mockResolvedValue(null); + mockPrisma.listing.findMany.mockResolvedValue([]); + mockPrisma.payment.findMany.mockResolvedValue([]); + mockPrisma.subscription.findFirst.mockResolvedValue(null); + mockPrisma.review.findMany.mockResolvedValue([]); + mockPrisma.inquiry.findMany.mockResolvedValue([]); + mockPrisma.savedSearch.findMany.mockResolvedValue([]); + mockPrisma.transaction.findMany.mockResolvedValue([]); + } + beforeEach(() => { vi.clearAllMocks(); + delete process.env['EXPORT_ROW_CAP']; + delete process.env['EXPORT_SIZE_CAP_MB']; handler = new ExportUserDataHandler(mockPrisma as any, mockLogger as any); }); - it('exports all user data including relations', async () => { + it('exports all user data including relations and returns a stream', async () => { mockPrisma.user.findUnique.mockResolvedValue(sampleUser); mockPrisma.agent.findUnique.mockResolvedValue({ id: 'agent-1', userId: 'user-1' }); mockPrisma.listing.findMany.mockResolvedValue([{ id: 'listing-1' }]); @@ -46,43 +74,77 @@ describe('ExportUserDataHandler', () => { mockPrisma.transaction.findMany.mockResolvedValue([{ id: 'tx-1' }]); const result = await handler.execute(new ExportUserDataCommand('user-1')); + const json = await readStream(result.stream); + const parsed = JSON.parse(json); - expect(result.user).toEqual(sampleUser); - expect(result.agent).toEqual({ id: 'agent-1', userId: 'user-1' }); - expect(result.listings).toHaveLength(1); - expect(result.payments).toHaveLength(1); - expect(result.subscription).toEqual({ id: 'sub-1', status: 'ACTIVE' }); - expect(result.reviews).toHaveLength(1); - expect(result.inquiries).toHaveLength(1); - expect(result.savedSearches).toHaveLength(1); - expect(result.transactions).toHaveLength(1); + expect(parsed.user).toMatchObject({ id: 'user-1' }); + expect(parsed.agent).toEqual({ id: 'agent-1', userId: 'user-1' }); + expect(parsed.listings).toHaveLength(1); + expect(parsed.payments).toHaveLength(1); + expect(parsed.subscription).toEqual({ id: 'sub-1', status: 'ACTIVE' }); + expect(parsed.reviews).toHaveLength(1); + expect(parsed.inquiries).toHaveLength(1); + expect(parsed.savedSearches).toHaveLength(1); + expect(parsed.transactions).toHaveLength(1); + expect(result.truncated).toBe(false); }); it('throws NotFoundException if user not found', async () => { mockPrisma.user.findUnique.mockResolvedValue(null); - await expect( - handler.execute(new ExportUserDataCommand('missing')), - ).rejects.toThrow(NotFoundException); + await expect(handler.execute(new ExportUserDataCommand('missing'))).rejects.toThrow( + NotFoundException, + ); }); - it('includes exportedAt timestamp', async () => { + it('includes exportedAt timestamp and cap metadata in the payload', async () => { mockPrisma.user.findUnique.mockResolvedValue(sampleUser); - mockPrisma.agent.findUnique.mockResolvedValue(null); - mockPrisma.listing.findMany.mockResolvedValue([]); - mockPrisma.payment.findMany.mockResolvedValue([]); - mockPrisma.subscription.findFirst.mockResolvedValue(null); - mockPrisma.review.findMany.mockResolvedValue([]); - mockPrisma.inquiry.findMany.mockResolvedValue([]); - mockPrisma.savedSearch.findMany.mockResolvedValue([]); - mockPrisma.transaction.findMany.mockResolvedValue([]); + setupEmptyRelations(); const before = new Date().toISOString(); const result = await handler.execute(new ExportUserDataCommand('user-1')); const after = new Date().toISOString(); + const parsed = JSON.parse(await readStream(result.stream)); - expect(result.exportedAt).toBeDefined(); - expect(result.exportedAt >= before).toBe(true); - expect(result.exportedAt <= after).toBe(true); + expect(parsed.exportedAt).toBeDefined(); + expect(parsed.exportedAt >= before).toBe(true); + expect(parsed.exportedAt <= after).toBe(true); + expect(typeof parsed.rowCap).toBe('number'); + expect(typeof parsed.sizeCap).toBe('number'); + }); + + it('applies row cap to each collection query', async () => { + process.env['EXPORT_ROW_CAP'] = '5'; + handler = new ExportUserDataHandler(mockPrisma as any, mockLogger as any); + + mockPrisma.user.findUnique.mockResolvedValue(sampleUser); + setupEmptyRelations(); + + await handler.execute(new ExportUserDataCommand('user-1')); + + for (const method of [ + mockPrisma.listing.findMany, + mockPrisma.payment.findMany, + mockPrisma.review.findMany, + mockPrisma.inquiry.findMany, + mockPrisma.savedSearch.findMany, + mockPrisma.transaction.findMany, + ]) { + expect(method).toHaveBeenCalledWith(expect.objectContaining({ take: 5 })); + } + }); + + it('throws PayloadTooLargeException when JSON exceeds the size cap', async () => { + process.env['EXPORT_SIZE_CAP_MB'] = '0.000001'; + handler = new ExportUserDataHandler(mockPrisma as any, mockLogger as any); + + mockPrisma.user.findUnique.mockResolvedValue(sampleUser); + setupEmptyRelations(); + + await expect(handler.execute(new ExportUserDataCommand('user-1'))).rejects.toThrow( + PayloadTooLargeException, + ); + + expect(mockLogger.warn).toHaveBeenCalled(); }); }); diff --git a/apps/api/src/modules/auth/application/commands/export-user-data/export-user-data.handler.ts b/apps/api/src/modules/auth/application/commands/export-user-data/export-user-data.handler.ts index c178822..98257f8 100644 --- a/apps/api/src/modules/auth/application/commands/export-user-data/export-user-data.handler.ts +++ b/apps/api/src/modules/auth/application/commands/export-user-data/export-user-data.handler.ts @@ -1,8 +1,14 @@ -import { InternalServerErrorException } from '@nestjs/common'; +import { HttpException, InternalServerErrorException, PayloadTooLargeException } from '@nestjs/common'; import { CommandHandler, type ICommandHandler } from '@nestjs/cqrs'; +import { Readable } from 'node:stream'; import { LoggerService, PrismaService, DomainException, NotFoundException } from '@modules/shared'; import { ExportUserDataCommand } from './export-user-data.command'; +/** Per-collection row cap. Override via EXPORT_ROW_CAP env var (default 10 000). */ +const DEFAULT_ROW_CAP = 10_000; +/** Maximum total export size in megabytes. Override via EXPORT_SIZE_CAP_MB env var (default 100). */ +const DEFAULT_SIZE_CAP_MB = 100; + export interface UserDataExport { user: { id: string; @@ -22,16 +28,34 @@ export interface UserDataExport { savedSearches: unknown[]; transactions: unknown[]; exportedAt: string; + /** Effective row cap applied to each collection query. */ + rowCap: number; + /** Effective size cap in bytes for the entire JSON payload. */ + sizeCap: number; +} + +export interface ExportUserDataResult { + /** Node.js Readable stream containing the UTF-8 encoded JSON payload. */ + stream: Readable; + /** True when a row or size cap was reached and the export may be incomplete. */ + truncated: boolean; } @CommandHandler(ExportUserDataCommand) export class ExportUserDataHandler implements ICommandHandler { + private readonly rowCap: number; + private readonly sizeCapBytes: number; + constructor( private readonly prisma: PrismaService, private readonly logger: LoggerService, - ) {} + ) { + this.rowCap = parseInt(process.env['EXPORT_ROW_CAP'] ?? String(DEFAULT_ROW_CAP), 10); + const sizeMb = parseFloat(process.env['EXPORT_SIZE_CAP_MB'] ?? String(DEFAULT_SIZE_CAP_MB)); + this.sizeCapBytes = Math.floor(sizeMb * 1024 * 1024); + } - async execute(command: ExportUserDataCommand): Promise { + async execute(command: ExportUserDataCommand): Promise { try { const user = await this.prisma.user.findUnique({ where: { id: command.userId }, @@ -43,27 +67,29 @@ export class ExportUserDataHandler implements ICommandHandler this.sizeCapBytes) { + this.logger.warn( + `Export for user ${command.userId} is ${byteLength} bytes, exceeds cap of ${this.sizeCapBytes} bytes`, + this.constructor.name, + ); + throw new PayloadTooLargeException( + `Dữ liệu xuất (${Math.round(byteLength / 1024 / 1024)} MB) vượt giới hạn ` + + `${Math.round(this.sizeCapBytes / 1024 / 1024)} MB. ` + + `Vui lòng liên hệ hỗ trợ để xuất theo từng phần.`, + ); + } + + this.logger.log( + `User data exported for ${command.userId} (${byteLength} bytes, rowCap=${rowCap})`, + 'ExportUserDataHandler', + ); + + const stream = Readable.from(Buffer.from(json, 'utf8')); + return { stream, truncated: false }; } catch (error) { - if (error instanceof DomainException) throw error; + if (error instanceof DomainException || error instanceof HttpException) throw error; this.logger.error( `Failed to export user data: ${error instanceof Error ? error.message : error}`, error instanceof Error ? error.stack : undefined, diff --git a/apps/api/src/modules/auth/presentation/controllers/user-data.controller.ts b/apps/api/src/modules/auth/presentation/controllers/user-data.controller.ts index 1cb24ac..514f8e0 100644 --- a/apps/api/src/modules/auth/presentation/controllers/user-data.controller.ts +++ b/apps/api/src/modules/auth/presentation/controllers/user-data.controller.ts @@ -5,13 +5,16 @@ import { Get, Param, Post, + Res, + StreamableFile, UseGuards, } from '@nestjs/common'; import { CommandBus } from '@nestjs/cqrs'; -import { ApiTags, ApiOperation, ApiResponse, ApiBearerAuth } from '@nestjs/swagger'; +import { ApiTags, ApiOperation, ApiResponse, ApiBearerAuth, ApiProduces } from '@nestjs/swagger'; +import { Response } from 'express'; import { CancelUserDeletionCommand } from '../../application/commands/cancel-user-deletion/cancel-user-deletion.command'; import { ExportUserDataCommand } from '../../application/commands/export-user-data/export-user-data.command'; -import { type UserDataExport } from '../../application/commands/export-user-data/export-user-data.handler'; +import { type ExportUserDataResult } from '../../application/commands/export-user-data/export-user-data.handler'; import { ForceDeleteUserCommand } from '../../application/commands/force-delete-user/force-delete-user.command'; import { RequestUserDeletionCommand } from '../../application/commands/request-user-deletion/request-user-deletion.command'; import { type JwtPayload } from '../../infrastructure/services/token.service'; @@ -58,13 +61,33 @@ export class UserDataController { @Get('me/export') @UseGuards(JwtAuthGuard) @ApiBearerAuth('JWT') - @ApiOperation({ summary: 'Export user data (GDPR Article 20)' }) - @ApiResponse({ status: 200, description: 'User data exported as JSON' }) + @ApiProduces('application/json') + @ApiOperation({ + summary: 'Export user data (GDPR Article 20)', + description: + 'Streams the full user data export as JSON. ' + + 'Row cap (per collection) defaults to 10 000 rows; size cap defaults to 100 MB. ' + + 'Both are configurable via EXPORT_ROW_CAP and EXPORT_SIZE_CAP_MB env vars.', + }) + @ApiResponse({ status: 200, description: 'User data exported as streaming JSON' }) @ApiResponse({ status: 401, description: 'Unauthorized' }) + @ApiResponse({ + status: 413, + description: 'Export exceeds size cap — contact support for chunked export', + }) async exportData( @CurrentUser() user: JwtPayload, - ): Promise { - return this.commandBus.execute(new ExportUserDataCommand(user.sub)); + @Res({ passthrough: true }) res: Response, + ): Promise { + const result: ExportUserDataResult = await this.commandBus.execute( + new ExportUserDataCommand(user.sub), + ); + res.setHeader('Content-Type', 'application/json'); + res.setHeader( + 'Content-Disposition', + `attachment; filename="user-data-${user.sub}.json"`, + ); + return new StreamableFile(result.stream); } @Delete(':id/force') diff --git a/apps/api/src/modules/metrics/infrastructure/metrics.service.ts b/apps/api/src/modules/metrics/infrastructure/metrics.service.ts index 93a440e..28d6223 100644 --- a/apps/api/src/modules/metrics/infrastructure/metrics.service.ts +++ b/apps/api/src/modules/metrics/infrastructure/metrics.service.ts @@ -10,6 +10,9 @@ import { HTTP_REQUESTS_TOTAL, GOODGO_WS_CONNECTED_CLIENTS, GOODGO_WS_MESSAGES_TOTAL, + READ_MODEL_PROJECTOR_LAG_SECONDS, + READ_MODEL_REFRESH_DURATION_SECONDS, + READ_MODEL_RECONCILIATION_DRIFT_TOTAL, WEB_VITALS_LCP, WEB_VITALS_FCP, WEB_VITALS_CLS, @@ -37,6 +40,12 @@ export class MetricsService { private readonly wsConnectedClientsGauge: Gauge, @InjectMetric(GOODGO_WS_MESSAGES_TOTAL) private readonly wsMessagesCounter: Counter, + @InjectMetric(READ_MODEL_PROJECTOR_LAG_SECONDS) + private readonly projectorLagGauge: Gauge, + @InjectMetric(READ_MODEL_REFRESH_DURATION_SECONDS) + private readonly readModelRefreshHistogram: Histogram, + @InjectMetric(READ_MODEL_RECONCILIATION_DRIFT_TOTAL) + private readonly reconciliationDriftCounter: Counter, @InjectMetric(WEB_VITALS_LCP) private readonly lcpHistogram: Histogram, @InjectMetric(WEB_VITALS_FCP) @@ -106,6 +115,21 @@ export class MetricsService { this.wsMessagesCounter.inc({ namespace, event, direction }); } + /** Set current projector lag (seconds behind source stream) for a handler. */ + setProjectorLag(handler: string, lagSeconds: number): void { + this.projectorLagGauge.set({ handler }, lagSeconds); + } + + /** Record the duration of a read-model view refresh. */ + recordReadModelRefresh(view: string, durationSeconds: number): void { + this.readModelRefreshHistogram.observe({ view }, durationSeconds); + } + + /** Increment the reconciliation drift counter for a read model. */ + recordReconciliationDrift(model: string, count = 1): void { + this.reconciliationDriftCounter.inc({ model }, count); + } + /** Map metric name → the correct histogram. */ private readonly vitalHistograms: Record = {}; diff --git a/apps/api/src/modules/shared/infrastructure/event-bus/__tests__/contracts.spec.ts b/apps/api/src/modules/shared/infrastructure/event-bus/__tests__/contracts.spec.ts new file mode 100644 index 0000000..36d0e84 --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/event-bus/__tests__/contracts.spec.ts @@ -0,0 +1,98 @@ +import { + EVENT_ENVELOPE_SCHEMA_VERSION, + assertValidEnvelope, + isKnownEventType, + isUuidV7, + uuidv7, + validateEnvelope, + type EventEnvelope, +} from '@goodgo/contracts-events'; +import { describe, it, expect } from 'vitest'; + +describe('@goodgo/contracts-events', () => { + describe('uuidv7', () => { + it('produces a RFC 9562 v7 UUID', () => { + const id = uuidv7(); + expect(isUuidV7(id)).toBe(true); + }); + + it('encodes the provided timestamp in the high bits', () => { + const now = 1_714_000_000_000; // stable, post-2024 + const id = uuidv7(now); + // First 8 hex chars = high 32 bits of ms timestamp + const hex = id.replace(/-/g, '').slice(0, 12); + const ts = parseInt(hex, 16); + expect(ts).toBe(now); + }); + + it('generates monotonic-ish ids across rapid calls', () => { + const a = uuidv7(); + const b = uuidv7(); + // v7 starts with the timestamp, so same-ms pairs compare by random bits; + // at worst they're equal-prefix — both must still be valid. + expect(isUuidV7(a)).toBe(true); + expect(isUuidV7(b)).toBe(true); + }); + }); + + describe('validateEnvelope', () => { + const base: EventEnvelope = { + schemaVersion: EVENT_ENVELOPE_SCHEMA_VERSION, + eventId: uuidv7(), + eventType: 'payment.completed', + occurredAt: '2026-04-23T14:00:00.000Z', + producer: 'api', + traceId: 'a'.repeat(32), + payload: {}, + }; + + it('accepts a valid envelope', () => { + expect(validateEnvelope(base)).toEqual([]); + }); + + it('rejects a non-v7 eventId', () => { + const issues = validateEnvelope({ ...base, eventId: 'not-a-uuid' }); + expect(issues.map((i) => i.path)).toContain('eventId'); + }); + + it('rejects an invalid eventType', () => { + const issues = validateEnvelope({ ...base, eventType: 'PaymentCompleted' }); + expect(issues.map((i) => i.path)).toContain('eventType'); + }); + + it('rejects a trace id that is not 32 hex chars', () => { + const issues = validateEnvelope({ ...base, traceId: 'short' }); + expect(issues.map((i) => i.path)).toContain('traceId'); + }); + + it('rejects schemaVersion drift', () => { + const issues = validateEnvelope({ ...base, schemaVersion: 99 }); + expect(issues.map((i) => i.path)).toContain('schemaVersion'); + }); + + it('rejects missing payload', () => { + const { payload: _drop, ...rest } = base; + void _drop; + const issues = validateEnvelope(rest as unknown); + expect(issues.map((i) => i.path)).toContain('payload'); + }); + }); + + describe('assertValidEnvelope', () => { + it('throws with a flat message on invalid input', () => { + expect(() => assertValidEnvelope({ schemaVersion: 1 })).toThrow(/Invalid EventEnvelope/); + }); + }); + + describe('isKnownEventType', () => { + it('recognises the first 3 schemas', () => { + expect(isKnownEventType('payment.completed')).toBe(true); + expect(isKnownEventType('listing.approved')).toBe(true); + expect(isKnownEventType('kyc.verified')).toBe(true); + }); + + it('rejects unknown event types', () => { + expect(isKnownEventType('payment.refunded')).toBe(false); + }); + }); +}); diff --git a/apps/api/src/modules/shared/infrastructure/event-bus/__tests__/in-memory.event-bus.spec.ts b/apps/api/src/modules/shared/infrastructure/event-bus/__tests__/in-memory.event-bus.spec.ts new file mode 100644 index 0000000..ca94cd3 --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/event-bus/__tests__/in-memory.event-bus.spec.ts @@ -0,0 +1,82 @@ +import { uuidv7, type EventEnvelope } from '@goodgo/contracts-events'; +import { describe, it, expect, beforeEach } from 'vitest'; +import { buildEnvelope } from '../envelope-builder'; +import { InMemoryEventBus } from '../in-memory.event-bus'; + +describe('InMemoryEventBus', () => { + let bus: InMemoryEventBus; + beforeEach(() => { + bus = new InMemoryEventBus(); + }); + + function env(type: string, payload: unknown = {}): EventEnvelope { + return buildEnvelope({ producer: 'api' }, { eventType: type, payload }); + } + + it('records published envelopes and returns a transport id', async () => { + const result = await bus.publish(env('payment.completed', { paymentId: 'p1' })); + expect(result.eventId).toMatch(/^[0-9a-f-]{36}$/); + expect(result.stream).toBe('events:payment.completed'); + expect(result.transportId).toMatch(/-\d+$/); + expect(bus.all()).toHaveLength(1); + }); + + it('filters by event type', async () => { + await bus.publishAll([ + env('payment.completed'), + env('listing.approved'), + env('payment.completed'), + ]); + expect(bus.byType('payment.completed')).toHaveLength(2); + expect(bus.byType('listing.approved')).toHaveLength(1); + expect(bus.byType('kyc.verified')).toHaveLength(0); + }); + + it('rejects malformed envelopes', async () => { + const bad = { + schemaVersion: 1, + eventId: 'not-a-v7', + eventType: 'payment.completed', + occurredAt: '2026-04-23T00:00:00Z', + producer: 'api', + traceId: 'a'.repeat(32), + payload: {}, + }; + await expect(bus.publish(bad as unknown as EventEnvelope)).rejects.toThrow( + /Invalid EventEnvelope/, + ); + }); + + it('reset clears state', async () => { + await bus.publish(env('kyc.verified')); + expect(bus.all()).toHaveLength(1); + bus.reset(); + expect(bus.all()).toHaveLength(0); + }); + + it('buildEnvelope defaults traceId to 32 zeros when tracing is off', () => { + const e = buildEnvelope({ producer: 'api' }, { eventType: 'kyc.verified', payload: {} }); + expect(e.traceId).toBe('0'.repeat(32)); + expect(e.producer).toBe('api'); + expect(e.schemaVersion).toBe(1); + }); + + it('buildEnvelope honours explicit overrides (replay path)', () => { + const id = uuidv7(); + const e = buildEnvelope( + { producer: 'api' }, + { + eventType: 'listing.approved', + payload: {}, + eventId: id, + traceId: 'b'.repeat(32), + occurredAt: new Date('2026-01-01T00:00:00Z'), + producer: 'replay-cli', + }, + ); + expect(e.eventId).toBe(id); + expect(e.traceId).toBe('b'.repeat(32)); + expect(e.producer).toBe('replay-cli'); + expect(e.occurredAt).toBe('2026-01-01T00:00:00.000Z'); + }); +}); diff --git a/apps/api/src/modules/shared/infrastructure/event-bus/envelope-builder.ts b/apps/api/src/modules/shared/infrastructure/event-bus/envelope-builder.ts new file mode 100644 index 0000000..e241ecd --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/event-bus/envelope-builder.ts @@ -0,0 +1,55 @@ +import { uuidv7, type EventEnvelope } from '@goodgo/contracts-events'; + +const ZERO_TRACE_ID = '0'.repeat(32); + +/** + * Returns the active trace id, or 32 zeros when none is propagated. + * + * The codebase does not yet depend on `@opentelemetry/api` (Sentry handles + * traces today). To honor the CTO condition that every envelope carries a + * `traceId` from Phase 0, we expose this hook as the integration point. + */ +export function currentTraceId(): string { + try { + const sentryGlobal = (globalThis as Record)['__SENTRY__']; + if (sentryGlobal && typeof sentryGlobal === 'object') { + const hub = (sentryGlobal as { hub?: { getScope?: () => { getSpan?: () => { traceId?: string } | undefined } } }).hub; + const traceId = hub?.getScope?.()?.getSpan?.()?.traceId; + if (typeof traceId === 'string' && /^[0-9a-f]{32}$/i.test(traceId)) { + return traceId; + } + } + } catch { + // Defensive: never let trace lookup fail event publishing. + } + return ZERO_TRACE_ID; +} + +export interface BuildEnvelopeInput { + eventType: string; + payload: TPayload; + producer?: string; + occurredAt?: Date; + traceId?: string; + eventId?: string; +} + +/** + * Pure helper — builds an `EventEnvelope` with sensible defaults + * (UUIDv7 eventId, current trace id, ISO occurredAt). Kept outside + * the EventBus to make unit testing trivial. + */ +export function buildEnvelope( + defaults: { producer: string }, + input: BuildEnvelopeInput, +): EventEnvelope { + return { + schemaVersion: 1, + eventId: input.eventId ?? uuidv7(), + eventType: input.eventType, + occurredAt: (input.occurredAt ?? new Date()).toISOString(), + producer: input.producer ?? defaults.producer, + traceId: input.traceId ?? currentTraceId(), + payload: input.payload, + }; +} diff --git a/apps/api/src/modules/shared/infrastructure/event-bus/event-bus.interface.ts b/apps/api/src/modules/shared/infrastructure/event-bus/event-bus.interface.ts new file mode 100644 index 0000000..0e3e824 --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/event-bus/event-bus.interface.ts @@ -0,0 +1,14 @@ +import type { EventEnvelope } from '@goodgo/contracts-events'; + +export interface EventBus { + publish(envelope: EventEnvelope): Promise; + publishAll(envelopes: EventEnvelope[]): Promise; +} + +export interface PublishResult { + eventId: string; + transportId: string; + stream: string; +} + +export const EVENT_BUS = Symbol('EventBus'); diff --git a/apps/api/src/modules/shared/infrastructure/event-bus/in-memory.event-bus.ts b/apps/api/src/modules/shared/infrastructure/event-bus/in-memory.event-bus.ts new file mode 100644 index 0000000..dc22c91 --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/event-bus/in-memory.event-bus.ts @@ -0,0 +1,51 @@ +import { type EventEnvelope, assertValidEnvelope } from '@goodgo/contracts-events'; +import { Injectable } from '@nestjs/common'; +import type { EventBus, PublishResult } from './event-bus.interface'; +import { streamFor } from './redis-streams.event-bus'; + +/** + * Test/dev double for the EventBus. Records every published envelope + * and exposes lookup helpers. Used by: + * - unit tests in this module + * - Phase 1 dual-publish diff harness + */ +@Injectable() +export class InMemoryEventBus implements EventBus { + private readonly published: { stream: string; envelope: EventEnvelope }[] = []; + private sequence = 0; + + async publish(envelope: EventEnvelope): Promise { + assertValidEnvelope(envelope); + const stream = streamFor(envelope.eventType); + this.published.push({ stream, envelope: envelope as EventEnvelope }); + this.sequence += 1; + return { + eventId: envelope.eventId, + transportId: `${Date.now()}-${this.sequence}`, + stream, + }; + } + + async publishAll(envelopes: EventEnvelope[]): Promise { + const out: PublishResult[] = []; + for (const env of envelopes) { + out.push(await this.publish(env)); + } + return out; + } + + all(): readonly EventEnvelope[] { + return this.published.map((p) => p.envelope); + } + + byType(eventType: string): EventEnvelope[] { + return this.published + .filter((p) => p.envelope.eventType === eventType) + .map((p) => p.envelope as EventEnvelope); + } + + reset(): void { + this.published.length = 0; + this.sequence = 0; + } +} diff --git a/apps/api/src/modules/shared/infrastructure/event-bus/index.ts b/apps/api/src/modules/shared/infrastructure/event-bus/index.ts new file mode 100644 index 0000000..ed9d14a --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/event-bus/index.ts @@ -0,0 +1,12 @@ +export { + EVENT_BUS, + type EventBus, + type PublishResult, +} from './event-bus.interface'; +export { RedisStreamsEventBus, streamFor } from './redis-streams.event-bus'; +export { InMemoryEventBus } from './in-memory.event-bus'; +export { + buildEnvelope, + currentTraceId, + type BuildEnvelopeInput, +} from './envelope-builder'; diff --git a/apps/api/src/modules/shared/infrastructure/event-bus/redis-streams.event-bus.ts b/apps/api/src/modules/shared/infrastructure/event-bus/redis-streams.event-bus.ts new file mode 100644 index 0000000..41e4925 --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/event-bus/redis-streams.event-bus.ts @@ -0,0 +1,65 @@ +import { type EventEnvelope, assertValidEnvelope } from '@goodgo/contracts-events'; +import { Injectable, Logger } from '@nestjs/common'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports +import { RedisService } from '../redis.service'; +import type { EventBus, PublishResult } from './event-bus.interface'; + +/** + * Stream naming: one stream per event-type. Per-event consumer groups + * are created lazily by individual consumer services (Phase 1+). + * + * events:payment.completed + * events:listing.approved + * events:kyc.verified + * + * `MAXLEN ~ 100000` per stream — RFC §5 mitigation. Nightly archive + * to S3 lands in Phase 3. + */ +const STREAM_PREFIX = 'events:'; +const DEFAULT_MAXLEN = 100_000; + +@Injectable() +export class RedisStreamsEventBus implements EventBus { + private readonly logger = new Logger(RedisStreamsEventBus.name); + private readonly maxlen: number; + + constructor(private readonly redis: RedisService) { + const envMax = process.env['EVENT_BUS_STREAM_MAXLEN']; + this.maxlen = envMax ? Number(envMax) : DEFAULT_MAXLEN; + } + + async publish(envelope: EventEnvelope): Promise { + assertValidEnvelope(envelope); + const stream = streamFor(envelope.eventType); + const client = this.redis.getClient(); + // XADD MAXLEN ~ * envelope + const transportId = await client.xadd( + stream, + 'MAXLEN', + '~', + this.maxlen, + '*', + 'envelope', + JSON.stringify(envelope), + ); + if (transportId === null) { + throw new Error(`XADD returned NIL for stream ${stream}`); + } + this.logger.debug( + `Published ${envelope.eventType} eventId=${envelope.eventId} -> ${stream}@${transportId}`, + ); + return { eventId: envelope.eventId, transportId, stream }; + } + + async publishAll(envelopes: EventEnvelope[]): Promise { + const out: PublishResult[] = []; + for (const env of envelopes) { + out.push(await this.publish(env)); + } + return out; + } +} + +export function streamFor(eventType: string): string { + return `${STREAM_PREFIX}${eventType}`; +} diff --git a/apps/api/src/modules/shared/infrastructure/index.ts b/apps/api/src/modules/shared/infrastructure/index.ts index 9d3fb95..ca76fa6 100644 --- a/apps/api/src/modules/shared/infrastructure/index.ts +++ b/apps/api/src/modules/shared/infrastructure/index.ts @@ -42,3 +42,18 @@ export { FileValidationPipe } from './pipes/file-validation.pipe'; export type { FileValidationOptions, UploadedFile } from './pipes/file-validation.pipe'; export { validateEnv, validateJwtSecret } from './env-validation'; export { cacheMetaStorage, type CacheMeta, type CacheMetaStore } from './cache-meta.store'; +// RFC-001 Phase 1 — API versioning. +export { + API_VERSION_REGISTRY, + resolveMajorSpec, + type ApiMajorSpec, + type ApiVersionDeprecation, + type ApiVersionRegistry, +} from './versioning'; +export { + VersionInterceptor, + DeprecationInterceptor, + API_MINOR_HEADER, + API_MINOR_RESOLVED_HEADER, + type ResolvedApiVersion, +} from './interceptors'; diff --git a/apps/api/src/modules/shared/infrastructure/outbox/__tests__/outbox.relay.spec.ts b/apps/api/src/modules/shared/infrastructure/outbox/__tests__/outbox.relay.spec.ts new file mode 100644 index 0000000..06ba88f --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/outbox/__tests__/outbox.relay.spec.ts @@ -0,0 +1,133 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { buildEnvelope } from '../../event-bus'; +import { InMemoryEventBus } from '../../event-bus/in-memory.event-bus'; +import { OutboxRelay } from '../outbox.relay'; + +type OutboxRow = { + id: string; + eventId: string; + eventType: string; + aggregateId: string | null; + envelope: unknown; + createdAt: Date; + publishedAt: Date | null; + attempts: number; + lastError: string | null; +}; + +/** + * Lightweight fake Prisma — just enough for OutboxRelay.tick(): + * - pg_try_advisory_lock + * - eventOutbox.findMany / update + */ +function makeFakePrisma(rows: OutboxRow[], options: { acquireLock?: boolean } = {}) { + const acquireLock = options.acquireLock ?? true; + return { + $queryRawUnsafe: vi.fn(async (sql: string) => { + if (sql.includes('pg_try_advisory_lock')) return [{ locked: acquireLock }]; + return []; + }), + eventOutbox: { + findMany: vi.fn(async (args: { where?: { publishedAt: null }; take?: number }) => { + const pending = rows.filter((r) => r.publishedAt === null); + return pending.slice(0, args.take ?? 100); + }), + update: vi.fn(async (args: { where: { id: string }; data: Record }) => { + const row = rows.find((r) => r.id === args.where.id); + if (!row) throw new Error('row not found'); + const data = args.data; + if ('publishedAt' in data) row.publishedAt = data['publishedAt'] as Date | null; + if ('lastError' in data) row.lastError = data['lastError'] as string | null; + if ('attempts' in data) { + const v = data['attempts']; + if (v && typeof v === 'object' && 'increment' in v) { + row.attempts += (v as { increment: number }).increment; + } + } + return row; + }), + }, + }; +} + +function fakeRow(type: string): OutboxRow { + const envelope = buildEnvelope({ producer: 'api' }, { eventType: type, payload: { k: 'v' } }); + return { + id: `row-${envelope.eventId}`, + eventId: envelope.eventId, + eventType: envelope.eventType, + aggregateId: null, + envelope, + createdAt: new Date(), + publishedAt: null, + attempts: 0, + lastError: null, + }; +} + +describe('OutboxRelay.tick', () => { + let bus: InMemoryEventBus; + beforeEach(() => { + bus = new InMemoryEventBus(); + process.env['EVENT_OUTBOX_RELAY_ENABLED'] = 'false'; // don't auto-start timer + }); + + it('drains pending rows into the EventBus and marks them published', async () => { + const rows = [fakeRow('payment.completed'), fakeRow('listing.approved')]; + const prisma = makeFakePrisma(rows); + const relay = new OutboxRelay(prisma as never, bus); + + const result = await relay.tick(); + + expect(result.acquired).toBe(true); + expect(result.processed).toBe(2); + expect(result.failed).toBe(0); + expect(bus.all()).toHaveLength(2); + expect(rows.every((r) => r.publishedAt instanceof Date)).toBe(true); + }); + + it('does nothing when the advisory lock is held elsewhere', async () => { + const rows = [fakeRow('kyc.verified')]; + const prisma = makeFakePrisma(rows, { acquireLock: false }); + const relay = new OutboxRelay(prisma as never, bus); + + const result = await relay.tick(); + + expect(result.acquired).toBe(false); + expect(result.processed).toBe(0); + expect(bus.all()).toHaveLength(0); + expect(rows[0]?.publishedAt).toBeNull(); + }); + + it('records lastError and leaves publishedAt null on publish failure', async () => { + const rows = [fakeRow('payment.completed')]; + const prisma = makeFakePrisma(rows); + const failing = { + publish: vi.fn(async () => { + throw new Error('XADD refused'); + }), + publishAll: vi.fn(), + }; + const relay = new OutboxRelay(prisma as never, failing as never); + + const result = await relay.tick(); + + expect(result.processed).toBe(0); + expect(result.failed).toBe(1); + expect(rows[0]?.publishedAt).toBeNull(); + expect(rows[0]?.lastError).toContain('XADD refused'); + expect(rows[0]?.attempts).toBe(1); + }); + + it('skips rows that are already published', async () => { + const row = fakeRow('listing.approved'); + row.publishedAt = new Date(); + const prisma = makeFakePrisma([row]); + const relay = new OutboxRelay(prisma as never, bus); + + const result = await relay.tick(); + + expect(result.processed).toBe(0); + expect(bus.all()).toHaveLength(0); + }); +}); diff --git a/apps/api/src/modules/shared/infrastructure/outbox/index.ts b/apps/api/src/modules/shared/infrastructure/outbox/index.ts new file mode 100644 index 0000000..6e31f6f --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/outbox/index.ts @@ -0,0 +1,2 @@ +export { OutboxService, type OutboxAppendOptions } from './outbox.service'; +export { OutboxRelay } from './outbox.relay'; diff --git a/apps/api/src/modules/shared/infrastructure/outbox/outbox.relay.ts b/apps/api/src/modules/shared/infrastructure/outbox/outbox.relay.ts new file mode 100644 index 0000000..73b08f6 --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/outbox/outbox.relay.ts @@ -0,0 +1,134 @@ +import type { EventEnvelope } from '@goodgo/contracts-events'; +import { + Inject, + Injectable, + Logger, + type OnModuleDestroy, + type OnModuleInit, +} from '@nestjs/common'; +import { EVENT_BUS, type EventBus } from '../event-bus/event-bus.interface'; +import { type PrismaService } from '../prisma.service'; + +/** + * Single-process relay that drains `event_outbox` into the EventBus. + * + * Concurrency: every node tries to acquire the same Postgres advisory + * lock (`pg_try_advisory_lock`); only the holder runs the poll loop. + * This is the single-process + advisory-lock design called out in + * RFC-004 §4 ("No leader-election library yet"). + */ + +const ADVISORY_LOCK_KEY = 0xe7b04204; // bespoke 32-bit key for the outbox relay +const DEFAULT_POLL_MS = 1_000; +const DEFAULT_BATCH_SIZE = 100; + +@Injectable() +export class OutboxRelay implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(OutboxRelay.name); + private readonly pollIntervalMs: number; + private readonly batchSize: number; + private readonly enabled: boolean; + private timer: NodeJS.Timeout | null = null; + private running = false; + private stopped = false; + private holdsLock = false; + + constructor( + private readonly prisma: PrismaService, + @Inject(EVENT_BUS) private readonly bus: EventBus, + ) { + this.pollIntervalMs = Number(process.env['EVENT_OUTBOX_POLL_MS'] ?? DEFAULT_POLL_MS); + this.batchSize = Number(process.env['EVENT_OUTBOX_BATCH_SIZE'] ?? DEFAULT_BATCH_SIZE); + this.enabled = (process.env['EVENT_OUTBOX_RELAY_ENABLED'] ?? 'true').toLowerCase() !== 'false'; + } + + onModuleInit(): void { + if (!this.enabled) { + this.logger.log('OutboxRelay disabled via EVENT_OUTBOX_RELAY_ENABLED=false'); + return; + } + this.scheduleNext(); + } + + async onModuleDestroy(): Promise { + this.stopped = true; + if (this.timer) clearTimeout(this.timer); + if (this.holdsLock) { + try { + await this.prisma.$queryRawUnsafe(`SELECT pg_advisory_unlock(${ADVISORY_LOCK_KEY})`); + } catch (err) { + this.logger.warn(`Failed to release advisory lock: ${(err as Error).message}`); + } + } + } + + private scheduleNext(): void { + if (this.stopped) return; + this.timer = setTimeout(() => { + void this.tick().finally(() => this.scheduleNext()); + }, this.pollIntervalMs); + } + + /** Public for tests — drains one batch synchronously. */ + async tick(): Promise<{ acquired: boolean; processed: number; failed: number }> { + if (this.running) return { acquired: false, processed: 0, failed: 0 }; + this.running = true; + try { + const acquired = await this.tryAcquireLock(); + if (!acquired) return { acquired: false, processed: 0, failed: 0 }; + const { processed, failed } = await this.drainBatch(); + return { acquired: true, processed, failed }; + } finally { + this.running = false; + } + } + + private async tryAcquireLock(): Promise { + if (this.holdsLock) return true; + const rows = await this.prisma.$queryRawUnsafe<{ locked: boolean }[]>( + `SELECT pg_try_advisory_lock(${ADVISORY_LOCK_KEY}) AS locked`, + ); + const locked = rows[0]?.locked === true; + if (locked) { + this.holdsLock = true; + this.logger.log('Acquired event_outbox advisory lock — this node is now the relay leader'); + } + return locked; + } + + private async drainBatch(): Promise<{ processed: number; failed: number }> { + const pending = await this.prisma.eventOutbox.findMany({ + where: { publishedAt: null }, + orderBy: { createdAt: 'asc' }, + take: this.batchSize, + }); + if (pending.length === 0) return { processed: 0, failed: 0 }; + let processed = 0; + let failed = 0; + for (const row of pending) { + try { + const envelope = row.envelope as unknown as EventEnvelope; + await this.bus.publish(envelope); + await this.prisma.eventOutbox.update({ + where: { id: row.id }, + data: { publishedAt: new Date(), attempts: { increment: 1 }, lastError: null }, + }); + processed += 1; + } catch (err) { + failed += 1; + const message = err instanceof Error ? err.message : String(err); + this.logger.error( + `Outbox publish failed for eventId=${row.eventId} type=${row.eventType}: ${message}`, + ); + await this.prisma.eventOutbox.update({ + where: { id: row.id }, + data: { attempts: { increment: 1 }, lastError: message.slice(0, 1000) }, + }); + } + } + if (processed > 0 || failed > 0) { + this.logger.debug(`Outbox drained batch: processed=${processed} failed=${failed}`); + } + return { processed, failed }; + } +} diff --git a/apps/api/src/modules/shared/infrastructure/outbox/outbox.service.ts b/apps/api/src/modules/shared/infrastructure/outbox/outbox.service.ts new file mode 100644 index 0000000..86030a3 --- /dev/null +++ b/apps/api/src/modules/shared/infrastructure/outbox/outbox.service.ts @@ -0,0 +1,49 @@ +import { type EventEnvelope, assertValidEnvelope } from '@goodgo/contracts-events'; +import { Injectable, Logger } from '@nestjs/common'; +import type { Prisma } from '@prisma/client'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports +import { PrismaService } from '../prisma.service'; + +/** + * Transactional outbox writer. Call inside the same Prisma transaction + * as the domain change so the row commits atomically with the state + * mutation it describes. The Outbox **never** publishes directly; the + * relay (`OutboxRelay`) tails `event_outbox` and forwards to the EventBus. + */ +export interface OutboxAppendOptions { + aggregateId?: string; +} + +type EventOutboxDelegate = PrismaService['eventOutbox']; +type PrismaTxLike = Pick | { eventOutbox: Pick }; + +@Injectable() +export class OutboxService { + private readonly logger = new Logger(OutboxService.name); + + constructor(private readonly prisma: PrismaService) {} + + async append( + tx: PrismaTxLike | PrismaService, + envelope: EventEnvelope, + options: OutboxAppendOptions = {}, + ): Promise { + assertValidEnvelope(envelope); + const client = ('eventOutbox' in tx ? tx.eventOutbox : tx) as EventOutboxDelegate; + await client.create({ + data: { + eventId: envelope.eventId, + eventType: envelope.eventType, + aggregateId: options.aggregateId ?? null, + envelope: envelope as unknown as Prisma.InputJsonValue, + }, + }); + } + + async appendStandalone(envelope: EventEnvelope, options: OutboxAppendOptions = {}): Promise { + await this.append(this.prisma, envelope, options); + this.logger.warn( + `appendStandalone used for ${envelope.eventType} eventId=${envelope.eventId} — prefer the transactional append()`, + ); + } +} diff --git a/apps/api/src/modules/shared/shared.module.ts b/apps/api/src/modules/shared/shared.module.ts index a0334e8..3e42f75 100644 --- a/apps/api/src/modules/shared/shared.module.ts +++ b/apps/api/src/modules/shared/shared.module.ts @@ -1,6 +1,6 @@ import { Global, type MiddlewareConsumer, Module, type NestModule, RequestMethod } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; -import { APP_FILTER } from '@nestjs/core'; +import { APP_FILTER, APP_INTERCEPTOR } from '@nestjs/core'; import { EventEmitterModule } from '@nestjs/event-emitter'; import { PrometheusModule, makeCounterProvider } from '@willsoto/nestjs-prometheus'; import { @@ -10,8 +10,11 @@ import { CACHE_DEGRADATION_TOTAL, } from './infrastructure/cache.service'; import { EventBusService } from './infrastructure/event-bus.service'; +import { EVENT_BUS, RedisStreamsEventBus } from './infrastructure/event-bus'; +import { OutboxRelay, OutboxService } from './infrastructure/outbox'; import { FieldEncryptionService } from './infrastructure/field-encryption.service'; import { GlobalExceptionFilter } from './infrastructure/filters/global-exception.filter'; +import { DeprecationInterceptor, VersionInterceptor } from './infrastructure/interceptors'; import { LoggerService } from './infrastructure/logger.service'; import { CorrelationIdMiddleware } from './infrastructure/middleware/correlation-id.middleware'; import { CsrfMiddleware } from './infrastructure/middleware/csrf.middleware'; @@ -35,6 +38,10 @@ import { TypesenseClientService } from './infrastructure/typesense-client.servic RedisService, CacheService, EventBusService, + // RFC-004 Phase 0 (GOO-172) — durable async messaging backbone. + { provide: EVENT_BUS, useClass: RedisStreamsEventBus }, + OutboxService, + OutboxRelay, TypesenseClientService, makeCounterProvider({ name: CACHE_HIT_TOTAL, @@ -55,8 +62,18 @@ import { TypesenseClientService } from './infrastructure/typesense-client.servic provide: APP_FILTER, useClass: GlobalExceptionFilter, }, + // RFC-001 Phase 1 (GOO-170) — order matters: VersionInterceptor first + // populates req.apiVersion; DeprecationInterceptor reads from it. + { + provide: APP_INTERCEPTOR, + useClass: VersionInterceptor, + }, + { + provide: APP_INTERCEPTOR, + useClass: DeprecationInterceptor, + }, ], - exports: [PrismaService, RedisService, CacheService, LoggerService, EventBusService, FieldEncryptionService, TypesenseClientService, PrometheusModule], + exports: [PrismaService, RedisService, CacheService, LoggerService, EventBusService, EVENT_BUS, OutboxService, FieldEncryptionService, TypesenseClientService, PrometheusModule], }) export class SharedModule implements NestModule { configure(consumer: MiddlewareConsumer): void { diff --git a/libs/contracts/events/README.md b/libs/contracts/events/README.md new file mode 100644 index 0000000..09b5cad --- /dev/null +++ b/libs/contracts/events/README.md @@ -0,0 +1,53 @@ +# @goodgo/contracts-events + +Cross-runtime (Node + Python) event contracts for RFC-004's async messaging +backbone. See [GOO-95](/GOO/issues/GOO-95) for the RFC and +[GOO-172](/GOO/issues/GOO-172) for Phase 0. + +## What lives here + +- `src/envelope.ts` — `EventEnvelope` TypeScript type + `EVENT_ENVELOPE_SCHEMA_VERSION`. +- `src/uuid-v7.ts` — pure-Node UUIDv7 generator (no runtime deps). +- `src/event-types.ts` — string-literal union of all known event types. +- `schemas/envelope.schema.json` — JSON Schema for the envelope itself. +- `schemas/.schema.json` — JSON Schema for each event payload. + +The `schemas/` directory is consumed by the Python AI services +(`libs/ai-services`) via `redis-py` consumers — JSON Schema is the single +source of truth across runtimes. + +## Envelope shape + +```ts +interface EventEnvelope { + schemaVersion: number; // bump when envelope itself changes + eventId: string; // UUIDv7 — time-ordered, idempotency key + eventType: string; // dotted: "payment.completed" + occurredAt: string; // ISO-8601 UTC + producer: string; // service name, e.g. "api" + traceId: string; // OpenTelemetry-compatible (32 hex chars or "00…") + payload: TPayload; // event-specific, validated by per-type schema +} +``` + +`schemaVersion` starts at `1`. Bump only when the **envelope** changes; +payload changes are versioned per event-type schema independently. + +## First 3 schemas (Phase 0 deliverable) + +| Event type | Trigger | +|----------------------|--------------------------------------------------| +| `payment.completed` | Payment moves to `succeeded` after gateway IPN | +| `listing.approved` | Moderation approves a listing | +| `kyc.verified` | KYC review marks a user verified | + +Phase 1 (notifications cutover, [GOO-173](/GOO/issues/GOO-173)) will add +the rest of the production event surface. + +## Adding a new event type + +1. Pick a stable dotted name (`.`). +2. Add a `schemas/.schema.json` JSON Schema describing the payload. +3. Add the literal to `src/event-types.ts`. +4. (Optional) Re-export a typed payload alias from `src/index.ts`. +5. Land + dual-publish for at least one sprint before any consumer hard-fails on it. diff --git a/libs/contracts/events/package.json b/libs/contracts/events/package.json new file mode 100644 index 0000000..7a68abb --- /dev/null +++ b/libs/contracts/events/package.json @@ -0,0 +1,19 @@ +{ + "name": "@goodgo/contracts-events", + "version": "0.1.0", + "private": true, + "main": "./src/index.ts", + "types": "./src/index.ts", + "exports": { + ".": "./src/index.ts", + "./schemas/*": "./schemas/*" + }, + "scripts": { + "lint": "echo \"(no lint configured)\"", + "typecheck": "tsc --noEmit", + "test": "echo \"(no tests yet)\"" + }, + "devDependencies": { + "typescript": "^5.5.0" + } +} diff --git a/libs/contracts/events/schemas/envelope.schema.json b/libs/contracts/events/schemas/envelope.schema.json new file mode 100644 index 0000000..df6b0ef --- /dev/null +++ b/libs/contracts/events/schemas/envelope.schema.json @@ -0,0 +1,52 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://goodgo.vn/schemas/events/envelope.schema.json", + "title": "EventEnvelope", + "description": "Cross-runtime event envelope for RFC-004 (GOO-95). Source of truth — Node and Python consumers validate against this file.", + "type": "object", + "additionalProperties": false, + "required": [ + "schemaVersion", + "eventId", + "eventType", + "occurredAt", + "producer", + "traceId", + "payload" + ], + "properties": { + "schemaVersion": { + "type": "integer", + "const": 1, + "description": "Envelope wire-format version. Currently 1." + }, + "eventId": { + "type": "string", + "pattern": "^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$", + "description": "UUIDv7 — time-ordered. Used as idempotency key (30-day TTL in idempotency table)." + }, + "eventType": { + "type": "string", + "pattern": "^[a-z][a-z0-9_]*(\\.[a-z][a-z0-9_]*)+$", + "description": "Dotted event type, e.g. payment.completed." + }, + "occurredAt": { + "type": "string", + "format": "date-time", + "description": "ISO-8601 UTC timestamp of the domain event (not publish time)." + }, + "producer": { + "type": "string", + "minLength": 1, + "description": "Producing service identifier, e.g. api, ai-services." + }, + "traceId": { + "type": "string", + "pattern": "^[0-9a-f]{32}$", + "description": "OpenTelemetry-compatible 32-hex trace id. Use 32 zeros when no active span." + }, + "payload": { + "description": "Event-specific payload; validated separately against schemas/.schema.json." + } + } +} diff --git a/libs/contracts/events/schemas/kyc.verified.schema.json b/libs/contracts/events/schemas/kyc.verified.schema.json new file mode 100644 index 0000000..2115abe --- /dev/null +++ b/libs/contracts/events/schemas/kyc.verified.schema.json @@ -0,0 +1,20 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://goodgo.vn/schemas/events/kyc.verified.schema.json", + "title": "kyc.verified", + "description": "Emitted when a user's KYC review transitions to VERIFIED.", + "type": "object", + "additionalProperties": false, + "required": ["userId", "verifiedByUserId", "level", "verifiedAt", "documentRefs"], + "properties": { + "userId": { "type": "string", "minLength": 1 }, + "verifiedByUserId": { "type": "string", "minLength": 1 }, + "level": { "type": "string", "enum": ["basic", "enhanced"] }, + "verifiedAt": { "type": "string", "format": "date-time" }, + "documentRefs": { + "type": "array", + "items": { "type": "string", "minLength": 1 }, + "description": "Opaque references (e.g. S3 keys) to the documents used for verification." + } + } +} diff --git a/libs/contracts/events/schemas/listing.approved.schema.json b/libs/contracts/events/schemas/listing.approved.schema.json new file mode 100644 index 0000000..67acfda --- /dev/null +++ b/libs/contracts/events/schemas/listing.approved.schema.json @@ -0,0 +1,28 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://goodgo.vn/schemas/events/listing.approved.schema.json", + "title": "listing.approved", + "description": "Emitted when a Listing is approved by moderation and goes live.", + "type": "object", + "additionalProperties": false, + "required": [ + "listingId", + "propertyId", + "agentId", + "approvedByUserId", + "approvedAt", + "expiresAt" + ], + "properties": { + "listingId": { "type": "string", "minLength": 1 }, + "propertyId": { "type": "string", "minLength": 1 }, + "agentId": { "type": "string", "minLength": 1 }, + "approvedByUserId": { "type": "string", "minLength": 1 }, + "approvedAt": { "type": "string", "format": "date-time" }, + "expiresAt": { + "type": ["string", "null"], + "format": "date-time", + "description": "Null when the listing has no scheduled expiry." + } + } +} diff --git a/libs/contracts/events/schemas/payment.completed.schema.json b/libs/contracts/events/schemas/payment.completed.schema.json new file mode 100644 index 0000000..1bae6c8 --- /dev/null +++ b/libs/contracts/events/schemas/payment.completed.schema.json @@ -0,0 +1,32 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://goodgo.vn/schemas/events/payment.completed.schema.json", + "title": "payment.completed", + "description": "Emitted when a Payment aggregate transitions to SUCCEEDED after a verified gateway IPN (VNPay / MoMo / ZaloPay).", + "type": "object", + "additionalProperties": false, + "required": [ + "paymentId", + "orderId", + "userId", + "amount", + "currency", + "gateway", + "gatewayTransactionId", + "paidAt" + ], + "properties": { + "paymentId": { "type": "string", "minLength": 1 }, + "orderId": { "type": "string", "minLength": 1 }, + "userId": { "type": "string", "minLength": 1 }, + "amount": { + "type": "string", + "pattern": "^-?\\d+(\\.\\d+)?$", + "description": "Decimal as string to preserve VND precision (no float rounding)." + }, + "currency": { "type": "string", "enum": ["VND", "USD"] }, + "gateway": { "type": "string", "enum": ["vnpay", "momo", "zalopay"] }, + "gatewayTransactionId": { "type": "string", "minLength": 1 }, + "paidAt": { "type": "string", "format": "date-time" } + } +} diff --git a/libs/contracts/events/src/envelope.ts b/libs/contracts/events/src/envelope.ts new file mode 100644 index 0000000..c634c7b --- /dev/null +++ b/libs/contracts/events/src/envelope.ts @@ -0,0 +1,69 @@ +import { isUuidV7 } from './uuid-v7'; + +export const EVENT_ENVELOPE_SCHEMA_VERSION = 1; + +export interface EventEnvelope { + schemaVersion: number; + eventId: string; + eventType: string; + occurredAt: string; + producer: string; + traceId: string; + payload: TPayload; +} + +const TRACE_ID_RE = /^[0-9a-f]{32}$/i; +const ISO_8601_RE = + /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d{1,9})?(?:Z|[+-]\d{2}:\d{2})$/; + +export interface EnvelopeValidationIssue { + path: string; + message: string; +} + +export function validateEnvelope(envelope: unknown): EnvelopeValidationIssue[] { + const issues: EnvelopeValidationIssue[] = []; + if (envelope === null || typeof envelope !== 'object') { + return [{ path: '$', message: 'envelope must be an object' }]; + } + const e = envelope as Record; + if (e['schemaVersion'] !== EVENT_ENVELOPE_SCHEMA_VERSION) { + issues.push({ + path: 'schemaVersion', + message: `expected ${EVENT_ENVELOPE_SCHEMA_VERSION}, got ${String(e['schemaVersion'])}`, + }); + } + if (typeof e['eventId'] !== 'string' || !isUuidV7(e['eventId'])) { + issues.push({ path: 'eventId', message: 'must be a UUIDv7 string' }); + } + if ( + typeof e['eventType'] !== 'string' || + !/^[a-z][a-z0-9_]*(\.[a-z][a-z0-9_]*)+$/.test(e['eventType']) + ) { + issues.push({ + path: 'eventType', + message: 'must match /^[a-z][a-z0-9_]*(\\.[a-z][a-z0-9_]*)+$/', + }); + } + if (typeof e['occurredAt'] !== 'string' || !ISO_8601_RE.test(e['occurredAt'])) { + issues.push({ path: 'occurredAt', message: 'must be an ISO-8601 timestamp' }); + } + if (typeof e['producer'] !== 'string' || e['producer'].length === 0) { + issues.push({ path: 'producer', message: 'must be a non-empty string' }); + } + if (typeof e['traceId'] !== 'string' || !TRACE_ID_RE.test(e['traceId'])) { + issues.push({ path: 'traceId', message: 'must be 32 hex characters' }); + } + if (!('payload' in e)) { + issues.push({ path: 'payload', message: 'is required (use {} for empty)' }); + } + return issues; +} + +export function assertValidEnvelope(envelope: unknown): asserts envelope is EventEnvelope { + const issues = validateEnvelope(envelope); + if (issues.length > 0) { + const flat = issues.map((i) => `${i.path}: ${i.message}`).join('; '); + throw new Error(`Invalid EventEnvelope — ${flat}`); + } +} diff --git a/libs/contracts/events/src/event-types.ts b/libs/contracts/events/src/event-types.ts new file mode 100644 index 0000000..ff03605 --- /dev/null +++ b/libs/contracts/events/src/event-types.ts @@ -0,0 +1,11 @@ +export const KNOWN_EVENT_TYPES = [ + 'kyc.verified', + 'listing.approved', + 'payment.completed', +] as const; + +export type KnownEventType = (typeof KNOWN_EVENT_TYPES)[number]; + +export function isKnownEventType(value: string): value is KnownEventType { + return (KNOWN_EVENT_TYPES as readonly string[]).includes(value); +} diff --git a/libs/contracts/events/src/index.ts b/libs/contracts/events/src/index.ts new file mode 100644 index 0000000..cd525cc --- /dev/null +++ b/libs/contracts/events/src/index.ts @@ -0,0 +1,37 @@ +export { + EVENT_ENVELOPE_SCHEMA_VERSION, + type EventEnvelope, + type EnvelopeValidationIssue, + validateEnvelope, + assertValidEnvelope, +} from './envelope'; +export { uuidv7, isUuidV7 } from './uuid-v7'; +export { KNOWN_EVENT_TYPES, type KnownEventType, isKnownEventType } from './event-types'; + +export interface PaymentCompletedPayload { + paymentId: string; + orderId: string; + userId: string; + amount: string; + currency: 'VND' | 'USD'; + gateway: 'vnpay' | 'momo' | 'zalopay'; + gatewayTransactionId: string; + paidAt: string; +} + +export interface ListingApprovedPayload { + listingId: string; + propertyId: string; + agentId: string; + approvedByUserId: string; + approvedAt: string; + expiresAt: string | null; +} + +export interface KycVerifiedPayload { + userId: string; + verifiedByUserId: string; + level: 'basic' | 'enhanced'; + verifiedAt: string; + documentRefs: string[]; +} diff --git a/libs/contracts/events/src/uuid-v7.ts b/libs/contracts/events/src/uuid-v7.ts new file mode 100644 index 0000000..fbb2aa3 --- /dev/null +++ b/libs/contracts/events/src/uuid-v7.ts @@ -0,0 +1,44 @@ +import { randomBytes } from 'node:crypto'; + +/** + * UUIDv7 — 48-bit Unix-ms timestamp in the high bits, 74 random bits below. + * + * Time-ordered, monotonic enough for our needs (idempotency keys + Stream IDs). + * No dependency on the `uuid` package — Phase 0 keeps the foundation + * tree-shakeable for the Python side (which uses its own implementation). + * + * Reference: RFC 9562 §5.7. + */ +export function uuidv7(now: number = Date.now()): string { + const ts = BigInt(now); // milliseconds since epoch + const bytes = randomBytes(16); + + // 48-bit timestamp (big-endian) in bytes 0..5 + bytes[0] = Number((ts >> 40n) & 0xffn); + bytes[1] = Number((ts >> 32n) & 0xffn); + bytes[2] = Number((ts >> 24n) & 0xffn); + bytes[3] = Number((ts >> 16n) & 0xffn); + bytes[4] = Number((ts >> 8n) & 0xffn); + bytes[5] = Number(ts & 0xffn); + + // Version 7 in the high nibble of byte 6 + bytes[6] = (bytes[6]! & 0x0f) | 0x70; + // RFC 4122 variant (10xx) in the high bits of byte 8 + bytes[8] = (bytes[8]! & 0x3f) | 0x80; + + const hex = Buffer.from(bytes).toString('hex'); + return [ + hex.slice(0, 8), + hex.slice(8, 12), + hex.slice(12, 16), + hex.slice(16, 20), + hex.slice(20, 32), + ].join('-'); +} + +const UUID_V7_RE = + /^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + +export function isUuidV7(value: string): boolean { + return UUID_V7_RE.test(value); +} diff --git a/libs/contracts/events/tsconfig.json b/libs/contracts/events/tsconfig.json new file mode 100644 index 0000000..3b05860 --- /dev/null +++ b/libs/contracts/events/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "ESNext", + "moduleResolution": "Bundler", + "strict": true, + "esModuleInterop": true, + "resolveJsonModule": true, + "skipLibCheck": true, + "noEmit": true + }, + "include": ["src/**/*.ts", "schemas/**/*.json"] +} diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index 4693470..e1d445a 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -2,3 +2,4 @@ packages: - 'apps/*' - 'packages/*' - 'libs/*' + - 'libs/contracts/*' diff --git a/prisma/migrations/20260423140000_add_event_outbox/migration.sql b/prisma/migrations/20260423140000_add_event_outbox/migration.sql new file mode 100644 index 0000000..fe91d76 --- /dev/null +++ b/prisma/migrations/20260423140000_add_event_outbox/migration.sql @@ -0,0 +1,26 @@ +-- RFC-004 Phase 0 (GOO-172): transactional outbox for async messaging backbone. +-- Producers insert into event_outbox in the same tx as the domain change. +-- A single relay process (Postgres advisory-lock leader) tails pending rows +-- and publishes them to Redis Streams, then flips published_at. + +CREATE TABLE "event_outbox" ( + "id" TEXT NOT NULL, + "eventId" TEXT NOT NULL, + "eventType" TEXT NOT NULL, + "aggregateId" TEXT, + "envelope" JSONB NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "publishedAt" TIMESTAMP(3), + "attempts" INTEGER NOT NULL DEFAULT 0, + "lastError" TEXT, + + CONSTRAINT "event_outbox_pkey" PRIMARY KEY ("id") +); + +CREATE UNIQUE INDEX "event_outbox_eventId_key" ON "event_outbox"("eventId"); + +-- Hot path: relay scans `WHERE publishedAt IS NULL ORDER BY createdAt`. +CREATE INDEX "event_outbox_publishedAt_createdAt_idx" ON "event_outbox"("publishedAt", "createdAt"); + +-- Diagnostics: per-type backlog inspection. +CREATE INDEX "event_outbox_eventType_createdAt_idx" ON "event_outbox"("eventType", "createdAt"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 64a1c0b..355ac84 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -1567,3 +1567,32 @@ model VnAdministrativeAlias { @@index([newWardCode]) @@map("vn_administrative_aliases") } + +/// Transactional outbox for RFC-004 async messaging backbone (GOO-95). +/// Producers write one row per domain event in the same Postgres +/// transaction as the domain state change. A single relay process +/// (Postgres advisory-lock leader) tails pending rows and publishes +/// them to Redis Streams, flipping `publishedAt` on success. +model EventOutbox { + id String @id @default(cuid()) + /// UUIDv7 from the envelope — idempotency key + stable cross-runtime id. + eventId String @unique + /// Dotted event type (`payment.completed`). Used by the relay to route. + eventType String + /// Aggregate identifier (e.g. paymentId, listingId) — for partitioning / debugging. + aggregateId String? + /// Fully-formed `EventEnvelope` JSON ready to XADD. Never mutated after insert. + envelope Json + /// When the row was inserted (inside the domain tx). + createdAt DateTime @default(now()) + /// When the relay confirmed XADD acceptance. Null = still pending. + publishedAt DateTime? + /// Monotonic retry count for the relay (reset on success). + attempts Int @default(0) + /// Last error message on failure — surfaced in admin dashboards / Sentry. + lastError String? + + @@index([publishedAt, createdAt]) + @@index([eventType, createdAt]) + @@map("event_outbox") +}