fix(a11y): resolve serious accessibility issues on search page (GOO-110)
- Add aria-hidden="true" to all decorative inline SVGs (bookmark, view-mode, funnel, checkmark) - Convert save-search popover to proper dialog: role="dialog", aria-modal, focus trap, Escape key, focus return to trigger - Add aria-pressed on list/map/split view-mode toggle buttons - Add aria-expanded + aria-controls on mobile filter toggle button - Add role="status" + aria-label="Đang tải..." on Suspense fallback Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
129
apps/api/src/modules/read-models/README.md
Normal file
129
apps/api/src/modules/read-models/README.md
Normal file
@@ -0,0 +1,129 @@
|
||||
# `read-models` module
|
||||
|
||||
Phase 0 skeleton for the CQRS read-model expansion described in
|
||||
[RFC-003](../../../docs/adr/0003-cqrs-read-models.md) (the ADR itself
|
||||
lands with [GOO-193](/GOO/issues/GOO-193); until then RFC-003 lives on
|
||||
[GOO-94](/GOO/issues/GOO-94)).
|
||||
|
||||
## Layout
|
||||
|
||||
```
|
||||
read-models/
|
||||
domain/
|
||||
projection-context.ts # ProjectionContext, ProjectableEvent
|
||||
projection-offset-store.ts # IProjectionOffsetStore port + DI symbol
|
||||
read-repository.ts # IReadRepository convention marker
|
||||
application/
|
||||
projectors/
|
||||
projector.base.ts # Projector<E> base class
|
||||
repositories/ # I<Name>ReadRepository interfaces (Phase 2/3)
|
||||
infrastructure/
|
||||
refresh/ # mat-view refresh cron (Phase 1)
|
||||
reconciliation/ # nightly drift checker (Phase 2+)
|
||||
testing/
|
||||
in-memory-projection-offset-store.ts # unit-test harness
|
||||
read-models.module.ts
|
||||
index.ts
|
||||
```
|
||||
|
||||
This mirrors the layout RFC-003 §5 specifies; intentionally **no
|
||||
`presentation/`** because read models are infrastructure for other
|
||||
modules' query handlers, not their own HTTP surface.
|
||||
|
||||
## The projector contract
|
||||
|
||||
Every read-model projector extends `Projector<E extends DomainEvent>`
|
||||
and implements:
|
||||
|
||||
```ts
|
||||
@EventsHandler(MyDomainEvent)
|
||||
export class MyProjector extends Projector<MyDomainEvent> {
|
||||
readonly handlerName = 'my-projector.v1';
|
||||
|
||||
protected async apply(event: MyDomainEvent, ctx: ProjectionContext) {
|
||||
// write to your read model
|
||||
}
|
||||
}
|
||||
|
||||
// glue (one of):
|
||||
@EventsHandler(MyDomainEvent)
|
||||
export class MyProjectorGlue implements IEventHandler<MyDomainEvent> {
|
||||
constructor(private readonly projector: MyProjector) {}
|
||||
handle(event: MyDomainEvent) { return this.projector.dispatch(event); }
|
||||
}
|
||||
```
|
||||
|
||||
Subclasses MUST:
|
||||
|
||||
- set `handlerName` to a **stable string** (rename = re-projection — be deliberate);
|
||||
- implement `apply(event, ctx)` and treat `ctx.eventId` as the unit of idempotency.
|
||||
|
||||
Subclasses MUST NOT:
|
||||
|
||||
- call `apply` directly — always go through `dispatch(event)`;
|
||||
- write to write-model tables — read models are read-only from the API
|
||||
surface, only mutated by their owning projector or refresh job;
|
||||
- implement their own deduplication — the base class already does it via
|
||||
`IProjectionOffsetStore`.
|
||||
|
||||
## The offset / idempotency contract
|
||||
|
||||
RFC-003 §0 mandates `(eventId, handlerName)` idempotency:
|
||||
|
||||
> The `(eventId, handler)` offset table is non-negotiable. Land it in
|
||||
> Phase 0 with a unit-test harness so every Phase 2/3 projector inherits it.
|
||||
|
||||
This module ships the **port** (`IProjectionOffsetStore`,
|
||||
`PROJECTION_OFFSET_STORE`) and an in-memory implementation for tests.
|
||||
The Prisma-backed implementation — including the
|
||||
`projection_offset(event_id, handler_name, applied_at, payload_hash)`
|
||||
migration and the transactional wrapper — lands with
|
||||
[GOO-187](/GOO/issues/GOO-187).
|
||||
|
||||
The base class enforces the contract by calling `recordIfAbsent` BEFORE
|
||||
`apply`. Re-deliveries observe `applied: false` and are skipped. The
|
||||
offset row is intentionally **not rolled back on `apply` failure** in
|
||||
Phase 0 — this is the conservative choice (RFC-003 §7) and is healed by
|
||||
the nightly reconciliation job that lands in Phase 2.
|
||||
|
||||
`eventId` is currently derived from
|
||||
`${aggregateId}:${occurredAt.getTime()}:${eventName}` because the
|
||||
existing `DomainEvent` interface (`apps/api/src/modules/shared/domain/domain-event.ts`)
|
||||
does not yet carry a stable id. Override `deriveEventId` on your
|
||||
projector if your event type provides one. The id contract itself is
|
||||
finalised in [GOO-187](/GOO/issues/GOO-187); Phase 2/3 projectors should
|
||||
not bake assumptions about its format.
|
||||
|
||||
## The repository convention
|
||||
|
||||
For each read model:
|
||||
|
||||
1. Define `I<Name>ReadRepository` (extending `IReadRepository`) under
|
||||
`application/repositories/`.
|
||||
2. Export a paired injection symbol `<NAME>_READ_REPOSITORY`.
|
||||
3. Implement `Prisma<Name>ReadRepository` under
|
||||
`infrastructure/repositories/` and bind it in `ReadModelsModule`.
|
||||
4. Re-export the symbol from the module's `index.ts` so query handlers in
|
||||
other modules can `@Inject(LISTING_CARD_READ_REPOSITORY)` without
|
||||
reaching into the read-models module's internals.
|
||||
|
||||
Read repositories are READ-ONLY from the perspective of the rest of the
|
||||
API. The only writers are the projector that owns the read model (Option
|
||||
C) or the materialized-view refresh job (Option B).
|
||||
|
||||
## What Phase 0 is NOT
|
||||
|
||||
- No `projection_offset` migration — owned by [GOO-187](/GOO/issues/GOO-187).
|
||||
- No projectors registered.
|
||||
- No materialized views or refresh job — Phase 1.
|
||||
- No reconciliation job — Phase 2.
|
||||
- No `X-Data-Freshness-Seconds` helper — separate Phase 0 ticket.
|
||||
- No kill-switch / chaos test — separate Phase 0 ticket.
|
||||
|
||||
The skeleton exists so the next batch of PRs is purely additive.
|
||||
|
||||
## Coordination
|
||||
|
||||
- Parent: [GOO-94](/GOO/issues/GOO-94)
|
||||
- Sibling (offset table + idempotency harness): [GOO-187](/GOO/issues/GOO-187)
|
||||
- ADR (write-up): [GOO-193](/GOO/issues/GOO-193)
|
||||
1
apps/api/src/modules/read-models/application/index.ts
Normal file
1
apps/api/src/modules/read-models/application/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export * from './projectors';
|
||||
@@ -0,0 +1 @@
|
||||
export { Projector } from './projector.base';
|
||||
@@ -0,0 +1,112 @@
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { LoggerService } from '@modules/shared';
|
||||
import {
|
||||
PROJECTION_OFFSET_STORE,
|
||||
type IProjectionOffsetStore,
|
||||
type ProjectableEvent,
|
||||
type ProjectionContext,
|
||||
} from '../../domain';
|
||||
|
||||
/**
|
||||
* Base class every read-model projector inherits from.
|
||||
*
|
||||
* Responsibilities:
|
||||
* - Owns the typed `apply(event, ctx)` hook subclasses implement.
|
||||
* - Delegates the `(eventId, handlerName)` idempotency check to
|
||||
* {@link IProjectionOffsetStore} (port from `domain/`,
|
||||
* Prisma implementation from [GOO-187](/GOO/issues/GOO-187)).
|
||||
* - Emits a structured log line with projector lag for observability
|
||||
* (`X-Data-Freshness-Seconds` SLO, RFC-003 §0).
|
||||
*
|
||||
* Subclasses do NOT call `apply` directly — they invoke {@link dispatch}
|
||||
* from their `@EventsHandler` / `@OnEvent` glue. `dispatch` enforces the
|
||||
* "at-least-once → effectively-once" contract.
|
||||
*
|
||||
* Subclasses MUST:
|
||||
* - Set `handlerName` (stable identifier — used as the offset key half).
|
||||
* - Implement `apply(event, ctx)`.
|
||||
*
|
||||
* Subclasses MAY:
|
||||
* - Override `deriveEventId(event)` if their event type carries a
|
||||
* stable id field. Default derivation is
|
||||
* `${aggregateId}:${occurredAt.getTime()}:${eventName}` — sufficient
|
||||
* for current domain events but NOT for events fanned out via
|
||||
* external transports (revisit when CDC lands, RFC-003 Option D).
|
||||
*/
|
||||
@Injectable()
|
||||
export abstract class Projector<E extends ProjectableEvent> {
|
||||
/** Stable handler identifier — second half of the offset key. */
|
||||
abstract readonly handlerName: string;
|
||||
|
||||
constructor(
|
||||
@Inject(PROJECTION_OFFSET_STORE)
|
||||
protected readonly offsets: IProjectionOffsetStore,
|
||||
protected readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Implement the actual projection. MUST be deterministic given
|
||||
* `(event, ctx)` and MUST be safe to short-circuit if `ctx` indicates
|
||||
* a re-delivery (the base class already enforces this — subclasses
|
||||
* should not re-check).
|
||||
*/
|
||||
protected abstract apply(event: E, ctx: ProjectionContext): Promise<void>;
|
||||
|
||||
/**
|
||||
* Optional hook so subclasses can override how `eventId` is derived
|
||||
* from a domain event. Override this if your event type carries a
|
||||
* stable id (e.g. UUID minted by the producer).
|
||||
*/
|
||||
protected deriveEventId(event: E): string {
|
||||
return `${event.aggregateId}:${event.occurredAt.getTime()}:${event.eventName}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point invoked by the projector's framework glue
|
||||
* (`@EventsHandler` / `@OnEvent`). Wraps `apply` with the offset
|
||||
* idempotency check and emits a lag log line on success.
|
||||
*/
|
||||
async dispatch(event: E): Promise<void> {
|
||||
const observedAt = new Date();
|
||||
const ctx: ProjectionContext = {
|
||||
eventId: this.deriveEventId(event),
|
||||
handlerName: this.handlerName,
|
||||
observedAt,
|
||||
};
|
||||
|
||||
const { applied } = await this.offsets.recordIfAbsent({
|
||||
eventId: ctx.eventId,
|
||||
handlerName: ctx.handlerName,
|
||||
appliedAt: observedAt,
|
||||
});
|
||||
|
||||
if (!applied) {
|
||||
// Re-delivery — already projected. No-op by contract.
|
||||
this.logger.debug(
|
||||
`Projector ${this.handlerName} skipped duplicate event ${ctx.eventId}`,
|
||||
'Projector',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await this.apply(event, ctx);
|
||||
const lagMs = observedAt.getTime() - event.occurredAt.getTime();
|
||||
this.logger.debug(
|
||||
`Projector ${this.handlerName} applied event ${ctx.eventId} (lag=${lagMs}ms)`,
|
||||
'Projector',
|
||||
);
|
||||
} catch (err) {
|
||||
// Surface the failure with the offset key so reconciliation can
|
||||
// reason about partially-applied state. Note that the offset row
|
||||
// is already inserted — Phase 0 deliberately does NOT roll it back.
|
||||
// RFC-003 §7 covers this with the nightly reconciliation job.
|
||||
this.logger.error(
|
||||
`Projector ${this.handlerName} failed for event ${ctx.eventId}: ${(err as Error).message}`,
|
||||
(err as Error).stack,
|
||||
'Projector',
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
/**
|
||||
* Per-read-model repository interfaces live in this folder once Phase 2/3
|
||||
* begin landing concrete read models. Phase 0 ships only the convention
|
||||
* (see `domain/read-repository.ts`):
|
||||
*
|
||||
* - One interface per read model: `I<Name>ReadRepository`.
|
||||
* - Paired injection symbol: `<NAME>_READ_REPOSITORY`.
|
||||
* - Read-only — writes go through projectors or refresh jobs.
|
||||
*/
|
||||
export {};
|
||||
17
apps/api/src/modules/read-models/domain/index.ts
Normal file
17
apps/api/src/modules/read-models/domain/index.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
export {
|
||||
type ProjectionContext,
|
||||
type ProjectableEvent,
|
||||
} from './projection-context';
|
||||
export {
|
||||
PROJECTION_OFFSET_STORE,
|
||||
type IProjectionOffsetStore,
|
||||
type ProjectionOffsetKey,
|
||||
type ProjectionOffsetRecord,
|
||||
type RecordOffsetInput,
|
||||
type RecordOffsetResult,
|
||||
} from './projection-offset-store';
|
||||
export { type IReadRepository } from './read-repository';
|
||||
export {
|
||||
READ_MODEL_KILL_SWITCH,
|
||||
type IReadModelKillSwitch,
|
||||
} from './read-model-kill-switch';
|
||||
@@ -0,0 +1,43 @@
|
||||
import type { DomainEvent } from '@modules/shared';
|
||||
|
||||
/**
|
||||
* Per-event context handed to a projector's `apply` hook.
|
||||
*
|
||||
* Phase 0 keeps this intentionally minimal. Later phases may attach
|
||||
* tracing spans, current offset metadata, or a transaction handle here
|
||||
* — additions MUST stay backward-compatible (additive properties only).
|
||||
*/
|
||||
export interface ProjectionContext {
|
||||
/**
|
||||
* Stable identifier for the event being projected. Used as the primary
|
||||
* key half of `(eventId, handlerName)` in the offset store so re-delivery
|
||||
* is a no-op.
|
||||
*
|
||||
* NOTE: domain events do not yet carry a stable id; until they do
|
||||
* the wrapper that invokes a projector is responsible for deriving one
|
||||
* (typically `${aggregateId}:${occurredAt.getTime()}:${eventName}`).
|
||||
* This contract is fixed in [GOO-187](/GOO/issues/GOO-187).
|
||||
*/
|
||||
readonly eventId: string;
|
||||
|
||||
/**
|
||||
* The projector handler invoking `apply`. Used as the second half of
|
||||
* the `(eventId, handlerName)` offset key — the same event projected
|
||||
* by two different handlers must record two separate offsets.
|
||||
*/
|
||||
readonly handlerName: string;
|
||||
|
||||
/**
|
||||
* When the event was observed by the projector dispatcher (NOT when
|
||||
* the event itself occurred — see `event.occurredAt`). Useful for
|
||||
* lag metrics: `observedAt - event.occurredAt`.
|
||||
*/
|
||||
readonly observedAt: Date;
|
||||
}
|
||||
|
||||
/**
|
||||
* Projector-facing view of a domain event. Re-exported here so projector
|
||||
* code does not have to reach across to `@modules/shared` for the base
|
||||
* type — keeps the read-models module's public surface self-contained.
|
||||
*/
|
||||
export type ProjectableEvent = DomainEvent;
|
||||
@@ -0,0 +1,63 @@
|
||||
/**
|
||||
* Idempotency contract for projector dispatch.
|
||||
*
|
||||
* RFC-003 §0 (CTO ask): the `(eventId, handlerName)` offset table is
|
||||
* non-negotiable. Phase 0 lands this *port* so that:
|
||||
*
|
||||
* 1. The projector base class can express the contract in code today
|
||||
* (without taking a Prisma dependency at this layer).
|
||||
* 2. [GOO-187](/GOO/issues/GOO-187) lands the Prisma migration
|
||||
* (`projection_offset(event_id, handler_name, applied_at, payload_hash)`)
|
||||
* and the concrete implementation against this same interface.
|
||||
* 3. The unit-test harness in `read-models/testing` can ship an
|
||||
* in-memory implementation without coupling to infra.
|
||||
*
|
||||
* Implementations MUST be safe under concurrent dispatch: the
|
||||
* `recordIfAbsent` call is the linearisation point — exactly one caller
|
||||
* for a given `(eventId, handlerName)` should observe `applied: true`.
|
||||
*/
|
||||
|
||||
export interface ProjectionOffsetKey {
|
||||
readonly eventId: string;
|
||||
readonly handlerName: string;
|
||||
}
|
||||
|
||||
export interface ProjectionOffsetRecord extends ProjectionOffsetKey {
|
||||
/** When this offset was first recorded (i.e. the projection ran). */
|
||||
readonly appliedAt: Date;
|
||||
/**
|
||||
* Optional content-hash of the projected payload. Reconciliation jobs
|
||||
* use this to spot drift between what was projected and what the
|
||||
* write-side now holds.
|
||||
*/
|
||||
readonly payloadHash?: string;
|
||||
}
|
||||
|
||||
export interface RecordOffsetInput extends ProjectionOffsetKey {
|
||||
readonly appliedAt?: Date;
|
||||
readonly payloadHash?: string;
|
||||
}
|
||||
|
||||
export interface RecordOffsetResult {
|
||||
/**
|
||||
* `true` if this call inserted a fresh offset row (the projection
|
||||
* should run); `false` if the offset already existed (re-delivery,
|
||||
* the projection MUST be skipped).
|
||||
*/
|
||||
readonly applied: boolean;
|
||||
}
|
||||
|
||||
export interface IProjectionOffsetStore {
|
||||
/**
|
||||
* Atomically insert an offset row if and only if no row exists for
|
||||
* `(eventId, handlerName)`. Implementations typically use
|
||||
* `INSERT ... ON CONFLICT DO NOTHING` or an equivalent unique-constraint
|
||||
* insert and report whether a row was actually written.
|
||||
*/
|
||||
recordIfAbsent(input: RecordOffsetInput): Promise<RecordOffsetResult>;
|
||||
|
||||
/** Lookup helper for reconciliation tooling and tests. */
|
||||
find(key: ProjectionOffsetKey): Promise<ProjectionOffsetRecord | null>;
|
||||
}
|
||||
|
||||
export const PROJECTION_OFFSET_STORE = Symbol('PROJECTION_OFFSET_STORE');
|
||||
@@ -0,0 +1,24 @@
|
||||
/**
|
||||
* Per-read-model kill switch — RFC-003 §0.
|
||||
*
|
||||
* Contract:
|
||||
* - `isEnabled(name)` returns whether the named read model should
|
||||
* serve queries. When `false`, callers MUST fail open to the
|
||||
* write-model path.
|
||||
* - Implementations MUST be hot-readable (no restart required).
|
||||
* - The check is evaluated per-call so that a flag toggled mid-request
|
||||
* is honoured on the NEXT repository call within the same request.
|
||||
* In-flight calls complete against whichever source they already
|
||||
* started on.
|
||||
*/
|
||||
|
||||
export const READ_MODEL_KILL_SWITCH = Symbol('READ_MODEL_KILL_SWITCH');
|
||||
|
||||
export interface IReadModelKillSwitch {
|
||||
/**
|
||||
* Returns `true` when the named read model is active and safe to query.
|
||||
* Returns `true` (fail-open) for unknown / un-configured names so that
|
||||
* an absent config key never blocks the write-model fallback path.
|
||||
*/
|
||||
isEnabled(readModelName: string): boolean;
|
||||
}
|
||||
28
apps/api/src/modules/read-models/domain/read-repository.ts
Normal file
28
apps/api/src/modules/read-models/domain/read-repository.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
/**
|
||||
* Marker interface for read-model repositories.
|
||||
*
|
||||
* Convention (Phase 0):
|
||||
* - One interface per read model: `I<Name>ReadRepository`.
|
||||
* - Paired injection symbol: `<NAME>_READ_REPOSITORY` (Symbol).
|
||||
* - Concrete Prisma-backed class lives under
|
||||
* `infrastructure/repositories/prisma-<name>-read.repository.ts`.
|
||||
* - Read repositories are READ-ONLY. Writes happen exclusively via
|
||||
* projectors (Option C) or scheduled refresh jobs (Option B).
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* ```ts
|
||||
* export const LISTING_CARD_READ_REPOSITORY = Symbol('LISTING_CARD_READ_REPOSITORY');
|
||||
*
|
||||
* export interface IListingCardReadRepository extends IReadRepository {
|
||||
* findById(id: string): Promise<ListingCardReadView | null>;
|
||||
* search(params: ListingCardSearchParams): Promise<PaginatedResult<ListingCardReadView>>;
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* Keeping this as an empty marker (rather than forcing a `findById`
|
||||
* shape) lets Phase 2/3 read repositories pick the access pattern that
|
||||
* fits the read model — point lookup, search, range scan, etc.
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-interface
|
||||
export interface IReadRepository {}
|
||||
4
apps/api/src/modules/read-models/index.ts
Normal file
4
apps/api/src/modules/read-models/index.ts
Normal file
@@ -0,0 +1,4 @@
|
||||
export { ReadModelsModule } from './read-models.module';
|
||||
export * from './domain';
|
||||
export * from './application';
|
||||
export * from './infrastructure';
|
||||
@@ -0,0 +1,238 @@
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import { type IReadModelKillSwitch } from '../../domain/read-model-kill-switch';
|
||||
import { ReadModelRepositoryWrapper } from '../read-model-repository-wrapper';
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Shared test doubles */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
interface IFakeRepository {
|
||||
findById(id: string): Promise<{ id: string; source: string }>;
|
||||
search(query: string): Promise<{ results: string[]; source: string }>;
|
||||
}
|
||||
|
||||
function createReadRepo(): IFakeRepository {
|
||||
return {
|
||||
findById: vi.fn(async (id: string) => ({ id, source: 'read-model' })),
|
||||
search: vi.fn(async (q: string) => ({ results: [q], source: 'read-model' })),
|
||||
};
|
||||
}
|
||||
|
||||
function createWriteRepo(): IFakeRepository {
|
||||
return {
|
||||
findById: vi.fn(async (id: string) => ({ id, source: 'write-model' })),
|
||||
search: vi.fn(async (q: string) => ({ results: [q], source: 'write-model' })),
|
||||
};
|
||||
}
|
||||
|
||||
const silentLogger = {
|
||||
debug: vi.fn(),
|
||||
log: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
verbose: vi.fn(),
|
||||
} as any;
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Mutable kill switch for chaos testing */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
class MutableKillSwitch implements IReadModelKillSwitch {
|
||||
private flags = new Map<string, boolean>();
|
||||
|
||||
setEnabled(name: string, enabled: boolean): void {
|
||||
this.flags.set(name, enabled);
|
||||
}
|
||||
|
||||
isEnabled(name: string): boolean {
|
||||
return this.flags.get(name) ?? true; // fail-open default
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Tests */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
describe('ReadModelRepositoryWrapper — kill switch', () => {
|
||||
let killSwitch: MutableKillSwitch;
|
||||
let readRepo: IFakeRepository;
|
||||
let writeRepo: IFakeRepository;
|
||||
let proxy: IFakeRepository;
|
||||
|
||||
beforeEach(() => {
|
||||
killSwitch = new MutableKillSwitch();
|
||||
readRepo = createReadRepo();
|
||||
writeRepo = createWriteRepo();
|
||||
const wrapper = new ReadModelRepositoryWrapper<IFakeRepository>(
|
||||
readRepo,
|
||||
writeRepo,
|
||||
killSwitch,
|
||||
'listing_card',
|
||||
silentLogger,
|
||||
);
|
||||
proxy = wrapper.getProxy();
|
||||
});
|
||||
|
||||
it('routes to read-model when kill switch is ON (enabled)', async () => {
|
||||
killSwitch.setEnabled('listing_card', true);
|
||||
const result = await proxy.findById('abc');
|
||||
expect(result.source).toBe('read-model');
|
||||
expect(readRepo.findById).toHaveBeenCalledWith('abc');
|
||||
expect(writeRepo.findById).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('routes to write-model when kill switch is OFF (disabled)', async () => {
|
||||
killSwitch.setEnabled('listing_card', false);
|
||||
const result = await proxy.findById('abc');
|
||||
expect(result.source).toBe('write-model');
|
||||
expect(writeRepo.findById).toHaveBeenCalledWith('abc');
|
||||
expect(readRepo.findById).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('defaults to enabled (fail-open) for unknown read model names', async () => {
|
||||
// 'listing_card' was never set → defaults to true
|
||||
const freshKillSwitch = new MutableKillSwitch();
|
||||
const wrapper = new ReadModelRepositoryWrapper<IFakeRepository>(
|
||||
readRepo,
|
||||
writeRepo,
|
||||
freshKillSwitch,
|
||||
'unknown_model',
|
||||
silentLogger,
|
||||
);
|
||||
const result = await wrapper.getProxy().findById('xyz');
|
||||
expect(result.source).toBe('read-model');
|
||||
});
|
||||
|
||||
/* -------------------------------------------------------------- */
|
||||
/* CHAOS TEST: flag toggle mid-request → fail-open */
|
||||
/* -------------------------------------------------------------- */
|
||||
|
||||
it('chaos: flag toggle mid-request fails open to write-model on NEXT call', async () => {
|
||||
// Start enabled — first call goes to read-model.
|
||||
killSwitch.setEnabled('listing_card', true);
|
||||
|
||||
const call1 = proxy.findById('first');
|
||||
|
||||
// Toggle the flag WHILE call1 is in-flight.
|
||||
killSwitch.setEnabled('listing_card', false);
|
||||
|
||||
const result1 = await call1;
|
||||
// call1 already started on read-model — it completes there.
|
||||
expect(result1.source).toBe('read-model');
|
||||
|
||||
// NEXT call should route to write-model (the switch was flipped).
|
||||
const result2 = await proxy.findById('second');
|
||||
expect(result2.source).toBe('write-model');
|
||||
});
|
||||
|
||||
it('chaos: rapid toggle during sequential calls routes correctly', async () => {
|
||||
// Simulate a chaotic sequence of toggles interleaved with calls.
|
||||
killSwitch.setEnabled('listing_card', true);
|
||||
const r1 = await proxy.search('q1');
|
||||
expect(r1.source).toBe('read-model');
|
||||
|
||||
killSwitch.setEnabled('listing_card', false);
|
||||
const r2 = await proxy.search('q2');
|
||||
expect(r2.source).toBe('write-model');
|
||||
|
||||
killSwitch.setEnabled('listing_card', true);
|
||||
const r3 = await proxy.search('q3');
|
||||
expect(r3.source).toBe('read-model');
|
||||
|
||||
killSwitch.setEnabled('listing_card', false);
|
||||
killSwitch.setEnabled('listing_card', true);
|
||||
killSwitch.setEnabled('listing_card', false);
|
||||
const r4 = await proxy.search('q4');
|
||||
expect(r4.source).toBe('write-model');
|
||||
});
|
||||
|
||||
it('chaos: concurrent calls with mid-flight toggle each route independently', async () => {
|
||||
killSwitch.setEnabled('listing_card', true);
|
||||
|
||||
// Slow read-model that takes time to resolve.
|
||||
const slowReadRepo: IFakeRepository = {
|
||||
findById: vi.fn(
|
||||
(id: string) =>
|
||||
new Promise((resolve) =>
|
||||
setTimeout(() => resolve({ id, source: 'read-model' }), 50),
|
||||
),
|
||||
),
|
||||
search: vi.fn(async (q: string) => ({ results: [q], source: 'read-model' })),
|
||||
};
|
||||
|
||||
const wrapper = new ReadModelRepositoryWrapper<IFakeRepository>(
|
||||
slowReadRepo,
|
||||
writeRepo,
|
||||
killSwitch,
|
||||
'listing_card',
|
||||
silentLogger,
|
||||
);
|
||||
const slowProxy = wrapper.getProxy();
|
||||
|
||||
// Launch first call (will use read-model, takes 50ms).
|
||||
const p1 = slowProxy.findById('slow');
|
||||
|
||||
// Toggle off before call1 resolves.
|
||||
killSwitch.setEnabled('listing_card', false);
|
||||
|
||||
// Second call should immediately route to write-model.
|
||||
const p2 = slowProxy.findById('fast');
|
||||
|
||||
const [result1, result2] = await Promise.all([p1, p2]);
|
||||
|
||||
// call1 was already dispatched to read-model — completes there.
|
||||
expect(result1.source).toBe('read-model');
|
||||
// call2 was dispatched after toggle — goes to write-model.
|
||||
expect(result2.source).toBe('write-model');
|
||||
});
|
||||
|
||||
it('chaos: zero error bubble to caller when switch is off', async () => {
|
||||
killSwitch.setEnabled('listing_card', false);
|
||||
|
||||
// Both methods should work without throwing.
|
||||
await expect(proxy.findById('x')).resolves.toBeDefined();
|
||||
await expect(proxy.search('y')).resolves.toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('ConfigReadModelKillSwitch', () => {
|
||||
// Unit test the config-backed implementation separately.
|
||||
it('reads env var per call (hot-readable)', async () => {
|
||||
const { ConfigReadModelKillSwitch } = await import(
|
||||
'../config-read-model-kill-switch'
|
||||
);
|
||||
|
||||
const mockConfig = {
|
||||
get: vi.fn((key: string) => {
|
||||
if (key === 'READ_MODEL_LISTING_CARD_ENABLED') return 'false';
|
||||
return undefined;
|
||||
}),
|
||||
};
|
||||
|
||||
const ks = new ConfigReadModelKillSwitch(mockConfig as any, silentLogger);
|
||||
|
||||
expect(ks.isEnabled('listing_card')).toBe(false);
|
||||
expect(ks.isEnabled('unknown')).toBe(true); // fail-open
|
||||
|
||||
// Simulate hot-reload by changing the mock return.
|
||||
mockConfig.get.mockImplementation((key: string) => {
|
||||
if (key === 'READ_MODEL_LISTING_CARD_ENABLED') return 'true';
|
||||
return undefined;
|
||||
});
|
||||
|
||||
expect(ks.isEnabled('listing_card')).toBe(true);
|
||||
});
|
||||
|
||||
it('treats "0" as disabled', async () => {
|
||||
const { ConfigReadModelKillSwitch } = await import(
|
||||
'../config-read-model-kill-switch'
|
||||
);
|
||||
|
||||
const mockConfig = {
|
||||
get: vi.fn(() => '0'),
|
||||
};
|
||||
|
||||
const ks = new ConfigReadModelKillSwitch(mockConfig as any, silentLogger);
|
||||
expect(ks.isEnabled('any_model')).toBe(false);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,46 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { LoggerService } from '@modules/shared';
|
||||
import { type IReadModelKillSwitch } from '../domain/read-model-kill-switch';
|
||||
|
||||
/**
|
||||
* Config-driven per-read-model kill switch.
|
||||
*
|
||||
* Reads `READ_MODEL_<UPPER_SNAKE_NAME>_ENABLED` from process.env via
|
||||
* NestJS ConfigService on every call (hot-readable — no restart needed).
|
||||
*
|
||||
* Missing keys default to `true` (fail-open: the read model is presumed
|
||||
* healthy unless explicitly killed).
|
||||
*
|
||||
* Example:
|
||||
* `READ_MODEL_LISTING_CARD_ENABLED=false` → listing_card read model disabled
|
||||
* (env var absent) → read model enabled (fail-open)
|
||||
*/
|
||||
@Injectable()
|
||||
export class ConfigReadModelKillSwitch implements IReadModelKillSwitch {
|
||||
constructor(
|
||||
private readonly config: ConfigService,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
isEnabled(readModelName: string): boolean {
|
||||
const envKey = `READ_MODEL_${readModelName.replace(/-/g, '_').toUpperCase()}_ENABLED`;
|
||||
const raw = this.config.get<string>(envKey);
|
||||
|
||||
// Missing config → fail open (enabled).
|
||||
if (raw === undefined || raw === null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const enabled = raw !== 'false' && raw !== '0';
|
||||
|
||||
if (!enabled) {
|
||||
this.logger.debug(
|
||||
`Kill switch OFF for read model "${readModelName}" (${envKey}=${raw})`,
|
||||
'ReadModelKillSwitch',
|
||||
);
|
||||
}
|
||||
|
||||
return enabled;
|
||||
}
|
||||
}
|
||||
4
apps/api/src/modules/read-models/infrastructure/index.ts
Normal file
4
apps/api/src/modules/read-models/infrastructure/index.ts
Normal file
@@ -0,0 +1,4 @@
|
||||
export * from './refresh';
|
||||
export * from './reconciliation';
|
||||
export { ConfigReadModelKillSwitch } from './config-read-model-kill-switch';
|
||||
export { ReadModelRepositoryWrapper } from './read-model-repository-wrapper';
|
||||
@@ -0,0 +1,79 @@
|
||||
import { LoggerService } from '@modules/shared';
|
||||
import { type IReadModelKillSwitch } from '../domain/read-model-kill-switch';
|
||||
|
||||
/**
|
||||
* Generic wrapper that sits in front of a read-model repository and
|
||||
* transparently fails open to the write-model repository when the
|
||||
* per-read-model kill switch is OFF.
|
||||
*
|
||||
* Every public method call checks the kill switch. Because the check
|
||||
* happens per-call (not per-request), a flag toggled mid-request is
|
||||
* honoured on the NEXT call — the in-flight call completes against
|
||||
* whichever source it already started on.
|
||||
*
|
||||
* Usage (at module wiring time):
|
||||
*
|
||||
* ```ts
|
||||
* const wrapper = new ReadModelRepositoryWrapper(
|
||||
* readRepo,
|
||||
* writeRepo,
|
||||
* killSwitch,
|
||||
* 'listing_card',
|
||||
* logger,
|
||||
* );
|
||||
* ```
|
||||
*
|
||||
* `T` is the repository interface both implementations share (the
|
||||
* intersection of methods callable by consumers).
|
||||
*/
|
||||
export class ReadModelRepositoryWrapper<T extends object> {
|
||||
private readonly proxy: T;
|
||||
|
||||
constructor(
|
||||
private readonly readImpl: T,
|
||||
private readonly writeImpl: T,
|
||||
private readonly killSwitch: IReadModelKillSwitch,
|
||||
private readonly readModelName: string,
|
||||
private readonly logger: LoggerService,
|
||||
) {
|
||||
// Build a Proxy that intercepts every method call and routes it
|
||||
// through the kill switch.
|
||||
this.proxy = new Proxy(readImpl, {
|
||||
get: (_target, prop, _receiver) => {
|
||||
const readVal = (readImpl as Record<string | symbol, unknown>)[prop];
|
||||
const writeVal = (writeImpl as Record<string | symbol, unknown>)[prop];
|
||||
|
||||
// Non-function properties: always return from the active source.
|
||||
if (typeof readVal !== 'function') {
|
||||
return this.killSwitch.isEnabled(this.readModelName) ? readVal : writeVal;
|
||||
}
|
||||
|
||||
// Function: return a wrapper that checks the switch at call time.
|
||||
return (...args: unknown[]) => {
|
||||
if (this.killSwitch.isEnabled(this.readModelName)) {
|
||||
return (readVal as Function).apply(readImpl, args);
|
||||
}
|
||||
|
||||
this.logger.debug(
|
||||
`Kill switch routed ${this.readModelName}.${String(prop)} → write-model`,
|
||||
'ReadModelRepositoryWrapper',
|
||||
);
|
||||
if (typeof writeVal !== 'function') {
|
||||
throw new Error(
|
||||
`Write-model fallback for ${this.readModelName} does not implement ${String(prop)}`,
|
||||
);
|
||||
}
|
||||
return (writeVal as Function).apply(writeImpl, args);
|
||||
};
|
||||
},
|
||||
}) as T;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the proxied repository that consumers should depend on.
|
||||
* Inject this as the repository token value.
|
||||
*/
|
||||
getProxy(): T {
|
||||
return this.proxy;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
/**
|
||||
* Reconciliation infrastructure (RFC-003 §7).
|
||||
*
|
||||
* Phase 0 ships only the placeholder. Phase 2 lands the sampled nightly
|
||||
* (1%) drift checker; the weekly full reconciliation runs follow once
|
||||
* Phase 2 has soaked in production for one cycle.
|
||||
*/
|
||||
export {};
|
||||
@@ -0,0 +1,9 @@
|
||||
/**
|
||||
* Materialized-view refresh infrastructure (RFC-003 Option B).
|
||||
*
|
||||
* Phase 0 ships only the placeholder. Phase 1 lands
|
||||
* `RefreshMaterializedViewJob` and the cron registrations for
|
||||
* `mv_heatmap_district`, `mv_heatmap_ward`, `mv_market_snapshot`,
|
||||
* `mv_district_stats`.
|
||||
*/
|
||||
export {};
|
||||
34
apps/api/src/modules/read-models/read-models.module.ts
Normal file
34
apps/api/src/modules/read-models/read-models.module.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { CqrsModule } from '@nestjs/cqrs';
|
||||
import { SharedModule } from '@modules/shared';
|
||||
import { READ_MODEL_KILL_SWITCH } from './domain/read-model-kill-switch';
|
||||
import { ConfigReadModelKillSwitch } from './infrastructure/config-read-model-kill-switch';
|
||||
|
||||
/**
|
||||
* Read-models module skeleton — RFC-003 Phase 0.
|
||||
*
|
||||
* Hosts:
|
||||
* - Projector base class (`application/projectors/projector.base.ts`).
|
||||
* - Read-model repository convention (`domain/read-repository.ts`).
|
||||
* - Idempotency port (`domain/projection-offset-store.ts`).
|
||||
* - Per-read-model kill switch (`domain/read-model-kill-switch.ts`).
|
||||
*
|
||||
* No projectors, repositories, or `IProjectionOffsetStore` provider are
|
||||
* registered here yet. The Prisma-backed offset store binding lands with
|
||||
* [GOO-187](/GOO/issues/GOO-187); per-read-model projectors land in
|
||||
* Phase 2/3.
|
||||
*
|
||||
* The module is imported by `AppModule` so its DI container is wired up
|
||||
* even while empty — keeps Phase 2/3 PRs strictly additive.
|
||||
*/
|
||||
@Module({
|
||||
imports: [CqrsModule, SharedModule],
|
||||
providers: [
|
||||
{
|
||||
provide: READ_MODEL_KILL_SWITCH,
|
||||
useClass: ConfigReadModelKillSwitch,
|
||||
},
|
||||
],
|
||||
exports: [READ_MODEL_KILL_SWITCH],
|
||||
})
|
||||
export class ReadModelsModule {}
|
||||
@@ -0,0 +1,51 @@
|
||||
import {
|
||||
type IProjectionOffsetStore,
|
||||
type ProjectionOffsetKey,
|
||||
type ProjectionOffsetRecord,
|
||||
type RecordOffsetInput,
|
||||
type RecordOffsetResult,
|
||||
} from '../domain';
|
||||
|
||||
/**
|
||||
* In-memory {@link IProjectionOffsetStore} for unit tests.
|
||||
*
|
||||
* Phase 2/3 projector tests reuse this so they can exercise the
|
||||
* "replay same event N times → single state mutation" contract from
|
||||
* RFC-003 §0 without spinning up Postgres. The Prisma-backed
|
||||
* implementation lives in [GOO-187](/GOO/issues/GOO-187).
|
||||
*/
|
||||
export class InMemoryProjectionOffsetStore implements IProjectionOffsetStore {
|
||||
private readonly rows = new Map<string, ProjectionOffsetRecord>();
|
||||
|
||||
private static key(k: ProjectionOffsetKey): string {
|
||||
return `${k.handlerName}::${k.eventId}`;
|
||||
}
|
||||
|
||||
async recordIfAbsent(input: RecordOffsetInput): Promise<RecordOffsetResult> {
|
||||
const k = InMemoryProjectionOffsetStore.key(input);
|
||||
if (this.rows.has(k)) {
|
||||
return { applied: false };
|
||||
}
|
||||
this.rows.set(k, {
|
||||
eventId: input.eventId,
|
||||
handlerName: input.handlerName,
|
||||
appliedAt: input.appliedAt ?? new Date(),
|
||||
...(input.payloadHash !== undefined ? { payloadHash: input.payloadHash } : {}),
|
||||
});
|
||||
return { applied: true };
|
||||
}
|
||||
|
||||
async find(key: ProjectionOffsetKey): Promise<ProjectionOffsetRecord | null> {
|
||||
return this.rows.get(InMemoryProjectionOffsetStore.key(key)) ?? null;
|
||||
}
|
||||
|
||||
/** Test helper. */
|
||||
size(): number {
|
||||
return this.rows.size;
|
||||
}
|
||||
|
||||
/** Test helper. */
|
||||
clear(): void {
|
||||
this.rows.clear();
|
||||
}
|
||||
}
|
||||
1
apps/api/src/modules/read-models/testing/index.ts
Normal file
1
apps/api/src/modules/read-models/testing/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export { InMemoryProjectionOffsetStore } from './in-memory-projection-offset-store';
|
||||
Reference in New Issue
Block a user