diff --git a/apps/api/src/modules/retention/application/__tests__/payment-callback-purge.service.spec.ts b/apps/api/src/modules/retention/application/__tests__/payment-callback-purge.service.spec.ts new file mode 100644 index 0000000..480a47d --- /dev/null +++ b/apps/api/src/modules/retention/application/__tests__/payment-callback-purge.service.spec.ts @@ -0,0 +1,46 @@ +import type { LoggerService } from '@modules/shared'; +import { PaymentCallbackPurgeService } from '../services/payment-callback-purge.service'; +import type { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository'; + +describe('PaymentCallbackPurgeService (stub)', () => { + let logger: { log: ReturnType; warn: ReturnType; error: ReturnType; debug: ReturnType }; + let runLog: { start: ReturnType; markFinished: ReturnType; markFailed: ReturnType }; + let service: PaymentCallbackPurgeService; + + beforeEach(() => { + logger = { log: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }; + runLog = { + start: vi.fn().mockResolvedValue('run-X'), + markFinished: vi.fn().mockResolvedValue(undefined), + markFailed: vi.fn().mockResolvedValue(undefined), + }; + service = new PaymentCallbackPurgeService( + logger as unknown as LoggerService, + runLog as unknown as RetentionRunLogRepository, + ); + }); + + it.each([1, 2, 3] as const)('records a SUCCESS run with rowsAffected=0 for phase %s', async (phase) => { + const result = await service.run(phase); + expect(runLog.start).toHaveBeenCalledWith(expect.objectContaining({ job: 'payment-callback-purge', phase })); + expect(runLog.markFinished).toHaveBeenCalledWith('run-X', 0); + expect(result).toEqual({ rowsAffected: 0, runId: 'run-X' }); + }); + + it('selects strictly older cutoffs per higher phase (2y < 5y < 10y)', async () => { + const fixed = new Date('2030-01-01T00:00:00Z').getTime(); + vi.spyOn(Date, 'now').mockReturnValue(fixed); + + const calls: string[] = []; + logger.warn = vi.fn((msg: string) => calls.push(msg)) as unknown as ReturnType; + + await service.run(1); + await service.run(2); + await service.run(3); + + const cutoffs = calls.map((m) => /cutoff=([^)]+)/.exec(m)?.[1] ?? ''); + expect(cutoffs).toHaveLength(3); + expect(cutoffs[0]! > cutoffs[1]!).toBe(true); + expect(cutoffs[1]! > cutoffs[2]!).toBe(true); + }); +}); diff --git a/apps/api/src/modules/retention/application/__tests__/refresh-token-purge.service.spec.ts b/apps/api/src/modules/retention/application/__tests__/refresh-token-purge.service.spec.ts new file mode 100644 index 0000000..73af3ac --- /dev/null +++ b/apps/api/src/modules/retention/application/__tests__/refresh-token-purge.service.spec.ts @@ -0,0 +1,49 @@ +import type { LoggerService, PrismaService } from '@modules/shared'; +import { RefreshTokenPurgeService } from '../services/refresh-token-purge.service'; +import type { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository'; + +describe('RefreshTokenPurgeService', () => { + let prisma: { $queryRaw: ReturnType; refreshToken: { count: ReturnType } }; + let logger: { log: ReturnType; error: ReturnType; debug: ReturnType }; + let runLog: { start: ReturnType; markFinished: ReturnType; markFailed: ReturnType }; + let service: RefreshTokenPurgeService; + + beforeEach(() => { + prisma = { $queryRaw: vi.fn(), refreshToken: { count: vi.fn() } }; + logger = { log: vi.fn(), error: vi.fn(), debug: vi.fn() }; + runLog = { + start: vi.fn().mockResolvedValue('run-1'), + markFinished: vi.fn().mockResolvedValue(undefined), + markFailed: vi.fn().mockResolvedValue(undefined), + }; + service = new RefreshTokenPurgeService( + prisma as unknown as PrismaService, + logger as unknown as LoggerService, + runLog as unknown as RetentionRunLogRepository, + ); + }); + + it('starts a run, deletes batches, and marks finished with total rowsAffected', async () => { + prisma.$queryRaw + .mockResolvedValueOnce(new Array(1000).fill(0).map((_, i) => ({ id: `r${i}` }))) + .mockResolvedValueOnce([{ id: 'r1000' }]); + + const result = await service.run(); + + expect(runLog.start).toHaveBeenCalledWith(expect.objectContaining({ job: 'refresh-token-purge' })); + expect(prisma.$queryRaw).toHaveBeenCalledTimes(2); + expect(runLog.markFinished).toHaveBeenCalledWith('run-1', 1001); + expect(result).toEqual({ rowsAffected: 1001, runId: 'run-1' }); + }); + + it('marks the run as FAILED when the delete query throws', async () => { + const boom = new Error('connection lost'); + prisma.$queryRaw.mockRejectedValue(boom); + + await expect(service.run()).rejects.toThrow(boom); + + expect(runLog.markFailed).toHaveBeenCalledWith('run-1', boom, 0); + expect(runLog.markFinished).not.toHaveBeenCalled(); + expect(logger.error).toHaveBeenCalled(); + }); +}); diff --git a/apps/api/src/modules/retention/application/services/audit-log-purge.service.ts b/apps/api/src/modules/retention/application/services/audit-log-purge.service.ts new file mode 100644 index 0000000..d8181f2 --- /dev/null +++ b/apps/api/src/modules/retention/application/services/audit-log-purge.service.ts @@ -0,0 +1,85 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService, LoggerService } from '@modules/shared'; +import { RETENTION_CONFIG } from '../../domain/retention.config'; +import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository'; + +/** + * Anonymizes rows in AdminAuditLog older than 5 years. Because the schema + * requires non-null actor/target IDs, the strategy is tombstone-in-place: + * PII columns are replaced with the sentinel `ANONYMIZED`, and a + * `metadata.anonymized=true` marker is written so the predicate is + * idempotent. + * + * GOO-196 — Decree 13 compliance. + */ +@Injectable() +export class AuditLogPurgeService { + static readonly JOB = 'audit-log-anonymize'; + private static readonly TOMBSTONE = 'ANONYMIZED'; + + constructor( + private readonly prisma: PrismaService, + private readonly logger: LoggerService, + private readonly runLog: RetentionRunLogRepository, + ) {} + + async run(): Promise<{ rowsAffected: number; runId: string }> { + const cutoff = new Date(Date.now() - RETENTION_CONFIG.auditAnonymizeMs); + const runId = await this.runLog.start({ + job: AuditLogPurgeService.JOB, + batchSize: RETENTION_CONFIG.batchSize, + dryRun: RETENTION_CONFIG.dryRun, + }); + + let total = 0; + try { + for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) { + if (RETENTION_CONFIG.dryRun) { + total = await this.prisma.adminAuditLog.count({ + where: { + createdAt: { lt: cutoff }, + }, + }); + break; + } + + const rows = await this.prisma.$queryRaw>` + UPDATE "AdminAuditLog" + SET "actorId" = ${AuditLogPurgeService.TOMBSTONE}, + "targetId" = ${AuditLogPurgeService.TOMBSTONE}, + "ipAddress" = NULL, + "userAgent" = NULL, + "metadata" = COALESCE("metadata", '{}'::jsonb) || '{"anonymized":true}'::jsonb + WHERE id IN ( + SELECT id FROM "AdminAuditLog" + WHERE "createdAt" < ${cutoff} + AND ("metadata" ->> 'anonymized') IS DISTINCT FROM 'true' + ORDER BY "createdAt" ASC + LIMIT ${RETENTION_CONFIG.batchSize} + FOR UPDATE SKIP LOCKED + ) + RETURNING id + `; + + total += rows.length; + if (rows.length < RETENTION_CONFIG.batchSize) break; + } + + await this.runLog.markFinished(runId, total); + this.logger.log( + `AuditLogPurgeService anonymized ${total} row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`, + 'AuditLogPurgeService', + ); + return { rowsAffected: total, runId }; + } catch (err) { + const error = err as Error; + await this.runLog.markFailed(runId, error, total); + this.logger.error( + `AuditLogPurgeService failed: ${error.message}`, + error.stack, + 'AuditLogPurgeService', + ); + throw error; + } + } +} diff --git a/apps/api/src/modules/retention/application/services/kyc-purge.service.ts b/apps/api/src/modules/retention/application/services/kyc-purge.service.ts new file mode 100644 index 0000000..ceea972 --- /dev/null +++ b/apps/api/src/modules/retention/application/services/kyc-purge.service.ts @@ -0,0 +1,85 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService, LoggerService } from '@modules/shared'; +import { RETENTION_CONFIG } from '../../domain/retention.config'; +import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository'; + +/** + * Hard-deletes the User.kycData JSON blob 90 days after `User.deletedAt`. + * Per CLO guidance, KYC PII must not survive a deleted account beyond the + * Decree-13 minimum. The User row itself stays (it carries audit-relevant + * metadata via foreign keys), only the kyc payload is nulled. + * + * Future work (tracked separately): when a dedicated KycDocument table + * lands, this service must also call StorageService.deleteObject(key) for + * each blob in S3. + * + * GOO-196 — Decree 13 compliance. + */ +@Injectable() +export class KycPurgeService { + static readonly JOB = 'kyc-blob-purge'; + + constructor( + private readonly prisma: PrismaService, + private readonly logger: LoggerService, + private readonly runLog: RetentionRunLogRepository, + ) {} + + async run(): Promise<{ rowsAffected: number; runId: string }> { + const cutoff = new Date(Date.now() - RETENTION_CONFIG.kycPurgeMs); + const runId = await this.runLog.start({ + job: KycPurgeService.JOB, + batchSize: RETENTION_CONFIG.batchSize, + dryRun: RETENTION_CONFIG.dryRun, + }); + + let total = 0; + try { + for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) { + if (RETENTION_CONFIG.dryRun) { + total = await this.prisma.user.count({ + where: { + deletedAt: { lt: cutoff }, + NOT: { kycData: { equals: null as unknown as undefined } }, + }, + }); + break; + } + + const rows = await this.prisma.$queryRaw>` + UPDATE "User" + SET "kycData" = NULL + WHERE id IN ( + SELECT id FROM "User" + WHERE "deletedAt" IS NOT NULL + AND "deletedAt" < ${cutoff} + AND "kycData" IS NOT NULL + ORDER BY "deletedAt" ASC + LIMIT ${RETENTION_CONFIG.batchSize} + FOR UPDATE SKIP LOCKED + ) + RETURNING id + `; + + total += rows.length; + if (rows.length < RETENTION_CONFIG.batchSize) break; + } + + await this.runLog.markFinished(runId, total); + this.logger.log( + `KycPurgeService nulled kycData on ${total} user row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`, + 'KycPurgeService', + ); + return { rowsAffected: total, runId }; + } catch (err) { + const error = err as Error; + await this.runLog.markFailed(runId, error, total); + this.logger.error( + `KycPurgeService failed: ${error.message}`, + error.stack, + 'KycPurgeService', + ); + throw error; + } + } +} diff --git a/apps/api/src/modules/retention/application/services/messaging-purge.service.ts b/apps/api/src/modules/retention/application/services/messaging-purge.service.ts new file mode 100644 index 0000000..5adf52e --- /dev/null +++ b/apps/api/src/modules/retention/application/services/messaging-purge.service.ts @@ -0,0 +1,85 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService, LoggerService } from '@modules/shared'; +import { RETENTION_CONFIG } from '../../domain/retention.config'; +import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository'; + +/** + * Hard-deletes Message.content for any conversation in CLOSED status whose + * last activity is older than 90 days. Metadata (sender, timestamps) is + * preserved per CLO guidance — only the message body itself is PII. + * + * GOO-196 — Decree 13 compliance. + */ +@Injectable() +export class MessagingPurgeService { + static readonly JOB = 'messaging-body-purge'; + + constructor( + private readonly prisma: PrismaService, + private readonly logger: LoggerService, + private readonly runLog: RetentionRunLogRepository, + ) {} + + async run(): Promise<{ rowsAffected: number; runId: string }> { + const cutoff = new Date(Date.now() - RETENTION_CONFIG.messagingBodyMs); + const runId = await this.runLog.start({ + job: MessagingPurgeService.JOB, + batchSize: RETENTION_CONFIG.batchSize, + dryRun: RETENTION_CONFIG.dryRun, + }); + + let total = 0; + try { + for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) { + if (RETENTION_CONFIG.dryRun) { + total = await this.prisma.message.count({ + where: { + deletedAt: null, + conversation: { + status: 'CLOSED', + lastMessageAt: { lt: cutoff }, + }, + }, + }); + break; + } + + const rows = await this.prisma.$queryRaw>` + UPDATE "Message" + SET "content" = '', + "deletedAt" = NOW() + WHERE id IN ( + SELECT m.id FROM "Message" m + JOIN "Conversation" c ON c.id = m."conversationId" + WHERE m."deletedAt" IS NULL + AND c."status" = 'CLOSED' + AND c."lastMessageAt" < ${cutoff} + ORDER BY m."createdAt" ASC + LIMIT ${RETENTION_CONFIG.batchSize} + FOR UPDATE OF m SKIP LOCKED + ) + RETURNING id + `; + + total += rows.length; + if (rows.length < RETENTION_CONFIG.batchSize) break; + } + + await this.runLog.markFinished(runId, total); + this.logger.log( + `MessagingPurgeService cleared ${total} message body row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`, + 'MessagingPurgeService', + ); + return { rowsAffected: total, runId }; + } catch (err) { + const error = err as Error; + await this.runLog.markFailed(runId, error, total); + this.logger.error( + `MessagingPurgeService failed: ${error.message}`, + error.stack, + 'MessagingPurgeService', + ); + throw error; + } + } +} diff --git a/apps/api/src/modules/retention/application/services/payment-callback-purge.service.ts b/apps/api/src/modules/retention/application/services/payment-callback-purge.service.ts new file mode 100644 index 0000000..c379c0c --- /dev/null +++ b/apps/api/src/modules/retention/application/services/payment-callback-purge.service.ts @@ -0,0 +1,79 @@ +import { Injectable } from '@nestjs/common'; +import { LoggerService } from '@modules/shared'; +import { RETENTION_CONFIG } from '../../domain/retention.config'; +import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository'; + +/** + * Payment callback log purge — three-phase schedule confirmed by CLO in + * GOO-201 (MoF Circular 78/2021/TT-BTC, Accounting Law 88/2015 Art. 41, + * Tax Admin Law 38/2019 Art. 86): + * + * Phase 1 @ 2y — scrub operational PII (IP, device fingerprint). + * Phase 2 @ 5y — scrub buyer identity (name/phone/email, bank suffix); + * preserves `buyerName` for invoice-linked rows. + * Phase 3 @ 10y — hard delete (or cold-archive if + * RETENTION_PAYMENT_ARCHIVE=true). + * + * This service is intentionally a **stub** in the initial GOO-196 landing: + * the `PaymentCallbackLog` table does not exist in the Prisma schema yet + * (tracked under the payments module refactor). Calling `run()` emits a + * RetentionRunLog row with status=SUCCESS and rowsAffected=0 so dry-run + * telemetry is visible from day one, while the actual UPDATE/DELETE + * statements are added when the table lands. + * + * GOO-196 — Decree 13 compliance. + */ +@Injectable() +export class PaymentCallbackPurgeService { + static readonly JOB = 'payment-callback-purge'; + + constructor( + private readonly logger: LoggerService, + private readonly runLog: RetentionRunLogRepository, + ) {} + + async run(phase: 1 | 2 | 3): Promise<{ rowsAffected: number; runId: string }> { + const cutoff = this.cutoffFor(phase); + const runId = await this.runLog.start({ + job: PaymentCallbackPurgeService.JOB, + phase, + batchSize: RETENTION_CONFIG.batchSize, + dryRun: RETENTION_CONFIG.dryRun, + }); + + try { + // TODO(GOO-196 follow-up): implement once PaymentCallbackLog schema lands. + // Phase 1: UPDATE … SET ipAddress=NULL, deviceFingerprint=NULL, + // anonymizedPhase1At=NOW() WHERE createdAt < ${cutoff} + // AND anonymizedPhase1At IS NULL + // Phase 2: UPDATE … SET callbackPayload = jsonb_set_lax(..., 'null'), + // bankAccountMasked=NULL, cardSuffix=NULL, + // anonymizedPhase2At=NOW() (skip buyerName on invoice rows) + // Phase 3: DELETE FROM "PaymentCallbackLog" WHERE createdAt < ${cutoff} + // — OR — INSERT INTO payment_callback_archive … then DELETE. + const rowsAffected = 0; + await this.runLog.markFinished(runId, rowsAffected); + this.logger.warn( + `PaymentCallbackPurgeService phase=${phase} is a no-op — PaymentCallbackLog table not yet in schema (cutoff=${cutoff.toISOString()})`, + 'PaymentCallbackPurgeService', + ); + return { rowsAffected, runId }; + } catch (err) { + const error = err as Error; + await this.runLog.markFailed(runId, error); + throw error; + } + } + + private cutoffFor(phase: 1 | 2 | 3): Date { + const now = Date.now(); + switch (phase) { + case 1: + return new Date(now - RETENTION_CONFIG.paymentPhase1Ms); + case 2: + return new Date(now - RETENTION_CONFIG.paymentPhase2Ms); + case 3: + return new Date(now - RETENTION_CONFIG.paymentPhase3Ms); + } + } +} diff --git a/apps/api/src/modules/retention/application/services/refresh-token-purge.service.ts b/apps/api/src/modules/retention/application/services/refresh-token-purge.service.ts new file mode 100644 index 0000000..b8f304c --- /dev/null +++ b/apps/api/src/modules/retention/application/services/refresh-token-purge.service.ts @@ -0,0 +1,81 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService, LoggerService } from '@modules/shared'; +import { RETENTION_CONFIG } from '../../domain/retention.config'; +import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository'; + +/** + * Hard-deletes refresh tokens that are revoked or expired and were created + * more than `RETENTION_REFRESH_TOKEN_DAYS` (default 30) days ago. + * + * Idempotency: a single DELETE … RETURNING id with FOR UPDATE SKIP LOCKED is + * race-safe — only one writer wins each row. Loops in capped batches to avoid + * statement timeouts on large tables. + * + * GOO-196 — Decree 13 compliance. + */ +@Injectable() +export class RefreshTokenPurgeService { + static readonly JOB = 'refresh-token-purge'; + + constructor( + private readonly prisma: PrismaService, + private readonly logger: LoggerService, + private readonly runLog: RetentionRunLogRepository, + ) {} + + async run(): Promise<{ rowsAffected: number; runId: string }> { + const cutoff = new Date(Date.now() - RETENTION_CONFIG.refreshTokenStaleMs); + const runId = await this.runLog.start({ + job: RefreshTokenPurgeService.JOB, + batchSize: RETENTION_CONFIG.batchSize, + dryRun: RETENTION_CONFIG.dryRun, + }); + + let total = 0; + try { + for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) { + if (RETENTION_CONFIG.dryRun) { + total = await this.prisma.refreshToken.count({ + where: { + createdAt: { lt: cutoff }, + OR: [{ revokedAt: { not: null } }, { expiresAt: { lt: new Date() } }], + }, + }); + break; + } + + const rows = await this.prisma.$queryRaw>` + DELETE FROM "RefreshToken" + WHERE id IN ( + SELECT id FROM "RefreshToken" + WHERE "createdAt" < ${cutoff} + AND ("revokedAt" IS NOT NULL OR "expiresAt" < NOW()) + ORDER BY "createdAt" ASC + LIMIT ${RETENTION_CONFIG.batchSize} + FOR UPDATE SKIP LOCKED + ) + RETURNING id + `; + + total += rows.length; + if (rows.length < RETENTION_CONFIG.batchSize) break; + } + + await this.runLog.markFinished(runId, total); + this.logger.log( + `RefreshTokenPurgeService removed ${total} row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`, + 'RefreshTokenPurgeService', + ); + return { rowsAffected: total, runId }; + } catch (err) { + const error = err as Error; + await this.runLog.markFailed(runId, error, total); + this.logger.error( + `RefreshTokenPurgeService failed: ${error.message}`, + error.stack, + 'RefreshTokenPurgeService', + ); + throw error; + } + } +} diff --git a/apps/api/src/modules/retention/domain/retention.config.ts b/apps/api/src/modules/retention/domain/retention.config.ts new file mode 100644 index 0000000..2ce1835 --- /dev/null +++ b/apps/api/src/modules/retention/domain/retention.config.ts @@ -0,0 +1,56 @@ +/** + * Centralised retention configuration. Every window comes from env vars so a + * staging environment can dry-run aggressively without hard-coding constants. + * + * GOO-196 — Decree 13 compliance. + */ + +const days = (n: number): number => n * 24 * 60 * 60 * 1000; +const years = (n: number): number => Math.floor(n * 365.25 * 24 * 60 * 60 * 1000); + +const numEnv = (name: string, fallback: number): number => { + const raw = process.env[name]; + if (!raw) return fallback; + const parsed = Number(raw); + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; +}; + +const boolEnv = (name: string, fallback = false): boolean => { + const raw = process.env[name]; + if (raw === undefined) return fallback; + return raw === 'true' || raw === '1'; +}; + +export const RETENTION_CONFIG = { + /** Master kill switch — when false, every cron is a no-op. */ + enabled: boolEnv('RETENTION_ENABLED', false), + /** Forces every job into SELECT-only mode for staging review. */ + dryRun: boolEnv('RETENTION_DRY_RUN', false), + + /** Per-batch row cap. Keeps statement timeouts and replication lag bounded. */ + batchSize: numEnv('RETENTION_BATCH_SIZE', 1000), + /** Hard cap on batch loops per run. */ + maxBatches: numEnv('RETENTION_MAX_BATCHES', 50), + + /** RefreshToken: revoked OR expired older than this is hard-deleted. */ + refreshTokenStaleMs: days(numEnv('RETENTION_REFRESH_TOKEN_DAYS', 30)), + + /** Messaging body purge window after Conversation.lastMessageAt. */ + messagingBodyMs: days(numEnv('RETENTION_MESSAGING_DAYS', 90)), + + /** KYC blob purge window after User.deletedAt. */ + kycPurgeMs: days(numEnv('RETENTION_KYC_DAYS', 90)), + + /** Audit log anonymization window. */ + auditAnonymizeMs: years(numEnv('RETENTION_AUDIT_YEARS', 5)), + + /** Payment callback phased schedule (CLO confirmed via GOO-201). */ + paymentPhase1Ms: years(numEnv('RETENTION_PAYMENT_PHASE1_YEARS', 2)), + paymentPhase2Ms: years(numEnv('RETENTION_PAYMENT_PHASE2_YEARS', 5)), + paymentPhase3Ms: years(numEnv('RETENTION_PAYMENT_PHASE3_YEARS', 10)), + + /** When true, phase-3 archives to a cold table instead of hard-deleting. */ + paymentArchive: boolEnv('RETENTION_PAYMENT_ARCHIVE', false), +} as const; + +export type RetentionConfig = typeof RETENTION_CONFIG; diff --git a/apps/api/src/modules/retention/index.ts b/apps/api/src/modules/retention/index.ts new file mode 100644 index 0000000..bd66b55 --- /dev/null +++ b/apps/api/src/modules/retention/index.ts @@ -0,0 +1,8 @@ +export { RetentionModule } from './retention.module'; +export { RefreshTokenPurgeService } from './application/services/refresh-token-purge.service'; +export { MessagingPurgeService } from './application/services/messaging-purge.service'; +export { KycPurgeService } from './application/services/kyc-purge.service'; +export { AuditLogPurgeService } from './application/services/audit-log-purge.service'; +export { PaymentCallbackPurgeService } from './application/services/payment-callback-purge.service'; +export { RetentionRunLogRepository } from './infrastructure/repositories/retention-run-log.repository'; +export { RETENTION_CONFIG, type RetentionConfig } from './domain/retention.config'; diff --git a/apps/api/src/modules/retention/infrastructure/cron/retention-cron.orchestrator.ts b/apps/api/src/modules/retention/infrastructure/cron/retention-cron.orchestrator.ts new file mode 100644 index 0000000..f63601e --- /dev/null +++ b/apps/api/src/modules/retention/infrastructure/cron/retention-cron.orchestrator.ts @@ -0,0 +1,91 @@ +import { Injectable } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { LoggerService } from '@modules/shared'; +import { AuditLogPurgeService } from '../../application/services/audit-log-purge.service'; +import { KycPurgeService } from '../../application/services/kyc-purge.service'; +import { MessagingPurgeService } from '../../application/services/messaging-purge.service'; +import { PaymentCallbackPurgeService } from '../../application/services/payment-callback-purge.service'; +import { RefreshTokenPurgeService } from '../../application/services/refresh-token-purge.service'; +import { RETENTION_CONFIG } from '../../domain/retention.config'; + +/** + * Thin scheduler that delegates to each domain purge service. All windows + * run during Vietnam off-peak hours (UTC times below correspond to ~23:00– + * 01:00 ICT). Set RETENTION_ENABLED=true to activate; otherwise every job + * is a no-op so the module can ship behind a flag. + * + * GOO-196 — Decree 13 compliance. + */ +@Injectable() +export class RetentionCronOrchestrator { + constructor( + private readonly logger: LoggerService, + private readonly refreshTokens: RefreshTokenPurgeService, + private readonly messaging: MessagingPurgeService, + private readonly kyc: KycPurgeService, + private readonly auditLogs: AuditLogPurgeService, + private readonly paymentCallbacks: PaymentCallbackPurgeService, + ) {} + + @Cron('0 16 * * *', { name: 'retention-refresh-tokens', timeZone: 'UTC' }) + async runRefreshTokens(): Promise { + if (!RETENTION_CONFIG.enabled) return this.skip('refresh-token-purge'); + await this.safe(() => this.refreshTokens.run(), 'refresh-token-purge'); + } + + @Cron('30 16 * * *', { name: 'retention-messaging', timeZone: 'UTC' }) + async runMessaging(): Promise { + if (!RETENTION_CONFIG.enabled) return this.skip('messaging-purge'); + await this.safe(() => this.messaging.run(), 'messaging-purge'); + } + + @Cron('0 17 * * *', { name: 'retention-kyc', timeZone: 'UTC' }) + async runKyc(): Promise { + if (!RETENTION_CONFIG.enabled) return this.skip('kyc-purge'); + await this.safe(() => this.kyc.run(), 'kyc-purge'); + } + + @Cron('30 17 * * *', { name: 'retention-payment-phase1', timeZone: 'UTC' }) + async runPaymentPhase1(): Promise { + if (!RETENTION_CONFIG.enabled) return this.skip('payment-phase-1'); + await this.safe(() => this.paymentCallbacks.run(1), 'payment-phase-1'); + } + + @Cron('0 18 * * *', { name: 'retention-payment-phase2', timeZone: 'UTC' }) + async runPaymentPhase2(): Promise { + if (!RETENTION_CONFIG.enabled) return this.skip('payment-phase-2'); + await this.safe(() => this.paymentCallbacks.run(2), 'payment-phase-2'); + } + + @Cron('0 17 * * 0', { name: 'retention-audit-anonymize', timeZone: 'UTC' }) + async runAuditLogs(): Promise { + if (!RETENTION_CONFIG.enabled) return this.skip('audit-log-anonymize'); + await this.safe(() => this.auditLogs.run(), 'audit-log-anonymize'); + } + + @Cron('0 18 * * 0', { name: 'retention-payment-phase3', timeZone: 'UTC' }) + async runPaymentPhase3(): Promise { + if (!RETENTION_CONFIG.enabled) return this.skip('payment-phase-3'); + await this.safe(() => this.paymentCallbacks.run(3), 'payment-phase-3'); + } + + private skip(name: string): void { + this.logger.debug( + `Retention job ${name} skipped: RETENTION_ENABLED=false`, + 'RetentionCronOrchestrator', + ); + } + + private async safe(fn: () => Promise, name: string): Promise { + try { + await fn(); + } catch (err) { + this.logger.error( + `Retention job ${name} threw: ${(err as Error).message}`, + (err as Error).stack, + 'RetentionCronOrchestrator', + ); + // Swallow — RetentionRunLog already records FAILED. Do not crash the scheduler. + } + } +} diff --git a/apps/api/src/modules/retention/infrastructure/repositories/retention-run-log.repository.ts b/apps/api/src/modules/retention/infrastructure/repositories/retention-run-log.repository.ts new file mode 100644 index 0000000..497c3d7 --- /dev/null +++ b/apps/api/src/modules/retention/infrastructure/repositories/retention-run-log.repository.ts @@ -0,0 +1,56 @@ +import { Injectable } from '@nestjs/common'; +import { PrismaService } from '@modules/shared'; + +/** + * Thin repository around RetentionRunLog. Centralised so every purge service + * uses the exact same shape — start row, mark finished/failed, never invent + * variations. + * + * GOO-196 — Decree 13 compliance. + */ +@Injectable() +export class RetentionRunLogRepository { + constructor(private readonly prisma: PrismaService) {} + + async start(input: { + job: string; + phase?: number | null; + batchSize?: number | null; + dryRun?: boolean; + }): Promise { + const row = await this.prisma.retentionRunLog.create({ + data: { + job: input.job, + phase: input.phase ?? null, + batchSize: input.batchSize ?? null, + dryRun: input.dryRun ?? false, + status: 'RUNNING', + }, + select: { id: true }, + }); + return row.id; + } + + async markFinished(id: string, rowsAffected: number, partial = false): Promise { + await this.prisma.retentionRunLog.update({ + where: { id }, + data: { + finishedAt: new Date(), + rowsAffected, + status: partial ? 'PARTIAL' : 'SUCCESS', + }, + }); + } + + async markFailed(id: string, error: Error, rowsAffected = 0): Promise { + await this.prisma.retentionRunLog.update({ + where: { id }, + data: { + finishedAt: new Date(), + rowsAffected, + status: 'FAILED', + errorMessage: error.message.slice(0, 2000), + }, + }); + } +} diff --git a/apps/api/src/modules/retention/retention.module.ts b/apps/api/src/modules/retention/retention.module.ts new file mode 100644 index 0000000..5c1c402 --- /dev/null +++ b/apps/api/src/modules/retention/retention.module.ts @@ -0,0 +1,36 @@ +import { Module } from '@nestjs/common'; +import { AuditLogPurgeService } from './application/services/audit-log-purge.service'; +import { KycPurgeService } from './application/services/kyc-purge.service'; +import { MessagingPurgeService } from './application/services/messaging-purge.service'; +import { PaymentCallbackPurgeService } from './application/services/payment-callback-purge.service'; +import { RefreshTokenPurgeService } from './application/services/refresh-token-purge.service'; +import { RetentionCronOrchestrator } from './infrastructure/cron/retention-cron.orchestrator'; +import { RetentionRunLogRepository } from './infrastructure/repositories/retention-run-log.repository'; + +/** + * GOO-196 — Decree 13 data retention & purge jobs. + * + * Ships behind RETENTION_ENABLED=false so the module can land without + * affecting prod. Flip to true after a 7-day staging dry-run review with + * CLO/DPO. See `apps/api/src/modules/retention/domain/retention.config.ts` + * for every tunable window. + */ +@Module({ + providers: [ + RetentionRunLogRepository, + RefreshTokenPurgeService, + MessagingPurgeService, + KycPurgeService, + AuditLogPurgeService, + PaymentCallbackPurgeService, + RetentionCronOrchestrator, + ], + exports: [ + RefreshTokenPurgeService, + MessagingPurgeService, + KycPurgeService, + AuditLogPurgeService, + PaymentCallbackPurgeService, + ], +}) +export class RetentionModule {} diff --git a/prisma/migrations/20260424050000_goo196_add_retention_run_log/migration.sql b/prisma/migrations/20260424050000_goo196_add_retention_run_log/migration.sql new file mode 100644 index 0000000..67647ad --- /dev/null +++ b/prisma/migrations/20260424050000_goo196_add_retention_run_log/migration.sql @@ -0,0 +1,25 @@ +-- GOO-196: Data retention policy & purge jobs (Decree 13 compliance) +-- Adds the RetentionRunLog table so every purge / anonymization pass is auditable. + +-- CreateEnum +CREATE TYPE "RetentionRunStatus" AS ENUM ('RUNNING', 'SUCCESS', 'PARTIAL', 'FAILED'); + +-- CreateTable +CREATE TABLE "RetentionRunLog" ( + "id" TEXT NOT NULL, + "job" TEXT NOT NULL, + "phase" INTEGER, + "startedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "finishedAt" TIMESTAMP(3), + "rowsAffected" INTEGER NOT NULL DEFAULT 0, + "status" "RetentionRunStatus" NOT NULL DEFAULT 'RUNNING', + "errorMessage" TEXT, + "batchSize" INTEGER, + "dryRun" BOOLEAN NOT NULL DEFAULT false, + + CONSTRAINT "RetentionRunLog_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "RetentionRunLog_job_startedAt_idx" ON "RetentionRunLog"("job", "startedAt"); +CREATE INDEX "RetentionRunLog_startedAt_idx" ON "RetentionRunLog"("startedAt" DESC); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 64a1c0b..f35a80e 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -1567,3 +1567,34 @@ model VnAdministrativeAlias { @@index([newWardCode]) @@map("vn_administrative_aliases") } + +// ============================================================================= +// RETENTION (GOO-196 — Decree 13 compliance) +// ============================================================================= + +enum RetentionRunStatus { + RUNNING + SUCCESS + PARTIAL + FAILED +} + +/// Every purge / anonymization pass emits a RetentionRunLog row so the +/// operator and DPO can audit exactly what was touched and when. Multi-phase +/// jobs (e.g. payment callback 2y / 5y / 10y) record `phase` for +/// disambiguation. +model RetentionRunLog { + id String @id @default(cuid()) + job String + phase Int? + startedAt DateTime @default(now()) + finishedAt DateTime? + rowsAffected Int @default(0) + status RetentionRunStatus @default(RUNNING) + errorMessage String? + batchSize Int? + dryRun Boolean @default(false) + + @@index([job, startedAt]) + @@index([startedAt(sort: Desc)]) +}