Compare commits
2 Commits
b4bb05479e
...
865a28009f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
865a28009f | ||
|
|
05a629cf21 |
@@ -1 +1,2 @@
|
||||
export * from './projectors';
|
||||
export * from './reconciliation';
|
||||
|
||||
@@ -0,0 +1,303 @@
|
||||
import { describe, it, expect, beforeEach, vi } from 'vitest';
|
||||
import type {
|
||||
IReadModelReconciler,
|
||||
ReconciliationSampleResult,
|
||||
} from '../../../domain/reconciler';
|
||||
import {
|
||||
AutoPromotionTracker,
|
||||
PROMOTION_THRESHOLD,
|
||||
ReconciliationHarness,
|
||||
DEFAULT_SAMPLED_CONFIG,
|
||||
} from '../index';
|
||||
|
||||
// ── test helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
function makeLoggerStub() {
|
||||
return {
|
||||
log: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
verbose: vi.fn(),
|
||||
setContext: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
/** Deterministic xorshift32 PRNG → values in [0, 1). */
|
||||
function seededRng(seed: number): () => number {
|
||||
let state = seed | 0 || 0xdeadbeef;
|
||||
return () => {
|
||||
state ^= state << 13;
|
||||
state ^= state >>> 17;
|
||||
state ^= state << 5;
|
||||
// Unsigned → [0, 2^32) → [0, 1)
|
||||
return ((state >>> 0) % 10000) / 10000;
|
||||
};
|
||||
}
|
||||
|
||||
class SyntheticReconciler implements IReadModelReconciler {
|
||||
constructor(
|
||||
public readonly readModelName: string,
|
||||
private readonly universe: readonly string[],
|
||||
private readonly driftedKeys: ReadonlySet<string>,
|
||||
) {}
|
||||
|
||||
async listSampleKeys(): Promise<readonly string[]> {
|
||||
return this.universe;
|
||||
}
|
||||
|
||||
async reconcile(sampleKey: string): Promise<ReconciliationSampleResult> {
|
||||
if (this.driftedKeys.has(sampleKey)) {
|
||||
return { sampleKey, inSync: false, reason: 'value_mismatch' };
|
||||
}
|
||||
return { sampleKey, inSync: true };
|
||||
}
|
||||
}
|
||||
|
||||
function harnessWith(
|
||||
reconcilers: readonly IReadModelReconciler[],
|
||||
tracker?: AutoPromotionTracker,
|
||||
): { harness: ReconciliationHarness; tracker: AutoPromotionTracker; logger: ReturnType<typeof makeLoggerStub> } {
|
||||
const logger = makeLoggerStub();
|
||||
const t = tracker ?? new AutoPromotionTracker(logger as never);
|
||||
const harness = new ReconciliationHarness(logger as never, t, reconcilers);
|
||||
return { harness, tracker: t, logger };
|
||||
}
|
||||
|
||||
// ── pickSample ─────────────────────────────────────────────────────────────
|
||||
|
||||
describe('ReconciliationHarness.pickSample', () => {
|
||||
it('returns empty for an empty universe', () => {
|
||||
expect(ReconciliationHarness.pickSample([], 0.01, 20)).toEqual([]);
|
||||
});
|
||||
|
||||
it('returns all items when rate*N + floor >= N', () => {
|
||||
const universe = ['a', 'b', 'c'];
|
||||
const picked = ReconciliationHarness.pickSample(universe, 0.5, 10);
|
||||
expect(picked).toHaveLength(3);
|
||||
expect([...picked].sort()).toEqual(['a', 'b', 'c']);
|
||||
});
|
||||
|
||||
it('respects minSamples floor even when rate*N is tiny', () => {
|
||||
const universe = Array.from({ length: 500 }, (_, i) => `k${i}`);
|
||||
const rng = seededRng(1);
|
||||
const picked = ReconciliationHarness.pickSample(universe, 0.01, 20, rng);
|
||||
// ceil(0.01 * 500) = 5, but minSamples=20 → 20
|
||||
expect(picked).toHaveLength(20);
|
||||
});
|
||||
|
||||
it('respects rate when rate*N beats minSamples floor', () => {
|
||||
const universe = Array.from({ length: 10000 }, (_, i) => `k${i}`);
|
||||
const rng = seededRng(2);
|
||||
const picked = ReconciliationHarness.pickSample(universe, 0.01, 20, rng);
|
||||
// ceil(0.01 * 10000) = 100 > 20
|
||||
expect(picked).toHaveLength(100);
|
||||
});
|
||||
|
||||
it('samples without replacement', () => {
|
||||
const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`);
|
||||
const rng = seededRng(3);
|
||||
const picked = ReconciliationHarness.pickSample(universe, 0.05, 0, rng);
|
||||
expect(new Set(picked).size).toBe(picked.length);
|
||||
});
|
||||
});
|
||||
|
||||
// ── synthetic drift ────────────────────────────────────────────────────────
|
||||
|
||||
describe('ReconciliationHarness — synthetic drift', () => {
|
||||
it('runSampled reports 0 drift when everything is in sync', async () => {
|
||||
const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`);
|
||||
const rec = new SyntheticReconciler('test_rm', universe, new Set());
|
||||
const { harness } = harnessWith([rec]);
|
||||
|
||||
const reports = await harness.runSampled();
|
||||
expect(reports).toHaveLength(1);
|
||||
const r = reports[0]!;
|
||||
expect(r.drifted).toBe(0);
|
||||
expect(r.driftRate).toBe(0);
|
||||
expect(r.breach).toBe(false);
|
||||
expect(r.checked).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it('runSampled flags breach when drift exceeds 0.1%', async () => {
|
||||
// 10000 keys, 50 drifted → 0.5% > 0.1% threshold
|
||||
const universe = Array.from({ length: 10000 }, (_, i) => `k${i}`);
|
||||
const drifted = new Set(universe.slice(0, 500)); // 5% drifted universe-wide
|
||||
const rec = new SyntheticReconciler('hot_rm', universe, drifted);
|
||||
const { harness } = harnessWith([rec]);
|
||||
|
||||
const reports = await harness.runSampled();
|
||||
const r = reports[0]!;
|
||||
expect(r.breach).toBe(true);
|
||||
expect(r.drifted).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it('runFull walks every key in the universe', async () => {
|
||||
const universe = Array.from({ length: 300 }, (_, i) => `k${i}`);
|
||||
const rec = new SyntheticReconciler('full_rm', universe, new Set(['k0', 'k1']));
|
||||
const { harness } = harnessWith([rec]);
|
||||
|
||||
const reports = await harness.runFull();
|
||||
const r = reports[0]!;
|
||||
expect(r.checked).toBe(300);
|
||||
expect(r.drifted).toBe(2);
|
||||
expect(r.mode).toBe('full');
|
||||
});
|
||||
|
||||
it('runs reconcilers in parallel (independent results)', async () => {
|
||||
const universeA = Array.from({ length: 100 }, (_, i) => `a${i}`);
|
||||
const universeB = Array.from({ length: 100 }, (_, i) => `b${i}`);
|
||||
const a = new SyntheticReconciler('rm_a', universeA, new Set());
|
||||
const b = new SyntheticReconciler('rm_b', universeB, new Set(universeB));
|
||||
const { harness } = harnessWith([a, b]);
|
||||
|
||||
const reports = await harness.runSampled();
|
||||
expect(reports).toHaveLength(2);
|
||||
expect(reports[0]!.drifted).toBe(0);
|
||||
expect(reports[1]!.drifted).toBe(reports[1]!.checked);
|
||||
expect(reports[1]!.breach).toBe(true);
|
||||
});
|
||||
|
||||
it('captures reconciler errors without failing the run', async () => {
|
||||
const broken: IReadModelReconciler = {
|
||||
readModelName: 'broken_rm',
|
||||
async listSampleKeys() {
|
||||
throw new Error('db timeout');
|
||||
},
|
||||
async reconcile() {
|
||||
return { sampleKey: '', inSync: true };
|
||||
},
|
||||
};
|
||||
const { harness } = harnessWith([broken]);
|
||||
const reports = await harness.runSampled();
|
||||
expect(reports[0]!.error).toContain('db timeout');
|
||||
expect(reports[0]!.checked).toBe(0);
|
||||
expect(reports[0]!.breach).toBe(false);
|
||||
});
|
||||
|
||||
it('handles empty universe without breach', async () => {
|
||||
const rec = new SyntheticReconciler('empty_rm', [], new Set());
|
||||
const { harness } = harnessWith([rec]);
|
||||
const reports = await harness.runSampled();
|
||||
expect(reports[0]!.checked).toBe(0);
|
||||
expect(reports[0]!.driftRate).toBe(0);
|
||||
expect(reports[0]!.breach).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
// ── auto-promotion ─────────────────────────────────────────────────────────
|
||||
|
||||
describe('AutoPromotionTracker', () => {
|
||||
let tracker: AutoPromotionTracker;
|
||||
let logger: ReturnType<typeof makeLoggerStub>;
|
||||
|
||||
beforeEach(() => {
|
||||
logger = makeLoggerStub();
|
||||
tracker = new AutoPromotionTracker(logger as never);
|
||||
});
|
||||
|
||||
it('starts clean', () => {
|
||||
expect(tracker.getConsecutiveBreaches('rm')).toBe(0);
|
||||
});
|
||||
|
||||
it('recordBreach increments consecutive counter', () => {
|
||||
tracker.recordBreach('rm');
|
||||
expect(tracker.getConsecutiveBreaches('rm')).toBe(1);
|
||||
});
|
||||
|
||||
it('does NOT promote on a single breach', () => {
|
||||
const d = tracker.recordBreach('rm');
|
||||
expect(d.shouldPromote).toBe(false);
|
||||
expect(d.state).toBe('breach');
|
||||
});
|
||||
|
||||
it(`promotes after ${PROMOTION_THRESHOLD} CONSECUTIVE sampled breaches`, () => {
|
||||
tracker.recordBreach('rm');
|
||||
const d = tracker.recordBreach('rm');
|
||||
expect(d.shouldPromote).toBe(true);
|
||||
expect(d.state).toBe('promoted');
|
||||
expect(d.consecutiveBreaches).toBe(PROMOTION_THRESHOLD);
|
||||
expect(logger.error).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('recordClean resets the counter', () => {
|
||||
tracker.recordBreach('rm');
|
||||
tracker.recordClean('rm');
|
||||
expect(tracker.getConsecutiveBreaches('rm')).toBe(0);
|
||||
const d = tracker.recordBreach('rm');
|
||||
expect(d.shouldPromote).toBe(false);
|
||||
});
|
||||
|
||||
it('tracks per-read-model counters independently', () => {
|
||||
tracker.recordBreach('rm_a');
|
||||
tracker.recordBreach('rm_a');
|
||||
tracker.recordBreach('rm_b');
|
||||
expect(tracker.getConsecutiveBreaches('rm_a')).toBe(2);
|
||||
expect(tracker.getConsecutiveBreaches('rm_b')).toBe(1);
|
||||
});
|
||||
|
||||
it('recordError leaves counter unchanged', () => {
|
||||
tracker.recordBreach('rm');
|
||||
tracker.recordError('rm');
|
||||
expect(tracker.getConsecutiveBreaches('rm')).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
// ── integrated: harness + tracker ──────────────────────────────────────────
|
||||
|
||||
describe('ReconciliationHarness ↔ AutoPromotionTracker', () => {
|
||||
it('auto-promotes after 2 consecutive sampled breaches', async () => {
|
||||
const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`);
|
||||
const drifted = new Set(universe); // 100% drift
|
||||
const rec = new SyntheticReconciler('bad_rm', universe, drifted);
|
||||
const { harness, tracker } = harnessWith([rec]);
|
||||
|
||||
const run1 = await harness.runSampled();
|
||||
expect(run1[0]!.breach).toBe(true);
|
||||
expect(run1[0]!.promotion.shouldPromote).toBe(false);
|
||||
|
||||
const run2 = await harness.runSampled();
|
||||
expect(run2[0]!.breach).toBe(true);
|
||||
expect(run2[0]!.promotion.shouldPromote).toBe(true);
|
||||
expect(tracker.getConsecutiveBreaches('bad_rm')).toBe(2);
|
||||
});
|
||||
|
||||
it('a clean run breaks the promotion streak', async () => {
|
||||
const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`);
|
||||
|
||||
// breach run
|
||||
const hotRec = new SyntheticReconciler('rm', universe, new Set(universe));
|
||||
const { harness: h1, tracker } = harnessWith([hotRec]);
|
||||
await h1.runSampled();
|
||||
expect(tracker.getConsecutiveBreaches('rm')).toBe(1);
|
||||
|
||||
// clean run with SAME tracker
|
||||
const coldRec = new SyntheticReconciler('rm', universe, new Set());
|
||||
const { harness: h2 } = harnessWith([coldRec], tracker);
|
||||
const r = await h2.runSampled();
|
||||
expect(r[0]!.breach).toBe(false);
|
||||
expect(tracker.getConsecutiveBreaches('rm')).toBe(0);
|
||||
});
|
||||
|
||||
it('full-mode runs do NOT advance the auto-promotion counter', async () => {
|
||||
const universe = Array.from({ length: 100 }, (_, i) => `k${i}`);
|
||||
const rec = new SyntheticReconciler('rm', universe, new Set(universe));
|
||||
const { harness, tracker } = harnessWith([rec]);
|
||||
|
||||
await harness.runFull();
|
||||
await harness.runFull();
|
||||
expect(tracker.getConsecutiveBreaches('rm')).toBe(0);
|
||||
});
|
||||
|
||||
it('uses DEFAULT_SAMPLED_CONFIG breach threshold of 0.1%', () => {
|
||||
expect(DEFAULT_SAMPLED_CONFIG.breachThreshold).toBeCloseTo(0.001);
|
||||
expect(DEFAULT_SAMPLED_CONFIG.sampleRate).toBeCloseTo(0.01);
|
||||
expect(DEFAULT_SAMPLED_CONFIG.minSamples).toBeGreaterThanOrEqual(20);
|
||||
});
|
||||
|
||||
it('harness with empty registry is a no-op', async () => {
|
||||
const { harness } = harnessWith([]);
|
||||
await expect(harness.runSampled()).resolves.toEqual([]);
|
||||
await expect(harness.runFull()).resolves.toEqual([]);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,103 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import.
|
||||
import { LoggerService } from '@modules/shared';
|
||||
|
||||
/**
|
||||
* Promotion threshold — number of CONSECUTIVE sampled breaches before the
|
||||
* harness pages on-call to flip the config to nightly-full mode.
|
||||
*
|
||||
* RFC-003 §7: "Auto-promotion rule: 2 consecutive sampled breaches →
|
||||
* page on-call to flip config to nightly full".
|
||||
*/
|
||||
export const PROMOTION_THRESHOLD = 2;
|
||||
|
||||
export type PromotionState = 'clean' | 'breach' | 'promoted';
|
||||
|
||||
export interface PromotionDecision {
|
||||
readonly readModelName: string;
|
||||
readonly state: PromotionState;
|
||||
readonly consecutiveBreaches: number;
|
||||
readonly shouldPromote: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* In-memory auto-promotion state machine.
|
||||
*
|
||||
* Keeps a per-read-model counter of consecutive sampled breaches. A clean
|
||||
* run resets the counter; hitting `PROMOTION_THRESHOLD` emits a structured
|
||||
* log line that the alerting stack pages on via Loki, plus flips the
|
||||
* returned `shouldPromote` flag which the cron service exposes as a
|
||||
* Prometheus counter for redundant alerting.
|
||||
*
|
||||
* Multi-instance deployments: counter resets on process restart. That is
|
||||
* deliberate — we rely on Prometheus `increase(...)` over the alerting
|
||||
* window, not on in-process state, for the pager. The in-process state
|
||||
* is there so a single long-lived instance auto-pages without waiting
|
||||
* for the Prometheus scrape cycle.
|
||||
*/
|
||||
@Injectable()
|
||||
export class AutoPromotionTracker {
|
||||
private readonly counters = new Map<string, number>();
|
||||
|
||||
constructor(private readonly logger: LoggerService) {}
|
||||
|
||||
recordBreach(readModelName: string): PromotionDecision {
|
||||
const next = (this.counters.get(readModelName) ?? 0) + 1;
|
||||
this.counters.set(readModelName, next);
|
||||
|
||||
const shouldPromote = next >= PROMOTION_THRESHOLD;
|
||||
if (shouldPromote) {
|
||||
this.logger.error(
|
||||
`auto_promote_required read_model=${readModelName} consecutive_breaches=${next}`,
|
||||
undefined,
|
||||
'ReconciliationAutoPromotion',
|
||||
);
|
||||
} else {
|
||||
this.logger.warn(
|
||||
`reconciliation_breach read_model=${readModelName} consecutive_breaches=${next}`,
|
||||
'ReconciliationAutoPromotion',
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
readModelName,
|
||||
state: shouldPromote ? 'promoted' : 'breach',
|
||||
consecutiveBreaches: next,
|
||||
shouldPromote,
|
||||
};
|
||||
}
|
||||
|
||||
recordClean(readModelName: string): PromotionDecision {
|
||||
this.counters.set(readModelName, 0);
|
||||
return {
|
||||
readModelName,
|
||||
state: 'clean',
|
||||
consecutiveBreaches: 0,
|
||||
shouldPromote: false,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Error during sampling does NOT reset the counter — the run is simply
|
||||
* inconclusive. Returns the unchanged state for telemetry.
|
||||
*/
|
||||
recordError(readModelName: string): PromotionDecision {
|
||||
const current = this.counters.get(readModelName) ?? 0;
|
||||
return {
|
||||
readModelName,
|
||||
state: current >= PROMOTION_THRESHOLD ? 'promoted' : current > 0 ? 'breach' : 'clean',
|
||||
consecutiveBreaches: current,
|
||||
shouldPromote: false,
|
||||
};
|
||||
}
|
||||
|
||||
/** Test-only: inspect internal state. */
|
||||
getConsecutiveBreaches(readModelName: string): number {
|
||||
return this.counters.get(readModelName) ?? 0;
|
||||
}
|
||||
|
||||
/** Test-only: clear all counters. */
|
||||
reset(): void {
|
||||
this.counters.clear();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
export {
|
||||
AutoPromotionTracker,
|
||||
PROMOTION_THRESHOLD,
|
||||
type PromotionDecision,
|
||||
type PromotionState,
|
||||
} from './auto-promotion-tracker';
|
||||
export {
|
||||
ReconciliationHarness,
|
||||
DEFAULT_SAMPLED_CONFIG,
|
||||
type ReconciliationRunReport,
|
||||
} from './reconciliation-harness';
|
||||
@@ -0,0 +1,189 @@
|
||||
import { Inject, Injectable, Optional } from '@nestjs/common';
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import.
|
||||
import { LoggerService } from '@modules/shared';
|
||||
import {
|
||||
RECONCILER_REGISTRY,
|
||||
type IReadModelReconciler,
|
||||
type ReconciliationMode,
|
||||
type ReconciliationSampleResult,
|
||||
} from '../../domain/reconciler';
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import.
|
||||
import { AutoPromotionTracker } from './auto-promotion-tracker';
|
||||
import type { PromotionDecision } from './auto-promotion-tracker';
|
||||
|
||||
/**
|
||||
* Default sampled cadence config — RFC-003 §7:
|
||||
* "1% random sample reconciliation per registered read model"
|
||||
* "alert if drift > 0.1%"
|
||||
*
|
||||
* `minSamples` guarantees we always run at least 20 comparisons even for
|
||||
* tiny read models where 1% is a fractional row count.
|
||||
*/
|
||||
export const DEFAULT_SAMPLED_CONFIG = Object.freeze({
|
||||
sampleRate: 0.01,
|
||||
minSamples: 20,
|
||||
breachThreshold: 0.001,
|
||||
});
|
||||
|
||||
export interface ReconciliationRunReport {
|
||||
readonly readModelName: string;
|
||||
readonly mode: ReconciliationMode;
|
||||
readonly checked: number;
|
||||
readonly drifted: number;
|
||||
readonly driftRate: number;
|
||||
readonly breach: boolean;
|
||||
readonly driftReasons: readonly string[];
|
||||
readonly promotion: PromotionDecision;
|
||||
readonly error?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Orchestrates reconciler runs for both cadences. Stateless — cron service
|
||||
* owns scheduling, locking and metrics. The harness only does:
|
||||
* 1. Ask each reconciler for candidate keys.
|
||||
* 2. Sample (1% + min floor) for sampled runs; take all for full runs.
|
||||
* 3. Invoke `reconcile(sampleKey)` per sample.
|
||||
* 4. Feed the drift outcome into the auto-promotion tracker.
|
||||
* 5. Return a `ReconciliationRunReport[]` per read model.
|
||||
*/
|
||||
@Injectable()
|
||||
export class ReconciliationHarness {
|
||||
constructor(
|
||||
private readonly logger: LoggerService,
|
||||
private readonly promotionTracker: AutoPromotionTracker,
|
||||
@Optional()
|
||||
@Inject(RECONCILER_REGISTRY)
|
||||
private readonly reconcilers: readonly IReadModelReconciler[] | null,
|
||||
) {}
|
||||
|
||||
get registry(): readonly IReadModelReconciler[] {
|
||||
return this.reconcilers ?? [];
|
||||
}
|
||||
|
||||
async runSampled(
|
||||
config = DEFAULT_SAMPLED_CONFIG,
|
||||
): Promise<readonly ReconciliationRunReport[]> {
|
||||
return Promise.all(
|
||||
this.registry.map((r) => this.runOne(r, 'sampled', config)),
|
||||
);
|
||||
}
|
||||
|
||||
async runFull(
|
||||
breachThreshold = DEFAULT_SAMPLED_CONFIG.breachThreshold,
|
||||
): Promise<readonly ReconciliationRunReport[]> {
|
||||
return Promise.all(
|
||||
this.registry.map((r) =>
|
||||
this.runOne(r, 'full', {
|
||||
sampleRate: 1,
|
||||
minSamples: 0,
|
||||
breachThreshold,
|
||||
}),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
private async runOne(
|
||||
reconciler: IReadModelReconciler,
|
||||
mode: ReconciliationMode,
|
||||
config: typeof DEFAULT_SAMPLED_CONFIG,
|
||||
): Promise<ReconciliationRunReport> {
|
||||
const name = reconciler.readModelName;
|
||||
try {
|
||||
const budget =
|
||||
mode === 'full' ? Number.MAX_SAFE_INTEGER : Math.max(config.minSamples, 100);
|
||||
const universe = await reconciler.listSampleKeys(mode, budget);
|
||||
const sample =
|
||||
mode === 'full'
|
||||
? universe
|
||||
: ReconciliationHarness.pickSample(
|
||||
universe,
|
||||
config.sampleRate,
|
||||
config.minSamples,
|
||||
);
|
||||
|
||||
const results: ReconciliationSampleResult[] = [];
|
||||
for (const key of sample) {
|
||||
results.push(await reconciler.reconcile(key));
|
||||
}
|
||||
|
||||
const drifted = results.filter((r) => !r.inSync);
|
||||
const checked = results.length;
|
||||
const driftRate = checked === 0 ? 0 : drifted.length / checked;
|
||||
const breach = driftRate > config.breachThreshold;
|
||||
|
||||
const promotion =
|
||||
mode === 'sampled'
|
||||
? breach
|
||||
? this.promotionTracker.recordBreach(name)
|
||||
: this.promotionTracker.recordClean(name)
|
||||
: {
|
||||
readModelName: name,
|
||||
state: breach ? ('breach' as const) : ('clean' as const),
|
||||
consecutiveBreaches: this.promotionTracker.getConsecutiveBreaches(name),
|
||||
shouldPromote: false,
|
||||
};
|
||||
|
||||
this.logger.log(
|
||||
`reconciliation_run read_model=${name} mode=${mode} checked=${checked} drifted=${drifted.length} drift_rate=${driftRate.toFixed(5)} breach=${breach}`,
|
||||
'ReconciliationHarness',
|
||||
);
|
||||
|
||||
return {
|
||||
readModelName: name,
|
||||
mode,
|
||||
checked,
|
||||
drifted: drifted.length,
|
||||
driftRate,
|
||||
breach,
|
||||
driftReasons: drifted.map((d) => `${d.sampleKey}:${d.reason ?? 'drift'}`),
|
||||
promotion,
|
||||
};
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
this.logger.error(
|
||||
`reconciliation_run_error read_model=${name} mode=${mode} err=${message}`,
|
||||
err instanceof Error ? err.stack : undefined,
|
||||
'ReconciliationHarness',
|
||||
);
|
||||
return {
|
||||
readModelName: name,
|
||||
mode,
|
||||
checked: 0,
|
||||
drifted: 0,
|
||||
driftRate: 0,
|
||||
breach: false,
|
||||
driftReasons: [],
|
||||
promotion: this.promotionTracker.recordError(name),
|
||||
error: message,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fisher–Yates partial shuffle: takes `max(ceil(rate*N), minSamples)`
|
||||
* items without replacement in O(k) time and O(1) extra allocations past
|
||||
* the copy of the input array.
|
||||
*
|
||||
* `rng` lets tests inject a seeded PRNG; production uses `Math.random`.
|
||||
*/
|
||||
static pickSample<T>(
|
||||
universe: readonly T[],
|
||||
rate: number,
|
||||
minSamples: number,
|
||||
rng: () => number = Math.random,
|
||||
): readonly T[] {
|
||||
const n = universe.length;
|
||||
if (n === 0) return [];
|
||||
const target = Math.min(n, Math.max(minSamples, Math.ceil(rate * n)));
|
||||
if (target >= n) return [...universe];
|
||||
|
||||
const pool = [...universe];
|
||||
for (let i = 0; i < target; i += 1) {
|
||||
const j = i + Math.floor(rng() * (n - i));
|
||||
const tmp = pool[i] as T;
|
||||
pool[i] = pool[j] as T;
|
||||
pool[j] = tmp;
|
||||
}
|
||||
return pool.slice(0, target);
|
||||
}
|
||||
}
|
||||
@@ -15,3 +15,9 @@ export {
|
||||
READ_MODEL_KILL_SWITCH,
|
||||
type IReadModelKillSwitch,
|
||||
} from './read-model-kill-switch';
|
||||
export {
|
||||
RECONCILER_REGISTRY,
|
||||
type IReadModelReconciler,
|
||||
type ReconciliationMode,
|
||||
type ReconciliationSampleResult,
|
||||
} from './reconciler';
|
||||
|
||||
49
apps/api/src/modules/read-models/domain/reconciler.ts
Normal file
49
apps/api/src/modules/read-models/domain/reconciler.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
/**
|
||||
* Per-read-model reconciler port — RFC-003 §7.
|
||||
*
|
||||
* Reconcilers compare projected read-model rows against the authoritative
|
||||
* write-model and report whether each sampled key is in sync. The harness
|
||||
* drives them on both cadences (1% sampled nightly + 100% weekly full).
|
||||
*
|
||||
* Phase 0 ships the port only; Phase 2 lands the first concrete reconciler.
|
||||
*/
|
||||
|
||||
export interface ReconciliationSampleResult {
|
||||
/** Opaque per-reconciler sample key (e.g. listing ID). */
|
||||
readonly sampleKey: string;
|
||||
/** `true` when read-model row matches the authoritative source. */
|
||||
readonly inSync: boolean;
|
||||
/** Optional human-readable drift reason (omit when `inSync === true`). */
|
||||
readonly reason?: string;
|
||||
}
|
||||
|
||||
export type ReconciliationMode = 'sampled' | 'full';
|
||||
|
||||
export interface IReadModelReconciler {
|
||||
/** Stable read-model name; matches the `IReadRepository` name. */
|
||||
readonly readModelName: string;
|
||||
|
||||
/**
|
||||
* Returns the candidate universe of sample keys. For `mode === 'full'` the
|
||||
* returned list SHOULD be the complete population; the harness still caps
|
||||
* by `sampleBudget` for memory-safety.
|
||||
*/
|
||||
listSampleKeys(
|
||||
mode: ReconciliationMode,
|
||||
sampleBudget: number,
|
||||
): Promise<readonly string[]>;
|
||||
|
||||
/**
|
||||
* Compare one sample key against the authoritative source. MUST be pure
|
||||
* (read-only) and MUST NOT throw for a single drift — return
|
||||
* `{ inSync: false, reason }` instead.
|
||||
*/
|
||||
reconcile(sampleKey: string): Promise<ReconciliationSampleResult>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Nest DI token for the `IReadModelReconciler[]` registry. Providers
|
||||
* contribute reconcilers via a `useExisting` / `useClass` pattern; the
|
||||
* registry itself is supplied as a plain array so the harness can iterate.
|
||||
*/
|
||||
export const RECONCILER_REGISTRY = Symbol('RECONCILER_REGISTRY');
|
||||
@@ -1,8 +1,20 @@
|
||||
/**
|
||||
* Reconciliation infrastructure (RFC-003 §7).
|
||||
*
|
||||
* Phase 0 ships only the placeholder. Phase 2 lands the sampled nightly
|
||||
* (1%) drift checker; the weekly full reconciliation runs follow once
|
||||
* Phase 2 has soaked in production for one cycle.
|
||||
* Ships the Phase 0 harness:
|
||||
* - `ReconciliationCronService` — sampled nightly + full weekly schedules
|
||||
* guarded by a Redis SET NX EX lock for at-most-once execution.
|
||||
* - Prometheus metric names re-exported so alert rules/dashboards can
|
||||
* reference the constants.
|
||||
*
|
||||
* Concrete per-read-model reconcilers land with Phase 2 via the
|
||||
* `RECONCILER_REGISTRY` DI token.
|
||||
*/
|
||||
export {};
|
||||
export {
|
||||
ReconciliationCronService,
|
||||
RECON_SAMPLES_TOTAL,
|
||||
RECON_DRIFT_TOTAL,
|
||||
RECON_BREACH_TOTAL,
|
||||
RECON_PROMOTION_TOTAL,
|
||||
RECON_DURATION,
|
||||
} from './reconciliation-cron.service';
|
||||
|
||||
@@ -0,0 +1,138 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Cron } from '@nestjs/schedule';
|
||||
import { InjectMetric } from '@willsoto/nestjs-prometheus';
|
||||
import type { Counter, Histogram } from 'prom-client';
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import.
|
||||
import { LoggerService, RedisService } from '@modules/shared';
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import.
|
||||
import { ReconciliationHarness, type ReconciliationRunReport } from '../../application/reconciliation';
|
||||
|
||||
/** Prometheus metric names — exported so dashboards / alerts can reference them. */
|
||||
export const RECON_SAMPLES_TOTAL = 'goodgo_read_model_reconciliation_samples_total';
|
||||
export const RECON_DRIFT_TOTAL = 'goodgo_read_model_reconciliation_drift_total';
|
||||
export const RECON_BREACH_TOTAL = 'goodgo_read_model_reconciliation_breach_total';
|
||||
export const RECON_PROMOTION_TOTAL = 'goodgo_read_model_reconciliation_promotion_total';
|
||||
export const RECON_DURATION = 'goodgo_read_model_reconciliation_duration_seconds';
|
||||
|
||||
const LOCK_PREFIX = 'recon:lock:';
|
||||
const SAMPLED_LOCK_TTL_SEC = 5 * 60; // 5 minutes
|
||||
const FULL_LOCK_TTL_SEC = 60 * 60; // 1 hour
|
||||
|
||||
/**
|
||||
* Cron orchestrator for the reconciliation harness — RFC-003 §7.
|
||||
*
|
||||
* - Sampled nightly: 03:17 UTC (≈ 10:17 ICT). 1% sample per read model,
|
||||
* alerts when drift > 0.1%, two consecutive breaches auto-page on-call.
|
||||
* - Full weekly: 04:30 UTC Sunday (≈ 11:30 ICT). Walks every key.
|
||||
*
|
||||
* A Redis SET NX EX lock (`recon:lock:<mode>`) guarantees at-most-once
|
||||
* execution across horizontally-scaled API instances.
|
||||
*/
|
||||
@Injectable()
|
||||
export class ReconciliationCronService {
|
||||
constructor(
|
||||
private readonly harness: ReconciliationHarness,
|
||||
private readonly redis: RedisService,
|
||||
private readonly logger: LoggerService,
|
||||
@InjectMetric(RECON_SAMPLES_TOTAL)
|
||||
private readonly samplesCounter: Counter<string>,
|
||||
@InjectMetric(RECON_DRIFT_TOTAL)
|
||||
private readonly driftCounter: Counter<string>,
|
||||
@InjectMetric(RECON_BREACH_TOTAL)
|
||||
private readonly breachCounter: Counter<string>,
|
||||
@InjectMetric(RECON_PROMOTION_TOTAL)
|
||||
private readonly promotionCounter: Counter<string>,
|
||||
@InjectMetric(RECON_DURATION)
|
||||
private readonly durationHistogram: Histogram<string>,
|
||||
) {}
|
||||
|
||||
@Cron('17 3 * * *', { name: 'reconciliation-sampled-nightly', timeZone: 'UTC' })
|
||||
async runSampledNightly(): Promise<void> {
|
||||
await this.runWithLock('sampled', () => this.harness.runSampled());
|
||||
}
|
||||
|
||||
@Cron('30 4 * * 0', { name: 'reconciliation-full-weekly', timeZone: 'UTC' })
|
||||
async runFullWeekly(): Promise<void> {
|
||||
await this.runWithLock('full', () => this.harness.runFull());
|
||||
}
|
||||
|
||||
private async runWithLock(
|
||||
mode: 'sampled' | 'full',
|
||||
run: () => Promise<readonly ReconciliationRunReport[]>,
|
||||
): Promise<readonly ReconciliationRunReport[] | null> {
|
||||
const ttl = mode === 'full' ? FULL_LOCK_TTL_SEC : SAMPLED_LOCK_TTL_SEC;
|
||||
const lockKey = `${LOCK_PREFIX}${mode}`;
|
||||
const token = `${process.pid}-${Date.now()}`;
|
||||
let acquired = false;
|
||||
|
||||
try {
|
||||
const client = this.redis.getClient();
|
||||
const result = await client.set(lockKey, token, 'EX', ttl, 'NX');
|
||||
acquired = result === 'OK';
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
`reconciliation_lock_error mode=${mode} err=${(err as Error).message}`,
|
||||
(err as Error).stack,
|
||||
'ReconciliationCronService',
|
||||
);
|
||||
// Fail closed on Redis errors — do not run unguarded.
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!acquired) {
|
||||
this.logger.log(
|
||||
`reconciliation_skip mode=${mode} reason=lock_held`,
|
||||
'ReconciliationCronService',
|
||||
);
|
||||
return null;
|
||||
}
|
||||
|
||||
const startedAt = process.hrtime.bigint();
|
||||
try {
|
||||
const reports = await run();
|
||||
this.recordMetrics(mode, reports, startedAt);
|
||||
return reports;
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
`reconciliation_run_failed mode=${mode} err=${(err as Error).message}`,
|
||||
(err as Error).stack,
|
||||
'ReconciliationCronService',
|
||||
);
|
||||
return null;
|
||||
} finally {
|
||||
try {
|
||||
// Best-effort lock release: only if we still own it.
|
||||
const client = this.redis.getClient();
|
||||
const current = await client.get(lockKey);
|
||||
if (current === token) {
|
||||
await client.del(lockKey);
|
||||
}
|
||||
} catch {
|
||||
// Ignore — TTL will reap the lock.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private recordMetrics(
|
||||
mode: 'sampled' | 'full',
|
||||
reports: readonly ReconciliationRunReport[],
|
||||
startedAt: bigint,
|
||||
): void {
|
||||
const elapsedSec = Number(process.hrtime.bigint() - startedAt) / 1e9;
|
||||
this.durationHistogram.labels({ mode }).observe(elapsedSec);
|
||||
|
||||
for (const report of reports) {
|
||||
const labels = { read_model: report.readModelName, mode };
|
||||
this.samplesCounter.inc(labels, report.checked);
|
||||
if (report.drifted > 0) {
|
||||
this.driftCounter.inc(labels, report.drifted);
|
||||
}
|
||||
if (report.breach) {
|
||||
this.breachCounter.inc(labels);
|
||||
}
|
||||
if (report.promotion.shouldPromote) {
|
||||
this.promotionCounter.inc({ read_model: report.readModelName });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,34 +1,90 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { CqrsModule } from '@nestjs/cqrs';
|
||||
import {
|
||||
PrometheusModule,
|
||||
makeCounterProvider,
|
||||
makeHistogramProvider,
|
||||
} from '@willsoto/nestjs-prometheus';
|
||||
import { SharedModule } from '@modules/shared';
|
||||
import {
|
||||
AutoPromotionTracker,
|
||||
ReconciliationHarness,
|
||||
} from './application/reconciliation';
|
||||
import { READ_MODEL_KILL_SWITCH } from './domain/read-model-kill-switch';
|
||||
import { RECONCILER_REGISTRY } from './domain/reconciler';
|
||||
import { ConfigReadModelKillSwitch } from './infrastructure/config-read-model-kill-switch';
|
||||
import {
|
||||
ReconciliationCronService,
|
||||
RECON_SAMPLES_TOTAL,
|
||||
RECON_DRIFT_TOTAL,
|
||||
RECON_BREACH_TOTAL,
|
||||
RECON_PROMOTION_TOTAL,
|
||||
RECON_DURATION,
|
||||
} from './infrastructure/reconciliation';
|
||||
|
||||
/**
|
||||
* Read-models module skeleton — RFC-003 Phase 0.
|
||||
* Read-models module — RFC-003 Phase 0.
|
||||
*
|
||||
* Hosts:
|
||||
* - Projector base class (`application/projectors/projector.base.ts`).
|
||||
* - Read-model repository convention (`domain/read-repository.ts`).
|
||||
* - Idempotency port (`domain/projection-offset-store.ts`).
|
||||
* - Per-read-model kill switch (`domain/read-model-kill-switch.ts`).
|
||||
* - Reconciliation harness + cron (sampled nightly + full weekly,
|
||||
* RFC-003 §7 / [GOO-191](/GOO/issues/GOO-191)).
|
||||
*
|
||||
* No projectors, repositories, or `IProjectionOffsetStore` provider are
|
||||
* registered here yet. The Prisma-backed offset store binding lands with
|
||||
* [GOO-187](/GOO/issues/GOO-187); per-read-model projectors land in
|
||||
* Phase 2/3.
|
||||
*
|
||||
* The module is imported by `AppModule` so its DI container is wired up
|
||||
* even while empty — keeps Phase 2/3 PRs strictly additive.
|
||||
* The Prisma-backed offset store binding lands with
|
||||
* [GOO-187](/GOO/issues/GOO-187). `RECONCILER_REGISTRY` is provided as an
|
||||
* empty array in Phase 0 — concrete reconcilers register against the
|
||||
* token starting in Phase 2; the harness uses `@Optional()` so an empty
|
||||
* registry simply results in no-op runs.
|
||||
*/
|
||||
@Module({
|
||||
imports: [CqrsModule, SharedModule],
|
||||
imports: [CqrsModule, SharedModule, PrometheusModule],
|
||||
providers: [
|
||||
{
|
||||
provide: READ_MODEL_KILL_SWITCH,
|
||||
useClass: ConfigReadModelKillSwitch,
|
||||
},
|
||||
{
|
||||
provide: RECONCILER_REGISTRY,
|
||||
useValue: [],
|
||||
},
|
||||
AutoPromotionTracker,
|
||||
ReconciliationHarness,
|
||||
ReconciliationCronService,
|
||||
makeCounterProvider({
|
||||
name: RECON_SAMPLES_TOTAL,
|
||||
help: 'Total samples checked by the read-model reconciliation harness',
|
||||
labelNames: ['read_model', 'mode'],
|
||||
}),
|
||||
makeCounterProvider({
|
||||
name: RECON_DRIFT_TOTAL,
|
||||
help: 'Total drifted samples detected by the read-model reconciliation harness',
|
||||
labelNames: ['read_model', 'mode'],
|
||||
}),
|
||||
makeCounterProvider({
|
||||
name: RECON_BREACH_TOTAL,
|
||||
help: 'Reconciliation runs whose drift rate exceeded the breach threshold (>0.1%)',
|
||||
labelNames: ['read_model', 'mode'],
|
||||
}),
|
||||
makeCounterProvider({
|
||||
name: RECON_PROMOTION_TOTAL,
|
||||
help: 'Auto-promotion events (2 consecutive sampled breaches) — pages on-call',
|
||||
labelNames: ['read_model'],
|
||||
}),
|
||||
makeHistogramProvider({
|
||||
name: RECON_DURATION,
|
||||
help: 'Wall-clock duration of a reconciliation run in seconds',
|
||||
labelNames: ['mode'],
|
||||
buckets: [0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1800],
|
||||
}),
|
||||
],
|
||||
exports: [
|
||||
READ_MODEL_KILL_SWITCH,
|
||||
RECONCILER_REGISTRY,
|
||||
AutoPromotionTracker,
|
||||
ReconciliationHarness,
|
||||
],
|
||||
exports: [READ_MODEL_KILL_SWITCH],
|
||||
})
|
||||
export class ReadModelsModule {}
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
import type { LoggerService } from '@modules/shared';
|
||||
import { PaymentCallbackPurgeService } from '../services/payment-callback-purge.service';
|
||||
import type { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
|
||||
|
||||
describe('PaymentCallbackPurgeService (stub)', () => {
|
||||
let logger: { log: ReturnType<typeof vi.fn>; warn: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn>; debug: ReturnType<typeof vi.fn> };
|
||||
let runLog: { start: ReturnType<typeof vi.fn>; markFinished: ReturnType<typeof vi.fn>; markFailed: ReturnType<typeof vi.fn> };
|
||||
let service: PaymentCallbackPurgeService;
|
||||
|
||||
beforeEach(() => {
|
||||
logger = { log: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() };
|
||||
runLog = {
|
||||
start: vi.fn().mockResolvedValue('run-X'),
|
||||
markFinished: vi.fn().mockResolvedValue(undefined),
|
||||
markFailed: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
service = new PaymentCallbackPurgeService(
|
||||
logger as unknown as LoggerService,
|
||||
runLog as unknown as RetentionRunLogRepository,
|
||||
);
|
||||
});
|
||||
|
||||
it.each([1, 2, 3] as const)('records a SUCCESS run with rowsAffected=0 for phase %s', async (phase) => {
|
||||
const result = await service.run(phase);
|
||||
expect(runLog.start).toHaveBeenCalledWith(expect.objectContaining({ job: 'payment-callback-purge', phase }));
|
||||
expect(runLog.markFinished).toHaveBeenCalledWith('run-X', 0);
|
||||
expect(result).toEqual({ rowsAffected: 0, runId: 'run-X' });
|
||||
});
|
||||
|
||||
it('selects strictly older cutoffs per higher phase (2y < 5y < 10y)', async () => {
|
||||
const fixed = new Date('2030-01-01T00:00:00Z').getTime();
|
||||
vi.spyOn(Date, 'now').mockReturnValue(fixed);
|
||||
|
||||
const calls: string[] = [];
|
||||
logger.warn = vi.fn((msg: string) => calls.push(msg)) as unknown as ReturnType<typeof vi.fn>;
|
||||
|
||||
await service.run(1);
|
||||
await service.run(2);
|
||||
await service.run(3);
|
||||
|
||||
const cutoffs = calls.map((m) => /cutoff=([^)]+)/.exec(m)?.[1] ?? '');
|
||||
expect(cutoffs).toHaveLength(3);
|
||||
expect(cutoffs[0]! > cutoffs[1]!).toBe(true);
|
||||
expect(cutoffs[1]! > cutoffs[2]!).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,49 @@
|
||||
import type { LoggerService, PrismaService } from '@modules/shared';
|
||||
import { RefreshTokenPurgeService } from '../services/refresh-token-purge.service';
|
||||
import type { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
|
||||
|
||||
describe('RefreshTokenPurgeService', () => {
|
||||
let prisma: { $queryRaw: ReturnType<typeof vi.fn>; refreshToken: { count: ReturnType<typeof vi.fn> } };
|
||||
let logger: { log: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn>; debug: ReturnType<typeof vi.fn> };
|
||||
let runLog: { start: ReturnType<typeof vi.fn>; markFinished: ReturnType<typeof vi.fn>; markFailed: ReturnType<typeof vi.fn> };
|
||||
let service: RefreshTokenPurgeService;
|
||||
|
||||
beforeEach(() => {
|
||||
prisma = { $queryRaw: vi.fn(), refreshToken: { count: vi.fn() } };
|
||||
logger = { log: vi.fn(), error: vi.fn(), debug: vi.fn() };
|
||||
runLog = {
|
||||
start: vi.fn().mockResolvedValue('run-1'),
|
||||
markFinished: vi.fn().mockResolvedValue(undefined),
|
||||
markFailed: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
service = new RefreshTokenPurgeService(
|
||||
prisma as unknown as PrismaService,
|
||||
logger as unknown as LoggerService,
|
||||
runLog as unknown as RetentionRunLogRepository,
|
||||
);
|
||||
});
|
||||
|
||||
it('starts a run, deletes batches, and marks finished with total rowsAffected', async () => {
|
||||
prisma.$queryRaw
|
||||
.mockResolvedValueOnce(new Array(1000).fill(0).map((_, i) => ({ id: `r${i}` })))
|
||||
.mockResolvedValueOnce([{ id: 'r1000' }]);
|
||||
|
||||
const result = await service.run();
|
||||
|
||||
expect(runLog.start).toHaveBeenCalledWith(expect.objectContaining({ job: 'refresh-token-purge' }));
|
||||
expect(prisma.$queryRaw).toHaveBeenCalledTimes(2);
|
||||
expect(runLog.markFinished).toHaveBeenCalledWith('run-1', 1001);
|
||||
expect(result).toEqual({ rowsAffected: 1001, runId: 'run-1' });
|
||||
});
|
||||
|
||||
it('marks the run as FAILED when the delete query throws', async () => {
|
||||
const boom = new Error('connection lost');
|
||||
prisma.$queryRaw.mockRejectedValue(boom);
|
||||
|
||||
await expect(service.run()).rejects.toThrow(boom);
|
||||
|
||||
expect(runLog.markFailed).toHaveBeenCalledWith('run-1', boom, 0);
|
||||
expect(runLog.markFinished).not.toHaveBeenCalled();
|
||||
expect(logger.error).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,85 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { PrismaService, LoggerService } from '@modules/shared';
|
||||
import { RETENTION_CONFIG } from '../../domain/retention.config';
|
||||
import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
|
||||
|
||||
/**
|
||||
* Anonymizes rows in AdminAuditLog older than 5 years. Because the schema
|
||||
* requires non-null actor/target IDs, the strategy is tombstone-in-place:
|
||||
* PII columns are replaced with the sentinel `ANONYMIZED`, and a
|
||||
* `metadata.anonymized=true` marker is written so the predicate is
|
||||
* idempotent.
|
||||
*
|
||||
* GOO-196 — Decree 13 compliance.
|
||||
*/
|
||||
@Injectable()
|
||||
export class AuditLogPurgeService {
|
||||
static readonly JOB = 'audit-log-anonymize';
|
||||
private static readonly TOMBSTONE = 'ANONYMIZED';
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly logger: LoggerService,
|
||||
private readonly runLog: RetentionRunLogRepository,
|
||||
) {}
|
||||
|
||||
async run(): Promise<{ rowsAffected: number; runId: string }> {
|
||||
const cutoff = new Date(Date.now() - RETENTION_CONFIG.auditAnonymizeMs);
|
||||
const runId = await this.runLog.start({
|
||||
job: AuditLogPurgeService.JOB,
|
||||
batchSize: RETENTION_CONFIG.batchSize,
|
||||
dryRun: RETENTION_CONFIG.dryRun,
|
||||
});
|
||||
|
||||
let total = 0;
|
||||
try {
|
||||
for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) {
|
||||
if (RETENTION_CONFIG.dryRun) {
|
||||
total = await this.prisma.adminAuditLog.count({
|
||||
where: {
|
||||
createdAt: { lt: cutoff },
|
||||
},
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
const rows = await this.prisma.$queryRaw<Array<{ id: string }>>`
|
||||
UPDATE "AdminAuditLog"
|
||||
SET "actorId" = ${AuditLogPurgeService.TOMBSTONE},
|
||||
"targetId" = ${AuditLogPurgeService.TOMBSTONE},
|
||||
"ipAddress" = NULL,
|
||||
"userAgent" = NULL,
|
||||
"metadata" = COALESCE("metadata", '{}'::jsonb) || '{"anonymized":true}'::jsonb
|
||||
WHERE id IN (
|
||||
SELECT id FROM "AdminAuditLog"
|
||||
WHERE "createdAt" < ${cutoff}
|
||||
AND ("metadata" ->> 'anonymized') IS DISTINCT FROM 'true'
|
||||
ORDER BY "createdAt" ASC
|
||||
LIMIT ${RETENTION_CONFIG.batchSize}
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id
|
||||
`;
|
||||
|
||||
total += rows.length;
|
||||
if (rows.length < RETENTION_CONFIG.batchSize) break;
|
||||
}
|
||||
|
||||
await this.runLog.markFinished(runId, total);
|
||||
this.logger.log(
|
||||
`AuditLogPurgeService anonymized ${total} row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`,
|
||||
'AuditLogPurgeService',
|
||||
);
|
||||
return { rowsAffected: total, runId };
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
await this.runLog.markFailed(runId, error, total);
|
||||
this.logger.error(
|
||||
`AuditLogPurgeService failed: ${error.message}`,
|
||||
error.stack,
|
||||
'AuditLogPurgeService',
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { PrismaService, LoggerService } from '@modules/shared';
|
||||
import { RETENTION_CONFIG } from '../../domain/retention.config';
|
||||
import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
|
||||
|
||||
/**
|
||||
* Hard-deletes the User.kycData JSON blob 90 days after `User.deletedAt`.
|
||||
* Per CLO guidance, KYC PII must not survive a deleted account beyond the
|
||||
* Decree-13 minimum. The User row itself stays (it carries audit-relevant
|
||||
* metadata via foreign keys), only the kyc payload is nulled.
|
||||
*
|
||||
* Future work (tracked separately): when a dedicated KycDocument table
|
||||
* lands, this service must also call StorageService.deleteObject(key) for
|
||||
* each blob in S3.
|
||||
*
|
||||
* GOO-196 — Decree 13 compliance.
|
||||
*/
|
||||
@Injectable()
|
||||
export class KycPurgeService {
|
||||
static readonly JOB = 'kyc-blob-purge';
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly logger: LoggerService,
|
||||
private readonly runLog: RetentionRunLogRepository,
|
||||
) {}
|
||||
|
||||
async run(): Promise<{ rowsAffected: number; runId: string }> {
|
||||
const cutoff = new Date(Date.now() - RETENTION_CONFIG.kycPurgeMs);
|
||||
const runId = await this.runLog.start({
|
||||
job: KycPurgeService.JOB,
|
||||
batchSize: RETENTION_CONFIG.batchSize,
|
||||
dryRun: RETENTION_CONFIG.dryRun,
|
||||
});
|
||||
|
||||
let total = 0;
|
||||
try {
|
||||
for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) {
|
||||
if (RETENTION_CONFIG.dryRun) {
|
||||
total = await this.prisma.user.count({
|
||||
where: {
|
||||
deletedAt: { lt: cutoff },
|
||||
NOT: { kycData: { equals: null as unknown as undefined } },
|
||||
},
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
const rows = await this.prisma.$queryRaw<Array<{ id: string }>>`
|
||||
UPDATE "User"
|
||||
SET "kycData" = NULL
|
||||
WHERE id IN (
|
||||
SELECT id FROM "User"
|
||||
WHERE "deletedAt" IS NOT NULL
|
||||
AND "deletedAt" < ${cutoff}
|
||||
AND "kycData" IS NOT NULL
|
||||
ORDER BY "deletedAt" ASC
|
||||
LIMIT ${RETENTION_CONFIG.batchSize}
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id
|
||||
`;
|
||||
|
||||
total += rows.length;
|
||||
if (rows.length < RETENTION_CONFIG.batchSize) break;
|
||||
}
|
||||
|
||||
await this.runLog.markFinished(runId, total);
|
||||
this.logger.log(
|
||||
`KycPurgeService nulled kycData on ${total} user row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`,
|
||||
'KycPurgeService',
|
||||
);
|
||||
return { rowsAffected: total, runId };
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
await this.runLog.markFailed(runId, error, total);
|
||||
this.logger.error(
|
||||
`KycPurgeService failed: ${error.message}`,
|
||||
error.stack,
|
||||
'KycPurgeService',
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { PrismaService, LoggerService } from '@modules/shared';
|
||||
import { RETENTION_CONFIG } from '../../domain/retention.config';
|
||||
import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
|
||||
|
||||
/**
|
||||
* Hard-deletes Message.content for any conversation in CLOSED status whose
|
||||
* last activity is older than 90 days. Metadata (sender, timestamps) is
|
||||
* preserved per CLO guidance — only the message body itself is PII.
|
||||
*
|
||||
* GOO-196 — Decree 13 compliance.
|
||||
*/
|
||||
@Injectable()
|
||||
export class MessagingPurgeService {
|
||||
static readonly JOB = 'messaging-body-purge';
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly logger: LoggerService,
|
||||
private readonly runLog: RetentionRunLogRepository,
|
||||
) {}
|
||||
|
||||
async run(): Promise<{ rowsAffected: number; runId: string }> {
|
||||
const cutoff = new Date(Date.now() - RETENTION_CONFIG.messagingBodyMs);
|
||||
const runId = await this.runLog.start({
|
||||
job: MessagingPurgeService.JOB,
|
||||
batchSize: RETENTION_CONFIG.batchSize,
|
||||
dryRun: RETENTION_CONFIG.dryRun,
|
||||
});
|
||||
|
||||
let total = 0;
|
||||
try {
|
||||
for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) {
|
||||
if (RETENTION_CONFIG.dryRun) {
|
||||
total = await this.prisma.message.count({
|
||||
where: {
|
||||
deletedAt: null,
|
||||
conversation: {
|
||||
status: 'CLOSED',
|
||||
lastMessageAt: { lt: cutoff },
|
||||
},
|
||||
},
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
const rows = await this.prisma.$queryRaw<Array<{ id: string }>>`
|
||||
UPDATE "Message"
|
||||
SET "content" = '',
|
||||
"deletedAt" = NOW()
|
||||
WHERE id IN (
|
||||
SELECT m.id FROM "Message" m
|
||||
JOIN "Conversation" c ON c.id = m."conversationId"
|
||||
WHERE m."deletedAt" IS NULL
|
||||
AND c."status" = 'CLOSED'
|
||||
AND c."lastMessageAt" < ${cutoff}
|
||||
ORDER BY m."createdAt" ASC
|
||||
LIMIT ${RETENTION_CONFIG.batchSize}
|
||||
FOR UPDATE OF m SKIP LOCKED
|
||||
)
|
||||
RETURNING id
|
||||
`;
|
||||
|
||||
total += rows.length;
|
||||
if (rows.length < RETENTION_CONFIG.batchSize) break;
|
||||
}
|
||||
|
||||
await this.runLog.markFinished(runId, total);
|
||||
this.logger.log(
|
||||
`MessagingPurgeService cleared ${total} message body row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`,
|
||||
'MessagingPurgeService',
|
||||
);
|
||||
return { rowsAffected: total, runId };
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
await this.runLog.markFailed(runId, error, total);
|
||||
this.logger.error(
|
||||
`MessagingPurgeService failed: ${error.message}`,
|
||||
error.stack,
|
||||
'MessagingPurgeService',
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { LoggerService } from '@modules/shared';
|
||||
import { RETENTION_CONFIG } from '../../domain/retention.config';
|
||||
import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
|
||||
|
||||
/**
|
||||
* Payment callback log purge — three-phase schedule confirmed by CLO in
|
||||
* GOO-201 (MoF Circular 78/2021/TT-BTC, Accounting Law 88/2015 Art. 41,
|
||||
* Tax Admin Law 38/2019 Art. 86):
|
||||
*
|
||||
* Phase 1 @ 2y — scrub operational PII (IP, device fingerprint).
|
||||
* Phase 2 @ 5y — scrub buyer identity (name/phone/email, bank suffix);
|
||||
* preserves `buyerName` for invoice-linked rows.
|
||||
* Phase 3 @ 10y — hard delete (or cold-archive if
|
||||
* RETENTION_PAYMENT_ARCHIVE=true).
|
||||
*
|
||||
* This service is intentionally a **stub** in the initial GOO-196 landing:
|
||||
* the `PaymentCallbackLog` table does not exist in the Prisma schema yet
|
||||
* (tracked under the payments module refactor). Calling `run()` emits a
|
||||
* RetentionRunLog row with status=SUCCESS and rowsAffected=0 so dry-run
|
||||
* telemetry is visible from day one, while the actual UPDATE/DELETE
|
||||
* statements are added when the table lands.
|
||||
*
|
||||
* GOO-196 — Decree 13 compliance.
|
||||
*/
|
||||
@Injectable()
|
||||
export class PaymentCallbackPurgeService {
|
||||
static readonly JOB = 'payment-callback-purge';
|
||||
|
||||
constructor(
|
||||
private readonly logger: LoggerService,
|
||||
private readonly runLog: RetentionRunLogRepository,
|
||||
) {}
|
||||
|
||||
async run(phase: 1 | 2 | 3): Promise<{ rowsAffected: number; runId: string }> {
|
||||
const cutoff = this.cutoffFor(phase);
|
||||
const runId = await this.runLog.start({
|
||||
job: PaymentCallbackPurgeService.JOB,
|
||||
phase,
|
||||
batchSize: RETENTION_CONFIG.batchSize,
|
||||
dryRun: RETENTION_CONFIG.dryRun,
|
||||
});
|
||||
|
||||
try {
|
||||
// TODO(GOO-196 follow-up): implement once PaymentCallbackLog schema lands.
|
||||
// Phase 1: UPDATE … SET ipAddress=NULL, deviceFingerprint=NULL,
|
||||
// anonymizedPhase1At=NOW() WHERE createdAt < ${cutoff}
|
||||
// AND anonymizedPhase1At IS NULL
|
||||
// Phase 2: UPDATE … SET callbackPayload = jsonb_set_lax(..., 'null'),
|
||||
// bankAccountMasked=NULL, cardSuffix=NULL,
|
||||
// anonymizedPhase2At=NOW() (skip buyerName on invoice rows)
|
||||
// Phase 3: DELETE FROM "PaymentCallbackLog" WHERE createdAt < ${cutoff}
|
||||
// — OR — INSERT INTO payment_callback_archive … then DELETE.
|
||||
const rowsAffected = 0;
|
||||
await this.runLog.markFinished(runId, rowsAffected);
|
||||
this.logger.warn(
|
||||
`PaymentCallbackPurgeService phase=${phase} is a no-op — PaymentCallbackLog table not yet in schema (cutoff=${cutoff.toISOString()})`,
|
||||
'PaymentCallbackPurgeService',
|
||||
);
|
||||
return { rowsAffected, runId };
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
await this.runLog.markFailed(runId, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private cutoffFor(phase: 1 | 2 | 3): Date {
|
||||
const now = Date.now();
|
||||
switch (phase) {
|
||||
case 1:
|
||||
return new Date(now - RETENTION_CONFIG.paymentPhase1Ms);
|
||||
case 2:
|
||||
return new Date(now - RETENTION_CONFIG.paymentPhase2Ms);
|
||||
case 3:
|
||||
return new Date(now - RETENTION_CONFIG.paymentPhase3Ms);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { PrismaService, LoggerService } from '@modules/shared';
|
||||
import { RETENTION_CONFIG } from '../../domain/retention.config';
|
||||
import { RetentionRunLogRepository } from '../../infrastructure/repositories/retention-run-log.repository';
|
||||
|
||||
/**
|
||||
* Hard-deletes refresh tokens that are revoked or expired and were created
|
||||
* more than `RETENTION_REFRESH_TOKEN_DAYS` (default 30) days ago.
|
||||
*
|
||||
* Idempotency: a single DELETE … RETURNING id with FOR UPDATE SKIP LOCKED is
|
||||
* race-safe — only one writer wins each row. Loops in capped batches to avoid
|
||||
* statement timeouts on large tables.
|
||||
*
|
||||
* GOO-196 — Decree 13 compliance.
|
||||
*/
|
||||
@Injectable()
|
||||
export class RefreshTokenPurgeService {
|
||||
static readonly JOB = 'refresh-token-purge';
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly logger: LoggerService,
|
||||
private readonly runLog: RetentionRunLogRepository,
|
||||
) {}
|
||||
|
||||
async run(): Promise<{ rowsAffected: number; runId: string }> {
|
||||
const cutoff = new Date(Date.now() - RETENTION_CONFIG.refreshTokenStaleMs);
|
||||
const runId = await this.runLog.start({
|
||||
job: RefreshTokenPurgeService.JOB,
|
||||
batchSize: RETENTION_CONFIG.batchSize,
|
||||
dryRun: RETENTION_CONFIG.dryRun,
|
||||
});
|
||||
|
||||
let total = 0;
|
||||
try {
|
||||
for (let batch = 0; batch < RETENTION_CONFIG.maxBatches; batch += 1) {
|
||||
if (RETENTION_CONFIG.dryRun) {
|
||||
total = await this.prisma.refreshToken.count({
|
||||
where: {
|
||||
createdAt: { lt: cutoff },
|
||||
OR: [{ revokedAt: { not: null } }, { expiresAt: { lt: new Date() } }],
|
||||
},
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
const rows = await this.prisma.$queryRaw<Array<{ id: string }>>`
|
||||
DELETE FROM "RefreshToken"
|
||||
WHERE id IN (
|
||||
SELECT id FROM "RefreshToken"
|
||||
WHERE "createdAt" < ${cutoff}
|
||||
AND ("revokedAt" IS NOT NULL OR "expiresAt" < NOW())
|
||||
ORDER BY "createdAt" ASC
|
||||
LIMIT ${RETENTION_CONFIG.batchSize}
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id
|
||||
`;
|
||||
|
||||
total += rows.length;
|
||||
if (rows.length < RETENTION_CONFIG.batchSize) break;
|
||||
}
|
||||
|
||||
await this.runLog.markFinished(runId, total);
|
||||
this.logger.log(
|
||||
`RefreshTokenPurgeService removed ${total} row(s) (cutoff=${cutoff.toISOString()}, dryRun=${RETENTION_CONFIG.dryRun})`,
|
||||
'RefreshTokenPurgeService',
|
||||
);
|
||||
return { rowsAffected: total, runId };
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
await this.runLog.markFailed(runId, error, total);
|
||||
this.logger.error(
|
||||
`RefreshTokenPurgeService failed: ${error.message}`,
|
||||
error.stack,
|
||||
'RefreshTokenPurgeService',
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
56
apps/api/src/modules/retention/domain/retention.config.ts
Normal file
56
apps/api/src/modules/retention/domain/retention.config.ts
Normal file
@@ -0,0 +1,56 @@
|
||||
/**
|
||||
* Centralised retention configuration. Every window comes from env vars so a
|
||||
* staging environment can dry-run aggressively without hard-coding constants.
|
||||
*
|
||||
* GOO-196 — Decree 13 compliance.
|
||||
*/
|
||||
|
||||
const days = (n: number): number => n * 24 * 60 * 60 * 1000;
|
||||
const years = (n: number): number => Math.floor(n * 365.25 * 24 * 60 * 60 * 1000);
|
||||
|
||||
const numEnv = (name: string, fallback: number): number => {
|
||||
const raw = process.env[name];
|
||||
if (!raw) return fallback;
|
||||
const parsed = Number(raw);
|
||||
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
|
||||
};
|
||||
|
||||
const boolEnv = (name: string, fallback = false): boolean => {
|
||||
const raw = process.env[name];
|
||||
if (raw === undefined) return fallback;
|
||||
return raw === 'true' || raw === '1';
|
||||
};
|
||||
|
||||
export const RETENTION_CONFIG = {
|
||||
/** Master kill switch — when false, every cron is a no-op. */
|
||||
enabled: boolEnv('RETENTION_ENABLED', false),
|
||||
/** Forces every job into SELECT-only mode for staging review. */
|
||||
dryRun: boolEnv('RETENTION_DRY_RUN', false),
|
||||
|
||||
/** Per-batch row cap. Keeps statement timeouts and replication lag bounded. */
|
||||
batchSize: numEnv('RETENTION_BATCH_SIZE', 1000),
|
||||
/** Hard cap on batch loops per run. */
|
||||
maxBatches: numEnv('RETENTION_MAX_BATCHES', 50),
|
||||
|
||||
/** RefreshToken: revoked OR expired older than this is hard-deleted. */
|
||||
refreshTokenStaleMs: days(numEnv('RETENTION_REFRESH_TOKEN_DAYS', 30)),
|
||||
|
||||
/** Messaging body purge window after Conversation.lastMessageAt. */
|
||||
messagingBodyMs: days(numEnv('RETENTION_MESSAGING_DAYS', 90)),
|
||||
|
||||
/** KYC blob purge window after User.deletedAt. */
|
||||
kycPurgeMs: days(numEnv('RETENTION_KYC_DAYS', 90)),
|
||||
|
||||
/** Audit log anonymization window. */
|
||||
auditAnonymizeMs: years(numEnv('RETENTION_AUDIT_YEARS', 5)),
|
||||
|
||||
/** Payment callback phased schedule (CLO confirmed via GOO-201). */
|
||||
paymentPhase1Ms: years(numEnv('RETENTION_PAYMENT_PHASE1_YEARS', 2)),
|
||||
paymentPhase2Ms: years(numEnv('RETENTION_PAYMENT_PHASE2_YEARS', 5)),
|
||||
paymentPhase3Ms: years(numEnv('RETENTION_PAYMENT_PHASE3_YEARS', 10)),
|
||||
|
||||
/** When true, phase-3 archives to a cold table instead of hard-deleting. */
|
||||
paymentArchive: boolEnv('RETENTION_PAYMENT_ARCHIVE', false),
|
||||
} as const;
|
||||
|
||||
export type RetentionConfig = typeof RETENTION_CONFIG;
|
||||
8
apps/api/src/modules/retention/index.ts
Normal file
8
apps/api/src/modules/retention/index.ts
Normal file
@@ -0,0 +1,8 @@
|
||||
export { RetentionModule } from './retention.module';
|
||||
export { RefreshTokenPurgeService } from './application/services/refresh-token-purge.service';
|
||||
export { MessagingPurgeService } from './application/services/messaging-purge.service';
|
||||
export { KycPurgeService } from './application/services/kyc-purge.service';
|
||||
export { AuditLogPurgeService } from './application/services/audit-log-purge.service';
|
||||
export { PaymentCallbackPurgeService } from './application/services/payment-callback-purge.service';
|
||||
export { RetentionRunLogRepository } from './infrastructure/repositories/retention-run-log.repository';
|
||||
export { RETENTION_CONFIG, type RetentionConfig } from './domain/retention.config';
|
||||
@@ -0,0 +1,91 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Cron } from '@nestjs/schedule';
|
||||
import { LoggerService } from '@modules/shared';
|
||||
import { AuditLogPurgeService } from '../../application/services/audit-log-purge.service';
|
||||
import { KycPurgeService } from '../../application/services/kyc-purge.service';
|
||||
import { MessagingPurgeService } from '../../application/services/messaging-purge.service';
|
||||
import { PaymentCallbackPurgeService } from '../../application/services/payment-callback-purge.service';
|
||||
import { RefreshTokenPurgeService } from '../../application/services/refresh-token-purge.service';
|
||||
import { RETENTION_CONFIG } from '../../domain/retention.config';
|
||||
|
||||
/**
|
||||
* Thin scheduler that delegates to each domain purge service. All windows
|
||||
* run during Vietnam off-peak hours (UTC times below correspond to ~23:00–
|
||||
* 01:00 ICT). Set RETENTION_ENABLED=true to activate; otherwise every job
|
||||
* is a no-op so the module can ship behind a flag.
|
||||
*
|
||||
* GOO-196 — Decree 13 compliance.
|
||||
*/
|
||||
@Injectable()
|
||||
export class RetentionCronOrchestrator {
|
||||
constructor(
|
||||
private readonly logger: LoggerService,
|
||||
private readonly refreshTokens: RefreshTokenPurgeService,
|
||||
private readonly messaging: MessagingPurgeService,
|
||||
private readonly kyc: KycPurgeService,
|
||||
private readonly auditLogs: AuditLogPurgeService,
|
||||
private readonly paymentCallbacks: PaymentCallbackPurgeService,
|
||||
) {}
|
||||
|
||||
@Cron('0 16 * * *', { name: 'retention-refresh-tokens', timeZone: 'UTC' })
|
||||
async runRefreshTokens(): Promise<void> {
|
||||
if (!RETENTION_CONFIG.enabled) return this.skip('refresh-token-purge');
|
||||
await this.safe(() => this.refreshTokens.run(), 'refresh-token-purge');
|
||||
}
|
||||
|
||||
@Cron('30 16 * * *', { name: 'retention-messaging', timeZone: 'UTC' })
|
||||
async runMessaging(): Promise<void> {
|
||||
if (!RETENTION_CONFIG.enabled) return this.skip('messaging-purge');
|
||||
await this.safe(() => this.messaging.run(), 'messaging-purge');
|
||||
}
|
||||
|
||||
@Cron('0 17 * * *', { name: 'retention-kyc', timeZone: 'UTC' })
|
||||
async runKyc(): Promise<void> {
|
||||
if (!RETENTION_CONFIG.enabled) return this.skip('kyc-purge');
|
||||
await this.safe(() => this.kyc.run(), 'kyc-purge');
|
||||
}
|
||||
|
||||
@Cron('30 17 * * *', { name: 'retention-payment-phase1', timeZone: 'UTC' })
|
||||
async runPaymentPhase1(): Promise<void> {
|
||||
if (!RETENTION_CONFIG.enabled) return this.skip('payment-phase-1');
|
||||
await this.safe(() => this.paymentCallbacks.run(1), 'payment-phase-1');
|
||||
}
|
||||
|
||||
@Cron('0 18 * * *', { name: 'retention-payment-phase2', timeZone: 'UTC' })
|
||||
async runPaymentPhase2(): Promise<void> {
|
||||
if (!RETENTION_CONFIG.enabled) return this.skip('payment-phase-2');
|
||||
await this.safe(() => this.paymentCallbacks.run(2), 'payment-phase-2');
|
||||
}
|
||||
|
||||
@Cron('0 17 * * 0', { name: 'retention-audit-anonymize', timeZone: 'UTC' })
|
||||
async runAuditLogs(): Promise<void> {
|
||||
if (!RETENTION_CONFIG.enabled) return this.skip('audit-log-anonymize');
|
||||
await this.safe(() => this.auditLogs.run(), 'audit-log-anonymize');
|
||||
}
|
||||
|
||||
@Cron('0 18 * * 0', { name: 'retention-payment-phase3', timeZone: 'UTC' })
|
||||
async runPaymentPhase3(): Promise<void> {
|
||||
if (!RETENTION_CONFIG.enabled) return this.skip('payment-phase-3');
|
||||
await this.safe(() => this.paymentCallbacks.run(3), 'payment-phase-3');
|
||||
}
|
||||
|
||||
private skip(name: string): void {
|
||||
this.logger.debug(
|
||||
`Retention job ${name} skipped: RETENTION_ENABLED=false`,
|
||||
'RetentionCronOrchestrator',
|
||||
);
|
||||
}
|
||||
|
||||
private async safe(fn: () => Promise<unknown>, name: string): Promise<void> {
|
||||
try {
|
||||
await fn();
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
`Retention job ${name} threw: ${(err as Error).message}`,
|
||||
(err as Error).stack,
|
||||
'RetentionCronOrchestrator',
|
||||
);
|
||||
// Swallow — RetentionRunLog already records FAILED. Do not crash the scheduler.
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { PrismaService } from '@modules/shared';
|
||||
|
||||
/**
|
||||
* Thin repository around RetentionRunLog. Centralised so every purge service
|
||||
* uses the exact same shape — start row, mark finished/failed, never invent
|
||||
* variations.
|
||||
*
|
||||
* GOO-196 — Decree 13 compliance.
|
||||
*/
|
||||
@Injectable()
|
||||
export class RetentionRunLogRepository {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async start(input: {
|
||||
job: string;
|
||||
phase?: number | null;
|
||||
batchSize?: number | null;
|
||||
dryRun?: boolean;
|
||||
}): Promise<string> {
|
||||
const row = await this.prisma.retentionRunLog.create({
|
||||
data: {
|
||||
job: input.job,
|
||||
phase: input.phase ?? null,
|
||||
batchSize: input.batchSize ?? null,
|
||||
dryRun: input.dryRun ?? false,
|
||||
status: 'RUNNING',
|
||||
},
|
||||
select: { id: true },
|
||||
});
|
||||
return row.id;
|
||||
}
|
||||
|
||||
async markFinished(id: string, rowsAffected: number, partial = false): Promise<void> {
|
||||
await this.prisma.retentionRunLog.update({
|
||||
where: { id },
|
||||
data: {
|
||||
finishedAt: new Date(),
|
||||
rowsAffected,
|
||||
status: partial ? 'PARTIAL' : 'SUCCESS',
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async markFailed(id: string, error: Error, rowsAffected = 0): Promise<void> {
|
||||
await this.prisma.retentionRunLog.update({
|
||||
where: { id },
|
||||
data: {
|
||||
finishedAt: new Date(),
|
||||
rowsAffected,
|
||||
status: 'FAILED',
|
||||
errorMessage: error.message.slice(0, 2000),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
36
apps/api/src/modules/retention/retention.module.ts
Normal file
36
apps/api/src/modules/retention/retention.module.ts
Normal file
@@ -0,0 +1,36 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { AuditLogPurgeService } from './application/services/audit-log-purge.service';
|
||||
import { KycPurgeService } from './application/services/kyc-purge.service';
|
||||
import { MessagingPurgeService } from './application/services/messaging-purge.service';
|
||||
import { PaymentCallbackPurgeService } from './application/services/payment-callback-purge.service';
|
||||
import { RefreshTokenPurgeService } from './application/services/refresh-token-purge.service';
|
||||
import { RetentionCronOrchestrator } from './infrastructure/cron/retention-cron.orchestrator';
|
||||
import { RetentionRunLogRepository } from './infrastructure/repositories/retention-run-log.repository';
|
||||
|
||||
/**
|
||||
* GOO-196 — Decree 13 data retention & purge jobs.
|
||||
*
|
||||
* Ships behind RETENTION_ENABLED=false so the module can land without
|
||||
* affecting prod. Flip to true after a 7-day staging dry-run review with
|
||||
* CLO/DPO. See `apps/api/src/modules/retention/domain/retention.config.ts`
|
||||
* for every tunable window.
|
||||
*/
|
||||
@Module({
|
||||
providers: [
|
||||
RetentionRunLogRepository,
|
||||
RefreshTokenPurgeService,
|
||||
MessagingPurgeService,
|
||||
KycPurgeService,
|
||||
AuditLogPurgeService,
|
||||
PaymentCallbackPurgeService,
|
||||
RetentionCronOrchestrator,
|
||||
],
|
||||
exports: [
|
||||
RefreshTokenPurgeService,
|
||||
MessagingPurgeService,
|
||||
KycPurgeService,
|
||||
AuditLogPurgeService,
|
||||
PaymentCallbackPurgeService,
|
||||
],
|
||||
})
|
||||
export class RetentionModule {}
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
useDeleteSavedSearch,
|
||||
useUpdateSavedSearch,
|
||||
} from '@/lib/hooks/use-saved-searches';
|
||||
import { formatCompact } from '@/lib/currency';
|
||||
import { type SavedSearch, type SavedSearchFilters } from '@/lib/saved-search-api';
|
||||
|
||||
const PROPERTY_TYPE_LABELS: Record<string, string> = {
|
||||
@@ -25,12 +26,6 @@ const TRANSACTION_TYPE_LABELS: Record<string, string> = {
|
||||
RENT: 'Cho thuê',
|
||||
};
|
||||
|
||||
function formatPrice(value: string): string {
|
||||
const num = Number(value);
|
||||
if (num >= 1_000_000_000) return `${(num / 1_000_000_000).toFixed(1)} tỷ`;
|
||||
if (num >= 1_000_000) return `${(num / 1_000_000).toFixed(0)} triệu`;
|
||||
return num.toLocaleString('vi-VN');
|
||||
}
|
||||
|
||||
function formatFilters(filters: SavedSearchFilters): string[] {
|
||||
const parts: string[] = [];
|
||||
@@ -44,11 +39,11 @@ function formatFilters(filters: SavedSearchFilters): string[] {
|
||||
if (filters.district) parts.push(filters.district);
|
||||
if (filters.city) parts.push(filters.city);
|
||||
if (filters.priceMin && filters.priceMax) {
|
||||
parts.push(`${formatPrice(filters.priceMin)} - ${formatPrice(filters.priceMax)}`);
|
||||
parts.push(`${formatCompact(filters.priceMin)} - ${formatCompact(filters.priceMax)}`);
|
||||
} else if (filters.priceMin) {
|
||||
parts.push(`Từ ${formatPrice(filters.priceMin)}`);
|
||||
parts.push(`Từ ${formatCompact(filters.priceMin)}`);
|
||||
} else if (filters.priceMax) {
|
||||
parts.push(`Đến ${formatPrice(filters.priceMax)}`);
|
||||
parts.push(`Đến ${formatCompact(filters.priceMax)}`);
|
||||
}
|
||||
if (filters.areaMin || filters.areaMax) {
|
||||
if (filters.areaMin && filters.areaMax) {
|
||||
|
||||
@@ -17,23 +17,13 @@ import {
|
||||
usePriceMovers,
|
||||
useTrendingAreas,
|
||||
} from '@/lib/hooks/use-analytics';
|
||||
import { formatCompact } from '@/lib/currency';
|
||||
import { listingsApi, type ListingDetail } from '@/lib/listings-api';
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Helpers */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
const vndFmt = new Intl.NumberFormat('vi-VN', {
|
||||
style: 'currency',
|
||||
currency: 'VND',
|
||||
maximumFractionDigits: 0,
|
||||
});
|
||||
|
||||
function formatVnd(value: number): string {
|
||||
if (value >= 1_000_000_000) return `${(value / 1_000_000_000).toFixed(1)} tỷ`;
|
||||
if (value >= 1_000_000) return `${(value / 1_000_000).toFixed(0)} tr`;
|
||||
return vndFmt.format(value);
|
||||
}
|
||||
|
||||
function formatPriceM2(value: number): string {
|
||||
if (value >= 1_000_000) return `${(value / 1_000_000).toFixed(1)} tr/m²`;
|
||||
@@ -146,7 +136,7 @@ function KpiStrip({ city }: { city: string }) {
|
||||
/>
|
||||
<KpiCard
|
||||
label="Giá TB"
|
||||
value={data ? formatVnd(data.avgPrice) : '—'}
|
||||
value={data ? formatCompact(data.avgPrice) : '—'}
|
||||
delta={data?.priceChangePct?.d30}
|
||||
footnote="Toàn thành phố"
|
||||
icon={<Building2 className="h-3.5 w-3.5" />}
|
||||
@@ -154,7 +144,7 @@ function KpiStrip({ city }: { city: string }) {
|
||||
/>
|
||||
<KpiCard
|
||||
label="Giá trung vị"
|
||||
value={data ? formatVnd(data.medianPrice) : '—'}
|
||||
value={data ? formatCompact(data.medianPrice) : '—'}
|
||||
footnote="Median price"
|
||||
icon={<Layers className="h-3.5 w-3.5" />}
|
||||
loading={isLoading}
|
||||
@@ -352,7 +342,7 @@ function RecentListings() {
|
||||
const price = Number(r.priceVND);
|
||||
return (
|
||||
<span className="font-mono text-sm font-semibold tabular-nums text-foreground">
|
||||
{formatVnd(price)}
|
||||
{formatCompact(price)}
|
||||
</span>
|
||||
);
|
||||
},
|
||||
|
||||
@@ -39,6 +39,7 @@ import { Button } from '@/components/ui/button';
|
||||
import { Card, CardContent, CardHeader, CardTitle, CardDescription } from '@/components/ui/card';
|
||||
import { Link } from '@/i18n/navigation';
|
||||
import type { AgentPublicProfile, AgentReviewItem } from '@/lib/agents-api';
|
||||
import { formatCompact } from '@/lib/currency';
|
||||
import { shimmerBlurDataURL } from '@/lib/image-blur';
|
||||
import type { ListingDetail } from '@/lib/listings-api';
|
||||
|
||||
@@ -48,12 +49,6 @@ import type { ListingDetail } from '@/lib/listings-api';
|
||||
|
||||
const VND = new Intl.NumberFormat('vi-VN');
|
||||
|
||||
function fmtVND(value: string | number | bigint): string {
|
||||
const n = typeof value === 'bigint' ? Number(value) : Number(value);
|
||||
if (n >= 1_000_000_000) return `${(n / 1_000_000_000).toFixed(1)} tỷ`;
|
||||
if (n >= 1_000_000) return `${(n / 1_000_000).toFixed(0)} tr`;
|
||||
return VND.format(n);
|
||||
}
|
||||
|
||||
function qualityLabel(score: number): string {
|
||||
if (score >= 80) return 'Xuất sắc';
|
||||
@@ -145,7 +140,7 @@ const listingColumns: DataTableColumn<ListingDetail>[] = [
|
||||
sortValue: (row) => Number(row.priceVND),
|
||||
cell: (row) => (
|
||||
<span className="text-xs font-semibold tabular-nums text-primary">
|
||||
{fmtVND(row.priceVND)}
|
||||
{formatCompact(row.priceVND)}
|
||||
</span>
|
||||
),
|
||||
width: '12%',
|
||||
@@ -160,7 +155,7 @@ const listingColumns: DataTableColumn<ListingDetail>[] = [
|
||||
cell: (row) =>
|
||||
row.pricePerM2 != null ? (
|
||||
<span className="text-xs tabular-nums text-foreground-muted">
|
||||
{fmtVND(row.pricePerM2)}
|
||||
{formatCompact(row.pricePerM2)}
|
||||
</span>
|
||||
) : (
|
||||
<span className="text-xs text-foreground-dim">—</span>
|
||||
@@ -384,7 +379,7 @@ export function AgentProfileClient({
|
||||
/>
|
||||
<KpiCard
|
||||
label="Giá TB"
|
||||
value={avgPriceVND != null ? fmtVND(avgPriceVND) : '—'}
|
||||
value={avgPriceVND != null ? formatCompact(avgPriceVND) : '—'}
|
||||
icon={<BarChart2 className="h-4 w-4" />}
|
||||
footnote="Danh mục hiện tại"
|
||||
/>
|
||||
|
||||
@@ -11,21 +11,13 @@ import {
|
||||
} from 'recharts';
|
||||
|
||||
import type { PriceHistoryItem } from '@/lib/listings-api';
|
||||
import { formatCompact } from '@/lib/currency';
|
||||
|
||||
interface PriceHistoryChartProps {
|
||||
data: PriceHistoryItem[];
|
||||
height?: number;
|
||||
}
|
||||
|
||||
function priceToMillions(priceStr: string): number {
|
||||
return Math.round(Number(priceStr) / 1_000_000);
|
||||
}
|
||||
|
||||
function formatMillions(value: number): string {
|
||||
if (value >= 1000) return `${(value / 1000).toFixed(1)} tỷ`;
|
||||
return `${value} tr`;
|
||||
}
|
||||
|
||||
export function PriceHistoryChart({ data, height = 280 }: PriceHistoryChartProps) {
|
||||
if (data.length === 0) return null;
|
||||
|
||||
@@ -37,7 +29,7 @@ export function PriceHistoryChart({ data, height = 280 }: PriceHistoryChartProps
|
||||
month: '2-digit',
|
||||
year: 'numeric',
|
||||
}),
|
||||
price: priceToMillions(item.newPrice),
|
||||
price: Number(item.newPrice),
|
||||
}));
|
||||
|
||||
return (
|
||||
@@ -48,7 +40,7 @@ export function PriceHistoryChart({ data, height = 280 }: PriceHistoryChartProps
|
||||
<YAxis
|
||||
tick={{ fontSize: 11 }}
|
||||
className="fill-muted-foreground"
|
||||
tickFormatter={(v: number) => formatMillions(v)}
|
||||
tickFormatter={(v: number) => formatCompact(v)}
|
||||
/>
|
||||
<Tooltip
|
||||
contentStyle={{
|
||||
@@ -57,7 +49,7 @@ export function PriceHistoryChart({ data, height = 280 }: PriceHistoryChartProps
|
||||
borderRadius: '0.5rem',
|
||||
fontSize: '0.875rem',
|
||||
}}
|
||||
formatter={(value) => [formatMillions(Number(value)), 'Giá']}
|
||||
formatter={(value) => [formatCompact(Number(value)), 'Giá']}
|
||||
/>
|
||||
<Line
|
||||
type="monotone"
|
||||
|
||||
@@ -5,16 +5,10 @@ import mapboxgl from 'mapbox-gl';
|
||||
import * as React from 'react';
|
||||
import 'mapbox-gl/dist/mapbox-gl.css';
|
||||
import { ComponentErrorBoundary } from '@/components/error-boundary';
|
||||
import { formatCompact } from '@/lib/currency';
|
||||
import type { ListingDetail } from '@/lib/listings-api';
|
||||
import { useMapboxStyle } from '@/lib/mapbox-style';
|
||||
|
||||
function formatPrice(priceVND: string): string {
|
||||
const num = Number(priceVND);
|
||||
if (num >= 1_000_000_000) return `${(num / 1_000_000_000).toFixed(1)} tỷ`;
|
||||
if (num >= 1_000_000) return `${(num / 1_000_000).toFixed(0)} tr`;
|
||||
return num.toLocaleString('vi-VN');
|
||||
}
|
||||
|
||||
interface ListingMapProps {
|
||||
listings: ListingDetail[];
|
||||
onMarkerClick?: (listing: ListingDetail) => void;
|
||||
@@ -67,7 +61,7 @@ function buildGeoJSON(
|
||||
geometry: { type: 'Point', coordinates: [lng, lat] },
|
||||
properties: {
|
||||
id: listing.id,
|
||||
price: formatPrice(listing.priceVND),
|
||||
price: formatCompact(listing.priceVND),
|
||||
title: listing.property.title,
|
||||
district: listing.property.district ?? '',
|
||||
city: listing.property.city ?? '',
|
||||
@@ -100,7 +94,7 @@ function buildPopupContent(listing: ListingDetail): HTMLDivElement {
|
||||
const price = document.createElement('p');
|
||||
price.style.cssText =
|
||||
'font-weight:700;color:hsl(var(--primary));font-size:14px;margin:0 0 4px;';
|
||||
price.textContent = `${formatPrice(listing.priceVND)} VND`;
|
||||
price.textContent = `${formatCompact(listing.priceVND)} VND`;
|
||||
container.appendChild(price);
|
||||
|
||||
const title = document.createElement('p');
|
||||
|
||||
@@ -97,6 +97,18 @@ export function formatPricePerM2(price: string | number): string {
|
||||
return `${num.toLocaleString('vi-VN')} \u0111/m\u00b2`;
|
||||
}
|
||||
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Alias: formatCompact
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Alias for {@link formatPrice}.
|
||||
* Use this name when the call-site intent is compact/abbreviated display
|
||||
* rather than a full price string (e.g. chart tick labels, map markers).
|
||||
*/
|
||||
export const formatCompact = formatPrice;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Parser (reverse direction)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
-- GOO-196: Data retention policy & purge jobs (Decree 13 compliance)
|
||||
-- Adds the RetentionRunLog table so every purge / anonymization pass is auditable.
|
||||
|
||||
-- CreateEnum
|
||||
CREATE TYPE "RetentionRunStatus" AS ENUM ('RUNNING', 'SUCCESS', 'PARTIAL', 'FAILED');
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "RetentionRunLog" (
|
||||
"id" TEXT NOT NULL,
|
||||
"job" TEXT NOT NULL,
|
||||
"phase" INTEGER,
|
||||
"startedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"finishedAt" TIMESTAMP(3),
|
||||
"rowsAffected" INTEGER NOT NULL DEFAULT 0,
|
||||
"status" "RetentionRunStatus" NOT NULL DEFAULT 'RUNNING',
|
||||
"errorMessage" TEXT,
|
||||
"batchSize" INTEGER,
|
||||
"dryRun" BOOLEAN NOT NULL DEFAULT false,
|
||||
|
||||
CONSTRAINT "RetentionRunLog_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "RetentionRunLog_job_startedAt_idx" ON "RetentionRunLog"("job", "startedAt");
|
||||
CREATE INDEX "RetentionRunLog_startedAt_idx" ON "RetentionRunLog"("startedAt" DESC);
|
||||
@@ -1567,3 +1567,68 @@ model VnAdministrativeAlias {
|
||||
@@index([newWardCode])
|
||||
@@map("vn_administrative_aliases")
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// READ MODELS — RFC-003 Phase 0
|
||||
// =============================================================================
|
||||
|
||||
/// Idempotency offset table for CQRS projectors.
|
||||
///
|
||||
/// Per RFC-003 §0 (CTO ask): every projector wraps its `apply` call in a
|
||||
/// transaction that inserts `(eventId, handlerName)` here. Re-deliveries
|
||||
/// hit the composite primary key, the transaction rolls back, and the
|
||||
/// projector observes a no-op. Port + Prisma implementation live in
|
||||
/// `apps/api/src/modules/read-models/`.
|
||||
model ProjectionOffset {
|
||||
/// Stable identifier of the projected event. Phase 0 derives this
|
||||
/// from `${aggregateId}:${occurredAt.getTime()}:${eventName}` until
|
||||
/// domain events carry a producer-minted UUID (RFC-003 Option D).
|
||||
eventId String
|
||||
|
||||
/// Stable identifier of the projector that consumed the event.
|
||||
/// Renaming a projector forces a full re-projection — be deliberate.
|
||||
handlerName String
|
||||
|
||||
/// When the offset was first written (i.e. the projection ran).
|
||||
appliedAt DateTime @default(now())
|
||||
|
||||
/// Optional content hash of the projected payload. Reconciliation
|
||||
/// jobs use this to detect drift between the read model and the
|
||||
/// authoritative write model.
|
||||
payloadHash String?
|
||||
|
||||
@@id([eventId, handlerName])
|
||||
@@index([handlerName, appliedAt])
|
||||
@@map("projection_offset")
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// RETENTION (GOO-196 — Decree 13 compliance)
|
||||
// =============================================================================
|
||||
|
||||
enum RetentionRunStatus {
|
||||
RUNNING
|
||||
SUCCESS
|
||||
PARTIAL
|
||||
FAILED
|
||||
}
|
||||
|
||||
/// Every purge / anonymization pass emits a RetentionRunLog row so the
|
||||
/// operator and DPO can audit exactly what was touched and when. Multi-phase
|
||||
/// jobs (e.g. payment callback 2y / 5y / 10y) record `phase` for
|
||||
/// disambiguation.
|
||||
model RetentionRunLog {
|
||||
id String @id @default(cuid())
|
||||
job String
|
||||
phase Int?
|
||||
startedAt DateTime @default(now())
|
||||
finishedAt DateTime?
|
||||
rowsAffected Int @default(0)
|
||||
status RetentionRunStatus @default(RUNNING)
|
||||
errorMessage String?
|
||||
batchSize Int?
|
||||
dryRun Boolean @default(false)
|
||||
|
||||
@@index([job, startedAt])
|
||||
@@index([startedAt(sort: Desc)])
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user