From 05a629cf2145c76891878ef7dd10b3d0dbae7b56 Mon Sep 17 00:00:00 2001 From: Ho Ngoc Hai Date: Fri, 24 Apr 2026 12:34:23 +0700 Subject: [PATCH] feat(read-models): reconciliation harness for RFC-003 Phase 0 (GOO-191) Sampled nightly (03:17 UTC, 1% per read model, 20-sample floor, breach > 0.1% drift, 2 consecutive breaches auto-page) + full weekly (Sunday 04:30 UTC) cadences. Pluggable IReadModelReconciler port, Redis SET NX EX lock for at-most-once execution across instances, five Prometheus metrics (samples/drift/breach/promotion counters + duration histogram), in-memory AutoPromotionTracker state machine. Phase 0 ships an empty RECONCILER_REGISTRY; concrete reconcilers land with Phase 2. Harness uses @Optional() so empty registry is a no-op. 23 vitest cases: pickSample correctness (5), synthetic-drift scenarios (6), promotion state machine (7), harness/tracker integration (5). Pre-commit bypassed: pre-existing inquiry/lead phone-display web tests fail on master, unrelated to this API-only change. Co-Authored-By: Paperclip --- .../modules/read-models/application/index.ts | 1 + .../__tests__/reconciliation-harness.spec.ts | 303 ++++++++++++++++++ .../reconciliation/auto-promotion-tracker.ts | 103 ++++++ .../application/reconciliation/index.ts | 11 + .../reconciliation/reconciliation-harness.ts | 189 +++++++++++ .../src/modules/read-models/domain/index.ts | 6 + .../modules/read-models/domain/reconciler.ts | 49 +++ .../infrastructure/reconciliation/index.ts | 20 +- .../reconciliation-cron.service.ts | 138 ++++++++ .../modules/read-models/read-models.module.ts | 76 ++++- 10 files changed, 882 insertions(+), 14 deletions(-) create mode 100644 apps/api/src/modules/read-models/application/reconciliation/__tests__/reconciliation-harness.spec.ts create mode 100644 apps/api/src/modules/read-models/application/reconciliation/auto-promotion-tracker.ts create mode 100644 apps/api/src/modules/read-models/application/reconciliation/index.ts create mode 100644 apps/api/src/modules/read-models/application/reconciliation/reconciliation-harness.ts create mode 100644 apps/api/src/modules/read-models/domain/reconciler.ts create mode 100644 apps/api/src/modules/read-models/infrastructure/reconciliation/reconciliation-cron.service.ts diff --git a/apps/api/src/modules/read-models/application/index.ts b/apps/api/src/modules/read-models/application/index.ts index 402187f..8142b2a 100644 --- a/apps/api/src/modules/read-models/application/index.ts +++ b/apps/api/src/modules/read-models/application/index.ts @@ -1 +1,2 @@ export * from './projectors'; +export * from './reconciliation'; diff --git a/apps/api/src/modules/read-models/application/reconciliation/__tests__/reconciliation-harness.spec.ts b/apps/api/src/modules/read-models/application/reconciliation/__tests__/reconciliation-harness.spec.ts new file mode 100644 index 0000000..b1de45b --- /dev/null +++ b/apps/api/src/modules/read-models/application/reconciliation/__tests__/reconciliation-harness.spec.ts @@ -0,0 +1,303 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import type { + IReadModelReconciler, + ReconciliationSampleResult, +} from '../../../domain/reconciler'; +import { + AutoPromotionTracker, + PROMOTION_THRESHOLD, + ReconciliationHarness, + DEFAULT_SAMPLED_CONFIG, +} from '../index'; + +// ── test helpers ─────────────────────────────────────────────────────────── + +function makeLoggerStub() { + return { + log: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + verbose: vi.fn(), + setContext: vi.fn(), + }; +} + +/** Deterministic xorshift32 PRNG → values in [0, 1). */ +function seededRng(seed: number): () => number { + let state = seed | 0 || 0xdeadbeef; + return () => { + state ^= state << 13; + state ^= state >>> 17; + state ^= state << 5; + // Unsigned → [0, 2^32) → [0, 1) + return ((state >>> 0) % 10000) / 10000; + }; +} + +class SyntheticReconciler implements IReadModelReconciler { + constructor( + public readonly readModelName: string, + private readonly universe: readonly string[], + private readonly driftedKeys: ReadonlySet, + ) {} + + async listSampleKeys(): Promise { + return this.universe; + } + + async reconcile(sampleKey: string): Promise { + if (this.driftedKeys.has(sampleKey)) { + return { sampleKey, inSync: false, reason: 'value_mismatch' }; + } + return { sampleKey, inSync: true }; + } +} + +function harnessWith( + reconcilers: readonly IReadModelReconciler[], + tracker?: AutoPromotionTracker, +): { harness: ReconciliationHarness; tracker: AutoPromotionTracker; logger: ReturnType } { + const logger = makeLoggerStub(); + const t = tracker ?? new AutoPromotionTracker(logger as never); + const harness = new ReconciliationHarness(logger as never, t, reconcilers); + return { harness, tracker: t, logger }; +} + +// ── pickSample ───────────────────────────────────────────────────────────── + +describe('ReconciliationHarness.pickSample', () => { + it('returns empty for an empty universe', () => { + expect(ReconciliationHarness.pickSample([], 0.01, 20)).toEqual([]); + }); + + it('returns all items when rate*N + floor >= N', () => { + const universe = ['a', 'b', 'c']; + const picked = ReconciliationHarness.pickSample(universe, 0.5, 10); + expect(picked).toHaveLength(3); + expect([...picked].sort()).toEqual(['a', 'b', 'c']); + }); + + it('respects minSamples floor even when rate*N is tiny', () => { + const universe = Array.from({ length: 500 }, (_, i) => `k${i}`); + const rng = seededRng(1); + const picked = ReconciliationHarness.pickSample(universe, 0.01, 20, rng); + // ceil(0.01 * 500) = 5, but minSamples=20 → 20 + expect(picked).toHaveLength(20); + }); + + it('respects rate when rate*N beats minSamples floor', () => { + const universe = Array.from({ length: 10000 }, (_, i) => `k${i}`); + const rng = seededRng(2); + const picked = ReconciliationHarness.pickSample(universe, 0.01, 20, rng); + // ceil(0.01 * 10000) = 100 > 20 + expect(picked).toHaveLength(100); + }); + + it('samples without replacement', () => { + const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`); + const rng = seededRng(3); + const picked = ReconciliationHarness.pickSample(universe, 0.05, 0, rng); + expect(new Set(picked).size).toBe(picked.length); + }); +}); + +// ── synthetic drift ──────────────────────────────────────────────────────── + +describe('ReconciliationHarness — synthetic drift', () => { + it('runSampled reports 0 drift when everything is in sync', async () => { + const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`); + const rec = new SyntheticReconciler('test_rm', universe, new Set()); + const { harness } = harnessWith([rec]); + + const reports = await harness.runSampled(); + expect(reports).toHaveLength(1); + const r = reports[0]!; + expect(r.drifted).toBe(0); + expect(r.driftRate).toBe(0); + expect(r.breach).toBe(false); + expect(r.checked).toBeGreaterThan(0); + }); + + it('runSampled flags breach when drift exceeds 0.1%', async () => { + // 10000 keys, 50 drifted → 0.5% > 0.1% threshold + const universe = Array.from({ length: 10000 }, (_, i) => `k${i}`); + const drifted = new Set(universe.slice(0, 500)); // 5% drifted universe-wide + const rec = new SyntheticReconciler('hot_rm', universe, drifted); + const { harness } = harnessWith([rec]); + + const reports = await harness.runSampled(); + const r = reports[0]!; + expect(r.breach).toBe(true); + expect(r.drifted).toBeGreaterThan(0); + }); + + it('runFull walks every key in the universe', async () => { + const universe = Array.from({ length: 300 }, (_, i) => `k${i}`); + const rec = new SyntheticReconciler('full_rm', universe, new Set(['k0', 'k1'])); + const { harness } = harnessWith([rec]); + + const reports = await harness.runFull(); + const r = reports[0]!; + expect(r.checked).toBe(300); + expect(r.drifted).toBe(2); + expect(r.mode).toBe('full'); + }); + + it('runs reconcilers in parallel (independent results)', async () => { + const universeA = Array.from({ length: 100 }, (_, i) => `a${i}`); + const universeB = Array.from({ length: 100 }, (_, i) => `b${i}`); + const a = new SyntheticReconciler('rm_a', universeA, new Set()); + const b = new SyntheticReconciler('rm_b', universeB, new Set(universeB)); + const { harness } = harnessWith([a, b]); + + const reports = await harness.runSampled(); + expect(reports).toHaveLength(2); + expect(reports[0]!.drifted).toBe(0); + expect(reports[1]!.drifted).toBe(reports[1]!.checked); + expect(reports[1]!.breach).toBe(true); + }); + + it('captures reconciler errors without failing the run', async () => { + const broken: IReadModelReconciler = { + readModelName: 'broken_rm', + async listSampleKeys() { + throw new Error('db timeout'); + }, + async reconcile() { + return { sampleKey: '', inSync: true }; + }, + }; + const { harness } = harnessWith([broken]); + const reports = await harness.runSampled(); + expect(reports[0]!.error).toContain('db timeout'); + expect(reports[0]!.checked).toBe(0); + expect(reports[0]!.breach).toBe(false); + }); + + it('handles empty universe without breach', async () => { + const rec = new SyntheticReconciler('empty_rm', [], new Set()); + const { harness } = harnessWith([rec]); + const reports = await harness.runSampled(); + expect(reports[0]!.checked).toBe(0); + expect(reports[0]!.driftRate).toBe(0); + expect(reports[0]!.breach).toBe(false); + }); +}); + +// ── auto-promotion ───────────────────────────────────────────────────────── + +describe('AutoPromotionTracker', () => { + let tracker: AutoPromotionTracker; + let logger: ReturnType; + + beforeEach(() => { + logger = makeLoggerStub(); + tracker = new AutoPromotionTracker(logger as never); + }); + + it('starts clean', () => { + expect(tracker.getConsecutiveBreaches('rm')).toBe(0); + }); + + it('recordBreach increments consecutive counter', () => { + tracker.recordBreach('rm'); + expect(tracker.getConsecutiveBreaches('rm')).toBe(1); + }); + + it('does NOT promote on a single breach', () => { + const d = tracker.recordBreach('rm'); + expect(d.shouldPromote).toBe(false); + expect(d.state).toBe('breach'); + }); + + it(`promotes after ${PROMOTION_THRESHOLD} CONSECUTIVE sampled breaches`, () => { + tracker.recordBreach('rm'); + const d = tracker.recordBreach('rm'); + expect(d.shouldPromote).toBe(true); + expect(d.state).toBe('promoted'); + expect(d.consecutiveBreaches).toBe(PROMOTION_THRESHOLD); + expect(logger.error).toHaveBeenCalled(); + }); + + it('recordClean resets the counter', () => { + tracker.recordBreach('rm'); + tracker.recordClean('rm'); + expect(tracker.getConsecutiveBreaches('rm')).toBe(0); + const d = tracker.recordBreach('rm'); + expect(d.shouldPromote).toBe(false); + }); + + it('tracks per-read-model counters independently', () => { + tracker.recordBreach('rm_a'); + tracker.recordBreach('rm_a'); + tracker.recordBreach('rm_b'); + expect(tracker.getConsecutiveBreaches('rm_a')).toBe(2); + expect(tracker.getConsecutiveBreaches('rm_b')).toBe(1); + }); + + it('recordError leaves counter unchanged', () => { + tracker.recordBreach('rm'); + tracker.recordError('rm'); + expect(tracker.getConsecutiveBreaches('rm')).toBe(1); + }); +}); + +// ── integrated: harness + tracker ────────────────────────────────────────── + +describe('ReconciliationHarness ↔ AutoPromotionTracker', () => { + it('auto-promotes after 2 consecutive sampled breaches', async () => { + const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`); + const drifted = new Set(universe); // 100% drift + const rec = new SyntheticReconciler('bad_rm', universe, drifted); + const { harness, tracker } = harnessWith([rec]); + + const run1 = await harness.runSampled(); + expect(run1[0]!.breach).toBe(true); + expect(run1[0]!.promotion.shouldPromote).toBe(false); + + const run2 = await harness.runSampled(); + expect(run2[0]!.breach).toBe(true); + expect(run2[0]!.promotion.shouldPromote).toBe(true); + expect(tracker.getConsecutiveBreaches('bad_rm')).toBe(2); + }); + + it('a clean run breaks the promotion streak', async () => { + const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`); + + // breach run + const hotRec = new SyntheticReconciler('rm', universe, new Set(universe)); + const { harness: h1, tracker } = harnessWith([hotRec]); + await h1.runSampled(); + expect(tracker.getConsecutiveBreaches('rm')).toBe(1); + + // clean run with SAME tracker + const coldRec = new SyntheticReconciler('rm', universe, new Set()); + const { harness: h2 } = harnessWith([coldRec], tracker); + const r = await h2.runSampled(); + expect(r[0]!.breach).toBe(false); + expect(tracker.getConsecutiveBreaches('rm')).toBe(0); + }); + + it('full-mode runs do NOT advance the auto-promotion counter', async () => { + const universe = Array.from({ length: 100 }, (_, i) => `k${i}`); + const rec = new SyntheticReconciler('rm', universe, new Set(universe)); + const { harness, tracker } = harnessWith([rec]); + + await harness.runFull(); + await harness.runFull(); + expect(tracker.getConsecutiveBreaches('rm')).toBe(0); + }); + + it('uses DEFAULT_SAMPLED_CONFIG breach threshold of 0.1%', () => { + expect(DEFAULT_SAMPLED_CONFIG.breachThreshold).toBeCloseTo(0.001); + expect(DEFAULT_SAMPLED_CONFIG.sampleRate).toBeCloseTo(0.01); + expect(DEFAULT_SAMPLED_CONFIG.minSamples).toBeGreaterThanOrEqual(20); + }); + + it('harness with empty registry is a no-op', async () => { + const { harness } = harnessWith([]); + await expect(harness.runSampled()).resolves.toEqual([]); + await expect(harness.runFull()).resolves.toEqual([]); + }); +}); diff --git a/apps/api/src/modules/read-models/application/reconciliation/auto-promotion-tracker.ts b/apps/api/src/modules/read-models/application/reconciliation/auto-promotion-tracker.ts new file mode 100644 index 0000000..39360dd --- /dev/null +++ b/apps/api/src/modules/read-models/application/reconciliation/auto-promotion-tracker.ts @@ -0,0 +1,103 @@ +import { Injectable } from '@nestjs/common'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import. +import { LoggerService } from '@modules/shared'; + +/** + * Promotion threshold — number of CONSECUTIVE sampled breaches before the + * harness pages on-call to flip the config to nightly-full mode. + * + * RFC-003 §7: "Auto-promotion rule: 2 consecutive sampled breaches → + * page on-call to flip config to nightly full". + */ +export const PROMOTION_THRESHOLD = 2; + +export type PromotionState = 'clean' | 'breach' | 'promoted'; + +export interface PromotionDecision { + readonly readModelName: string; + readonly state: PromotionState; + readonly consecutiveBreaches: number; + readonly shouldPromote: boolean; +} + +/** + * In-memory auto-promotion state machine. + * + * Keeps a per-read-model counter of consecutive sampled breaches. A clean + * run resets the counter; hitting `PROMOTION_THRESHOLD` emits a structured + * log line that the alerting stack pages on via Loki, plus flips the + * returned `shouldPromote` flag which the cron service exposes as a + * Prometheus counter for redundant alerting. + * + * Multi-instance deployments: counter resets on process restart. That is + * deliberate — we rely on Prometheus `increase(...)` over the alerting + * window, not on in-process state, for the pager. The in-process state + * is there so a single long-lived instance auto-pages without waiting + * for the Prometheus scrape cycle. + */ +@Injectable() +export class AutoPromotionTracker { + private readonly counters = new Map(); + + constructor(private readonly logger: LoggerService) {} + + recordBreach(readModelName: string): PromotionDecision { + const next = (this.counters.get(readModelName) ?? 0) + 1; + this.counters.set(readModelName, next); + + const shouldPromote = next >= PROMOTION_THRESHOLD; + if (shouldPromote) { + this.logger.error( + `auto_promote_required read_model=${readModelName} consecutive_breaches=${next}`, + undefined, + 'ReconciliationAutoPromotion', + ); + } else { + this.logger.warn( + `reconciliation_breach read_model=${readModelName} consecutive_breaches=${next}`, + 'ReconciliationAutoPromotion', + ); + } + + return { + readModelName, + state: shouldPromote ? 'promoted' : 'breach', + consecutiveBreaches: next, + shouldPromote, + }; + } + + recordClean(readModelName: string): PromotionDecision { + this.counters.set(readModelName, 0); + return { + readModelName, + state: 'clean', + consecutiveBreaches: 0, + shouldPromote: false, + }; + } + + /** + * Error during sampling does NOT reset the counter — the run is simply + * inconclusive. Returns the unchanged state for telemetry. + */ + recordError(readModelName: string): PromotionDecision { + const current = this.counters.get(readModelName) ?? 0; + return { + readModelName, + state: current >= PROMOTION_THRESHOLD ? 'promoted' : current > 0 ? 'breach' : 'clean', + consecutiveBreaches: current, + shouldPromote: false, + }; + } + + /** Test-only: inspect internal state. */ + getConsecutiveBreaches(readModelName: string): number { + return this.counters.get(readModelName) ?? 0; + } + + /** Test-only: clear all counters. */ + reset(): void { + this.counters.clear(); + } +} diff --git a/apps/api/src/modules/read-models/application/reconciliation/index.ts b/apps/api/src/modules/read-models/application/reconciliation/index.ts new file mode 100644 index 0000000..16f0c31 --- /dev/null +++ b/apps/api/src/modules/read-models/application/reconciliation/index.ts @@ -0,0 +1,11 @@ +export { + AutoPromotionTracker, + PROMOTION_THRESHOLD, + type PromotionDecision, + type PromotionState, +} from './auto-promotion-tracker'; +export { + ReconciliationHarness, + DEFAULT_SAMPLED_CONFIG, + type ReconciliationRunReport, +} from './reconciliation-harness'; diff --git a/apps/api/src/modules/read-models/application/reconciliation/reconciliation-harness.ts b/apps/api/src/modules/read-models/application/reconciliation/reconciliation-harness.ts new file mode 100644 index 0000000..25f7e02 --- /dev/null +++ b/apps/api/src/modules/read-models/application/reconciliation/reconciliation-harness.ts @@ -0,0 +1,189 @@ +import { Inject, Injectable, Optional } from '@nestjs/common'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import. +import { LoggerService } from '@modules/shared'; +import { + RECONCILER_REGISTRY, + type IReadModelReconciler, + type ReconciliationMode, + type ReconciliationSampleResult, +} from '../../domain/reconciler'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import. +import { AutoPromotionTracker } from './auto-promotion-tracker'; +import type { PromotionDecision } from './auto-promotion-tracker'; + +/** + * Default sampled cadence config — RFC-003 §7: + * "1% random sample reconciliation per registered read model" + * "alert if drift > 0.1%" + * + * `minSamples` guarantees we always run at least 20 comparisons even for + * tiny read models where 1% is a fractional row count. + */ +export const DEFAULT_SAMPLED_CONFIG = Object.freeze({ + sampleRate: 0.01, + minSamples: 20, + breachThreshold: 0.001, +}); + +export interface ReconciliationRunReport { + readonly readModelName: string; + readonly mode: ReconciliationMode; + readonly checked: number; + readonly drifted: number; + readonly driftRate: number; + readonly breach: boolean; + readonly driftReasons: readonly string[]; + readonly promotion: PromotionDecision; + readonly error?: string; +} + +/** + * Orchestrates reconciler runs for both cadences. Stateless — cron service + * owns scheduling, locking and metrics. The harness only does: + * 1. Ask each reconciler for candidate keys. + * 2. Sample (1% + min floor) for sampled runs; take all for full runs. + * 3. Invoke `reconcile(sampleKey)` per sample. + * 4. Feed the drift outcome into the auto-promotion tracker. + * 5. Return a `ReconciliationRunReport[]` per read model. + */ +@Injectable() +export class ReconciliationHarness { + constructor( + private readonly logger: LoggerService, + private readonly promotionTracker: AutoPromotionTracker, + @Optional() + @Inject(RECONCILER_REGISTRY) + private readonly reconcilers: readonly IReadModelReconciler[] | null, + ) {} + + get registry(): readonly IReadModelReconciler[] { + return this.reconcilers ?? []; + } + + async runSampled( + config = DEFAULT_SAMPLED_CONFIG, + ): Promise { + return Promise.all( + this.registry.map((r) => this.runOne(r, 'sampled', config)), + ); + } + + async runFull( + breachThreshold = DEFAULT_SAMPLED_CONFIG.breachThreshold, + ): Promise { + return Promise.all( + this.registry.map((r) => + this.runOne(r, 'full', { + sampleRate: 1, + minSamples: 0, + breachThreshold, + }), + ), + ); + } + + private async runOne( + reconciler: IReadModelReconciler, + mode: ReconciliationMode, + config: typeof DEFAULT_SAMPLED_CONFIG, + ): Promise { + const name = reconciler.readModelName; + try { + const budget = + mode === 'full' ? Number.MAX_SAFE_INTEGER : Math.max(config.minSamples, 100); + const universe = await reconciler.listSampleKeys(mode, budget); + const sample = + mode === 'full' + ? universe + : ReconciliationHarness.pickSample( + universe, + config.sampleRate, + config.minSamples, + ); + + const results: ReconciliationSampleResult[] = []; + for (const key of sample) { + results.push(await reconciler.reconcile(key)); + } + + const drifted = results.filter((r) => !r.inSync); + const checked = results.length; + const driftRate = checked === 0 ? 0 : drifted.length / checked; + const breach = driftRate > config.breachThreshold; + + const promotion = + mode === 'sampled' + ? breach + ? this.promotionTracker.recordBreach(name) + : this.promotionTracker.recordClean(name) + : { + readModelName: name, + state: breach ? ('breach' as const) : ('clean' as const), + consecutiveBreaches: this.promotionTracker.getConsecutiveBreaches(name), + shouldPromote: false, + }; + + this.logger.log( + `reconciliation_run read_model=${name} mode=${mode} checked=${checked} drifted=${drifted.length} drift_rate=${driftRate.toFixed(5)} breach=${breach}`, + 'ReconciliationHarness', + ); + + return { + readModelName: name, + mode, + checked, + drifted: drifted.length, + driftRate, + breach, + driftReasons: drifted.map((d) => `${d.sampleKey}:${d.reason ?? 'drift'}`), + promotion, + }; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.logger.error( + `reconciliation_run_error read_model=${name} mode=${mode} err=${message}`, + err instanceof Error ? err.stack : undefined, + 'ReconciliationHarness', + ); + return { + readModelName: name, + mode, + checked: 0, + drifted: 0, + driftRate: 0, + breach: false, + driftReasons: [], + promotion: this.promotionTracker.recordError(name), + error: message, + }; + } + } + + /** + * Fisher–Yates partial shuffle: takes `max(ceil(rate*N), minSamples)` + * items without replacement in O(k) time and O(1) extra allocations past + * the copy of the input array. + * + * `rng` lets tests inject a seeded PRNG; production uses `Math.random`. + */ + static pickSample( + universe: readonly T[], + rate: number, + minSamples: number, + rng: () => number = Math.random, + ): readonly T[] { + const n = universe.length; + if (n === 0) return []; + const target = Math.min(n, Math.max(minSamples, Math.ceil(rate * n))); + if (target >= n) return [...universe]; + + const pool = [...universe]; + for (let i = 0; i < target; i += 1) { + const j = i + Math.floor(rng() * (n - i)); + const tmp = pool[i] as T; + pool[i] = pool[j] as T; + pool[j] = tmp; + } + return pool.slice(0, target); + } +} diff --git a/apps/api/src/modules/read-models/domain/index.ts b/apps/api/src/modules/read-models/domain/index.ts index 6b1fba5..e44cd87 100644 --- a/apps/api/src/modules/read-models/domain/index.ts +++ b/apps/api/src/modules/read-models/domain/index.ts @@ -15,3 +15,9 @@ export { READ_MODEL_KILL_SWITCH, type IReadModelKillSwitch, } from './read-model-kill-switch'; +export { + RECONCILER_REGISTRY, + type IReadModelReconciler, + type ReconciliationMode, + type ReconciliationSampleResult, +} from './reconciler'; diff --git a/apps/api/src/modules/read-models/domain/reconciler.ts b/apps/api/src/modules/read-models/domain/reconciler.ts new file mode 100644 index 0000000..f1cb90e --- /dev/null +++ b/apps/api/src/modules/read-models/domain/reconciler.ts @@ -0,0 +1,49 @@ +/** + * Per-read-model reconciler port — RFC-003 §7. + * + * Reconcilers compare projected read-model rows against the authoritative + * write-model and report whether each sampled key is in sync. The harness + * drives them on both cadences (1% sampled nightly + 100% weekly full). + * + * Phase 0 ships the port only; Phase 2 lands the first concrete reconciler. + */ + +export interface ReconciliationSampleResult { + /** Opaque per-reconciler sample key (e.g. listing ID). */ + readonly sampleKey: string; + /** `true` when read-model row matches the authoritative source. */ + readonly inSync: boolean; + /** Optional human-readable drift reason (omit when `inSync === true`). */ + readonly reason?: string; +} + +export type ReconciliationMode = 'sampled' | 'full'; + +export interface IReadModelReconciler { + /** Stable read-model name; matches the `IReadRepository` name. */ + readonly readModelName: string; + + /** + * Returns the candidate universe of sample keys. For `mode === 'full'` the + * returned list SHOULD be the complete population; the harness still caps + * by `sampleBudget` for memory-safety. + */ + listSampleKeys( + mode: ReconciliationMode, + sampleBudget: number, + ): Promise; + + /** + * Compare one sample key against the authoritative source. MUST be pure + * (read-only) and MUST NOT throw for a single drift — return + * `{ inSync: false, reason }` instead. + */ + reconcile(sampleKey: string): Promise; +} + +/** + * Nest DI token for the `IReadModelReconciler[]` registry. Providers + * contribute reconcilers via a `useExisting` / `useClass` pattern; the + * registry itself is supplied as a plain array so the harness can iterate. + */ +export const RECONCILER_REGISTRY = Symbol('RECONCILER_REGISTRY'); diff --git a/apps/api/src/modules/read-models/infrastructure/reconciliation/index.ts b/apps/api/src/modules/read-models/infrastructure/reconciliation/index.ts index 47555ff..2605f31 100644 --- a/apps/api/src/modules/read-models/infrastructure/reconciliation/index.ts +++ b/apps/api/src/modules/read-models/infrastructure/reconciliation/index.ts @@ -1,8 +1,20 @@ /** * Reconciliation infrastructure (RFC-003 §7). * - * Phase 0 ships only the placeholder. Phase 2 lands the sampled nightly - * (1%) drift checker; the weekly full reconciliation runs follow once - * Phase 2 has soaked in production for one cycle. + * Ships the Phase 0 harness: + * - `ReconciliationCronService` — sampled nightly + full weekly schedules + * guarded by a Redis SET NX EX lock for at-most-once execution. + * - Prometheus metric names re-exported so alert rules/dashboards can + * reference the constants. + * + * Concrete per-read-model reconcilers land with Phase 2 via the + * `RECONCILER_REGISTRY` DI token. */ -export {}; +export { + ReconciliationCronService, + RECON_SAMPLES_TOTAL, + RECON_DRIFT_TOTAL, + RECON_BREACH_TOTAL, + RECON_PROMOTION_TOTAL, + RECON_DURATION, +} from './reconciliation-cron.service'; diff --git a/apps/api/src/modules/read-models/infrastructure/reconciliation/reconciliation-cron.service.ts b/apps/api/src/modules/read-models/infrastructure/reconciliation/reconciliation-cron.service.ts new file mode 100644 index 0000000..418e0ba --- /dev/null +++ b/apps/api/src/modules/read-models/infrastructure/reconciliation/reconciliation-cron.service.ts @@ -0,0 +1,138 @@ +import { Injectable } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { InjectMetric } from '@willsoto/nestjs-prometheus'; +import type { Counter, Histogram } from 'prom-client'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import. +import { LoggerService, RedisService } from '@modules/shared'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import. +import { ReconciliationHarness, type ReconciliationRunReport } from '../../application/reconciliation'; + +/** Prometheus metric names — exported so dashboards / alerts can reference them. */ +export const RECON_SAMPLES_TOTAL = 'goodgo_read_model_reconciliation_samples_total'; +export const RECON_DRIFT_TOTAL = 'goodgo_read_model_reconciliation_drift_total'; +export const RECON_BREACH_TOTAL = 'goodgo_read_model_reconciliation_breach_total'; +export const RECON_PROMOTION_TOTAL = 'goodgo_read_model_reconciliation_promotion_total'; +export const RECON_DURATION = 'goodgo_read_model_reconciliation_duration_seconds'; + +const LOCK_PREFIX = 'recon:lock:'; +const SAMPLED_LOCK_TTL_SEC = 5 * 60; // 5 minutes +const FULL_LOCK_TTL_SEC = 60 * 60; // 1 hour + +/** + * Cron orchestrator for the reconciliation harness — RFC-003 §7. + * + * - Sampled nightly: 03:17 UTC (≈ 10:17 ICT). 1% sample per read model, + * alerts when drift > 0.1%, two consecutive breaches auto-page on-call. + * - Full weekly: 04:30 UTC Sunday (≈ 11:30 ICT). Walks every key. + * + * A Redis SET NX EX lock (`recon:lock:`) guarantees at-most-once + * execution across horizontally-scaled API instances. + */ +@Injectable() +export class ReconciliationCronService { + constructor( + private readonly harness: ReconciliationHarness, + private readonly redis: RedisService, + private readonly logger: LoggerService, + @InjectMetric(RECON_SAMPLES_TOTAL) + private readonly samplesCounter: Counter, + @InjectMetric(RECON_DRIFT_TOTAL) + private readonly driftCounter: Counter, + @InjectMetric(RECON_BREACH_TOTAL) + private readonly breachCounter: Counter, + @InjectMetric(RECON_PROMOTION_TOTAL) + private readonly promotionCounter: Counter, + @InjectMetric(RECON_DURATION) + private readonly durationHistogram: Histogram, + ) {} + + @Cron('17 3 * * *', { name: 'reconciliation-sampled-nightly', timeZone: 'UTC' }) + async runSampledNightly(): Promise { + await this.runWithLock('sampled', () => this.harness.runSampled()); + } + + @Cron('30 4 * * 0', { name: 'reconciliation-full-weekly', timeZone: 'UTC' }) + async runFullWeekly(): Promise { + await this.runWithLock('full', () => this.harness.runFull()); + } + + private async runWithLock( + mode: 'sampled' | 'full', + run: () => Promise, + ): Promise { + const ttl = mode === 'full' ? FULL_LOCK_TTL_SEC : SAMPLED_LOCK_TTL_SEC; + const lockKey = `${LOCK_PREFIX}${mode}`; + const token = `${process.pid}-${Date.now()}`; + let acquired = false; + + try { + const client = this.redis.getClient(); + const result = await client.set(lockKey, token, 'EX', ttl, 'NX'); + acquired = result === 'OK'; + } catch (err) { + this.logger.error( + `reconciliation_lock_error mode=${mode} err=${(err as Error).message}`, + (err as Error).stack, + 'ReconciliationCronService', + ); + // Fail closed on Redis errors — do not run unguarded. + return null; + } + + if (!acquired) { + this.logger.log( + `reconciliation_skip mode=${mode} reason=lock_held`, + 'ReconciliationCronService', + ); + return null; + } + + const startedAt = process.hrtime.bigint(); + try { + const reports = await run(); + this.recordMetrics(mode, reports, startedAt); + return reports; + } catch (err) { + this.logger.error( + `reconciliation_run_failed mode=${mode} err=${(err as Error).message}`, + (err as Error).stack, + 'ReconciliationCronService', + ); + return null; + } finally { + try { + // Best-effort lock release: only if we still own it. + const client = this.redis.getClient(); + const current = await client.get(lockKey); + if (current === token) { + await client.del(lockKey); + } + } catch { + // Ignore — TTL will reap the lock. + } + } + } + + private recordMetrics( + mode: 'sampled' | 'full', + reports: readonly ReconciliationRunReport[], + startedAt: bigint, + ): void { + const elapsedSec = Number(process.hrtime.bigint() - startedAt) / 1e9; + this.durationHistogram.labels({ mode }).observe(elapsedSec); + + for (const report of reports) { + const labels = { read_model: report.readModelName, mode }; + this.samplesCounter.inc(labels, report.checked); + if (report.drifted > 0) { + this.driftCounter.inc(labels, report.drifted); + } + if (report.breach) { + this.breachCounter.inc(labels); + } + if (report.promotion.shouldPromote) { + this.promotionCounter.inc({ read_model: report.readModelName }); + } + } + } +} diff --git a/apps/api/src/modules/read-models/read-models.module.ts b/apps/api/src/modules/read-models/read-models.module.ts index a7f0e75..dfa2f21 100644 --- a/apps/api/src/modules/read-models/read-models.module.ts +++ b/apps/api/src/modules/read-models/read-models.module.ts @@ -1,34 +1,90 @@ import { Module } from '@nestjs/common'; import { CqrsModule } from '@nestjs/cqrs'; +import { + PrometheusModule, + makeCounterProvider, + makeHistogramProvider, +} from '@willsoto/nestjs-prometheus'; import { SharedModule } from '@modules/shared'; +import { + AutoPromotionTracker, + ReconciliationHarness, +} from './application/reconciliation'; import { READ_MODEL_KILL_SWITCH } from './domain/read-model-kill-switch'; +import { RECONCILER_REGISTRY } from './domain/reconciler'; import { ConfigReadModelKillSwitch } from './infrastructure/config-read-model-kill-switch'; +import { + ReconciliationCronService, + RECON_SAMPLES_TOTAL, + RECON_DRIFT_TOTAL, + RECON_BREACH_TOTAL, + RECON_PROMOTION_TOTAL, + RECON_DURATION, +} from './infrastructure/reconciliation'; /** - * Read-models module skeleton — RFC-003 Phase 0. + * Read-models module — RFC-003 Phase 0. * * Hosts: * - Projector base class (`application/projectors/projector.base.ts`). * - Read-model repository convention (`domain/read-repository.ts`). * - Idempotency port (`domain/projection-offset-store.ts`). * - Per-read-model kill switch (`domain/read-model-kill-switch.ts`). + * - Reconciliation harness + cron (sampled nightly + full weekly, + * RFC-003 §7 / [GOO-191](/GOO/issues/GOO-191)). * - * No projectors, repositories, or `IProjectionOffsetStore` provider are - * registered here yet. The Prisma-backed offset store binding lands with - * [GOO-187](/GOO/issues/GOO-187); per-read-model projectors land in - * Phase 2/3. - * - * The module is imported by `AppModule` so its DI container is wired up - * even while empty — keeps Phase 2/3 PRs strictly additive. + * The Prisma-backed offset store binding lands with + * [GOO-187](/GOO/issues/GOO-187). `RECONCILER_REGISTRY` is provided as an + * empty array in Phase 0 — concrete reconcilers register against the + * token starting in Phase 2; the harness uses `@Optional()` so an empty + * registry simply results in no-op runs. */ @Module({ - imports: [CqrsModule, SharedModule], + imports: [CqrsModule, SharedModule, PrometheusModule], providers: [ { provide: READ_MODEL_KILL_SWITCH, useClass: ConfigReadModelKillSwitch, }, + { + provide: RECONCILER_REGISTRY, + useValue: [], + }, + AutoPromotionTracker, + ReconciliationHarness, + ReconciliationCronService, + makeCounterProvider({ + name: RECON_SAMPLES_TOTAL, + help: 'Total samples checked by the read-model reconciliation harness', + labelNames: ['read_model', 'mode'], + }), + makeCounterProvider({ + name: RECON_DRIFT_TOTAL, + help: 'Total drifted samples detected by the read-model reconciliation harness', + labelNames: ['read_model', 'mode'], + }), + makeCounterProvider({ + name: RECON_BREACH_TOTAL, + help: 'Reconciliation runs whose drift rate exceeded the breach threshold (>0.1%)', + labelNames: ['read_model', 'mode'], + }), + makeCounterProvider({ + name: RECON_PROMOTION_TOTAL, + help: 'Auto-promotion events (2 consecutive sampled breaches) — pages on-call', + labelNames: ['read_model'], + }), + makeHistogramProvider({ + name: RECON_DURATION, + help: 'Wall-clock duration of a reconciliation run in seconds', + labelNames: ['mode'], + buckets: [0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1800], + }), + ], + exports: [ + READ_MODEL_KILL_SWITCH, + RECONCILER_REGISTRY, + AutoPromotionTracker, + ReconciliationHarness, ], - exports: [READ_MODEL_KILL_SWITCH], }) export class ReadModelsModule {}