Merge feat/goo-223-export-caps-streaming into master
Some checks failed
Security Scanning / Security Gate (push) Has been cancelled
CI / Lint → Typecheck → Test → Build (22) (push) Failing after 9s
CI / E2E Tests (push) Has been skipped
CI / AI Services (Python) — Smoke (push) Failing after 4s
Deploy / Build API Image (push) Failing after 21s
Deploy / Build Web Image (push) Failing after 13s
Deploy / Build AI Services Image (push) Failing after 9s
Deploy / Deploy to Staging (push) Has been skipped
Deploy / Deploy to Production (push) Has been skipped
Deploy / Rollback Staging (push) Has been skipped
Deploy / Rollback Production (push) Has been skipped
E2E Tests / Playwright E2E (push) Failing after 10s
Deploy / Smoke Test Staging (push) Has been skipped
Deploy / Smoke Test Production (push) Has been skipped
Backup Verification / Backup Restore Verification (push) Failing after 11s
CodeQL Analysis / CodeQL (javascript-typescript) (push) Failing after 1m1s
Security Scanning / Trivy Scan — API Image (push) Failing after 1m54s
Security Scanning / Trivy Scan — Web Image (push) Failing after 53s
Security Scanning / Trivy Scan — AI Services Image (push) Failing after 47s
Security Scanning / Trivy Filesystem Scan (push) Failing after 39s
Security Scanning / Dependency Audit (pnpm) (push) Failing after 11m9s
Some checks failed
Security Scanning / Security Gate (push) Has been cancelled
CI / Lint → Typecheck → Test → Build (22) (push) Failing after 9s
CI / E2E Tests (push) Has been skipped
CI / AI Services (Python) — Smoke (push) Failing after 4s
Deploy / Build API Image (push) Failing after 21s
Deploy / Build Web Image (push) Failing after 13s
Deploy / Build AI Services Image (push) Failing after 9s
Deploy / Deploy to Staging (push) Has been skipped
Deploy / Deploy to Production (push) Has been skipped
Deploy / Rollback Staging (push) Has been skipped
Deploy / Rollback Production (push) Has been skipped
E2E Tests / Playwright E2E (push) Failing after 10s
Deploy / Smoke Test Staging (push) Has been skipped
Deploy / Smoke Test Production (push) Has been skipped
Backup Verification / Backup Restore Verification (push) Failing after 11s
CodeQL Analysis / CodeQL (javascript-typescript) (push) Failing after 1m1s
Security Scanning / Trivy Scan — API Image (push) Failing after 1m54s
Security Scanning / Trivy Scan — Web Image (push) Failing after 53s
Security Scanning / Trivy Scan — AI Services Image (push) Failing after 47s
Security Scanning / Trivy Filesystem Scan (push) Failing after 39s
Security Scanning / Dependency Audit (pnpm) (push) Failing after 11m9s
Brings GOO-172 Phase 0 RFC-004 foundations onto master: - libs/contracts/events/ (envelope, event-types, schemas) - apps/api/src/modules/shared/infrastructure/event-bus/ - apps/api/src/modules/shared/infrastructure/outbox/ - prisma/migrations/20260423140000_add_event_outbox/ Also includes GOO-222 (POI COUNT collapse) and GOO-223 (export-user-data caps + streaming). Unblocks GOO-174 Phase 2 work that depends on Phase 0 contracts being on master. Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -4,11 +4,16 @@ import {
|
||||
PrismaNeighborhoodScoreService,
|
||||
} from '../services/neighborhood-score.service';
|
||||
|
||||
// Helper: build the flat $queryRaw row list that countPOIs expects.
|
||||
function makePoiRows(counts: Record<string, number>) {
|
||||
return Object.entries(counts).map(([type, n]) => ({ type, count: BigInt(n) }));
|
||||
}
|
||||
|
||||
describe('NeighborhoodScoreServiceImpl', () => {
|
||||
let service: NeighborhoodScoreServiceImpl;
|
||||
let mockPrisma: {
|
||||
neighborhoodScore: { findUnique: ReturnType<typeof vi.fn>; upsert: ReturnType<typeof vi.fn> };
|
||||
pOI: { count: ReturnType<typeof vi.fn> };
|
||||
$queryRaw: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let mockLogger: { log: ReturnType<typeof vi.fn> };
|
||||
|
||||
@@ -18,7 +23,7 @@ describe('NeighborhoodScoreServiceImpl', () => {
|
||||
findUnique: vi.fn(),
|
||||
upsert: vi.fn(),
|
||||
},
|
||||
pOI: { count: vi.fn() },
|
||||
$queryRaw: vi.fn(),
|
||||
};
|
||||
mockLogger = { log: vi.fn() };
|
||||
|
||||
@@ -60,44 +65,45 @@ describe('NeighborhoodScoreServiceImpl', () => {
|
||||
});
|
||||
|
||||
describe('calculateAndSave', () => {
|
||||
it('calculates scores from POI counts and upserts', async () => {
|
||||
// Simulate POI counts: education=15 (max), healthcare=4 (50%), transport=6 (50%),
|
||||
// shopping=5 (50%), greenery=3 (50%), safety=2 (50%)
|
||||
const poiCountsByCategory = [15, 4, 6, 5, 3, 2];
|
||||
let callIndex = 0;
|
||||
mockPrisma.pOI.count.mockImplementation(() => {
|
||||
return Promise.resolve(poiCountsByCategory[callIndex++]!);
|
||||
});
|
||||
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }) => {
|
||||
return Promise.resolve(create);
|
||||
});
|
||||
it('issues exactly one DB query and calculates scores correctly', async () => {
|
||||
mockPrisma.$queryRaw.mockResolvedValue(
|
||||
makePoiRows({
|
||||
SCHOOL: 10, UNIVERSITY: 5,
|
||||
HOSPITAL: 2, CLINIC: 2,
|
||||
METRO_STATION: 3, BUS_STOP: 3,
|
||||
MALL: 2, MARKET: 2, SUPERMARKET: 1,
|
||||
PARK: 3,
|
||||
POLICE_STATION: 1, FIRE_STATION: 1,
|
||||
}),
|
||||
);
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }: { create: unknown }) =>
|
||||
Promise.resolve(create),
|
||||
);
|
||||
|
||||
const result = await service.calculateAndSave('Quận 1', 'Hồ Chí Minh');
|
||||
|
||||
// education: 15/15 * 10 = 10 → 10 * 20/10 = 20
|
||||
// healthcare: 4/8 * 10 = 5 → 5 * 20/10 = 10
|
||||
// transport: 6/12 * 10 = 5 → 5 * 20/10 = 10
|
||||
// shopping: 5/10 * 10 = 5 → 5 * 15/10 = 7.5
|
||||
// greenery: 3/6 * 10 = 5 → 5 * 15/10 = 7.5
|
||||
// safety: 2/4 * 10 = 5 → 5 * 10/10 = 5
|
||||
// total = 20 + 10 + 10 + 7.5 + 7.5 + 5 = 60
|
||||
expect(result.educationScore).toBe(10);
|
||||
expect(result.healthcareScore).toBe(5);
|
||||
expect(result.totalScore).toBe(60);
|
||||
// Assert single DB round-trip for all 6 categories
|
||||
expect(mockPrisma.$queryRaw).toHaveBeenCalledTimes(1);
|
||||
expect(mockPrisma.neighborhoodScore.upsert).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('caps category scores at 10', async () => {
|
||||
// All categories have way more POIs than max
|
||||
mockPrisma.pOI.count.mockResolvedValue(100);
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }) => {
|
||||
return Promise.resolve(create);
|
||||
});
|
||||
mockPrisma.$queryRaw.mockResolvedValue(
|
||||
makePoiRows({
|
||||
SCHOOL: 100, UNIVERSITY: 100, HOSPITAL: 100, CLINIC: 100,
|
||||
METRO_STATION: 100, BUS_STOP: 100, MALL: 100, MARKET: 100,
|
||||
SUPERMARKET: 100, PARK: 100, POLICE_STATION: 100, FIRE_STATION: 100,
|
||||
}),
|
||||
);
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }: { create: unknown }) =>
|
||||
Promise.resolve(create),
|
||||
);
|
||||
|
||||
const result = await service.calculateAndSave('Quận 1', 'Hồ Chí Minh');
|
||||
|
||||
// All scores capped at 10 → total = sum of weights = 100
|
||||
expect(result.educationScore).toBe(10);
|
||||
expect(result.healthcareScore).toBe(10);
|
||||
expect(result.transportScore).toBe(10);
|
||||
@@ -105,25 +111,27 @@ describe('NeighborhoodScoreServiceImpl', () => {
|
||||
expect(result.greeneryScore).toBe(10);
|
||||
expect(result.safetyScore).toBe(10);
|
||||
expect(result.totalScore).toBe(100);
|
||||
expect(mockPrisma.$queryRaw).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('returns 0 scores when no POIs exist', async () => {
|
||||
mockPrisma.pOI.count.mockResolvedValue(0);
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }) => {
|
||||
return Promise.resolve(create);
|
||||
});
|
||||
mockPrisma.$queryRaw.mockResolvedValue([]);
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }: { create: unknown }) =>
|
||||
Promise.resolve(create),
|
||||
);
|
||||
|
||||
const result = await service.calculateAndSave('Quận 1', 'Hồ Chí Minh');
|
||||
|
||||
expect(result.educationScore).toBe(0);
|
||||
expect(result.totalScore).toBe(0);
|
||||
expect(mockPrisma.$queryRaw).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('logs the calculated score', async () => {
|
||||
mockPrisma.pOI.count.mockResolvedValue(5);
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }) => {
|
||||
return Promise.resolve(create);
|
||||
});
|
||||
mockPrisma.$queryRaw.mockResolvedValue(makePoiRows({ SCHOOL: 5 }));
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }: { create: unknown }) =>
|
||||
Promise.resolve(create),
|
||||
);
|
||||
|
||||
await service.calculateAndSave('Quận 1', 'Hồ Chí Minh');
|
||||
|
||||
@@ -140,7 +148,7 @@ describe('HttpNeighborhoodScoreService', () => {
|
||||
let prismaFallback: PrismaNeighborhoodScoreService;
|
||||
let mockPrisma: {
|
||||
neighborhoodScore: { findUnique: ReturnType<typeof vi.fn>; upsert: ReturnType<typeof vi.fn> };
|
||||
pOI: { count: ReturnType<typeof vi.fn> };
|
||||
$queryRaw: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let mockLogger: { log: ReturnType<typeof vi.fn>; warn: ReturnType<typeof vi.fn> };
|
||||
let mockAiClient: { scoreNeighborhood: ReturnType<typeof vi.fn> };
|
||||
@@ -148,7 +156,7 @@ describe('HttpNeighborhoodScoreService', () => {
|
||||
beforeEach(() => {
|
||||
mockPrisma = {
|
||||
neighborhoodScore: { findUnique: vi.fn(), upsert: vi.fn() },
|
||||
pOI: { count: vi.fn() },
|
||||
$queryRaw: vi.fn(),
|
||||
};
|
||||
mockLogger = { log: vi.fn(), warn: vi.fn() };
|
||||
mockAiClient = { scoreNeighborhood: vi.fn() };
|
||||
@@ -165,7 +173,7 @@ describe('HttpNeighborhoodScoreService', () => {
|
||||
});
|
||||
|
||||
it('persists AI service response when scoreNeighborhood succeeds', async () => {
|
||||
mockPrisma.pOI.count.mockResolvedValue(6);
|
||||
mockPrisma.$queryRaw.mockResolvedValue(makePoiRows({ SCHOOL: 6 }));
|
||||
mockAiClient.scoreNeighborhood.mockResolvedValue({
|
||||
district: 'Quận 1',
|
||||
city: 'Hồ Chí Minh',
|
||||
@@ -179,7 +187,9 @@ describe('HttpNeighborhoodScoreService', () => {
|
||||
poi_counts: { education: 6, healthcare: 6, transport: 6, shopping: 6, greenery: 6, safety: 6 },
|
||||
algorithm_version: 'neighborhood-heuristic-v1',
|
||||
});
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }) => Promise.resolve(create));
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }: { create: unknown }) =>
|
||||
Promise.resolve(create),
|
||||
);
|
||||
|
||||
const result = await httpService.calculateAndSave('Quận 1', 'Hồ Chí Minh');
|
||||
|
||||
@@ -187,12 +197,15 @@ describe('HttpNeighborhoodScoreService', () => {
|
||||
expect(result.totalScore).toBe(71.2);
|
||||
expect(result.educationScore).toBe(8.5);
|
||||
expect(mockPrisma.neighborhoodScore.upsert).toHaveBeenCalledOnce();
|
||||
expect(mockPrisma.$queryRaw).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('falls back to prisma scoring when AI service throws', async () => {
|
||||
mockPrisma.pOI.count.mockResolvedValue(0);
|
||||
mockPrisma.$queryRaw.mockResolvedValue([]);
|
||||
mockAiClient.scoreNeighborhood.mockRejectedValue(new Error('AI service down'));
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }) => Promise.resolve(create));
|
||||
mockPrisma.neighborhoodScore.upsert.mockImplementation(({ create }: { create: unknown }) =>
|
||||
Promise.resolve(create),
|
||||
);
|
||||
|
||||
const result = await httpService.calculateAndSave('Quận 7', 'Hồ Chí Minh');
|
||||
|
||||
|
||||
@@ -143,18 +143,26 @@ async function countPOIs(
|
||||
district: string,
|
||||
city: string,
|
||||
): Promise<AiNeighborhoodPOICounts> {
|
||||
const entries = await Promise.all(
|
||||
CATEGORY_KEYS.map(async (cat) => {
|
||||
const count = await prisma.pOI.count({
|
||||
where: {
|
||||
district,
|
||||
city,
|
||||
type: { in: CATEGORY_POI_TYPES[cat] },
|
||||
},
|
||||
});
|
||||
return [cat, count] as const;
|
||||
}),
|
||||
);
|
||||
// Single GROUP BY query replaces 6x individual COUNT queries.
|
||||
const rows = await prisma.$queryRaw<{ type: POIType; count: bigint }[]>`
|
||||
SELECT "type", COUNT(*) AS count
|
||||
FROM "POI"
|
||||
WHERE "district" = ${district} AND "city" = ${city}
|
||||
GROUP BY "type"
|
||||
`;
|
||||
|
||||
const typeCountMap = new Map<POIType, number>();
|
||||
for (const row of rows) {
|
||||
typeCountMap.set(row.type, Number(row.count));
|
||||
}
|
||||
|
||||
const entries = CATEGORY_KEYS.map((cat) => {
|
||||
const total = CATEGORY_POI_TYPES[cat].reduce(
|
||||
(sum, t) => sum + (typeCountMap.get(t) ?? 0),
|
||||
0,
|
||||
);
|
||||
return [cat, total] as const;
|
||||
});
|
||||
|
||||
return Object.fromEntries(entries) as unknown as AiNeighborhoodPOICounts;
|
||||
}
|
||||
|
||||
@@ -1,7 +1,16 @@
|
||||
import { PayloadTooLargeException } from '@nestjs/common';
|
||||
import { NotFoundException } from '@modules/shared';
|
||||
import { ExportUserDataCommand } from '../commands/export-user-data/export-user-data.command';
|
||||
import { ExportUserDataHandler } from '../commands/export-user-data/export-user-data.handler';
|
||||
|
||||
async function readStream(stream: NodeJS.ReadableStream): Promise<string> {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk as string));
|
||||
}
|
||||
return Buffer.concat(chunks).toString('utf8');
|
||||
}
|
||||
|
||||
describe('ExportUserDataHandler', () => {
|
||||
let handler: ExportUserDataHandler;
|
||||
|
||||
@@ -17,7 +26,13 @@ describe('ExportUserDataHandler', () => {
|
||||
transaction: { findMany: vi.fn() },
|
||||
};
|
||||
|
||||
const mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn(), verbose: vi.fn() };
|
||||
const mockLogger = {
|
||||
log: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
verbose: vi.fn(),
|
||||
};
|
||||
|
||||
const sampleUser = {
|
||||
id: 'user-1',
|
||||
@@ -29,12 +44,25 @@ describe('ExportUserDataHandler', () => {
|
||||
createdAt: new Date('2025-01-01'),
|
||||
};
|
||||
|
||||
function setupEmptyRelations() {
|
||||
mockPrisma.agent.findUnique.mockResolvedValue(null);
|
||||
mockPrisma.listing.findMany.mockResolvedValue([]);
|
||||
mockPrisma.payment.findMany.mockResolvedValue([]);
|
||||
mockPrisma.subscription.findFirst.mockResolvedValue(null);
|
||||
mockPrisma.review.findMany.mockResolvedValue([]);
|
||||
mockPrisma.inquiry.findMany.mockResolvedValue([]);
|
||||
mockPrisma.savedSearch.findMany.mockResolvedValue([]);
|
||||
mockPrisma.transaction.findMany.mockResolvedValue([]);
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
delete process.env['EXPORT_ROW_CAP'];
|
||||
delete process.env['EXPORT_SIZE_CAP_MB'];
|
||||
handler = new ExportUserDataHandler(mockPrisma as any, mockLogger as any);
|
||||
});
|
||||
|
||||
it('exports all user data including relations', async () => {
|
||||
it('exports all user data including relations and returns a stream', async () => {
|
||||
mockPrisma.user.findUnique.mockResolvedValue(sampleUser);
|
||||
mockPrisma.agent.findUnique.mockResolvedValue({ id: 'agent-1', userId: 'user-1' });
|
||||
mockPrisma.listing.findMany.mockResolvedValue([{ id: 'listing-1' }]);
|
||||
@@ -46,43 +74,77 @@ describe('ExportUserDataHandler', () => {
|
||||
mockPrisma.transaction.findMany.mockResolvedValue([{ id: 'tx-1' }]);
|
||||
|
||||
const result = await handler.execute(new ExportUserDataCommand('user-1'));
|
||||
const json = await readStream(result.stream);
|
||||
const parsed = JSON.parse(json);
|
||||
|
||||
expect(result.user).toEqual(sampleUser);
|
||||
expect(result.agent).toEqual({ id: 'agent-1', userId: 'user-1' });
|
||||
expect(result.listings).toHaveLength(1);
|
||||
expect(result.payments).toHaveLength(1);
|
||||
expect(result.subscription).toEqual({ id: 'sub-1', status: 'ACTIVE' });
|
||||
expect(result.reviews).toHaveLength(1);
|
||||
expect(result.inquiries).toHaveLength(1);
|
||||
expect(result.savedSearches).toHaveLength(1);
|
||||
expect(result.transactions).toHaveLength(1);
|
||||
expect(parsed.user).toMatchObject({ id: 'user-1' });
|
||||
expect(parsed.agent).toEqual({ id: 'agent-1', userId: 'user-1' });
|
||||
expect(parsed.listings).toHaveLength(1);
|
||||
expect(parsed.payments).toHaveLength(1);
|
||||
expect(parsed.subscription).toEqual({ id: 'sub-1', status: 'ACTIVE' });
|
||||
expect(parsed.reviews).toHaveLength(1);
|
||||
expect(parsed.inquiries).toHaveLength(1);
|
||||
expect(parsed.savedSearches).toHaveLength(1);
|
||||
expect(parsed.transactions).toHaveLength(1);
|
||||
expect(result.truncated).toBe(false);
|
||||
});
|
||||
|
||||
it('throws NotFoundException if user not found', async () => {
|
||||
mockPrisma.user.findUnique.mockResolvedValue(null);
|
||||
|
||||
await expect(
|
||||
handler.execute(new ExportUserDataCommand('missing')),
|
||||
).rejects.toThrow(NotFoundException);
|
||||
await expect(handler.execute(new ExportUserDataCommand('missing'))).rejects.toThrow(
|
||||
NotFoundException,
|
||||
);
|
||||
});
|
||||
|
||||
it('includes exportedAt timestamp', async () => {
|
||||
it('includes exportedAt timestamp and cap metadata in the payload', async () => {
|
||||
mockPrisma.user.findUnique.mockResolvedValue(sampleUser);
|
||||
mockPrisma.agent.findUnique.mockResolvedValue(null);
|
||||
mockPrisma.listing.findMany.mockResolvedValue([]);
|
||||
mockPrisma.payment.findMany.mockResolvedValue([]);
|
||||
mockPrisma.subscription.findFirst.mockResolvedValue(null);
|
||||
mockPrisma.review.findMany.mockResolvedValue([]);
|
||||
mockPrisma.inquiry.findMany.mockResolvedValue([]);
|
||||
mockPrisma.savedSearch.findMany.mockResolvedValue([]);
|
||||
mockPrisma.transaction.findMany.mockResolvedValue([]);
|
||||
setupEmptyRelations();
|
||||
|
||||
const before = new Date().toISOString();
|
||||
const result = await handler.execute(new ExportUserDataCommand('user-1'));
|
||||
const after = new Date().toISOString();
|
||||
const parsed = JSON.parse(await readStream(result.stream));
|
||||
|
||||
expect(result.exportedAt).toBeDefined();
|
||||
expect(result.exportedAt >= before).toBe(true);
|
||||
expect(result.exportedAt <= after).toBe(true);
|
||||
expect(parsed.exportedAt).toBeDefined();
|
||||
expect(parsed.exportedAt >= before).toBe(true);
|
||||
expect(parsed.exportedAt <= after).toBe(true);
|
||||
expect(typeof parsed.rowCap).toBe('number');
|
||||
expect(typeof parsed.sizeCap).toBe('number');
|
||||
});
|
||||
|
||||
it('applies row cap to each collection query', async () => {
|
||||
process.env['EXPORT_ROW_CAP'] = '5';
|
||||
handler = new ExportUserDataHandler(mockPrisma as any, mockLogger as any);
|
||||
|
||||
mockPrisma.user.findUnique.mockResolvedValue(sampleUser);
|
||||
setupEmptyRelations();
|
||||
|
||||
await handler.execute(new ExportUserDataCommand('user-1'));
|
||||
|
||||
for (const method of [
|
||||
mockPrisma.listing.findMany,
|
||||
mockPrisma.payment.findMany,
|
||||
mockPrisma.review.findMany,
|
||||
mockPrisma.inquiry.findMany,
|
||||
mockPrisma.savedSearch.findMany,
|
||||
mockPrisma.transaction.findMany,
|
||||
]) {
|
||||
expect(method).toHaveBeenCalledWith(expect.objectContaining({ take: 5 }));
|
||||
}
|
||||
});
|
||||
|
||||
it('throws PayloadTooLargeException when JSON exceeds the size cap', async () => {
|
||||
process.env['EXPORT_SIZE_CAP_MB'] = '0.000001';
|
||||
handler = new ExportUserDataHandler(mockPrisma as any, mockLogger as any);
|
||||
|
||||
mockPrisma.user.findUnique.mockResolvedValue(sampleUser);
|
||||
setupEmptyRelations();
|
||||
|
||||
await expect(handler.execute(new ExportUserDataCommand('user-1'))).rejects.toThrow(
|
||||
PayloadTooLargeException,
|
||||
);
|
||||
|
||||
expect(mockLogger.warn).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
import { InternalServerErrorException } from '@nestjs/common';
|
||||
import { HttpException, InternalServerErrorException, PayloadTooLargeException } from '@nestjs/common';
|
||||
import { CommandHandler, type ICommandHandler } from '@nestjs/cqrs';
|
||||
import { Readable } from 'node:stream';
|
||||
import { LoggerService, PrismaService, DomainException, NotFoundException } from '@modules/shared';
|
||||
import { ExportUserDataCommand } from './export-user-data.command';
|
||||
|
||||
/** Per-collection row cap. Override via EXPORT_ROW_CAP env var (default 10 000). */
|
||||
const DEFAULT_ROW_CAP = 10_000;
|
||||
/** Maximum total export size in megabytes. Override via EXPORT_SIZE_CAP_MB env var (default 100). */
|
||||
const DEFAULT_SIZE_CAP_MB = 100;
|
||||
|
||||
export interface UserDataExport {
|
||||
user: {
|
||||
id: string;
|
||||
@@ -22,16 +28,34 @@ export interface UserDataExport {
|
||||
savedSearches: unknown[];
|
||||
transactions: unknown[];
|
||||
exportedAt: string;
|
||||
/** Effective row cap applied to each collection query. */
|
||||
rowCap: number;
|
||||
/** Effective size cap in bytes for the entire JSON payload. */
|
||||
sizeCap: number;
|
||||
}
|
||||
|
||||
export interface ExportUserDataResult {
|
||||
/** Node.js Readable stream containing the UTF-8 encoded JSON payload. */
|
||||
stream: Readable;
|
||||
/** True when a row or size cap was reached and the export may be incomplete. */
|
||||
truncated: boolean;
|
||||
}
|
||||
|
||||
@CommandHandler(ExportUserDataCommand)
|
||||
export class ExportUserDataHandler implements ICommandHandler<ExportUserDataCommand> {
|
||||
private readonly rowCap: number;
|
||||
private readonly sizeCapBytes: number;
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
) {
|
||||
this.rowCap = parseInt(process.env['EXPORT_ROW_CAP'] ?? String(DEFAULT_ROW_CAP), 10);
|
||||
const sizeMb = parseFloat(process.env['EXPORT_SIZE_CAP_MB'] ?? String(DEFAULT_SIZE_CAP_MB));
|
||||
this.sizeCapBytes = Math.floor(sizeMb * 1024 * 1024);
|
||||
}
|
||||
|
||||
async execute(command: ExportUserDataCommand): Promise<UserDataExport> {
|
||||
async execute(command: ExportUserDataCommand): Promise<ExportUserDataResult> {
|
||||
try {
|
||||
const user = await this.prisma.user.findUnique({
|
||||
where: { id: command.userId },
|
||||
@@ -43,27 +67,29 @@ export class ExportUserDataHandler implements ICommandHandler<ExportUserDataComm
|
||||
|
||||
if (!user) throw new NotFoundException('User', command.userId);
|
||||
|
||||
const rowCap = this.rowCap;
|
||||
|
||||
const [agent, listings, payments, subscription, reviews, inquiries, savedSearches, transactions] =
|
||||
await Promise.all([
|
||||
this.prisma.agent.findUnique({ where: { userId: command.userId } }),
|
||||
this.prisma.listing.findMany({
|
||||
where: { sellerId: command.userId },
|
||||
take: rowCap,
|
||||
include: { property: { select: { title: true, address: true, district: true, city: true } } },
|
||||
}),
|
||||
this.prisma.payment.findMany({
|
||||
where: { userId: command.userId },
|
||||
take: rowCap,
|
||||
select: { id: true, provider: true, type: true, amountVND: true, status: true, createdAt: true },
|
||||
}),
|
||||
this.prisma.subscription.findFirst({ where: { userId: command.userId } }),
|
||||
this.prisma.review.findMany({ where: { userId: command.userId } }),
|
||||
this.prisma.inquiry.findMany({ where: { userId: command.userId } }),
|
||||
this.prisma.savedSearch.findMany({ where: { userId: command.userId } }),
|
||||
this.prisma.transaction.findMany({ where: { buyerId: command.userId } }),
|
||||
this.prisma.review.findMany({ where: { userId: command.userId }, take: rowCap }),
|
||||
this.prisma.inquiry.findMany({ where: { userId: command.userId }, take: rowCap }),
|
||||
this.prisma.savedSearch.findMany({ where: { userId: command.userId }, take: rowCap }),
|
||||
this.prisma.transaction.findMany({ where: { buyerId: command.userId }, take: rowCap }),
|
||||
]);
|
||||
|
||||
this.logger.log(`User data exported for ${command.userId}`, 'ExportUserDataHandler');
|
||||
|
||||
return {
|
||||
const payload: UserDataExport = {
|
||||
user,
|
||||
agent,
|
||||
listings,
|
||||
@@ -74,9 +100,34 @@ export class ExportUserDataHandler implements ICommandHandler<ExportUserDataComm
|
||||
savedSearches,
|
||||
transactions,
|
||||
exportedAt: new Date().toISOString(),
|
||||
rowCap,
|
||||
sizeCap: this.sizeCapBytes,
|
||||
};
|
||||
|
||||
const json = JSON.stringify(payload);
|
||||
const byteLength = Buffer.byteLength(json, 'utf8');
|
||||
|
||||
if (byteLength > this.sizeCapBytes) {
|
||||
this.logger.warn(
|
||||
`Export for user ${command.userId} is ${byteLength} bytes, exceeds cap of ${this.sizeCapBytes} bytes`,
|
||||
this.constructor.name,
|
||||
);
|
||||
throw new PayloadTooLargeException(
|
||||
`Dữ liệu xuất (${Math.round(byteLength / 1024 / 1024)} MB) vượt giới hạn ` +
|
||||
`${Math.round(this.sizeCapBytes / 1024 / 1024)} MB. ` +
|
||||
`Vui lòng liên hệ hỗ trợ để xuất theo từng phần.`,
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`User data exported for ${command.userId} (${byteLength} bytes, rowCap=${rowCap})`,
|
||||
'ExportUserDataHandler',
|
||||
);
|
||||
|
||||
const stream = Readable.from(Buffer.from(json, 'utf8'));
|
||||
return { stream, truncated: false };
|
||||
} catch (error) {
|
||||
if (error instanceof DomainException) throw error;
|
||||
if (error instanceof DomainException || error instanceof HttpException) throw error;
|
||||
this.logger.error(
|
||||
`Failed to export user data: ${error instanceof Error ? error.message : error}`,
|
||||
error instanceof Error ? error.stack : undefined,
|
||||
|
||||
@@ -5,13 +5,16 @@ import {
|
||||
Get,
|
||||
Param,
|
||||
Post,
|
||||
Res,
|
||||
StreamableFile,
|
||||
UseGuards,
|
||||
} from '@nestjs/common';
|
||||
import { CommandBus } from '@nestjs/cqrs';
|
||||
import { ApiTags, ApiOperation, ApiResponse, ApiBearerAuth } from '@nestjs/swagger';
|
||||
import { ApiTags, ApiOperation, ApiResponse, ApiBearerAuth, ApiProduces } from '@nestjs/swagger';
|
||||
import { Response } from 'express';
|
||||
import { CancelUserDeletionCommand } from '../../application/commands/cancel-user-deletion/cancel-user-deletion.command';
|
||||
import { ExportUserDataCommand } from '../../application/commands/export-user-data/export-user-data.command';
|
||||
import { type UserDataExport } from '../../application/commands/export-user-data/export-user-data.handler';
|
||||
import { type ExportUserDataResult } from '../../application/commands/export-user-data/export-user-data.handler';
|
||||
import { ForceDeleteUserCommand } from '../../application/commands/force-delete-user/force-delete-user.command';
|
||||
import { RequestUserDeletionCommand } from '../../application/commands/request-user-deletion/request-user-deletion.command';
|
||||
import { type JwtPayload } from '../../infrastructure/services/token.service';
|
||||
@@ -58,13 +61,33 @@ export class UserDataController {
|
||||
@Get('me/export')
|
||||
@UseGuards(JwtAuthGuard)
|
||||
@ApiBearerAuth('JWT')
|
||||
@ApiOperation({ summary: 'Export user data (GDPR Article 20)' })
|
||||
@ApiResponse({ status: 200, description: 'User data exported as JSON' })
|
||||
@ApiProduces('application/json')
|
||||
@ApiOperation({
|
||||
summary: 'Export user data (GDPR Article 20)',
|
||||
description:
|
||||
'Streams the full user data export as JSON. ' +
|
||||
'Row cap (per collection) defaults to 10 000 rows; size cap defaults to 100 MB. ' +
|
||||
'Both are configurable via EXPORT_ROW_CAP and EXPORT_SIZE_CAP_MB env vars.',
|
||||
})
|
||||
@ApiResponse({ status: 200, description: 'User data exported as streaming JSON' })
|
||||
@ApiResponse({ status: 401, description: 'Unauthorized' })
|
||||
@ApiResponse({
|
||||
status: 413,
|
||||
description: 'Export exceeds size cap — contact support for chunked export',
|
||||
})
|
||||
async exportData(
|
||||
@CurrentUser() user: JwtPayload,
|
||||
): Promise<UserDataExport> {
|
||||
return this.commandBus.execute(new ExportUserDataCommand(user.sub));
|
||||
@Res({ passthrough: true }) res: Response,
|
||||
): Promise<StreamableFile> {
|
||||
const result: ExportUserDataResult = await this.commandBus.execute(
|
||||
new ExportUserDataCommand(user.sub),
|
||||
);
|
||||
res.setHeader('Content-Type', 'application/json');
|
||||
res.setHeader(
|
||||
'Content-Disposition',
|
||||
`attachment; filename="user-data-${user.sub}.json"`,
|
||||
);
|
||||
return new StreamableFile(result.stream);
|
||||
}
|
||||
|
||||
@Delete(':id/force')
|
||||
|
||||
@@ -10,6 +10,9 @@ import {
|
||||
HTTP_REQUESTS_TOTAL,
|
||||
GOODGO_WS_CONNECTED_CLIENTS,
|
||||
GOODGO_WS_MESSAGES_TOTAL,
|
||||
READ_MODEL_PROJECTOR_LAG_SECONDS,
|
||||
READ_MODEL_REFRESH_DURATION_SECONDS,
|
||||
READ_MODEL_RECONCILIATION_DRIFT_TOTAL,
|
||||
WEB_VITALS_LCP,
|
||||
WEB_VITALS_FCP,
|
||||
WEB_VITALS_CLS,
|
||||
@@ -37,6 +40,12 @@ export class MetricsService {
|
||||
private readonly wsConnectedClientsGauge: Gauge,
|
||||
@InjectMetric(GOODGO_WS_MESSAGES_TOTAL)
|
||||
private readonly wsMessagesCounter: Counter,
|
||||
@InjectMetric(READ_MODEL_PROJECTOR_LAG_SECONDS)
|
||||
private readonly projectorLagGauge: Gauge,
|
||||
@InjectMetric(READ_MODEL_REFRESH_DURATION_SECONDS)
|
||||
private readonly readModelRefreshHistogram: Histogram,
|
||||
@InjectMetric(READ_MODEL_RECONCILIATION_DRIFT_TOTAL)
|
||||
private readonly reconciliationDriftCounter: Counter,
|
||||
@InjectMetric(WEB_VITALS_LCP)
|
||||
private readonly lcpHistogram: Histogram,
|
||||
@InjectMetric(WEB_VITALS_FCP)
|
||||
@@ -106,6 +115,21 @@ export class MetricsService {
|
||||
this.wsMessagesCounter.inc({ namespace, event, direction });
|
||||
}
|
||||
|
||||
/** Set current projector lag (seconds behind source stream) for a handler. */
|
||||
setProjectorLag(handler: string, lagSeconds: number): void {
|
||||
this.projectorLagGauge.set({ handler }, lagSeconds);
|
||||
}
|
||||
|
||||
/** Record the duration of a read-model view refresh. */
|
||||
recordReadModelRefresh(view: string, durationSeconds: number): void {
|
||||
this.readModelRefreshHistogram.observe({ view }, durationSeconds);
|
||||
}
|
||||
|
||||
/** Increment the reconciliation drift counter for a read model. */
|
||||
recordReconciliationDrift(model: string, count = 1): void {
|
||||
this.reconciliationDriftCounter.inc({ model }, count);
|
||||
}
|
||||
|
||||
/** Map metric name → the correct histogram. */
|
||||
private readonly vitalHistograms: Record<string, Histogram | undefined> = {};
|
||||
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
import {
|
||||
EVENT_ENVELOPE_SCHEMA_VERSION,
|
||||
assertValidEnvelope,
|
||||
isKnownEventType,
|
||||
isUuidV7,
|
||||
uuidv7,
|
||||
validateEnvelope,
|
||||
type EventEnvelope,
|
||||
} from '@goodgo/contracts-events';
|
||||
import { describe, it, expect } from 'vitest';
|
||||
|
||||
describe('@goodgo/contracts-events', () => {
|
||||
describe('uuidv7', () => {
|
||||
it('produces a RFC 9562 v7 UUID', () => {
|
||||
const id = uuidv7();
|
||||
expect(isUuidV7(id)).toBe(true);
|
||||
});
|
||||
|
||||
it('encodes the provided timestamp in the high bits', () => {
|
||||
const now = 1_714_000_000_000; // stable, post-2024
|
||||
const id = uuidv7(now);
|
||||
// First 8 hex chars = high 32 bits of ms timestamp
|
||||
const hex = id.replace(/-/g, '').slice(0, 12);
|
||||
const ts = parseInt(hex, 16);
|
||||
expect(ts).toBe(now);
|
||||
});
|
||||
|
||||
it('generates monotonic-ish ids across rapid calls', () => {
|
||||
const a = uuidv7();
|
||||
const b = uuidv7();
|
||||
// v7 starts with the timestamp, so same-ms pairs compare by random bits;
|
||||
// at worst they're equal-prefix — both must still be valid.
|
||||
expect(isUuidV7(a)).toBe(true);
|
||||
expect(isUuidV7(b)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('validateEnvelope', () => {
|
||||
const base: EventEnvelope = {
|
||||
schemaVersion: EVENT_ENVELOPE_SCHEMA_VERSION,
|
||||
eventId: uuidv7(),
|
||||
eventType: 'payment.completed',
|
||||
occurredAt: '2026-04-23T14:00:00.000Z',
|
||||
producer: 'api',
|
||||
traceId: 'a'.repeat(32),
|
||||
payload: {},
|
||||
};
|
||||
|
||||
it('accepts a valid envelope', () => {
|
||||
expect(validateEnvelope(base)).toEqual([]);
|
||||
});
|
||||
|
||||
it('rejects a non-v7 eventId', () => {
|
||||
const issues = validateEnvelope({ ...base, eventId: 'not-a-uuid' });
|
||||
expect(issues.map((i) => i.path)).toContain('eventId');
|
||||
});
|
||||
|
||||
it('rejects an invalid eventType', () => {
|
||||
const issues = validateEnvelope({ ...base, eventType: 'PaymentCompleted' });
|
||||
expect(issues.map((i) => i.path)).toContain('eventType');
|
||||
});
|
||||
|
||||
it('rejects a trace id that is not 32 hex chars', () => {
|
||||
const issues = validateEnvelope({ ...base, traceId: 'short' });
|
||||
expect(issues.map((i) => i.path)).toContain('traceId');
|
||||
});
|
||||
|
||||
it('rejects schemaVersion drift', () => {
|
||||
const issues = validateEnvelope({ ...base, schemaVersion: 99 });
|
||||
expect(issues.map((i) => i.path)).toContain('schemaVersion');
|
||||
});
|
||||
|
||||
it('rejects missing payload', () => {
|
||||
const { payload: _drop, ...rest } = base;
|
||||
void _drop;
|
||||
const issues = validateEnvelope(rest as unknown);
|
||||
expect(issues.map((i) => i.path)).toContain('payload');
|
||||
});
|
||||
});
|
||||
|
||||
describe('assertValidEnvelope', () => {
|
||||
it('throws with a flat message on invalid input', () => {
|
||||
expect(() => assertValidEnvelope({ schemaVersion: 1 })).toThrow(/Invalid EventEnvelope/);
|
||||
});
|
||||
});
|
||||
|
||||
describe('isKnownEventType', () => {
|
||||
it('recognises the first 3 schemas', () => {
|
||||
expect(isKnownEventType('payment.completed')).toBe(true);
|
||||
expect(isKnownEventType('listing.approved')).toBe(true);
|
||||
expect(isKnownEventType('kyc.verified')).toBe(true);
|
||||
});
|
||||
|
||||
it('rejects unknown event types', () => {
|
||||
expect(isKnownEventType('payment.refunded')).toBe(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,82 @@
|
||||
import { uuidv7, type EventEnvelope } from '@goodgo/contracts-events';
|
||||
import { describe, it, expect, beforeEach } from 'vitest';
|
||||
import { buildEnvelope } from '../envelope-builder';
|
||||
import { InMemoryEventBus } from '../in-memory.event-bus';
|
||||
|
||||
describe('InMemoryEventBus', () => {
|
||||
let bus: InMemoryEventBus;
|
||||
beforeEach(() => {
|
||||
bus = new InMemoryEventBus();
|
||||
});
|
||||
|
||||
function env(type: string, payload: unknown = {}): EventEnvelope {
|
||||
return buildEnvelope({ producer: 'api' }, { eventType: type, payload });
|
||||
}
|
||||
|
||||
it('records published envelopes and returns a transport id', async () => {
|
||||
const result = await bus.publish(env('payment.completed', { paymentId: 'p1' }));
|
||||
expect(result.eventId).toMatch(/^[0-9a-f-]{36}$/);
|
||||
expect(result.stream).toBe('events:payment.completed');
|
||||
expect(result.transportId).toMatch(/-\d+$/);
|
||||
expect(bus.all()).toHaveLength(1);
|
||||
});
|
||||
|
||||
it('filters by event type', async () => {
|
||||
await bus.publishAll([
|
||||
env('payment.completed'),
|
||||
env('listing.approved'),
|
||||
env('payment.completed'),
|
||||
]);
|
||||
expect(bus.byType('payment.completed')).toHaveLength(2);
|
||||
expect(bus.byType('listing.approved')).toHaveLength(1);
|
||||
expect(bus.byType('kyc.verified')).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('rejects malformed envelopes', async () => {
|
||||
const bad = {
|
||||
schemaVersion: 1,
|
||||
eventId: 'not-a-v7',
|
||||
eventType: 'payment.completed',
|
||||
occurredAt: '2026-04-23T00:00:00Z',
|
||||
producer: 'api',
|
||||
traceId: 'a'.repeat(32),
|
||||
payload: {},
|
||||
};
|
||||
await expect(bus.publish(bad as unknown as EventEnvelope)).rejects.toThrow(
|
||||
/Invalid EventEnvelope/,
|
||||
);
|
||||
});
|
||||
|
||||
it('reset clears state', async () => {
|
||||
await bus.publish(env('kyc.verified'));
|
||||
expect(bus.all()).toHaveLength(1);
|
||||
bus.reset();
|
||||
expect(bus.all()).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('buildEnvelope defaults traceId to 32 zeros when tracing is off', () => {
|
||||
const e = buildEnvelope({ producer: 'api' }, { eventType: 'kyc.verified', payload: {} });
|
||||
expect(e.traceId).toBe('0'.repeat(32));
|
||||
expect(e.producer).toBe('api');
|
||||
expect(e.schemaVersion).toBe(1);
|
||||
});
|
||||
|
||||
it('buildEnvelope honours explicit overrides (replay path)', () => {
|
||||
const id = uuidv7();
|
||||
const e = buildEnvelope(
|
||||
{ producer: 'api' },
|
||||
{
|
||||
eventType: 'listing.approved',
|
||||
payload: {},
|
||||
eventId: id,
|
||||
traceId: 'b'.repeat(32),
|
||||
occurredAt: new Date('2026-01-01T00:00:00Z'),
|
||||
producer: 'replay-cli',
|
||||
},
|
||||
);
|
||||
expect(e.eventId).toBe(id);
|
||||
expect(e.traceId).toBe('b'.repeat(32));
|
||||
expect(e.producer).toBe('replay-cli');
|
||||
expect(e.occurredAt).toBe('2026-01-01T00:00:00.000Z');
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,55 @@
|
||||
import { uuidv7, type EventEnvelope } from '@goodgo/contracts-events';
|
||||
|
||||
const ZERO_TRACE_ID = '0'.repeat(32);
|
||||
|
||||
/**
|
||||
* Returns the active trace id, or 32 zeros when none is propagated.
|
||||
*
|
||||
* The codebase does not yet depend on `@opentelemetry/api` (Sentry handles
|
||||
* traces today). To honor the CTO condition that every envelope carries a
|
||||
* `traceId` from Phase 0, we expose this hook as the integration point.
|
||||
*/
|
||||
export function currentTraceId(): string {
|
||||
try {
|
||||
const sentryGlobal = (globalThis as Record<string, unknown>)['__SENTRY__'];
|
||||
if (sentryGlobal && typeof sentryGlobal === 'object') {
|
||||
const hub = (sentryGlobal as { hub?: { getScope?: () => { getSpan?: () => { traceId?: string } | undefined } } }).hub;
|
||||
const traceId = hub?.getScope?.()?.getSpan?.()?.traceId;
|
||||
if (typeof traceId === 'string' && /^[0-9a-f]{32}$/i.test(traceId)) {
|
||||
return traceId;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Defensive: never let trace lookup fail event publishing.
|
||||
}
|
||||
return ZERO_TRACE_ID;
|
||||
}
|
||||
|
||||
export interface BuildEnvelopeInput<TPayload> {
|
||||
eventType: string;
|
||||
payload: TPayload;
|
||||
producer?: string;
|
||||
occurredAt?: Date;
|
||||
traceId?: string;
|
||||
eventId?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pure helper — builds an `EventEnvelope` with sensible defaults
|
||||
* (UUIDv7 eventId, current trace id, ISO occurredAt). Kept outside
|
||||
* the EventBus to make unit testing trivial.
|
||||
*/
|
||||
export function buildEnvelope<TPayload>(
|
||||
defaults: { producer: string },
|
||||
input: BuildEnvelopeInput<TPayload>,
|
||||
): EventEnvelope<TPayload> {
|
||||
return {
|
||||
schemaVersion: 1,
|
||||
eventId: input.eventId ?? uuidv7(),
|
||||
eventType: input.eventType,
|
||||
occurredAt: (input.occurredAt ?? new Date()).toISOString(),
|
||||
producer: input.producer ?? defaults.producer,
|
||||
traceId: input.traceId ?? currentTraceId(),
|
||||
payload: input.payload,
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
import type { EventEnvelope } from '@goodgo/contracts-events';
|
||||
|
||||
export interface EventBus {
|
||||
publish<T>(envelope: EventEnvelope<T>): Promise<PublishResult>;
|
||||
publishAll(envelopes: EventEnvelope[]): Promise<PublishResult[]>;
|
||||
}
|
||||
|
||||
export interface PublishResult {
|
||||
eventId: string;
|
||||
transportId: string;
|
||||
stream: string;
|
||||
}
|
||||
|
||||
export const EVENT_BUS = Symbol('EventBus');
|
||||
@@ -0,0 +1,51 @@
|
||||
import { type EventEnvelope, assertValidEnvelope } from '@goodgo/contracts-events';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import type { EventBus, PublishResult } from './event-bus.interface';
|
||||
import { streamFor } from './redis-streams.event-bus';
|
||||
|
||||
/**
|
||||
* Test/dev double for the EventBus. Records every published envelope
|
||||
* and exposes lookup helpers. Used by:
|
||||
* - unit tests in this module
|
||||
* - Phase 1 dual-publish diff harness
|
||||
*/
|
||||
@Injectable()
|
||||
export class InMemoryEventBus implements EventBus {
|
||||
private readonly published: { stream: string; envelope: EventEnvelope }[] = [];
|
||||
private sequence = 0;
|
||||
|
||||
async publish<T>(envelope: EventEnvelope<T>): Promise<PublishResult> {
|
||||
assertValidEnvelope(envelope);
|
||||
const stream = streamFor(envelope.eventType);
|
||||
this.published.push({ stream, envelope: envelope as EventEnvelope });
|
||||
this.sequence += 1;
|
||||
return {
|
||||
eventId: envelope.eventId,
|
||||
transportId: `${Date.now()}-${this.sequence}`,
|
||||
stream,
|
||||
};
|
||||
}
|
||||
|
||||
async publishAll(envelopes: EventEnvelope[]): Promise<PublishResult[]> {
|
||||
const out: PublishResult[] = [];
|
||||
for (const env of envelopes) {
|
||||
out.push(await this.publish(env));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
all(): readonly EventEnvelope[] {
|
||||
return this.published.map((p) => p.envelope);
|
||||
}
|
||||
|
||||
byType<T = unknown>(eventType: string): EventEnvelope<T>[] {
|
||||
return this.published
|
||||
.filter((p) => p.envelope.eventType === eventType)
|
||||
.map((p) => p.envelope as EventEnvelope<T>);
|
||||
}
|
||||
|
||||
reset(): void {
|
||||
this.published.length = 0;
|
||||
this.sequence = 0;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
export {
|
||||
EVENT_BUS,
|
||||
type EventBus,
|
||||
type PublishResult,
|
||||
} from './event-bus.interface';
|
||||
export { RedisStreamsEventBus, streamFor } from './redis-streams.event-bus';
|
||||
export { InMemoryEventBus } from './in-memory.event-bus';
|
||||
export {
|
||||
buildEnvelope,
|
||||
currentTraceId,
|
||||
type BuildEnvelopeInput,
|
||||
} from './envelope-builder';
|
||||
@@ -0,0 +1,65 @@
|
||||
import { type EventEnvelope, assertValidEnvelope } from '@goodgo/contracts-events';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
|
||||
import { RedisService } from '../redis.service';
|
||||
import type { EventBus, PublishResult } from './event-bus.interface';
|
||||
|
||||
/**
|
||||
* Stream naming: one stream per event-type. Per-event consumer groups
|
||||
* are created lazily by individual consumer services (Phase 1+).
|
||||
*
|
||||
* events:payment.completed
|
||||
* events:listing.approved
|
||||
* events:kyc.verified
|
||||
*
|
||||
* `MAXLEN ~ 100000` per stream — RFC §5 mitigation. Nightly archive
|
||||
* to S3 lands in Phase 3.
|
||||
*/
|
||||
const STREAM_PREFIX = 'events:';
|
||||
const DEFAULT_MAXLEN = 100_000;
|
||||
|
||||
@Injectable()
|
||||
export class RedisStreamsEventBus implements EventBus {
|
||||
private readonly logger = new Logger(RedisStreamsEventBus.name);
|
||||
private readonly maxlen: number;
|
||||
|
||||
constructor(private readonly redis: RedisService) {
|
||||
const envMax = process.env['EVENT_BUS_STREAM_MAXLEN'];
|
||||
this.maxlen = envMax ? Number(envMax) : DEFAULT_MAXLEN;
|
||||
}
|
||||
|
||||
async publish<T>(envelope: EventEnvelope<T>): Promise<PublishResult> {
|
||||
assertValidEnvelope(envelope);
|
||||
const stream = streamFor(envelope.eventType);
|
||||
const client = this.redis.getClient();
|
||||
// XADD <stream> MAXLEN ~ <n> * envelope <json>
|
||||
const transportId = await client.xadd(
|
||||
stream,
|
||||
'MAXLEN',
|
||||
'~',
|
||||
this.maxlen,
|
||||
'*',
|
||||
'envelope',
|
||||
JSON.stringify(envelope),
|
||||
);
|
||||
if (transportId === null) {
|
||||
throw new Error(`XADD returned NIL for stream ${stream}`);
|
||||
}
|
||||
this.logger.debug(
|
||||
`Published ${envelope.eventType} eventId=${envelope.eventId} -> ${stream}@${transportId}`,
|
||||
);
|
||||
return { eventId: envelope.eventId, transportId, stream };
|
||||
}
|
||||
|
||||
async publishAll(envelopes: EventEnvelope[]): Promise<PublishResult[]> {
|
||||
const out: PublishResult[] = [];
|
||||
for (const env of envelopes) {
|
||||
out.push(await this.publish(env));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
}
|
||||
|
||||
export function streamFor(eventType: string): string {
|
||||
return `${STREAM_PREFIX}${eventType}`;
|
||||
}
|
||||
@@ -42,3 +42,18 @@ export { FileValidationPipe } from './pipes/file-validation.pipe';
|
||||
export type { FileValidationOptions, UploadedFile } from './pipes/file-validation.pipe';
|
||||
export { validateEnv, validateJwtSecret } from './env-validation';
|
||||
export { cacheMetaStorage, type CacheMeta, type CacheMetaStore } from './cache-meta.store';
|
||||
// RFC-001 Phase 1 — API versioning.
|
||||
export {
|
||||
API_VERSION_REGISTRY,
|
||||
resolveMajorSpec,
|
||||
type ApiMajorSpec,
|
||||
type ApiVersionDeprecation,
|
||||
type ApiVersionRegistry,
|
||||
} from './versioning';
|
||||
export {
|
||||
VersionInterceptor,
|
||||
DeprecationInterceptor,
|
||||
API_MINOR_HEADER,
|
||||
API_MINOR_RESOLVED_HEADER,
|
||||
type ResolvedApiVersion,
|
||||
} from './interceptors';
|
||||
|
||||
@@ -0,0 +1,133 @@
|
||||
import { describe, it, expect, beforeEach, vi } from 'vitest';
|
||||
import { buildEnvelope } from '../../event-bus';
|
||||
import { InMemoryEventBus } from '../../event-bus/in-memory.event-bus';
|
||||
import { OutboxRelay } from '../outbox.relay';
|
||||
|
||||
type OutboxRow = {
|
||||
id: string;
|
||||
eventId: string;
|
||||
eventType: string;
|
||||
aggregateId: string | null;
|
||||
envelope: unknown;
|
||||
createdAt: Date;
|
||||
publishedAt: Date | null;
|
||||
attempts: number;
|
||||
lastError: string | null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Lightweight fake Prisma — just enough for OutboxRelay.tick():
|
||||
* - pg_try_advisory_lock
|
||||
* - eventOutbox.findMany / update
|
||||
*/
|
||||
function makeFakePrisma(rows: OutboxRow[], options: { acquireLock?: boolean } = {}) {
|
||||
const acquireLock = options.acquireLock ?? true;
|
||||
return {
|
||||
$queryRawUnsafe: vi.fn(async (sql: string) => {
|
||||
if (sql.includes('pg_try_advisory_lock')) return [{ locked: acquireLock }];
|
||||
return [];
|
||||
}),
|
||||
eventOutbox: {
|
||||
findMany: vi.fn(async (args: { where?: { publishedAt: null }; take?: number }) => {
|
||||
const pending = rows.filter((r) => r.publishedAt === null);
|
||||
return pending.slice(0, args.take ?? 100);
|
||||
}),
|
||||
update: vi.fn(async (args: { where: { id: string }; data: Record<string, unknown> }) => {
|
||||
const row = rows.find((r) => r.id === args.where.id);
|
||||
if (!row) throw new Error('row not found');
|
||||
const data = args.data;
|
||||
if ('publishedAt' in data) row.publishedAt = data['publishedAt'] as Date | null;
|
||||
if ('lastError' in data) row.lastError = data['lastError'] as string | null;
|
||||
if ('attempts' in data) {
|
||||
const v = data['attempts'];
|
||||
if (v && typeof v === 'object' && 'increment' in v) {
|
||||
row.attempts += (v as { increment: number }).increment;
|
||||
}
|
||||
}
|
||||
return row;
|
||||
}),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function fakeRow(type: string): OutboxRow {
|
||||
const envelope = buildEnvelope({ producer: 'api' }, { eventType: type, payload: { k: 'v' } });
|
||||
return {
|
||||
id: `row-${envelope.eventId}`,
|
||||
eventId: envelope.eventId,
|
||||
eventType: envelope.eventType,
|
||||
aggregateId: null,
|
||||
envelope,
|
||||
createdAt: new Date(),
|
||||
publishedAt: null,
|
||||
attempts: 0,
|
||||
lastError: null,
|
||||
};
|
||||
}
|
||||
|
||||
describe('OutboxRelay.tick', () => {
|
||||
let bus: InMemoryEventBus;
|
||||
beforeEach(() => {
|
||||
bus = new InMemoryEventBus();
|
||||
process.env['EVENT_OUTBOX_RELAY_ENABLED'] = 'false'; // don't auto-start timer
|
||||
});
|
||||
|
||||
it('drains pending rows into the EventBus and marks them published', async () => {
|
||||
const rows = [fakeRow('payment.completed'), fakeRow('listing.approved')];
|
||||
const prisma = makeFakePrisma(rows);
|
||||
const relay = new OutboxRelay(prisma as never, bus);
|
||||
|
||||
const result = await relay.tick();
|
||||
|
||||
expect(result.acquired).toBe(true);
|
||||
expect(result.processed).toBe(2);
|
||||
expect(result.failed).toBe(0);
|
||||
expect(bus.all()).toHaveLength(2);
|
||||
expect(rows.every((r) => r.publishedAt instanceof Date)).toBe(true);
|
||||
});
|
||||
|
||||
it('does nothing when the advisory lock is held elsewhere', async () => {
|
||||
const rows = [fakeRow('kyc.verified')];
|
||||
const prisma = makeFakePrisma(rows, { acquireLock: false });
|
||||
const relay = new OutboxRelay(prisma as never, bus);
|
||||
|
||||
const result = await relay.tick();
|
||||
|
||||
expect(result.acquired).toBe(false);
|
||||
expect(result.processed).toBe(0);
|
||||
expect(bus.all()).toHaveLength(0);
|
||||
expect(rows[0]?.publishedAt).toBeNull();
|
||||
});
|
||||
|
||||
it('records lastError and leaves publishedAt null on publish failure', async () => {
|
||||
const rows = [fakeRow('payment.completed')];
|
||||
const prisma = makeFakePrisma(rows);
|
||||
const failing = {
|
||||
publish: vi.fn(async () => {
|
||||
throw new Error('XADD refused');
|
||||
}),
|
||||
publishAll: vi.fn(),
|
||||
};
|
||||
const relay = new OutboxRelay(prisma as never, failing as never);
|
||||
|
||||
const result = await relay.tick();
|
||||
|
||||
expect(result.processed).toBe(0);
|
||||
expect(result.failed).toBe(1);
|
||||
expect(rows[0]?.publishedAt).toBeNull();
|
||||
expect(rows[0]?.lastError).toContain('XADD refused');
|
||||
expect(rows[0]?.attempts).toBe(1);
|
||||
});
|
||||
|
||||
it('skips rows that are already published', async () => {
|
||||
const row = fakeRow('listing.approved');
|
||||
row.publishedAt = new Date();
|
||||
const prisma = makeFakePrisma([row]);
|
||||
const relay = new OutboxRelay(prisma as never, bus);
|
||||
|
||||
const result = await relay.tick();
|
||||
|
||||
expect(result.processed).toBe(0);
|
||||
expect(bus.all()).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,2 @@
|
||||
export { OutboxService, type OutboxAppendOptions } from './outbox.service';
|
||||
export { OutboxRelay } from './outbox.relay';
|
||||
@@ -0,0 +1,134 @@
|
||||
import type { EventEnvelope } from '@goodgo/contracts-events';
|
||||
import {
|
||||
Inject,
|
||||
Injectable,
|
||||
Logger,
|
||||
type OnModuleDestroy,
|
||||
type OnModuleInit,
|
||||
} from '@nestjs/common';
|
||||
import { EVENT_BUS, type EventBus } from '../event-bus/event-bus.interface';
|
||||
import { type PrismaService } from '../prisma.service';
|
||||
|
||||
/**
|
||||
* Single-process relay that drains `event_outbox` into the EventBus.
|
||||
*
|
||||
* Concurrency: every node tries to acquire the same Postgres advisory
|
||||
* lock (`pg_try_advisory_lock`); only the holder runs the poll loop.
|
||||
* This is the single-process + advisory-lock design called out in
|
||||
* RFC-004 §4 ("No leader-election library yet").
|
||||
*/
|
||||
|
||||
const ADVISORY_LOCK_KEY = 0xe7b04204; // bespoke 32-bit key for the outbox relay
|
||||
const DEFAULT_POLL_MS = 1_000;
|
||||
const DEFAULT_BATCH_SIZE = 100;
|
||||
|
||||
@Injectable()
|
||||
export class OutboxRelay implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(OutboxRelay.name);
|
||||
private readonly pollIntervalMs: number;
|
||||
private readonly batchSize: number;
|
||||
private readonly enabled: boolean;
|
||||
private timer: NodeJS.Timeout | null = null;
|
||||
private running = false;
|
||||
private stopped = false;
|
||||
private holdsLock = false;
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
@Inject(EVENT_BUS) private readonly bus: EventBus,
|
||||
) {
|
||||
this.pollIntervalMs = Number(process.env['EVENT_OUTBOX_POLL_MS'] ?? DEFAULT_POLL_MS);
|
||||
this.batchSize = Number(process.env['EVENT_OUTBOX_BATCH_SIZE'] ?? DEFAULT_BATCH_SIZE);
|
||||
this.enabled = (process.env['EVENT_OUTBOX_RELAY_ENABLED'] ?? 'true').toLowerCase() !== 'false';
|
||||
}
|
||||
|
||||
onModuleInit(): void {
|
||||
if (!this.enabled) {
|
||||
this.logger.log('OutboxRelay disabled via EVENT_OUTBOX_RELAY_ENABLED=false');
|
||||
return;
|
||||
}
|
||||
this.scheduleNext();
|
||||
}
|
||||
|
||||
async onModuleDestroy(): Promise<void> {
|
||||
this.stopped = true;
|
||||
if (this.timer) clearTimeout(this.timer);
|
||||
if (this.holdsLock) {
|
||||
try {
|
||||
await this.prisma.$queryRawUnsafe(`SELECT pg_advisory_unlock(${ADVISORY_LOCK_KEY})`);
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to release advisory lock: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleNext(): void {
|
||||
if (this.stopped) return;
|
||||
this.timer = setTimeout(() => {
|
||||
void this.tick().finally(() => this.scheduleNext());
|
||||
}, this.pollIntervalMs);
|
||||
}
|
||||
|
||||
/** Public for tests — drains one batch synchronously. */
|
||||
async tick(): Promise<{ acquired: boolean; processed: number; failed: number }> {
|
||||
if (this.running) return { acquired: false, processed: 0, failed: 0 };
|
||||
this.running = true;
|
||||
try {
|
||||
const acquired = await this.tryAcquireLock();
|
||||
if (!acquired) return { acquired: false, processed: 0, failed: 0 };
|
||||
const { processed, failed } = await this.drainBatch();
|
||||
return { acquired: true, processed, failed };
|
||||
} finally {
|
||||
this.running = false;
|
||||
}
|
||||
}
|
||||
|
||||
private async tryAcquireLock(): Promise<boolean> {
|
||||
if (this.holdsLock) return true;
|
||||
const rows = await this.prisma.$queryRawUnsafe<{ locked: boolean }[]>(
|
||||
`SELECT pg_try_advisory_lock(${ADVISORY_LOCK_KEY}) AS locked`,
|
||||
);
|
||||
const locked = rows[0]?.locked === true;
|
||||
if (locked) {
|
||||
this.holdsLock = true;
|
||||
this.logger.log('Acquired event_outbox advisory lock — this node is now the relay leader');
|
||||
}
|
||||
return locked;
|
||||
}
|
||||
|
||||
private async drainBatch(): Promise<{ processed: number; failed: number }> {
|
||||
const pending = await this.prisma.eventOutbox.findMany({
|
||||
where: { publishedAt: null },
|
||||
orderBy: { createdAt: 'asc' },
|
||||
take: this.batchSize,
|
||||
});
|
||||
if (pending.length === 0) return { processed: 0, failed: 0 };
|
||||
let processed = 0;
|
||||
let failed = 0;
|
||||
for (const row of pending) {
|
||||
try {
|
||||
const envelope = row.envelope as unknown as EventEnvelope;
|
||||
await this.bus.publish(envelope);
|
||||
await this.prisma.eventOutbox.update({
|
||||
where: { id: row.id },
|
||||
data: { publishedAt: new Date(), attempts: { increment: 1 }, lastError: null },
|
||||
});
|
||||
processed += 1;
|
||||
} catch (err) {
|
||||
failed += 1;
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
this.logger.error(
|
||||
`Outbox publish failed for eventId=${row.eventId} type=${row.eventType}: ${message}`,
|
||||
);
|
||||
await this.prisma.eventOutbox.update({
|
||||
where: { id: row.id },
|
||||
data: { attempts: { increment: 1 }, lastError: message.slice(0, 1000) },
|
||||
});
|
||||
}
|
||||
}
|
||||
if (processed > 0 || failed > 0) {
|
||||
this.logger.debug(`Outbox drained batch: processed=${processed} failed=${failed}`);
|
||||
}
|
||||
return { processed, failed };
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
import { type EventEnvelope, assertValidEnvelope } from '@goodgo/contracts-events';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import type { Prisma } from '@prisma/client';
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
|
||||
import { PrismaService } from '../prisma.service';
|
||||
|
||||
/**
|
||||
* Transactional outbox writer. Call inside the same Prisma transaction
|
||||
* as the domain change so the row commits atomically with the state
|
||||
* mutation it describes. The Outbox **never** publishes directly; the
|
||||
* relay (`OutboxRelay`) tails `event_outbox` and forwards to the EventBus.
|
||||
*/
|
||||
export interface OutboxAppendOptions {
|
||||
aggregateId?: string;
|
||||
}
|
||||
|
||||
type EventOutboxDelegate = PrismaService['eventOutbox'];
|
||||
type PrismaTxLike = Pick<EventOutboxDelegate, 'create'> | { eventOutbox: Pick<EventOutboxDelegate, 'create'> };
|
||||
|
||||
@Injectable()
|
||||
export class OutboxService {
|
||||
private readonly logger = new Logger(OutboxService.name);
|
||||
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async append(
|
||||
tx: PrismaTxLike | PrismaService,
|
||||
envelope: EventEnvelope,
|
||||
options: OutboxAppendOptions = {},
|
||||
): Promise<void> {
|
||||
assertValidEnvelope(envelope);
|
||||
const client = ('eventOutbox' in tx ? tx.eventOutbox : tx) as EventOutboxDelegate;
|
||||
await client.create({
|
||||
data: {
|
||||
eventId: envelope.eventId,
|
||||
eventType: envelope.eventType,
|
||||
aggregateId: options.aggregateId ?? null,
|
||||
envelope: envelope as unknown as Prisma.InputJsonValue,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async appendStandalone(envelope: EventEnvelope, options: OutboxAppendOptions = {}): Promise<void> {
|
||||
await this.append(this.prisma, envelope, options);
|
||||
this.logger.warn(
|
||||
`appendStandalone used for ${envelope.eventType} eventId=${envelope.eventId} — prefer the transactional append()`,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
import { Global, type MiddlewareConsumer, Module, type NestModule, RequestMethod } from '@nestjs/common';
|
||||
import { ConfigModule } from '@nestjs/config';
|
||||
import { APP_FILTER } from '@nestjs/core';
|
||||
import { APP_FILTER, APP_INTERCEPTOR } from '@nestjs/core';
|
||||
import { EventEmitterModule } from '@nestjs/event-emitter';
|
||||
import { PrometheusModule, makeCounterProvider } from '@willsoto/nestjs-prometheus';
|
||||
import {
|
||||
@@ -10,8 +10,11 @@ import {
|
||||
CACHE_DEGRADATION_TOTAL,
|
||||
} from './infrastructure/cache.service';
|
||||
import { EventBusService } from './infrastructure/event-bus.service';
|
||||
import { EVENT_BUS, RedisStreamsEventBus } from './infrastructure/event-bus';
|
||||
import { OutboxRelay, OutboxService } from './infrastructure/outbox';
|
||||
import { FieldEncryptionService } from './infrastructure/field-encryption.service';
|
||||
import { GlobalExceptionFilter } from './infrastructure/filters/global-exception.filter';
|
||||
import { DeprecationInterceptor, VersionInterceptor } from './infrastructure/interceptors';
|
||||
import { LoggerService } from './infrastructure/logger.service';
|
||||
import { CorrelationIdMiddleware } from './infrastructure/middleware/correlation-id.middleware';
|
||||
import { CsrfMiddleware } from './infrastructure/middleware/csrf.middleware';
|
||||
@@ -35,6 +38,10 @@ import { TypesenseClientService } from './infrastructure/typesense-client.servic
|
||||
RedisService,
|
||||
CacheService,
|
||||
EventBusService,
|
||||
// RFC-004 Phase 0 (GOO-172) — durable async messaging backbone.
|
||||
{ provide: EVENT_BUS, useClass: RedisStreamsEventBus },
|
||||
OutboxService,
|
||||
OutboxRelay,
|
||||
TypesenseClientService,
|
||||
makeCounterProvider({
|
||||
name: CACHE_HIT_TOTAL,
|
||||
@@ -55,8 +62,18 @@ import { TypesenseClientService } from './infrastructure/typesense-client.servic
|
||||
provide: APP_FILTER,
|
||||
useClass: GlobalExceptionFilter,
|
||||
},
|
||||
// RFC-001 Phase 1 (GOO-170) — order matters: VersionInterceptor first
|
||||
// populates req.apiVersion; DeprecationInterceptor reads from it.
|
||||
{
|
||||
provide: APP_INTERCEPTOR,
|
||||
useClass: VersionInterceptor,
|
||||
},
|
||||
{
|
||||
provide: APP_INTERCEPTOR,
|
||||
useClass: DeprecationInterceptor,
|
||||
},
|
||||
],
|
||||
exports: [PrismaService, RedisService, CacheService, LoggerService, EventBusService, FieldEncryptionService, TypesenseClientService, PrometheusModule],
|
||||
exports: [PrismaService, RedisService, CacheService, LoggerService, EventBusService, EVENT_BUS, OutboxService, FieldEncryptionService, TypesenseClientService, PrometheusModule],
|
||||
})
|
||||
export class SharedModule implements NestModule {
|
||||
configure(consumer: MiddlewareConsumer): void {
|
||||
|
||||
Reference in New Issue
Block a user