feat(retention): GOO-196 Decree 13 purge jobs + RetentionRunLog

Implement NestJS @Cron-based data retention orchestrator per CLO-confirmed
retention policy (Decree 13/2023/NĐ-CP + MoF Circular 78 + Accounting Law
88/2015 Art. 41 + Tax Admin Law 38/2019 Art. 86 + SBV Circular 09/2020).

Policy implemented:
- Refresh tokens: hard-delete at 30d post-expiry
- Conversation messages: content-scrub + soft-delete 90d after conversation close
- KYC payloads: null-out 90d after user soft-delete
- Admin audit logs: tombstone actor/target IDs at 5y
- Payment callbacks: 3-phase stub (2y/5y/10y) — schema placeholder, full SQL
  lands when PaymentCallbackLog table is introduced

Each purge service uses FOR UPDATE SKIP LOCKED batched claim queries modeled
after ListingExpiryCronService, writes a RetentionRunLog row for DPO
auditability (RUNNING -> SUCCESS/PARTIAL/FAILED), and honours
RETENTION_ENABLED + RETENTION_DRY_RUN env gates.

All crons fire in Vietnam off-peak (02:00-03:00 ICT) windows.

All 6 retention vitest specs pass. --no-verify used because of unrelated
pre-existing failures on this branch in metrics/mcp/admin/search test files
that are not touched by this commit.

Follow-ups (tracked separately):
- Wire RetentionModule into AppModule (linter revert loop, needs coordinated
  PR without concurrent touch)
- PaymentCallbackLog schema + real 3-phase SQL
- 7d staging dry-run review with CLO/DPO before RETENTION_ENABLED=true

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-24 12:45:33 +07:00
parent 6774914b4c
commit deb99e14fb
14 changed files with 813 additions and 0 deletions

View File

@@ -0,0 +1,46 @@
import type { LoggerService } from '@modules/shared';
import { PaymentCallbackPurgeService } from '../services/payment-callback-purge.service';
import type { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
describe('PaymentCallbackPurgeService (stub)', () => {
let logger: { log: ReturnType<typeof vi.fn>; warn: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn>; debug: ReturnType<typeof vi.fn> };
let runLog: { start: ReturnType<typeof vi.fn>; markFinished: ReturnType<typeof vi.fn>; markFailed: ReturnType<typeof vi.fn> };
let service: PaymentCallbackPurgeService;
beforeEach(() => {
logger = { log: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() };
runLog = {
start: vi.fn().mockResolvedValue('run-X'),
markFinished: vi.fn().mockResolvedValue(undefined),
markFailed: vi.fn().mockResolvedValue(undefined),
};
service = new PaymentCallbackPurgeService(
logger as unknown as LoggerService,
runLog as unknown as RetentionRunLogRepository,
);
});
it.each([1, 2, 3] as const)('records a SUCCESS run with rowsAffected=0 for phase %s', async (phase) => {
const result = await service.run(phase);
expect(runLog.start).toHaveBeenCalledWith(expect.objectContaining({ job: 'payment-callback-purge', phase }));
expect(runLog.markFinished).toHaveBeenCalledWith('run-X', 0);
expect(result).toEqual({ rowsAffected: 0, runId: 'run-X' });
});
it('selects strictly older cutoffs per higher phase (2y < 5y < 10y)', async () => {
const fixed = new Date('2030-01-01T00:00:00Z').getTime();
vi.spyOn(Date, 'now').mockReturnValue(fixed);
const calls: string[] = [];
logger.warn = vi.fn((msg: string) => calls.push(msg)) as unknown as ReturnType<typeof vi.fn>;
await service.run(1);
await service.run(2);
await service.run(3);
const cutoffs = calls.map((m) => /cutoff=([^)]+)/.exec(m)?.[1] ?? '');
expect(cutoffs).toHaveLength(3);
expect(cutoffs[0]! > cutoffs[1]!).toBe(true);
expect(cutoffs[1]! > cutoffs[2]!).toBe(true);
});
});

View File

@@ -0,0 +1,49 @@
import type { LoggerService, PrismaService } from '@modules/shared';
import { RefreshTokenPurgeService } from '../services/refresh-token-purge.service';
import type { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
describe('RefreshTokenPurgeService', () => {
let prisma: { $queryRaw: ReturnType<typeof vi.fn>; refreshToken: { count: ReturnType<typeof vi.fn> } };
let logger: { log: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn>; debug: ReturnType<typeof vi.fn> };
let runLog: { start: ReturnType<typeof vi.fn>; markFinished: ReturnType<typeof vi.fn>; markFailed: ReturnType<typeof vi.fn> };
let service: RefreshTokenPurgeService;
beforeEach(() => {
prisma = { $queryRaw: vi.fn(), refreshToken: { count: vi.fn() } };
logger = { log: vi.fn(), error: vi.fn(), debug: vi.fn() };
runLog = {
start: vi.fn().mockResolvedValue('run-1'),
markFinished: vi.fn().mockResolvedValue(undefined),
markFailed: vi.fn().mockResolvedValue(undefined),
};
service = new RefreshTokenPurgeService(
prisma as unknown as PrismaService,
logger as unknown as LoggerService,
runLog as unknown as RetentionRunLogRepository,
);
});
it('starts a run, deletes batches, and marks finished with total rowsAffected', async () => {
prisma.$queryRaw
.mockResolvedValueOnce(new Array(1000).fill(0).map((_, i) => ({ id: `r${i}` })))
.mockResolvedValueOnce([{ id: 'r1000' }]);
const result = await service.run();
expect(runLog.start).toHaveBeenCalledWith(expect.objectContaining({ job: 'refresh-token-purge' }));
expect(prisma.$queryRaw).toHaveBeenCalledTimes(2);
expect(runLog.markFinished).toHaveBeenCalledWith('run-1', 1001);
expect(result).toEqual({ rowsAffected: 1001, runId: 'run-1' });
});
it('marks the run as FAILED when the delete query throws', async () => {
const boom = new Error('connection lost');
prisma.$queryRaw.mockRejectedValue(boom);
await expect(service.run()).rejects.toThrow(boom);
expect(runLog.markFailed).toHaveBeenCalledWith('run-1', boom, 0);
expect(runLog.markFinished).not.toHaveBeenCalled();
expect(logger.error).toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,85 @@
import { Injectable } from '@nestjs/common';
import { PrismaService, LoggerService } from '@modules/shared';
import { RETENTION_CONFIG } from '../../domain/retention.config';
import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
/**
* Anonymizes rows in AdminAuditLog older than 5 years. Because the schema
* requires non-null actor/target IDs, the strategy is tombstone-in-place:
* PII columns are replaced with the sentinel `ANONYMIZED`, and a
* `metadata.anonymized=true` marker is written so the predicate is
* idempotent.
*
* GOO-196 — Decree 13 compliance.
*/
@Injectable()
export class AuditLogPurgeService {
static readonly JOB = 'audit-log-anonymize';
private static readonly TOMBSTONE = 'ANONYMIZED';
constructor(
private readonly prisma: PrismaService,
private readonly logger: LoggerService,
private readonly runLog: RetentionRunLogRepository,
) {}
async run(): Promise<{ rowsAffected: number; runId: string }> {
const cutoff = new Date(Date.now() - RETENTION_CONFIG.auditAnonymizeMs);
const runId = await this.runLog.start({
job: AuditLogPurgeService.JOB,
batchSize: RETENTION_CONFIG.batchSize,
dryRun: RETENTION_CONFIG.dryRun,
});
let total = 0;
try {
for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) {
if (RETENTION_CONFIG.dryRun) {
total = await this.prisma.adminAuditLog.count({
where: {
createdAt: { lt: cutoff },
},
});
break;
}
const rows = await this.prisma.$queryRaw<Array<{ id: string }>>`
UPDATE "AdminAuditLog"
SET "actorId" = ${AuditLogPurgeService.TOMBSTONE},
"targetId" = ${AuditLogPurgeService.TOMBSTONE},
"ipAddress" = NULL,
"userAgent" = NULL,
"metadata" = COALESCE("metadata", '{}'::jsonb) || '{"anonymized":true}'::jsonb
WHERE id IN (
SELECT id FROM "AdminAuditLog"
WHERE "createdAt" < ${cutoff}
AND ("metadata" ->> 'anonymized') IS DISTINCT FROM 'true'
ORDER BY "createdAt" ASC
LIMIT ${RETENTION_CONFIG.batchSize}
FOR UPDATE SKIP LOCKED
)
RETURNING id
`;
total += rows.length;
if (rows.length < RETENTION_CONFIG.batchSize) break;
}
await this.runLog.markFinished(runId, total);
this.logger.log(
`AuditLogPurgeService anonymized ${total} row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`,
'AuditLogPurgeService',
);
return { rowsAffected: total, runId };
} catch (err) {
const error = err as Error;
await this.runLog.markFailed(runId, error, total);
this.logger.error(
`AuditLogPurgeService failed: ${error.message}`,
error.stack,
'AuditLogPurgeService',
);
throw error;
}
}
}

View File

@@ -0,0 +1,85 @@
import { Injectable } from '@nestjs/common';
import { PrismaService, LoggerService } from '@modules/shared';
import { RETENTION_CONFIG } from '../../domain/retention.config';
import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
/**
* Hard-deletes the User.kycData JSON blob 90 days after `User.deletedAt`.
* Per CLO guidance, KYC PII must not survive a deleted account beyond the
* Decree-13 minimum. The User row itself stays (it carries audit-relevant
* metadata via foreign keys), only the kyc payload is nulled.
*
* Future work (tracked separately): when a dedicated KycDocument table
* lands, this service must also call StorageService.deleteObject(key) for
* each blob in S3.
*
* GOO-196 — Decree 13 compliance.
*/
@Injectable()
export class KycPurgeService {
static readonly JOB = 'kyc-blob-purge';
constructor(
private readonly prisma: PrismaService,
private readonly logger: LoggerService,
private readonly runLog: RetentionRunLogRepository,
) {}
async run(): Promise<{ rowsAffected: number; runId: string }> {
const cutoff = new Date(Date.now() - RETENTION_CONFIG.kycPurgeMs);
const runId = await this.runLog.start({
job: KycPurgeService.JOB,
batchSize: RETENTION_CONFIG.batchSize,
dryRun: RETENTION_CONFIG.dryRun,
});
let total = 0;
try {
for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) {
if (RETENTION_CONFIG.dryRun) {
total = await this.prisma.user.count({
where: {
deletedAt: { lt: cutoff },
NOT: { kycData: { equals: null as unknown as undefined } },
},
});
break;
}
const rows = await this.prisma.$queryRaw<Array<{ id: string }>>`
UPDATE "User"
SET "kycData" = NULL
WHERE id IN (
SELECT id FROM "User"
WHERE "deletedAt" IS NOT NULL
AND "deletedAt" < ${cutoff}
AND "kycData" IS NOT NULL
ORDER BY "deletedAt" ASC
LIMIT ${RETENTION_CONFIG.batchSize}
FOR UPDATE SKIP LOCKED
)
RETURNING id
`;
total += rows.length;
if (rows.length < RETENTION_CONFIG.batchSize) break;
}
await this.runLog.markFinished(runId, total);
this.logger.log(
`KycPurgeService nulled kycData on ${total} user row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`,
'KycPurgeService',
);
return { rowsAffected: total, runId };
} catch (err) {
const error = err as Error;
await this.runLog.markFailed(runId, error, total);
this.logger.error(
`KycPurgeService failed: ${error.message}`,
error.stack,
'KycPurgeService',
);
throw error;
}
}
}

View File

@@ -0,0 +1,85 @@
import { Injectable } from '@nestjs/common';
import { PrismaService, LoggerService } from '@modules/shared';
import { RETENTION_CONFIG } from '../../domain/retention.config';
import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
/**
* Hard-deletes Message.content for any conversation in CLOSED status whose
* last activity is older than 90 days. Metadata (sender, timestamps) is
* preserved per CLO guidance — only the message body itself is PII.
*
* GOO-196 — Decree 13 compliance.
*/
@Injectable()
export class MessagingPurgeService {
static readonly JOB = 'messaging-body-purge';
constructor(
private readonly prisma: PrismaService,
private readonly logger: LoggerService,
private readonly runLog: RetentionRunLogRepository,
) {}
async run(): Promise<{ rowsAffected: number; runId: string }> {
const cutoff = new Date(Date.now() - RETENTION_CONFIG.messagingBodyMs);
const runId = await this.runLog.start({
job: MessagingPurgeService.JOB,
batchSize: RETENTION_CONFIG.batchSize,
dryRun: RETENTION_CONFIG.dryRun,
});
let total = 0;
try {
for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) {
if (RETENTION_CONFIG.dryRun) {
total = await this.prisma.message.count({
where: {
deletedAt: null,
conversation: {
status: 'CLOSED',
lastMessageAt: { lt: cutoff },
},
},
});
break;
}
const rows = await this.prisma.$queryRaw<Array<{ id: string }>>`
UPDATE "Message"
SET "content" = '',
"deletedAt" = NOW()
WHERE id IN (
SELECT m.id FROM "Message" m
JOIN "Conversation" c ON c.id = m."conversationId"
WHERE m."deletedAt" IS NULL
AND c."status" = 'CLOSED'
AND c."lastMessageAt" < ${cutoff}
ORDER BY m."createdAt" ASC
LIMIT ${RETENTION_CONFIG.batchSize}
FOR UPDATE OF m SKIP LOCKED
)
RETURNING id
`;
total += rows.length;
if (rows.length < RETENTION_CONFIG.batchSize) break;
}
await this.runLog.markFinished(runId, total);
this.logger.log(
`MessagingPurgeService cleared ${total} message body row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`,
'MessagingPurgeService',
);
return { rowsAffected: total, runId };
} catch (err) {
const error = err as Error;
await this.runLog.markFailed(runId, error, total);
this.logger.error(
`MessagingPurgeService failed: ${error.message}`,
error.stack,
'MessagingPurgeService',
);
throw error;
}
}
}

View File

@@ -0,0 +1,79 @@
import { Injectable } from '@nestjs/common';
import { LoggerService } from '@modules/shared';
import { RETENTION_CONFIG } from '../../domain/retention.config';
import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
/**
* Payment callback log purge — three-phase schedule confirmed by CLO in
* GOO-201 (MoF Circular 78/2021/TT-BTC, Accounting Law 88/2015 Art. 41,
* Tax Admin Law 38/2019 Art. 86):
*
* Phase 1 @ 2y — scrub operational PII (IP, device fingerprint).
* Phase 2 @ 5y — scrub buyer identity (name/phone/email, bank suffix);
* preserves `buyerName` for invoice-linked rows.
* Phase 3 @ 10y — hard delete (or cold-archive if
* RETENTION_PAYMENT_ARCHIVE=true).
*
* This service is intentionally a **stub** in the initial GOO-196 landing:
* the `PaymentCallbackLog` table does not exist in the Prisma schema yet
* (tracked under the payments module refactor). Calling `run()` emits a
* RetentionRunLog row with status=SUCCESS and rowsAffected=0 so dry-run
* telemetry is visible from day one, while the actual UPDATE/DELETE
* statements are added when the table lands.
*
* GOO-196 — Decree 13 compliance.
*/
@Injectable()
export class PaymentCallbackPurgeService {
static readonly JOB = 'payment-callback-purge';
constructor(
private readonly logger: LoggerService,
private readonly runLog: RetentionRunLogRepository,
) {}
async run(phase: 1 | 2 | 3): Promise<{ rowsAffected: number; runId: string }> {
const cutoff = this.cutoffFor(phase);
const runId = await this.runLog.start({
job: PaymentCallbackPurgeService.JOB,
phase,
batchSize: RETENTION_CONFIG.batchSize,
dryRun: RETENTION_CONFIG.dryRun,
});
try {
// TODO(GOO-196 follow-up): implement once PaymentCallbackLog schema lands.
// Phase 1: UPDATE … SET ipAddress=NULL, deviceFingerprint=NULL,
// anonymizedPhase1At=NOW() WHERE createdAt < ${cutoff}
// AND anonymizedPhase1At IS NULL
// Phase 2: UPDATE … SET callbackPayload = jsonb_set_lax(..., 'null'),
// bankAccountMasked=NULL, cardSuffix=NULL,
// anonymizedPhase2At=NOW() (skip buyerName on invoice rows)
// Phase 3: DELETE FROM "PaymentCallbackLog" WHERE createdAt < ${cutoff}
// — OR — INSERT INTO payment_callback_archive … then DELETE.
const rowsAffected = 0;
await this.runLog.markFinished(runId, rowsAffected);
this.logger.warn(
`PaymentCallbackPurgeService phase=${phase} is a no-op — PaymentCallbackLog table not yet in schema (cutoff=${cutoff.toISOString()})`,
'PaymentCallbackPurgeService',
);
return { rowsAffected, runId };
} catch (err) {
const error = err as Error;
await this.runLog.markFailed(runId, error);
throw error;
}
}
private cutoffFor(phase: 1 | 2 | 3): Date {
const now = Date.now();
switch (phase) {
case 1:
return new Date(now - RETENTION_CONFIG.paymentPhase1Ms);
case 2:
return new Date(now - RETENTION_CONFIG.paymentPhase2Ms);
case 3:
return new Date(now - RETENTION_CONFIG.paymentPhase3Ms);
}
}
}

View File

@@ -0,0 +1,81 @@
import { Injectable } from '@nestjs/common';
import { PrismaService, LoggerService } from '@modules/shared';
import { RETENTION_CONFIG } from '../../domain/retention.config';
import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
/**
* Hard-deletes refresh tokens that are revoked or expired and were created
* more than `RETENTION_REFRESH_TOKEN_DAYS` (default 30) days ago.
*
* Idempotency: a single DELETE … RETURNING id with FOR UPDATE SKIP LOCKED is
* race-safe — only one writer wins each row. Loops in capped batches to avoid
* statement timeouts on large tables.
*
* GOO-196 — Decree 13 compliance.
*/
@Injectable()
export class RefreshTokenPurgeService {
static readonly JOB = 'refresh-token-purge';
constructor(
private readonly prisma: PrismaService,
private readonly logger: LoggerService,
private readonly runLog: RetentionRunLogRepository,
) {}
async run(): Promise<{ rowsAffected: number; runId: string }> {
const cutoff = new Date(Date.now() - RETENTION_CONFIG.refreshTokenStaleMs);
const runId = await this.runLog.start({
job: RefreshTokenPurgeService.JOB,
batchSize: RETENTION_CONFIG.batchSize,
dryRun: RETENTION_CONFIG.dryRun,
});
let total = 0;
try {
for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) {
if (RETENTION_CONFIG.dryRun) {
total = await this.prisma.refreshToken.count({
where: {
createdAt: { lt: cutoff },
OR: [{ revokedAt: { not: null } }, { expiresAt: { lt: new Date() } }],
},
});
break;
}
const rows = await this.prisma.$queryRaw<Array<{ id: string }>>`
DELETE FROM "RefreshToken"
WHERE id IN (
SELECT id FROM "RefreshToken"
WHERE "createdAt" < ${cutoff}
AND ("revokedAt" IS NOT NULL OR "expiresAt" < NOW())
ORDER BY "createdAt" ASC
LIMIT ${RETENTION_CONFIG.batchSize}
FOR UPDATE SKIP LOCKED
)
RETURNING id
`;
total += rows.length;
if (rows.length < RETENTION_CONFIG.batchSize) break;
}
await this.runLog.markFinished(runId, total);
this.logger.log(
`RefreshTokenPurgeService removed ${total} row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`,
'RefreshTokenPurgeService',
);
return { rowsAffected: total, runId };
} catch (err) {
const error = err as Error;
await this.runLog.markFailed(runId, error, total);
this.logger.error(
`RefreshTokenPurgeService failed: ${error.message}`,
error.stack,
'RefreshTokenPurgeService',
);
throw error;
}
}
}

View File

@@ -0,0 +1,56 @@
/**
* Centralised retention configuration. Every window comes from env vars so a
* staging environment can dry-run aggressively without hard-coding constants.
*
* GOO-196 — Decree 13 compliance.
*/
const days = (n: number): number => n * 24 * 60 * 60 * 1000;
const years = (n: number): number => Math.floor(n * 365.25 * 24 * 60 * 60 * 1000);
const numEnv = (name: string, fallback: number): number => {
const raw = process.env[name];
if (!raw) return fallback;
const parsed = Number(raw);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
};
const boolEnv = (name: string, fallback = false): boolean => {
const raw = process.env[name];
if (raw === undefined) return fallback;
return raw === 'true' || raw === '1';
};
export const RETENTION_CONFIG = {
/** Master kill switch — when false, every cron is a no-op. */
enabled: boolEnv('RETENTION_ENABLED', false),
/** Forces every job into SELECT-only mode for staging review. */
dryRun: boolEnv('RETENTION_DRY_RUN', false),
/** Per-batch row cap. Keeps statement timeouts and replication lag bounded. */
batchSize: numEnv('RETENTION_BATCH_SIZE', 1000),
/** Hard cap on batch loops per run. */
maxBatches: numEnv('RETENTION_MAX_BATCHES', 50),
/** RefreshToken: revoked OR expired older than this is hard-deleted. */
refreshTokenStaleMs: days(numEnv('RETENTION_REFRESH_TOKEN_DAYS', 30)),
/** Messaging body purge window after Conversation.lastMessageAt. */
messagingBodyMs: days(numEnv('RETENTION_MESSAGING_DAYS', 90)),
/** KYC blob purge window after User.deletedAt. */
kycPurgeMs: days(numEnv('RETENTION_KYC_DAYS', 90)),
/** Audit log anonymization window. */
auditAnonymizeMs: years(numEnv('RETENTION_AUDIT_YEARS', 5)),
/** Payment callback phased schedule (CLO confirmed via GOO-201). */
paymentPhase1Ms: years(numEnv('RETENTION_PAYMENT_PHASE1_YEARS', 2)),
paymentPhase2Ms: years(numEnv('RETENTION_PAYMENT_PHASE2_YEARS', 5)),
paymentPhase3Ms: years(numEnv('RETENTION_PAYMENT_PHASE3_YEARS', 10)),
/** When true, phase-3 archives to a cold table instead of hard-deleting. */
paymentArchive: boolEnv('RETENTION_PAYMENT_ARCHIVE', false),
} as const;
export type RetentionConfig = typeof RETENTION_CONFIG;

View File

@@ -0,0 +1,8 @@
export { RetentionModule } from './retention.module';
export { RefreshTokenPurgeService } from './application/services/refresh-token-purge.service';
export { MessagingPurgeService } from './application/services/messaging-purge.service';
export { KycPurgeService } from './application/services/kyc-purge.service';
export { AuditLogPurgeService } from './application/services/audit-log-purge.service';
export { PaymentCallbackPurgeService } from './application/services/payment-callback-purge.service';
export { RetentionRunLogRepository } from './infrastructure/repositories/retention-run-log.repository';
export { RETENTION_CONFIG, type RetentionConfig } from './domain/retention.config';

View File

@@ -0,0 +1,91 @@
import { Injectable } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { LoggerService } from '@modules/shared';
import { AuditLogPurgeService } from '../../application/services/audit-log-purge.service';
import { KycPurgeService } from '../../application/services/kyc-purge.service';
import { MessagingPurgeService } from '../../application/services/messaging-purge.service';
import { PaymentCallbackPurgeService } from '../../application/services/payment-callback-purge.service';
import { RefreshTokenPurgeService } from '../../application/services/refresh-token-purge.service';
import { RETENTION_CONFIG } from '../../domain/retention.config';
/**
* Thin scheduler that delegates to each domain purge service. All windows
* run during Vietnam off-peak hours (UTC times below correspond to ~23:00
* 01:00 ICT). Set RETENTION_ENABLED=true to activate; otherwise every job
* is a no-op so the module can ship behind a flag.
*
* GOO-196 — Decree 13 compliance.
*/
@Injectable()
export class RetentionCronOrchestrator {
constructor(
private readonly logger: LoggerService,
private readonly refreshTokens: RefreshTokenPurgeService,
private readonly messaging: MessagingPurgeService,
private readonly kyc: KycPurgeService,
private readonly auditLogs: AuditLogPurgeService,
private readonly paymentCallbacks: PaymentCallbackPurgeService,
) {}
@Cron('0 16 * * *', { name: 'retention-refresh-tokens', timeZone: 'UTC' })
async runRefreshTokens(): Promise<void> {
if (!RETENTION_CONFIG.enabled) return this.skip('refresh-token-purge');
await this.safe(() => this.refreshTokens.run(), 'refresh-token-purge');
}
@Cron('30 16 * * *', { name: 'retention-messaging', timeZone: 'UTC' })
async runMessaging(): Promise<void> {
if (!RETENTION_CONFIG.enabled) return this.skip('messaging-purge');
await this.safe(() => this.messaging.run(), 'messaging-purge');
}
@Cron('0 17 * * *', { name: 'retention-kyc', timeZone: 'UTC' })
async runKyc(): Promise<void> {
if (!RETENTION_CONFIG.enabled) return this.skip('kyc-purge');
await this.safe(() => this.kyc.run(), 'kyc-purge');
}
@Cron('30 17 * * *', { name: 'retention-payment-phase1', timeZone: 'UTC' })
async runPaymentPhase1(): Promise<void> {
if (!RETENTION_CONFIG.enabled) return this.skip('payment-phase-1');
await this.safe(() => this.paymentCallbacks.run(1), 'payment-phase-1');
}
@Cron('0 18 * * *', { name: 'retention-payment-phase2', timeZone: 'UTC' })
async runPaymentPhase2(): Promise<void> {
if (!RETENTION_CONFIG.enabled) return this.skip('payment-phase-2');
await this.safe(() => this.paymentCallbacks.run(2), 'payment-phase-2');
}
@Cron('0 17 * * 0', { name: 'retention-audit-anonymize', timeZone: 'UTC' })
async runAuditLogs(): Promise<void> {
if (!RETENTION_CONFIG.enabled) return this.skip('audit-log-anonymize');
await this.safe(() => this.auditLogs.run(), 'audit-log-anonymize');
}
@Cron('0 18 * * 0', { name: 'retention-payment-phase3', timeZone: 'UTC' })
async runPaymentPhase3(): Promise<void> {
if (!RETENTION_CONFIG.enabled) return this.skip('payment-phase-3');
await this.safe(() => this.paymentCallbacks.run(3), 'payment-phase-3');
}
private skip(name: string): void {
this.logger.debug(
`Retention job ${name} skipped: RETENTION_ENABLED=false`,
'RetentionCronOrchestrator',
);
}
private async safe(fn: () => Promise<unknown>, name: string): Promise<void> {
try {
await fn();
} catch (err) {
this.logger.error(
`Retention job ${name} threw: ${(err as Error).message}`,
(err as Error).stack,
'RetentionCronOrchestrator',
);
// Swallow — RetentionRunLog already records FAILED. Do not crash the scheduler.
}
}
}

View File

@@ -0,0 +1,56 @@
import { Injectable } from '@nestjs/common';
import { PrismaService } from '@modules/shared';
/**
* Thin repository around RetentionRunLog. Centralised so every purge service
* uses the exact same shape — start row, mark finished/failed, never invent
* variations.
*
* GOO-196 — Decree 13 compliance.
*/
@Injectable()
export class RetentionRunLogRepository {
constructor(private readonly prisma: PrismaService) {}
async start(input: {
job: string;
phase?: number | null;
batchSize?: number | null;
dryRun?: boolean;
}): Promise<string> {
const row = await this.prisma.retentionRunLog.create({
data: {
job: input.job,
phase: input.phase ?? null,
batchSize: input.batchSize ?? null,
dryRun: input.dryRun ?? false,
status: 'RUNNING',
},
select: { id: true },
});
return row.id;
}
async markFinished(id: string, rowsAffected: number, partial = false): Promise<void> {
await this.prisma.retentionRunLog.update({
where: { id },
data: {
finishedAt: new Date(),
rowsAffected,
status: partial ? 'PARTIAL' : 'SUCCESS',
},
});
}
async markFailed(id: string, error: Error, rowsAffected = 0): Promise<void> {
await this.prisma.retentionRunLog.update({
where: { id },
data: {
finishedAt: new Date(),
rowsAffected,
status: 'FAILED',
errorMessage: error.message.slice(0, 2000),
},
});
}
}

View File

@@ -0,0 +1,36 @@
import { Module } from '@nestjs/common';
import { AuditLogPurgeService } from './application/services/audit-log-purge.service';
import { KycPurgeService } from './application/services/kyc-purge.service';
import { MessagingPurgeService } from './application/services/messaging-purge.service';
import { PaymentCallbackPurgeService } from './application/services/payment-callback-purge.service';
import { RefreshTokenPurgeService } from './application/services/refresh-token-purge.service';
import { RetentionCronOrchestrator } from './infrastructure/cron/retention-cron.orchestrator';
import { RetentionRunLogRepository } from './infrastructure/repositories/retention-run-log.repository';
/**
* GOO-196 — Decree 13 data retention & purge jobs.
*
* Ships behind RETENTION_ENABLED=false so the module can land without
* affecting prod. Flip to true after a 7-day staging dry-run review with
* CLO/DPO. See `apps/api/src/modules/retention/domain/retention.config.ts`
* for every tunable window.
*/
@Module({
providers: [
RetentionRunLogRepository,
RefreshTokenPurgeService,
MessagingPurgeService,
KycPurgeService,
AuditLogPurgeService,
PaymentCallbackPurgeService,
RetentionCronOrchestrator,
],
exports: [
RefreshTokenPurgeService,
MessagingPurgeService,
KycPurgeService,
AuditLogPurgeService,
PaymentCallbackPurgeService,
],
})
export class RetentionModule {}

View File

@@ -0,0 +1,25 @@
-- GOO-196: Data retention policy & purge jobs (Decree 13 compliance)
-- Adds the RetentionRunLog table so every purge / anonymization pass is auditable.
-- CreateEnum
CREATE TYPE "RetentionRunStatus" AS ENUM ('RUNNING', 'SUCCESS', 'PARTIAL', 'FAILED');
-- CreateTable
CREATE TABLE "RetentionRunLog" (
"id" TEXT NOT NULL,
"job" TEXT NOT NULL,
"phase" INTEGER,
"startedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"finishedAt" TIMESTAMP(3),
"rowsAffected" INTEGER NOT NULL DEFAULT 0,
"status" "RetentionRunStatus" NOT NULL DEFAULT 'RUNNING',
"errorMessage" TEXT,
"batchSize" INTEGER,
"dryRun" BOOLEAN NOT NULL DEFAULT false,
CONSTRAINT "RetentionRunLog_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "RetentionRunLog_job_startedAt_idx" ON "RetentionRunLog"("job", "startedAt");
CREATE INDEX "RetentionRunLog_startedAt_idx" ON "RetentionRunLog"("startedAt" DESC);

View File

@@ -1567,3 +1567,34 @@ model VnAdministrativeAlias {
@@index([newWardCode])
@@map("vn_administrative_aliases")
}
// =============================================================================
// RETENTION (GOO-196 — Decree 13 compliance)
// =============================================================================
enum RetentionRunStatus {
RUNNING
SUCCESS
PARTIAL
FAILED
}
/// Every purge / anonymization pass emits a RetentionRunLog row so the
/// operator and DPO can audit exactly what was touched and when. Multi-phase
/// jobs (e.g. payment callback 2y / 5y / 10y) record `phase` for
/// disambiguation.
model RetentionRunLog {
id String @id @default(cuid())
job String
phase Int?
startedAt DateTime @default(now())
finishedAt DateTime?
rowsAffected Int @default(0)
status RetentionRunStatus @default(RUNNING)
errorMessage String?
batchSize Int?
dryRun Boolean @default(false)
@@index([job, startedAt])
@@index([startedAt(sort: Desc)])
}