feat(auth): add row/size caps + streaming to export-user-data
- Add per-collection row cap (default 10k, env EXPORT_ROW_CAP) via Prisma take on all findMany calls - Add total size cap (default 100MB, env EXPORT_SIZE_CAP_MB); throws PayloadTooLargeException (413) when exceeded - Convert response to Node.js Readable stream piped via NestJS StreamableFile to avoid large in-memory buffers - Export ExportUserDataResult interface (stream + truncated flag) from handler - Update controller to set Content-Type/Content-Disposition headers and return StreamableFile - Document EXPORT_ROW_CAP and EXPORT_SIZE_CAP_MB env vars in Swagger - Extend tests: row-cap assertion (take arg), size-cap 413 path, stream assertions Fixes GOO-223 (M-1 from GOO-200 audit). Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -0,0 +1,98 @@
|
||||
import {
|
||||
EVENT_ENVELOPE_SCHEMA_VERSION,
|
||||
assertValidEnvelope,
|
||||
isKnownEventType,
|
||||
isUuidV7,
|
||||
uuidv7,
|
||||
validateEnvelope,
|
||||
type EventEnvelope,
|
||||
} from '@goodgo/contracts-events';
|
||||
import { describe, it, expect } from 'vitest';
|
||||
|
||||
describe('@goodgo/contracts-events', () => {
|
||||
describe('uuidv7', () => {
|
||||
it('produces a RFC 9562 v7 UUID', () => {
|
||||
const id = uuidv7();
|
||||
expect(isUuidV7(id)).toBe(true);
|
||||
});
|
||||
|
||||
it('encodes the provided timestamp in the high bits', () => {
|
||||
const now = 1_714_000_000_000; // stable, post-2024
|
||||
const id = uuidv7(now);
|
||||
// First 8 hex chars = high 32 bits of ms timestamp
|
||||
const hex = id.replace(/-/g, '').slice(0, 12);
|
||||
const ts = parseInt(hex, 16);
|
||||
expect(ts).toBe(now);
|
||||
});
|
||||
|
||||
it('generates monotonic-ish ids across rapid calls', () => {
|
||||
const a = uuidv7();
|
||||
const b = uuidv7();
|
||||
// v7 starts with the timestamp, so same-ms pairs compare by random bits;
|
||||
// at worst they're equal-prefix — both must still be valid.
|
||||
expect(isUuidV7(a)).toBe(true);
|
||||
expect(isUuidV7(b)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('validateEnvelope', () => {
|
||||
const base: EventEnvelope = {
|
||||
schemaVersion: EVENT_ENVELOPE_SCHEMA_VERSION,
|
||||
eventId: uuidv7(),
|
||||
eventType: 'payment.completed',
|
||||
occurredAt: '2026-04-23T14:00:00.000Z',
|
||||
producer: 'api',
|
||||
traceId: 'a'.repeat(32),
|
||||
payload: {},
|
||||
};
|
||||
|
||||
it('accepts a valid envelope', () => {
|
||||
expect(validateEnvelope(base)).toEqual([]);
|
||||
});
|
||||
|
||||
it('rejects a non-v7 eventId', () => {
|
||||
const issues = validateEnvelope({ ...base, eventId: 'not-a-uuid' });
|
||||
expect(issues.map((i) => i.path)).toContain('eventId');
|
||||
});
|
||||
|
||||
it('rejects an invalid eventType', () => {
|
||||
const issues = validateEnvelope({ ...base, eventType: 'PaymentCompleted' });
|
||||
expect(issues.map((i) => i.path)).toContain('eventType');
|
||||
});
|
||||
|
||||
it('rejects a trace id that is not 32 hex chars', () => {
|
||||
const issues = validateEnvelope({ ...base, traceId: 'short' });
|
||||
expect(issues.map((i) => i.path)).toContain('traceId');
|
||||
});
|
||||
|
||||
it('rejects schemaVersion drift', () => {
|
||||
const issues = validateEnvelope({ ...base, schemaVersion: 99 });
|
||||
expect(issues.map((i) => i.path)).toContain('schemaVersion');
|
||||
});
|
||||
|
||||
it('rejects missing payload', () => {
|
||||
const { payload: _drop, ...rest } = base;
|
||||
void _drop;
|
||||
const issues = validateEnvelope(rest as unknown);
|
||||
expect(issues.map((i) => i.path)).toContain('payload');
|
||||
});
|
||||
});
|
||||
|
||||
describe('assertValidEnvelope', () => {
|
||||
it('throws with a flat message on invalid input', () => {
|
||||
expect(() => assertValidEnvelope({ schemaVersion: 1 })).toThrow(/Invalid EventEnvelope/);
|
||||
});
|
||||
});
|
||||
|
||||
describe('isKnownEventType', () => {
|
||||
it('recognises the first 3 schemas', () => {
|
||||
expect(isKnownEventType('payment.completed')).toBe(true);
|
||||
expect(isKnownEventType('listing.approved')).toBe(true);
|
||||
expect(isKnownEventType('kyc.verified')).toBe(true);
|
||||
});
|
||||
|
||||
it('rejects unknown event types', () => {
|
||||
expect(isKnownEventType('payment.refunded')).toBe(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,82 @@
|
||||
import { uuidv7, type EventEnvelope } from '@goodgo/contracts-events';
|
||||
import { describe, it, expect, beforeEach } from 'vitest';
|
||||
import { buildEnvelope } from '../envelope-builder';
|
||||
import { InMemoryEventBus } from '../in-memory.event-bus';
|
||||
|
||||
describe('InMemoryEventBus', () => {
|
||||
let bus: InMemoryEventBus;
|
||||
beforeEach(() => {
|
||||
bus = new InMemoryEventBus();
|
||||
});
|
||||
|
||||
function env(type: string, payload: unknown = {}): EventEnvelope {
|
||||
return buildEnvelope({ producer: 'api' }, { eventType: type, payload });
|
||||
}
|
||||
|
||||
it('records published envelopes and returns a transport id', async () => {
|
||||
const result = await bus.publish(env('payment.completed', { paymentId: 'p1' }));
|
||||
expect(result.eventId).toMatch(/^[0-9a-f-]{36}$/);
|
||||
expect(result.stream).toBe('events:payment.completed');
|
||||
expect(result.transportId).toMatch(/-\d+$/);
|
||||
expect(bus.all()).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('filters by event type', async () => {
|
||||
await bus.publishAll([
|
||||
env('payment.completed'),
|
||||
env('listing.approved'),
|
||||
env('payment.completed'),
|
||||
]);
|
||||
expect(bus.byType('payment.completed')).toHaveLength(2);
|
||||
expect(bus.byType('listing.approved')).toHaveLength(1);
|
||||
expect(bus.byType('kyc.verified')).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('rejects malformed envelopes', async () => {
|
||||
const bad = {
|
||||
schemaVersion: 1,
|
||||
eventId: 'not-a-v7',
|
||||
eventType: 'payment.completed',
|
||||
occurredAt: '2026-04-23T00:00:00Z',
|
||||
producer: 'api',
|
||||
traceId: 'a'.repeat(32),
|
||||
payload: {},
|
||||
};
|
||||
await expect(bus.publish(bad as unknown as EventEnvelope)).rejects.toThrow(
|
||||
/Invalid EventEnvelope/,
|
||||
);
|
||||
});
|
||||
|
||||
it('reset clears state', async () => {
|
||||
await bus.publish(env('kyc.verified'));
|
||||
expect(bus.all()).toHaveLength(1);
|
||||
bus.reset();
|
||||
expect(bus.all()).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('buildEnvelope defaults traceId to 32 zeros when tracing is off', () => {
|
||||
const e = buildEnvelope({ producer: 'api' }, { eventType: 'kyc.verified', payload: {} });
|
||||
expect(e.traceId).toBe('0'.repeat(32));
|
||||
expect(e.producer).toBe('api');
|
||||
expect(e.schemaVersion).toBe(1);
|
||||
});
|
||||
|
||||
it('buildEnvelope honours explicit overrides (replay path)', () => {
|
||||
const id = uuidv7();
|
||||
const e = buildEnvelope(
|
||||
{ producer: 'api' },
|
||||
{
|
||||
eventType: 'listing.approved',
|
||||
payload: {},
|
||||
eventId: id,
|
||||
traceId: 'b'.repeat(32),
|
||||
occurredAt: new Date('2026-01-01T00:00:00Z'),
|
||||
producer: 'replay-cli',
|
||||
},
|
||||
);
|
||||
expect(e.eventId).toBe(id);
|
||||
expect(e.traceId).toBe('b'.repeat(32));
|
||||
expect(e.producer).toBe('replay-cli');
|
||||
expect(e.occurredAt).toBe('2026-01-01T00:00:00.000Z');
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,55 @@
|
||||
import { uuidv7, type EventEnvelope } from '@goodgo/contracts-events';
|
||||
|
||||
const ZERO_TRACE_ID = '0'.repeat(32);
|
||||
|
||||
/**
|
||||
* Returns the active trace id, or 32 zeros when none is propagated.
|
||||
*
|
||||
* The codebase does not yet depend on `@opentelemetry/api` (Sentry handles
|
||||
* traces today). To honor the CTO condition that every envelope carries a
|
||||
* `traceId` from Phase 0, we expose this hook as the integration point.
|
||||
*/
|
||||
export function currentTraceId(): string {
|
||||
try {
|
||||
const sentryGlobal = (globalThis as Record<string, unknown>)['__SENTRY__'];
|
||||
if (sentryGlobal && typeof sentryGlobal === 'object') {
|
||||
const hub = (sentryGlobal as { hub?: { getScope?: () => { getSpan?: () => { traceId?: string } | undefined } } }).hub;
|
||||
const traceId = hub?.getScope?.()?.getSpan?.()?.traceId;
|
||||
if (typeof traceId === 'string' && /^[0-9a-f]{32}$/i.test(traceId)) {
|
||||
return traceId;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Defensive: never let trace lookup fail event publishing.
|
||||
}
|
||||
return ZERO_TRACE_ID;
|
||||
}
|
||||
|
||||
export interface BuildEnvelopeInput<TPayload> {
|
||||
eventType: string;
|
||||
payload: TPayload;
|
||||
producer?: string;
|
||||
occurredAt?: Date;
|
||||
traceId?: string;
|
||||
eventId?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pure helper — builds an `EventEnvelope` with sensible defaults
|
||||
* (UUIDv7 eventId, current trace id, ISO occurredAt). Kept outside
|
||||
* the EventBus to make unit testing trivial.
|
||||
*/
|
||||
export function buildEnvelope<TPayload>(
|
||||
defaults: { producer: string },
|
||||
input: BuildEnvelopeInput<TPayload>,
|
||||
): EventEnvelope<TPayload> {
|
||||
return {
|
||||
schemaVersion: 1,
|
||||
eventId: input.eventId ?? uuidv7(),
|
||||
eventType: input.eventType,
|
||||
occurredAt: (input.occurredAt ?? new Date()).toISOString(),
|
||||
producer: input.producer ?? defaults.producer,
|
||||
traceId: input.traceId ?? currentTraceId(),
|
||||
payload: input.payload,
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
import type { EventEnvelope } from '@goodgo/contracts-events';
|
||||
|
||||
export interface EventBus {
|
||||
publish<T>(envelope: EventEnvelope<T>): Promise<PublishResult>;
|
||||
publishAll(envelopes: EventEnvelope[]): Promise<PublishResult[]>;
|
||||
}
|
||||
|
||||
export interface PublishResult {
|
||||
eventId: string;
|
||||
transportId: string;
|
||||
stream: string;
|
||||
}
|
||||
|
||||
export const EVENT_BUS = Symbol('EventBus');
|
||||
@@ -0,0 +1,51 @@
|
||||
import { type EventEnvelope, assertValidEnvelope } from '@goodgo/contracts-events';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import type { EventBus, PublishResult } from './event-bus.interface';
|
||||
import { streamFor } from './redis-streams.event-bus';
|
||||
|
||||
/**
|
||||
* Test/dev double for the EventBus. Records every published envelope
|
||||
* and exposes lookup helpers. Used by:
|
||||
* - unit tests in this module
|
||||
* - Phase 1 dual-publish diff harness
|
||||
*/
|
||||
@Injectable()
|
||||
export class InMemoryEventBus implements EventBus {
|
||||
private readonly published: { stream: string; envelope: EventEnvelope }[] = [];
|
||||
private sequence = 0;
|
||||
|
||||
async publish<T>(envelope: EventEnvelope<T>): Promise<PublishResult> {
|
||||
assertValidEnvelope(envelope);
|
||||
const stream = streamFor(envelope.eventType);
|
||||
this.published.push({ stream, envelope: envelope as EventEnvelope });
|
||||
this.sequence += 1;
|
||||
return {
|
||||
eventId: envelope.eventId,
|
||||
transportId: `${Date.now()}-${this.sequence}`,
|
||||
stream,
|
||||
};
|
||||
}
|
||||
|
||||
async publishAll(envelopes: EventEnvelope[]): Promise<PublishResult[]> {
|
||||
const out: PublishResult[] = [];
|
||||
for (const env of envelopes) {
|
||||
out.push(await this.publish(env));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
all(): readonly EventEnvelope[] {
|
||||
return this.published.map((p) => p.envelope);
|
||||
}
|
||||
|
||||
byType<T = unknown>(eventType: string): EventEnvelope<T>[] {
|
||||
return this.published
|
||||
.filter((p) => p.envelope.eventType === eventType)
|
||||
.map((p) => p.envelope as EventEnvelope<T>);
|
||||
}
|
||||
|
||||
reset(): void {
|
||||
this.published.length = 0;
|
||||
this.sequence = 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
export {
|
||||
EVENT_BUS,
|
||||
type EventBus,
|
||||
type PublishResult,
|
||||
} from './event-bus.interface';
|
||||
export { RedisStreamsEventBus, streamFor } from './redis-streams.event-bus';
|
||||
export { InMemoryEventBus } from './in-memory.event-bus';
|
||||
export {
|
||||
buildEnvelope,
|
||||
currentTraceId,
|
||||
type BuildEnvelopeInput,
|
||||
} from './envelope-builder';
|
||||
@@ -0,0 +1,65 @@
|
||||
import { type EventEnvelope, assertValidEnvelope } from '@goodgo/contracts-events';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
|
||||
import { RedisService } from '../redis.service';
|
||||
import type { EventBus, PublishResult } from './event-bus.interface';
|
||||
|
||||
/**
|
||||
* Stream naming: one stream per event-type. Per-event consumer groups
|
||||
* are created lazily by individual consumer services (Phase 1+).
|
||||
*
|
||||
* events:payment.completed
|
||||
* events:listing.approved
|
||||
* events:kyc.verified
|
||||
*
|
||||
* `MAXLEN ~ 100000` per stream — RFC §5 mitigation. Nightly archive
|
||||
* to S3 lands in Phase 3.
|
||||
*/
|
||||
const STREAM_PREFIX = 'events:';
|
||||
const DEFAULT_MAXLEN = 100_000;
|
||||
|
||||
@Injectable()
|
||||
export class RedisStreamsEventBus implements EventBus {
|
||||
private readonly logger = new Logger(RedisStreamsEventBus.name);
|
||||
private readonly maxlen: number;
|
||||
|
||||
constructor(private readonly redis: RedisService) {
|
||||
const envMax = process.env['EVENT_BUS_STREAM_MAXLEN'];
|
||||
this.maxlen = envMax ? Number(envMax) : DEFAULT_MAXLEN;
|
||||
}
|
||||
|
||||
async publish<T>(envelope: EventEnvelope<T>): Promise<PublishResult> {
|
||||
assertValidEnvelope(envelope);
|
||||
const stream = streamFor(envelope.eventType);
|
||||
const client = this.redis.getClient();
|
||||
// XADD <stream> MAXLEN ~ <n> * envelope <json>
|
||||
const transportId = await client.xadd(
|
||||
stream,
|
||||
'MAXLEN',
|
||||
'~',
|
||||
this.maxlen,
|
||||
'*',
|
||||
'envelope',
|
||||
JSON.stringify(envelope),
|
||||
);
|
||||
if (transportId === null) {
|
||||
throw new Error(`XADD returned NIL for stream ${stream}`);
|
||||
}
|
||||
this.logger.debug(
|
||||
`Published ${envelope.eventType} eventId=${envelope.eventId} -> ${stream}@${transportId}`,
|
||||
);
|
||||
return { eventId: envelope.eventId, transportId, stream };
|
||||
}
|
||||
|
||||
async publishAll(envelopes: EventEnvelope[]): Promise<PublishResult[]> {
|
||||
const out: PublishResult[] = [];
|
||||
for (const env of envelopes) {
|
||||
out.push(await this.publish(env));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
}
|
||||
|
||||
export function streamFor(eventType: string): string {
|
||||
return `${STREAM_PREFIX}${eventType}`;
|
||||
}
|
||||
@@ -42,3 +42,18 @@ export { FileValidationPipe } from './pipes/file-validation.pipe';
|
||||
export type { FileValidationOptions, UploadedFile } from './pipes/file-validation.pipe';
|
||||
export { validateEnv, validateJwtSecret } from './env-validation';
|
||||
export { cacheMetaStorage, type CacheMeta, type CacheMetaStore } from './cache-meta.store';
|
||||
// RFC-001 Phase 1 — API versioning.
|
||||
export {
|
||||
API_VERSION_REGISTRY,
|
||||
resolveMajorSpec,
|
||||
type ApiMajorSpec,
|
||||
type ApiVersionDeprecation,
|
||||
type ApiVersionRegistry,
|
||||
} from './versioning';
|
||||
export {
|
||||
VersionInterceptor,
|
||||
DeprecationInterceptor,
|
||||
API_MINOR_HEADER,
|
||||
API_MINOR_RESOLVED_HEADER,
|
||||
type ResolvedApiVersion,
|
||||
} from './interceptors';
|
||||
|
||||
@@ -0,0 +1,133 @@
|
||||
import { describe, it, expect, beforeEach, vi } from 'vitest';
|
||||
import { buildEnvelope } from '../../event-bus';
|
||||
import { InMemoryEventBus } from '../../event-bus/in-memory.event-bus';
|
||||
import { OutboxRelay } from '../outbox.relay';
|
||||
|
||||
type OutboxRow = {
|
||||
id: string;
|
||||
eventId: string;
|
||||
eventType: string;
|
||||
aggregateId: string | null;
|
||||
envelope: unknown;
|
||||
createdAt: Date;
|
||||
publishedAt: Date | null;
|
||||
attempts: number;
|
||||
lastError: string | null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Lightweight fake Prisma — just enough for OutboxRelay.tick():
|
||||
* - pg_try_advisory_lock
|
||||
* - eventOutbox.findMany / update
|
||||
*/
|
||||
function makeFakePrisma(rows: OutboxRow[], options: { acquireLock?: boolean } = {}) {
|
||||
const acquireLock = options.acquireLock ?? true;
|
||||
return {
|
||||
$queryRawUnsafe: vi.fn(async (sql: string) => {
|
||||
if (sql.includes('pg_try_advisory_lock')) return [{ locked: acquireLock }];
|
||||
return [];
|
||||
}),
|
||||
eventOutbox: {
|
||||
findMany: vi.fn(async (args: { where?: { publishedAt: null }; take?: number }) => {
|
||||
const pending = rows.filter((r) => r.publishedAt === null);
|
||||
return pending.slice(0, args.take ?? 100);
|
||||
}),
|
||||
update: vi.fn(async (args: { where: { id: string }; data: Record<string, unknown> }) => {
|
||||
const row = rows.find((r) => r.id === args.where.id);
|
||||
if (!row) throw new Error('row not found');
|
||||
const data = args.data;
|
||||
if ('publishedAt' in data) row.publishedAt = data['publishedAt'] as Date | null;
|
||||
if ('lastError' in data) row.lastError = data['lastError'] as string | null;
|
||||
if ('attempts' in data) {
|
||||
const v = data['attempts'];
|
||||
if (v && typeof v === 'object' && 'increment' in v) {
|
||||
row.attempts += (v as { increment: number }).increment;
|
||||
}
|
||||
}
|
||||
return row;
|
||||
}),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function fakeRow(type: string): OutboxRow {
|
||||
const envelope = buildEnvelope({ producer: 'api' }, { eventType: type, payload: { k: 'v' } });
|
||||
return {
|
||||
id: `row-${envelope.eventId}`,
|
||||
eventId: envelope.eventId,
|
||||
eventType: envelope.eventType,
|
||||
aggregateId: null,
|
||||
envelope,
|
||||
createdAt: new Date(),
|
||||
publishedAt: null,
|
||||
attempts: 0,
|
||||
lastError: null,
|
||||
};
|
||||
}
|
||||
|
||||
describe('OutboxRelay.tick', () => {
|
||||
let bus: InMemoryEventBus;
|
||||
beforeEach(() => {
|
||||
bus = new InMemoryEventBus();
|
||||
process.env['EVENT_OUTBOX_RELAY_ENABLED'] = 'false'; // don't auto-start timer
|
||||
});
|
||||
|
||||
it('drains pending rows into the EventBus and marks them published', async () => {
|
||||
const rows = [fakeRow('payment.completed'), fakeRow('listing.approved')];
|
||||
const prisma = makeFakePrisma(rows);
|
||||
const relay = new OutboxRelay(prisma as never, bus);
|
||||
|
||||
const result = await relay.tick();
|
||||
|
||||
expect(result.acquired).toBe(true);
|
||||
expect(result.processed).toBe(2);
|
||||
expect(result.failed).toBe(0);
|
||||
expect(bus.all()).toHaveLength(2);
|
||||
expect(rows.every((r) => r.publishedAt instanceof Date)).toBe(true);
|
||||
});
|
||||
|
||||
it('does nothing when the advisory lock is held elsewhere', async () => {
|
||||
const rows = [fakeRow('kyc.verified')];
|
||||
const prisma = makeFakePrisma(rows, { acquireLock: false });
|
||||
const relay = new OutboxRelay(prisma as never, bus);
|
||||
|
||||
const result = await relay.tick();
|
||||
|
||||
expect(result.acquired).toBe(false);
|
||||
expect(result.processed).toBe(0);
|
||||
expect(bus.all()).toHaveLength(0);
|
||||
expect(rows[0]?.publishedAt).toBeNull();
|
||||
});
|
||||
|
||||
it('records lastError and leaves publishedAt null on publish failure', async () => {
|
||||
const rows = [fakeRow('payment.completed')];
|
||||
const prisma = makeFakePrisma(rows);
|
||||
const failing = {
|
||||
publish: vi.fn(async () => {
|
||||
throw new Error('XADD refused');
|
||||
}),
|
||||
publishAll: vi.fn(),
|
||||
};
|
||||
const relay = new OutboxRelay(prisma as never, failing as never);
|
||||
|
||||
const result = await relay.tick();
|
||||
|
||||
expect(result.processed).toBe(0);
|
||||
expect(result.failed).toBe(1);
|
||||
expect(rows[0]?.publishedAt).toBeNull();
|
||||
expect(rows[0]?.lastError).toContain('XADD refused');
|
||||
expect(rows[0]?.attempts).toBe(1);
|
||||
});
|
||||
|
||||
it('skips rows that are already published', async () => {
|
||||
const row = fakeRow('listing.approved');
|
||||
row.publishedAt = new Date();
|
||||
const prisma = makeFakePrisma([row]);
|
||||
const relay = new OutboxRelay(prisma as never, bus);
|
||||
|
||||
const result = await relay.tick();
|
||||
|
||||
expect(result.processed).toBe(0);
|
||||
expect(bus.all()).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,2 @@
|
||||
export { OutboxService, type OutboxAppendOptions } from './outbox.service';
|
||||
export { OutboxRelay } from './outbox.relay';
|
||||
@@ -0,0 +1,134 @@
|
||||
import type { EventEnvelope } from '@goodgo/contracts-events';
|
||||
import {
|
||||
Inject,
|
||||
Injectable,
|
||||
Logger,
|
||||
type OnModuleDestroy,
|
||||
type OnModuleInit,
|
||||
} from '@nestjs/common';
|
||||
import { EVENT_BUS, type EventBus } from '../event-bus/event-bus.interface';
|
||||
import { type PrismaService } from '../prisma.service';
|
||||
|
||||
/**
|
||||
* Single-process relay that drains `event_outbox` into the EventBus.
|
||||
*
|
||||
* Concurrency: every node tries to acquire the same Postgres advisory
|
||||
* lock (`pg_try_advisory_lock`); only the holder runs the poll loop.
|
||||
* This is the single-process + advisory-lock design called out in
|
||||
* RFC-004 §4 ("No leader-election library yet").
|
||||
*/
|
||||
|
||||
const ADVISORY_LOCK_KEY = 0xe7b04204; // bespoke 32-bit key for the outbox relay
|
||||
const DEFAULT_POLL_MS = 1_000;
|
||||
const DEFAULT_BATCH_SIZE = 100;
|
||||
|
||||
@Injectable()
|
||||
export class OutboxRelay implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(OutboxRelay.name);
|
||||
private readonly pollIntervalMs: number;
|
||||
private readonly batchSize: number;
|
||||
private readonly enabled: boolean;
|
||||
private timer: NodeJS.Timeout | null = null;
|
||||
private running = false;
|
||||
private stopped = false;
|
||||
private holdsLock = false;
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
@Inject(EVENT_BUS) private readonly bus: EventBus,
|
||||
) {
|
||||
this.pollIntervalMs = Number(process.env['EVENT_OUTBOX_POLL_MS'] ?? DEFAULT_POLL_MS);
|
||||
this.batchSize = Number(process.env['EVENT_OUTBOX_BATCH_SIZE'] ?? DEFAULT_BATCH_SIZE);
|
||||
this.enabled = (process.env['EVENT_OUTBOX_RELAY_ENABLED'] ?? 'true').toLowerCase() !== 'false';
|
||||
}
|
||||
|
||||
onModuleInit(): void {
|
||||
if (!this.enabled) {
|
||||
this.logger.log('OutboxRelay disabled via EVENT_OUTBOX_RELAY_ENABLED=false');
|
||||
return;
|
||||
}
|
||||
this.scheduleNext();
|
||||
}
|
||||
|
||||
async onModuleDestroy(): Promise<void> {
|
||||
this.stopped = true;
|
||||
if (this.timer) clearTimeout(this.timer);
|
||||
if (this.holdsLock) {
|
||||
try {
|
||||
await this.prisma.$queryRawUnsafe(`SELECT pg_advisory_unlock(${ADVISORY_LOCK_KEY})`);
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to release advisory lock: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleNext(): void {
|
||||
if (this.stopped) return;
|
||||
this.timer = setTimeout(() => {
|
||||
void this.tick().finally(() => this.scheduleNext());
|
||||
}, this.pollIntervalMs);
|
||||
}
|
||||
|
||||
/** Public for tests — drains one batch synchronously. */
|
||||
async tick(): Promise<{ acquired: boolean; processed: number; failed: number }> {
|
||||
if (this.running) return { acquired: false, processed: 0, failed: 0 };
|
||||
this.running = true;
|
||||
try {
|
||||
const acquired = await this.tryAcquireLock();
|
||||
if (!acquired) return { acquired: false, processed: 0, failed: 0 };
|
||||
const { processed, failed } = await this.drainBatch();
|
||||
return { acquired: true, processed, failed };
|
||||
} finally {
|
||||
this.running = false;
|
||||
}
|
||||
}
|
||||
|
||||
private async tryAcquireLock(): Promise<boolean> {
|
||||
if (this.holdsLock) return true;
|
||||
const rows = await this.prisma.$queryRawUnsafe<{ locked: boolean }[]>(
|
||||
`SELECT pg_try_advisory_lock(${ADVISORY_LOCK_KEY}) AS locked`,
|
||||
);
|
||||
const locked = rows[0]?.locked === true;
|
||||
if (locked) {
|
||||
this.holdsLock = true;
|
||||
this.logger.log('Acquired event_outbox advisory lock — this node is now the relay leader');
|
||||
}
|
||||
return locked;
|
||||
}
|
||||
|
||||
private async drainBatch(): Promise<{ processed: number; failed: number }> {
|
||||
const pending = await this.prisma.eventOutbox.findMany({
|
||||
where: { publishedAt: null },
|
||||
orderBy: { createdAt: 'asc' },
|
||||
take: this.batchSize,
|
||||
});
|
||||
if (pending.length === 0) return { processed: 0, failed: 0 };
|
||||
let processed = 0;
|
||||
let failed = 0;
|
||||
for (const row of pending) {
|
||||
try {
|
||||
const envelope = row.envelope as unknown as EventEnvelope;
|
||||
await this.bus.publish(envelope);
|
||||
await this.prisma.eventOutbox.update({
|
||||
where: { id: row.id },
|
||||
data: { publishedAt: new Date(), attempts: { increment: 1 }, lastError: null },
|
||||
});
|
||||
processed += 1;
|
||||
} catch (err) {
|
||||
failed += 1;
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
this.logger.error(
|
||||
`Outbox publish failed for eventId=${row.eventId} type=${row.eventType}: ${message}`,
|
||||
);
|
||||
await this.prisma.eventOutbox.update({
|
||||
where: { id: row.id },
|
||||
data: { attempts: { increment: 1 }, lastError: message.slice(0, 1000) },
|
||||
});
|
||||
}
|
||||
}
|
||||
if (processed > 0 || failed > 0) {
|
||||
this.logger.debug(`Outbox drained batch: processed=${processed} failed=${failed}`);
|
||||
}
|
||||
return { processed, failed };
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
import { type EventEnvelope, assertValidEnvelope } from '@goodgo/contracts-events';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import type { Prisma } from '@prisma/client';
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
|
||||
import { PrismaService } from '../prisma.service';
|
||||
|
||||
/**
|
||||
* Transactional outbox writer. Call inside the same Prisma transaction
|
||||
* as the domain change so the row commits atomically with the state
|
||||
* mutation it describes. The Outbox **never** publishes directly; the
|
||||
* relay (`OutboxRelay`) tails `event_outbox` and forwards to the EventBus.
|
||||
*/
|
||||
export interface OutboxAppendOptions {
|
||||
aggregateId?: string;
|
||||
}
|
||||
|
||||
type EventOutboxDelegate = PrismaService['eventOutbox'];
|
||||
type PrismaTxLike = Pick<EventOutboxDelegate, 'create'> | { eventOutbox: Pick<EventOutboxDelegate, 'create'> };
|
||||
|
||||
@Injectable()
|
||||
export class OutboxService {
|
||||
private readonly logger = new Logger(OutboxService.name);
|
||||
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async append(
|
||||
tx: PrismaTxLike | PrismaService,
|
||||
envelope: EventEnvelope,
|
||||
options: OutboxAppendOptions = {},
|
||||
): Promise<void> {
|
||||
assertValidEnvelope(envelope);
|
||||
const client = ('eventOutbox' in tx ? tx.eventOutbox : tx) as EventOutboxDelegate;
|
||||
await client.create({
|
||||
data: {
|
||||
eventId: envelope.eventId,
|
||||
eventType: envelope.eventType,
|
||||
aggregateId: options.aggregateId ?? null,
|
||||
envelope: envelope as unknown as Prisma.InputJsonValue,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async appendStandalone(envelope: EventEnvelope, options: OutboxAppendOptions = {}): Promise<void> {
|
||||
await this.append(this.prisma, envelope, options);
|
||||
this.logger.warn(
|
||||
`appendStandalone used for ${envelope.eventType} eventId=${envelope.eventId} — prefer the transactional append()`,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
import { Global, type MiddlewareConsumer, Module, type NestModule, RequestMethod } from '@nestjs/common';
|
||||
import { ConfigModule } from '@nestjs/config';
|
||||
import { APP_FILTER } from '@nestjs/core';
|
||||
import { APP_FILTER, APP_INTERCEPTOR } from '@nestjs/core';
|
||||
import { EventEmitterModule } from '@nestjs/event-emitter';
|
||||
import { PrometheusModule, makeCounterProvider } from '@willsoto/nestjs-prometheus';
|
||||
import {
|
||||
@@ -10,8 +10,11 @@ import {
|
||||
CACHE_DEGRADATION_TOTAL,
|
||||
} from './infrastructure/cache.service';
|
||||
import { EventBusService } from './infrastructure/event-bus.service';
|
||||
import { EVENT_BUS, RedisStreamsEventBus } from './infrastructure/event-bus';
|
||||
import { OutboxRelay, OutboxService } from './infrastructure/outbox';
|
||||
import { FieldEncryptionService } from './infrastructure/field-encryption.service';
|
||||
import { GlobalExceptionFilter } from './infrastructure/filters/global-exception.filter';
|
||||
import { DeprecationInterceptor, VersionInterceptor } from './infrastructure/interceptors';
|
||||
import { LoggerService } from './infrastructure/logger.service';
|
||||
import { CorrelationIdMiddleware } from './infrastructure/middleware/correlation-id.middleware';
|
||||
import { CsrfMiddleware } from './infrastructure/middleware/csrf.middleware';
|
||||
@@ -35,6 +38,10 @@ import { TypesenseClientService } from './infrastructure/typesense-client.servic
|
||||
RedisService,
|
||||
CacheService,
|
||||
EventBusService,
|
||||
// RFC-004 Phase 0 (GOO-172) — durable async messaging backbone.
|
||||
{ provide: EVENT_BUS, useClass: RedisStreamsEventBus },
|
||||
OutboxService,
|
||||
OutboxRelay,
|
||||
TypesenseClientService,
|
||||
makeCounterProvider({
|
||||
name: CACHE_HIT_TOTAL,
|
||||
@@ -55,8 +62,18 @@ import { TypesenseClientService } from './infrastructure/typesense-client.servic
|
||||
provide: APP_FILTER,
|
||||
useClass: GlobalExceptionFilter,
|
||||
},
|
||||
// RFC-001 Phase 1 (GOO-170) — order matters: VersionInterceptor first
|
||||
// populates req.apiVersion; DeprecationInterceptor reads from it.
|
||||
{
|
||||
provide: APP_INTERCEPTOR,
|
||||
useClass: VersionInterceptor,
|
||||
},
|
||||
{
|
||||
provide: APP_INTERCEPTOR,
|
||||
useClass: DeprecationInterceptor,
|
||||
},
|
||||
],
|
||||
exports: [PrismaService, RedisService, CacheService, LoggerService, EventBusService, FieldEncryptionService, TypesenseClientService, PrometheusModule],
|
||||
exports: [PrismaService, RedisService, CacheService, LoggerService, EventBusService, EVENT_BUS, OutboxService, FieldEncryptionService, TypesenseClientService, PrometheusModule],
|
||||
})
|
||||
export class SharedModule implements NestModule {
|
||||
configure(consumer: MiddlewareConsumer): void {
|
||||
|
||||
Reference in New Issue
Block a user