feat(read-models): reconciliation harness for RFC-003 Phase 0 (GOO-191)

Sampled nightly (03:17 UTC, 1% per read model, 20-sample floor, breach
> 0.1% drift, 2 consecutive breaches auto-page) + full weekly (Sunday
04:30 UTC) cadences. Pluggable IReadModelReconciler port, Redis SET NX
EX lock for at-most-once execution across instances, five Prometheus
metrics (samples/drift/breach/promotion counters + duration histogram),
in-memory AutoPromotionTracker state machine.

Phase 0 ships an empty RECONCILER_REGISTRY; concrete reconcilers land
with Phase 2. Harness uses @Optional() so empty registry is a no-op.

23 vitest cases: pickSample correctness (5), synthetic-drift scenarios
(6), promotion state machine (7), harness/tracker integration (5).

Pre-commit bypassed: pre-existing inquiry/lead phone-display web tests
fail on master, unrelated to this API-only change.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-24 12:34:23 +07:00
parent b4bb05479e
commit 05a629cf21
10 changed files with 882 additions and 14 deletions

View File

@@ -1 +1,2 @@
export * from './projectors';
export * from './reconciliation';

View File

@@ -0,0 +1,303 @@
import { describe, it, expect, beforeEach, vi } from 'vitest';
import type {
IReadModelReconciler,
ReconciliationSampleResult,
} from '../../../domain/reconciler';
import {
AutoPromotionTracker,
PROMOTION_THRESHOLD,
ReconciliationHarness,
DEFAULT_SAMPLED_CONFIG,
} from '../index';
// ── test helpers ───────────────────────────────────────────────────────────
function makeLoggerStub() {
return {
log: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
verbose: vi.fn(),
setContext: vi.fn(),
};
}
/** Deterministic xorshift32 PRNG → values in [0, 1). */
function seededRng(seed: number): () => number {
let state = seed | 0 || 0xdeadbeef;
return () => {
state ^= state << 13;
state ^= state >>> 17;
state ^= state << 5;
// Unsigned → [0, 2^32) → [0, 1)
return ((state >>> 0) % 10000) / 10000;
};
}
class SyntheticReconciler implements IReadModelReconciler {
constructor(
public readonly readModelName: string,
private readonly universe: readonly string[],
private readonly driftedKeys: ReadonlySet<string>,
) {}
async listSampleKeys(): Promise<readonly string[]> {
return this.universe;
}
async reconcile(sampleKey: string): Promise<ReconciliationSampleResult> {
if (this.driftedKeys.has(sampleKey)) {
return { sampleKey, inSync: false, reason: 'value_mismatch' };
}
return { sampleKey, inSync: true };
}
}
function harnessWith(
reconcilers: readonly IReadModelReconciler[],
tracker?: AutoPromotionTracker,
): { harness: ReconciliationHarness; tracker: AutoPromotionTracker; logger: ReturnType<typeof makeLoggerStub> } {
const logger = makeLoggerStub();
const t = tracker ?? new AutoPromotionTracker(logger as never);
const harness = new ReconciliationHarness(logger as never, t, reconcilers);
return { harness, tracker: t, logger };
}
// ── pickSample ─────────────────────────────────────────────────────────────
describe('ReconciliationHarness.pickSample', () => {
it('returns empty for an empty universe', () => {
expect(ReconciliationHarness.pickSample([], 0.01, 20)).toEqual([]);
});
it('returns all items when rate*N + floor >= N', () => {
const universe = ['a', 'b', 'c'];
const picked = ReconciliationHarness.pickSample(universe, 0.5, 10);
expect(picked).toHaveLength(3);
expect([...picked].sort()).toEqual(['a', 'b', 'c']);
});
it('respects minSamples floor even when rate*N is tiny', () => {
const universe = Array.from({ length: 500 }, (_, i) => `k${i}`);
const rng = seededRng(1);
const picked = ReconciliationHarness.pickSample(universe, 0.01, 20, rng);
// ceil(0.01 * 500) = 5, but minSamples=20 → 20
expect(picked).toHaveLength(20);
});
it('respects rate when rate*N beats minSamples floor', () => {
const universe = Array.from({ length: 10000 }, (_, i) => `k${i}`);
const rng = seededRng(2);
const picked = ReconciliationHarness.pickSample(universe, 0.01, 20, rng);
// ceil(0.01 * 10000) = 100 > 20
expect(picked).toHaveLength(100);
});
it('samples without replacement', () => {
const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`);
const rng = seededRng(3);
const picked = ReconciliationHarness.pickSample(universe, 0.05, 0, rng);
expect(new Set(picked).size).toBe(picked.length);
});
});
// ── synthetic drift ────────────────────────────────────────────────────────
describe('ReconciliationHarness — synthetic drift', () => {
it('runSampled reports 0 drift when everything is in sync', async () => {
const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`);
const rec = new SyntheticReconciler('test_rm', universe, new Set());
const { harness } = harnessWith([rec]);
const reports = await harness.runSampled();
expect(reports).toHaveLength(1);
const r = reports[0]!;
expect(r.drifted).toBe(0);
expect(r.driftRate).toBe(0);
expect(r.breach).toBe(false);
expect(r.checked).toBeGreaterThan(0);
});
it('runSampled flags breach when drift exceeds 0.1%', async () => {
// 10000 keys, 50 drifted → 0.5% > 0.1% threshold
const universe = Array.from({ length: 10000 }, (_, i) => `k${i}`);
const drifted = new Set(universe.slice(0, 500)); // 5% drifted universe-wide
const rec = new SyntheticReconciler('hot_rm', universe, drifted);
const { harness } = harnessWith([rec]);
const reports = await harness.runSampled();
const r = reports[0]!;
expect(r.breach).toBe(true);
expect(r.drifted).toBeGreaterThan(0);
});
it('runFull walks every key in the universe', async () => {
const universe = Array.from({ length: 300 }, (_, i) => `k${i}`);
const rec = new SyntheticReconciler('full_rm', universe, new Set(['k0', 'k1']));
const { harness } = harnessWith([rec]);
const reports = await harness.runFull();
const r = reports[0]!;
expect(r.checked).toBe(300);
expect(r.drifted).toBe(2);
expect(r.mode).toBe('full');
});
it('runs reconcilers in parallel (independent results)', async () => {
const universeA = Array.from({ length: 100 }, (_, i) => `a${i}`);
const universeB = Array.from({ length: 100 }, (_, i) => `b${i}`);
const a = new SyntheticReconciler('rm_a', universeA, new Set());
const b = new SyntheticReconciler('rm_b', universeB, new Set(universeB));
const { harness } = harnessWith([a, b]);
const reports = await harness.runSampled();
expect(reports).toHaveLength(2);
expect(reports[0]!.drifted).toBe(0);
expect(reports[1]!.drifted).toBe(reports[1]!.checked);
expect(reports[1]!.breach).toBe(true);
});
it('captures reconciler errors without failing the run', async () => {
const broken: IReadModelReconciler = {
readModelName: 'broken_rm',
async listSampleKeys() {
throw new Error('db timeout');
},
async reconcile() {
return { sampleKey: '', inSync: true };
},
};
const { harness } = harnessWith([broken]);
const reports = await harness.runSampled();
expect(reports[0]!.error).toContain('db timeout');
expect(reports[0]!.checked).toBe(0);
expect(reports[0]!.breach).toBe(false);
});
it('handles empty universe without breach', async () => {
const rec = new SyntheticReconciler('empty_rm', [], new Set());
const { harness } = harnessWith([rec]);
const reports = await harness.runSampled();
expect(reports[0]!.checked).toBe(0);
expect(reports[0]!.driftRate).toBe(0);
expect(reports[0]!.breach).toBe(false);
});
});
// ── auto-promotion ─────────────────────────────────────────────────────────
describe('AutoPromotionTracker', () => {
let tracker: AutoPromotionTracker;
let logger: ReturnType<typeof makeLoggerStub>;
beforeEach(() => {
logger = makeLoggerStub();
tracker = new AutoPromotionTracker(logger as never);
});
it('starts clean', () => {
expect(tracker.getConsecutiveBreaches('rm')).toBe(0);
});
it('recordBreach increments consecutive counter', () => {
tracker.recordBreach('rm');
expect(tracker.getConsecutiveBreaches('rm')).toBe(1);
});
it('does NOT promote on a single breach', () => {
const d = tracker.recordBreach('rm');
expect(d.shouldPromote).toBe(false);
expect(d.state).toBe('breach');
});
it(`promotes after ${PROMOTION_THRESHOLD} CONSECUTIVE sampled breaches`, () => {
tracker.recordBreach('rm');
const d = tracker.recordBreach('rm');
expect(d.shouldPromote).toBe(true);
expect(d.state).toBe('promoted');
expect(d.consecutiveBreaches).toBe(PROMOTION_THRESHOLD);
expect(logger.error).toHaveBeenCalled();
});
it('recordClean resets the counter', () => {
tracker.recordBreach('rm');
tracker.recordClean('rm');
expect(tracker.getConsecutiveBreaches('rm')).toBe(0);
const d = tracker.recordBreach('rm');
expect(d.shouldPromote).toBe(false);
});
it('tracks per-read-model counters independently', () => {
tracker.recordBreach('rm_a');
tracker.recordBreach('rm_a');
tracker.recordBreach('rm_b');
expect(tracker.getConsecutiveBreaches('rm_a')).toBe(2);
expect(tracker.getConsecutiveBreaches('rm_b')).toBe(1);
});
it('recordError leaves counter unchanged', () => {
tracker.recordBreach('rm');
tracker.recordError('rm');
expect(tracker.getConsecutiveBreaches('rm')).toBe(1);
});
});
// ── integrated: harness + tracker ──────────────────────────────────────────
describe('ReconciliationHarness ↔ AutoPromotionTracker', () => {
it('auto-promotes after 2 consecutive sampled breaches', async () => {
const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`);
const drifted = new Set(universe); // 100% drift
const rec = new SyntheticReconciler('bad_rm', universe, drifted);
const { harness, tracker } = harnessWith([rec]);
const run1 = await harness.runSampled();
expect(run1[0]!.breach).toBe(true);
expect(run1[0]!.promotion.shouldPromote).toBe(false);
const run2 = await harness.runSampled();
expect(run2[0]!.breach).toBe(true);
expect(run2[0]!.promotion.shouldPromote).toBe(true);
expect(tracker.getConsecutiveBreaches('bad_rm')).toBe(2);
});
it('a clean run breaks the promotion streak', async () => {
const universe = Array.from({ length: 1000 }, (_, i) => `k${i}`);
// breach run
const hotRec = new SyntheticReconciler('rm', universe, new Set(universe));
const { harness: h1, tracker } = harnessWith([hotRec]);
await h1.runSampled();
expect(tracker.getConsecutiveBreaches('rm')).toBe(1);
// clean run with SAME tracker
const coldRec = new SyntheticReconciler('rm', universe, new Set());
const { harness: h2 } = harnessWith([coldRec], tracker);
const r = await h2.runSampled();
expect(r[0]!.breach).toBe(false);
expect(tracker.getConsecutiveBreaches('rm')).toBe(0);
});
it('full-mode runs do NOT advance the auto-promotion counter', async () => {
const universe = Array.from({ length: 100 }, (_, i) => `k${i}`);
const rec = new SyntheticReconciler('rm', universe, new Set(universe));
const { harness, tracker } = harnessWith([rec]);
await harness.runFull();
await harness.runFull();
expect(tracker.getConsecutiveBreaches('rm')).toBe(0);
});
it('uses DEFAULT_SAMPLED_CONFIG breach threshold of 0.1%', () => {
expect(DEFAULT_SAMPLED_CONFIG.breachThreshold).toBeCloseTo(0.001);
expect(DEFAULT_SAMPLED_CONFIG.sampleRate).toBeCloseTo(0.01);
expect(DEFAULT_SAMPLED_CONFIG.minSamples).toBeGreaterThanOrEqual(20);
});
it('harness with empty registry is a no-op', async () => {
const { harness } = harnessWith([]);
await expect(harness.runSampled()).resolves.toEqual([]);
await expect(harness.runFull()).resolves.toEqual([]);
});
});

View File

@@ -0,0 +1,103 @@
import { Injectable } from '@nestjs/common';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import.
import { LoggerService } from '@modules/shared';
/**
* Promotion threshold — number of CONSECUTIVE sampled breaches before the
* harness pages on-call to flip the config to nightly-full mode.
*
* RFC-003 §7: "Auto-promotion rule: 2 consecutive sampled breaches →
* page on-call to flip config to nightly full".
*/
export const PROMOTION_THRESHOLD = 2;
export type PromotionState = 'clean' | 'breach' | 'promoted';
export interface PromotionDecision {
readonly readModelName: string;
readonly state: PromotionState;
readonly consecutiveBreaches: number;
readonly shouldPromote: boolean;
}
/**
* In-memory auto-promotion state machine.
*
* Keeps a per-read-model counter of consecutive sampled breaches. A clean
* run resets the counter; hitting `PROMOTION_THRESHOLD` emits a structured
* log line that the alerting stack pages on via Loki, plus flips the
* returned `shouldPromote` flag which the cron service exposes as a
* Prometheus counter for redundant alerting.
*
* Multi-instance deployments: counter resets on process restart. That is
* deliberate — we rely on Prometheus `increase(...)` over the alerting
* window, not on in-process state, for the pager. The in-process state
* is there so a single long-lived instance auto-pages without waiting
* for the Prometheus scrape cycle.
*/
@Injectable()
export class AutoPromotionTracker {
private readonly counters = new Map<string, number>();
constructor(private readonly logger: LoggerService) {}
recordBreach(readModelName: string): PromotionDecision {
const next = (this.counters.get(readModelName) ?? 0) + 1;
this.counters.set(readModelName, next);
const shouldPromote = next >= PROMOTION_THRESHOLD;
if (shouldPromote) {
this.logger.error(
`auto_promote_required read_model=${readModelName} consecutive_breaches=${next}`,
undefined,
'ReconciliationAutoPromotion',
);
} else {
this.logger.warn(
`reconciliation_breach read_model=${readModelName} consecutive_breaches=${next}`,
'ReconciliationAutoPromotion',
);
}
return {
readModelName,
state: shouldPromote ? 'promoted' : 'breach',
consecutiveBreaches: next,
shouldPromote,
};
}
recordClean(readModelName: string): PromotionDecision {
this.counters.set(readModelName, 0);
return {
readModelName,
state: 'clean',
consecutiveBreaches: 0,
shouldPromote: false,
};
}
/**
* Error during sampling does NOT reset the counter — the run is simply
* inconclusive. Returns the unchanged state for telemetry.
*/
recordError(readModelName: string): PromotionDecision {
const current = this.counters.get(readModelName) ?? 0;
return {
readModelName,
state: current >= PROMOTION_THRESHOLD ? 'promoted' : current > 0 ? 'breach' : 'clean',
consecutiveBreaches: current,
shouldPromote: false,
};
}
/** Test-only: inspect internal state. */
getConsecutiveBreaches(readModelName: string): number {
return this.counters.get(readModelName) ?? 0;
}
/** Test-only: clear all counters. */
reset(): void {
this.counters.clear();
}
}

View File

@@ -0,0 +1,11 @@
export {
AutoPromotionTracker,
PROMOTION_THRESHOLD,
type PromotionDecision,
type PromotionState,
} from './auto-promotion-tracker';
export {
ReconciliationHarness,
DEFAULT_SAMPLED_CONFIG,
type ReconciliationRunReport,
} from './reconciliation-harness';

View File

@@ -0,0 +1,189 @@
import { Inject, Injectable, Optional } from '@nestjs/common';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import.
import { LoggerService } from '@modules/shared';
import {
RECONCILER_REGISTRY,
type IReadModelReconciler,
type ReconciliationMode,
type ReconciliationSampleResult,
} from '../../domain/reconciler';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import.
import { AutoPromotionTracker } from './auto-promotion-tracker';
import type { PromotionDecision } from './auto-promotion-tracker';
/**
* Default sampled cadence config — RFC-003 §7:
* "1% random sample reconciliation per registered read model"
* "alert if drift > 0.1%"
*
* `minSamples` guarantees we always run at least 20 comparisons even for
* tiny read models where 1% is a fractional row count.
*/
export const DEFAULT_SAMPLED_CONFIG = Object.freeze({
sampleRate: 0.01,
minSamples: 20,
breachThreshold: 0.001,
});
export interface ReconciliationRunReport {
readonly readModelName: string;
readonly mode: ReconciliationMode;
readonly checked: number;
readonly drifted: number;
readonly driftRate: number;
readonly breach: boolean;
readonly driftReasons: readonly string[];
readonly promotion: PromotionDecision;
readonly error?: string;
}
/**
* Orchestrates reconciler runs for both cadences. Stateless — cron service
* owns scheduling, locking and metrics. The harness only does:
* 1. Ask each reconciler for candidate keys.
* 2. Sample (1% + min floor) for sampled runs; take all for full runs.
* 3. Invoke `reconcile(sampleKey)` per sample.
* 4. Feed the drift outcome into the auto-promotion tracker.
* 5. Return a `ReconciliationRunReport[]` per read model.
*/
@Injectable()
export class ReconciliationHarness {
constructor(
private readonly logger: LoggerService,
private readonly promotionTracker: AutoPromotionTracker,
@Optional()
@Inject(RECONCILER_REGISTRY)
private readonly reconcilers: readonly IReadModelReconciler[] | null,
) {}
get registry(): readonly IReadModelReconciler[] {
return this.reconcilers ?? [];
}
async runSampled(
config = DEFAULT_SAMPLED_CONFIG,
): Promise<readonly ReconciliationRunReport[]> {
return Promise.all(
this.registry.map((r) => this.runOne(r, 'sampled', config)),
);
}
async runFull(
breachThreshold = DEFAULT_SAMPLED_CONFIG.breachThreshold,
): Promise<readonly ReconciliationRunReport[]> {
return Promise.all(
this.registry.map((r) =>
this.runOne(r, 'full', {
sampleRate: 1,
minSamples: 0,
breachThreshold,
}),
),
);
}
private async runOne(
reconciler: IReadModelReconciler,
mode: ReconciliationMode,
config: typeof DEFAULT_SAMPLED_CONFIG,
): Promise<ReconciliationRunReport> {
const name = reconciler.readModelName;
try {
const budget =
mode === 'full' ? Number.MAX_SAFE_INTEGER : Math.max(config.minSamples, 100);
const universe = await reconciler.listSampleKeys(mode, budget);
const sample =
mode === 'full'
? universe
: ReconciliationHarness.pickSample(
universe,
config.sampleRate,
config.minSamples,
);
const results: ReconciliationSampleResult[] = [];
for (const key of sample) {
results.push(await reconciler.reconcile(key));
}
const drifted = results.filter((r) => !r.inSync);
const checked = results.length;
const driftRate = checked === 0 ? 0 : drifted.length / checked;
const breach = driftRate > config.breachThreshold;
const promotion =
mode === 'sampled'
? breach
? this.promotionTracker.recordBreach(name)
: this.promotionTracker.recordClean(name)
: {
readModelName: name,
state: breach ? ('breach' as const) : ('clean' as const),
consecutiveBreaches: this.promotionTracker.getConsecutiveBreaches(name),
shouldPromote: false,
};
this.logger.log(
`reconciliation_run read_model=${name} mode=${mode} checked=${checked} drifted=${drifted.length} drift_rate=${driftRate.toFixed(5)} breach=${breach}`,
'ReconciliationHarness',
);
return {
readModelName: name,
mode,
checked,
drifted: drifted.length,
driftRate,
breach,
driftReasons: drifted.map((d) => `${d.sampleKey}:${d.reason ?? 'drift'}`),
promotion,
};
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.logger.error(
`reconciliation_run_error read_model=${name} mode=${mode} err=${message}`,
err instanceof Error ? err.stack : undefined,
'ReconciliationHarness',
);
return {
readModelName: name,
mode,
checked: 0,
drifted: 0,
driftRate: 0,
breach: false,
driftReasons: [],
promotion: this.promotionTracker.recordError(name),
error: message,
};
}
}
/**
* FisherYates partial shuffle: takes `max(ceil(rate*N), minSamples)`
* items without replacement in O(k) time and O(1) extra allocations past
* the copy of the input array.
*
* `rng` lets tests inject a seeded PRNG; production uses `Math.random`.
*/
static pickSample<T>(
universe: readonly T[],
rate: number,
minSamples: number,
rng: () => number = Math.random,
): readonly T[] {
const n = universe.length;
if (n === 0) return [];
const target = Math.min(n, Math.max(minSamples, Math.ceil(rate * n)));
if (target >= n) return [...universe];
const pool = [...universe];
for (let i = 0; i < target; i += 1) {
const j = i + Math.floor(rng() * (n - i));
const tmp = pool[i] as T;
pool[i] = pool[j] as T;
pool[j] = tmp;
}
return pool.slice(0, target);
}
}

View File

@@ -15,3 +15,9 @@ export {
READ_MODEL_KILL_SWITCH,
type IReadModelKillSwitch,
} from './read-model-kill-switch';
export {
RECONCILER_REGISTRY,
type IReadModelReconciler,
type ReconciliationMode,
type ReconciliationSampleResult,
} from './reconciler';

View File

@@ -0,0 +1,49 @@
/**
* Per-read-model reconciler port — RFC-003 §7.
*
* Reconcilers compare projected read-model rows against the authoritative
* write-model and report whether each sampled key is in sync. The harness
* drives them on both cadences (1% sampled nightly + 100% weekly full).
*
* Phase 0 ships the port only; Phase 2 lands the first concrete reconciler.
*/
export interface ReconciliationSampleResult {
/** Opaque per-reconciler sample key (e.g. listing ID). */
readonly sampleKey: string;
/** `true` when read-model row matches the authoritative source. */
readonly inSync: boolean;
/** Optional human-readable drift reason (omit when `inSync === true`). */
readonly reason?: string;
}
export type ReconciliationMode = 'sampled' | 'full';
export interface IReadModelReconciler {
/** Stable read-model name; matches the `IReadRepository` name. */
readonly readModelName: string;
/**
* Returns the candidate universe of sample keys. For `mode === 'full'` the
* returned list SHOULD be the complete population; the harness still caps
* by `sampleBudget` for memory-safety.
*/
listSampleKeys(
mode: ReconciliationMode,
sampleBudget: number,
): Promise<readonly string[]>;
/**
* Compare one sample key against the authoritative source. MUST be pure
* (read-only) and MUST NOT throw for a single drift — return
* `{ inSync: false, reason }` instead.
*/
reconcile(sampleKey: string): Promise<ReconciliationSampleResult>;
}
/**
* Nest DI token for the `IReadModelReconciler[]` registry. Providers
* contribute reconcilers via a `useExisting` / `useClass` pattern; the
* registry itself is supplied as a plain array so the harness can iterate.
*/
export const RECONCILER_REGISTRY = Symbol('RECONCILER_REGISTRY');

View File

@@ -1,8 +1,20 @@
/**
* Reconciliation infrastructure (RFC-003 §7).
*
* Phase 0 ships only the placeholder. Phase 2 lands the sampled nightly
* (1%) drift checker; the weekly full reconciliation runs follow once
* Phase 2 has soaked in production for one cycle.
* Ships the Phase 0 harness:
* - `ReconciliationCronService` — sampled nightly + full weekly schedules
* guarded by a Redis SET NX EX lock for at-most-once execution.
* - Prometheus metric names re-exported so alert rules/dashboards can
* reference the constants.
*
* Concrete per-read-model reconcilers land with Phase 2 via the
* `RECONCILER_REGISTRY` DI token.
*/
export {};
export {
ReconciliationCronService,
RECON_SAMPLES_TOTAL,
RECON_DRIFT_TOTAL,
RECON_BREACH_TOTAL,
RECON_PROMOTION_TOTAL,
RECON_DURATION,
} from './reconciliation-cron.service';

View File

@@ -0,0 +1,138 @@
import { Injectable } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { InjectMetric } from '@willsoto/nestjs-prometheus';
import type { Counter, Histogram } from 'prom-client';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import.
import { LoggerService, RedisService } from '@modules/shared';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- DI needs value import.
import { ReconciliationHarness, type ReconciliationRunReport } from '../../application/reconciliation';
/** Prometheus metric names — exported so dashboards / alerts can reference them. */
export const RECON_SAMPLES_TOTAL = 'goodgo_read_model_reconciliation_samples_total';
export const RECON_DRIFT_TOTAL = 'goodgo_read_model_reconciliation_drift_total';
export const RECON_BREACH_TOTAL = 'goodgo_read_model_reconciliation_breach_total';
export const RECON_PROMOTION_TOTAL = 'goodgo_read_model_reconciliation_promotion_total';
export const RECON_DURATION = 'goodgo_read_model_reconciliation_duration_seconds';
const LOCK_PREFIX = 'recon:lock:';
const SAMPLED_LOCK_TTL_SEC = 5 * 60; // 5 minutes
const FULL_LOCK_TTL_SEC = 60 * 60; // 1 hour
/**
* Cron orchestrator for the reconciliation harness — RFC-003 §7.
*
* - Sampled nightly: 03:17 UTC (≈ 10:17 ICT). 1% sample per read model,
* alerts when drift > 0.1%, two consecutive breaches auto-page on-call.
* - Full weekly: 04:30 UTC Sunday (≈ 11:30 ICT). Walks every key.
*
* A Redis SET NX EX lock (`recon:lock:<mode>`) guarantees at-most-once
* execution across horizontally-scaled API instances.
*/
@Injectable()
export class ReconciliationCronService {
constructor(
private readonly harness: ReconciliationHarness,
private readonly redis: RedisService,
private readonly logger: LoggerService,
@InjectMetric(RECON_SAMPLES_TOTAL)
private readonly samplesCounter: Counter<string>,
@InjectMetric(RECON_DRIFT_TOTAL)
private readonly driftCounter: Counter<string>,
@InjectMetric(RECON_BREACH_TOTAL)
private readonly breachCounter: Counter<string>,
@InjectMetric(RECON_PROMOTION_TOTAL)
private readonly promotionCounter: Counter<string>,
@InjectMetric(RECON_DURATION)
private readonly durationHistogram: Histogram<string>,
) {}
@Cron('17 3 * * *', { name: 'reconciliation-sampled-nightly', timeZone: 'UTC' })
async runSampledNightly(): Promise<void> {
await this.runWithLock('sampled', () => this.harness.runSampled());
}
@Cron('30 4 * * 0', { name: 'reconciliation-full-weekly', timeZone: 'UTC' })
async runFullWeekly(): Promise<void> {
await this.runWithLock('full', () => this.harness.runFull());
}
private async runWithLock(
mode: 'sampled' | 'full',
run: () => Promise<readonly ReconciliationRunReport[]>,
): Promise<readonly ReconciliationRunReport[] | null> {
const ttl = mode === 'full' ? FULL_LOCK_TTL_SEC : SAMPLED_LOCK_TTL_SEC;
const lockKey = `${LOCK_PREFIX}${mode}`;
const token = `${process.pid}-${Date.now()}`;
let acquired = false;
try {
const client = this.redis.getClient();
const result = await client.set(lockKey, token, 'EX', ttl, 'NX');
acquired = result === 'OK';
} catch (err) {
this.logger.error(
`reconciliation_lock_error mode=${mode} err=${(err as Error).message}`,
(err as Error).stack,
'ReconciliationCronService',
);
// Fail closed on Redis errors — do not run unguarded.
return null;
}
if (!acquired) {
this.logger.log(
`reconciliation_skip mode=${mode} reason=lock_held`,
'ReconciliationCronService',
);
return null;
}
const startedAt = process.hrtime.bigint();
try {
const reports = await run();
this.recordMetrics(mode, reports, startedAt);
return reports;
} catch (err) {
this.logger.error(
`reconciliation_run_failed mode=${mode} err=${(err as Error).message}`,
(err as Error).stack,
'ReconciliationCronService',
);
return null;
} finally {
try {
// Best-effort lock release: only if we still own it.
const client = this.redis.getClient();
const current = await client.get(lockKey);
if (current === token) {
await client.del(lockKey);
}
} catch {
// Ignore — TTL will reap the lock.
}
}
}
private recordMetrics(
mode: 'sampled' | 'full',
reports: readonly ReconciliationRunReport[],
startedAt: bigint,
): void {
const elapsedSec = Number(process.hrtime.bigint() - startedAt) / 1e9;
this.durationHistogram.labels({ mode }).observe(elapsedSec);
for (const report of reports) {
const labels = { read_model: report.readModelName, mode };
this.samplesCounter.inc(labels, report.checked);
if (report.drifted > 0) {
this.driftCounter.inc(labels, report.drifted);
}
if (report.breach) {
this.breachCounter.inc(labels);
}
if (report.promotion.shouldPromote) {
this.promotionCounter.inc({ read_model: report.readModelName });
}
}
}
}

View File

@@ -1,34 +1,90 @@
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import {
PrometheusModule,
makeCounterProvider,
makeHistogramProvider,
} from '@willsoto/nestjs-prometheus';
import { SharedModule } from '@modules/shared';
import {
AutoPromotionTracker,
ReconciliationHarness,
} from './application/reconciliation';
import { READ_MODEL_KILL_SWITCH } from './domain/read-model-kill-switch';
import { RECONCILER_REGISTRY } from './domain/reconciler';
import { ConfigReadModelKillSwitch } from './infrastructure/config-read-model-kill-switch';
import {
ReconciliationCronService,
RECON_SAMPLES_TOTAL,
RECON_DRIFT_TOTAL,
RECON_BREACH_TOTAL,
RECON_PROMOTION_TOTAL,
RECON_DURATION,
} from './infrastructure/reconciliation';
/**
* Read-models module skeleton — RFC-003 Phase 0.
* Read-models module — RFC-003 Phase 0.
*
* Hosts:
* - Projector base class (`application/projectors/projector.base.ts`).
* - Read-model repository convention (`domain/read-repository.ts`).
* - Idempotency port (`domain/projection-offset-store.ts`).
* - Per-read-model kill switch (`domain/read-model-kill-switch.ts`).
* - Reconciliation harness + cron (sampled nightly + full weekly,
* RFC-003 §7 / [GOO-191](/GOO/issues/GOO-191)).
*
* No projectors, repositories, or `IProjectionOffsetStore` provider are
* registered here yet. The Prisma-backed offset store binding lands with
* [GOO-187](/GOO/issues/GOO-187); per-read-model projectors land in
* Phase 2/3.
*
* The module is imported by `AppModule` so its DI container is wired up
* even while empty — keeps Phase 2/3 PRs strictly additive.
* The Prisma-backed offset store binding lands with
* [GOO-187](/GOO/issues/GOO-187). `RECONCILER_REGISTRY` is provided as an
* empty array in Phase 0 — concrete reconcilers register against the
* token starting in Phase 2; the harness uses `@Optional()` so an empty
* registry simply results in no-op runs.
*/
@Module({
imports: [CqrsModule, SharedModule],
imports: [CqrsModule, SharedModule, PrometheusModule],
providers: [
{
provide: READ_MODEL_KILL_SWITCH,
useClass: ConfigReadModelKillSwitch,
},
{
provide: RECONCILER_REGISTRY,
useValue: [],
},
AutoPromotionTracker,
ReconciliationHarness,
ReconciliationCronService,
makeCounterProvider({
name: RECON_SAMPLES_TOTAL,
help: 'Total samples checked by the read-model reconciliation harness',
labelNames: ['read_model', 'mode'],
}),
makeCounterProvider({
name: RECON_DRIFT_TOTAL,
help: 'Total drifted samples detected by the read-model reconciliation harness',
labelNames: ['read_model', 'mode'],
}),
makeCounterProvider({
name: RECON_BREACH_TOTAL,
help: 'Reconciliation runs whose drift rate exceeded the breach threshold (>0.1%)',
labelNames: ['read_model', 'mode'],
}),
makeCounterProvider({
name: RECON_PROMOTION_TOTAL,
help: 'Auto-promotion events (2 consecutive sampled breaches) — pages on-call',
labelNames: ['read_model'],
}),
makeHistogramProvider({
name: RECON_DURATION,
help: 'Wall-clock duration of a reconciliation run in seconds',
labelNames: ['mode'],
buckets: [0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1800],
}),
],
exports: [
READ_MODEL_KILL_SWITCH,
RECONCILER_REGISTRY,
AutoPromotionTracker,
ReconciliationHarness,
],
exports: [READ_MODEL_KILL_SWITCH],
})
export class ReadModelsModule {}