feat(read-models): add projection_offset table + idempotency harness (GOO-187)
RFC-003 §0 Phase 0 — the (eventId, handlerName) offset contract.
- prisma: ProjectionOffset model + 20260424100000_add_projection_offset
migration (composite PK, handlerName/appliedAt index for reconciliation)
- infra: PrismaProjectionOffsetStore (createMany skipDuplicates +
applyWithOffset transactional helper that rolls offset+mutation back
together on failure)
- module: bind PROJECTION_OFFSET_STORE → PrismaProjectionOffsetStore
- testing: assertProjectorIdempotent harness ("replay N times → single
state mutation") for Phase 2/3 projector specs to reuse
- tests: 12 specs lock the contract — replay 5x → 1 mutation, broken
projectors fail loudly, two projectors keep independent offsets
Note: prisma format normalised existing column alignment when the new
model was added; the meaningful diff is the appended ProjectionOffset
block at the bottom of schema.prisma.
Acceptance criteria from issue:
- migration applies cleanly (validated via prisma format/parse)
- harness exported from read-models/testing and used by example
projector spec at __tests__/idempotency-harness.spec.ts
Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -0,0 +1,282 @@
|
||||
import { describe, it, expect, beforeEach, vi } from 'vitest';
|
||||
import { Projector } from '../application/projectors/projector.base';
|
||||
import type {
|
||||
IProjectionOffsetStore,
|
||||
ProjectableEvent,
|
||||
ProjectionContext,
|
||||
} from '../domain';
|
||||
import {
|
||||
InMemoryProjectionOffsetStore,
|
||||
assertProjectorIdempotent,
|
||||
} from '../testing';
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Fixture event + projector */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
interface ListingCreatedEvent extends ProjectableEvent {
|
||||
readonly eventName: 'listing.created';
|
||||
readonly aggregateId: string;
|
||||
readonly occurredAt: Date;
|
||||
readonly payload: { title: string };
|
||||
}
|
||||
|
||||
class InMemoryListingCardRepo {
|
||||
private readonly rows = new Map<string, string>();
|
||||
|
||||
upsert(id: string, title: string): void {
|
||||
this.rows.set(id, title);
|
||||
}
|
||||
|
||||
size(): number {
|
||||
return this.rows.size;
|
||||
}
|
||||
|
||||
get(id: string): string | undefined {
|
||||
return this.rows.get(id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Example projector demonstrating Phase 2/3 usage of the Phase 0
|
||||
* scaffolding ([GOO-187](/GOO/issues/GOO-187)). Real projectors land
|
||||
* later — this one exists purely to lock the harness contract.
|
||||
*/
|
||||
class ListingCardProjector extends Projector<ListingCreatedEvent> {
|
||||
readonly handlerName = 'listing-card.v1';
|
||||
|
||||
public applyCalls = 0;
|
||||
|
||||
constructor(
|
||||
store: IProjectionOffsetStore,
|
||||
logger: any,
|
||||
private readonly repo: InMemoryListingCardRepo,
|
||||
) {
|
||||
super(store, logger);
|
||||
}
|
||||
|
||||
protected override async apply(
|
||||
event: ListingCreatedEvent,
|
||||
_ctx: ProjectionContext,
|
||||
): Promise<void> {
|
||||
this.applyCalls += 1;
|
||||
this.repo.upsert(event.aggregateId, event.payload.title);
|
||||
}
|
||||
}
|
||||
|
||||
const silentLogger = {
|
||||
debug: vi.fn(),
|
||||
log: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
verbose: vi.fn(),
|
||||
};
|
||||
|
||||
function makeEvent(overrides: Partial<ListingCreatedEvent> = {}): ListingCreatedEvent {
|
||||
return {
|
||||
eventName: 'listing.created',
|
||||
aggregateId: overrides.aggregateId ?? 'listing-1',
|
||||
occurredAt: overrides.occurredAt ?? new Date('2026-04-24T00:00:00.000Z'),
|
||||
payload: overrides.payload ?? { title: 'A rooftop flat in D2' },
|
||||
};
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Tests — RFC-003 §0 idempotency harness */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
describe('RFC-003 Phase 0 — projector idempotency harness', () => {
|
||||
let store: InMemoryProjectionOffsetStore;
|
||||
let repo: InMemoryListingCardRepo;
|
||||
let projector: ListingCardProjector;
|
||||
|
||||
beforeEach(() => {
|
||||
store = new InMemoryProjectionOffsetStore();
|
||||
repo = new InMemoryListingCardRepo();
|
||||
projector = new ListingCardProjector(store, silentLogger as any, repo);
|
||||
});
|
||||
|
||||
it('replay same event 5 times → single state mutation', async () => {
|
||||
const event = makeEvent();
|
||||
|
||||
await assertProjectorIdempotent({
|
||||
projector,
|
||||
event,
|
||||
store,
|
||||
countMutations: () => repo.size(),
|
||||
replays: 5,
|
||||
});
|
||||
|
||||
expect(projector.applyCalls).toBe(1);
|
||||
expect(repo.get('listing-1')).toBe('A rooftop flat in D2');
|
||||
});
|
||||
|
||||
it('default replays = 3 still enforces the contract', async () => {
|
||||
const event = makeEvent();
|
||||
|
||||
await assertProjectorIdempotent({
|
||||
projector,
|
||||
event,
|
||||
store,
|
||||
countMutations: () => repo.size(),
|
||||
});
|
||||
|
||||
expect(projector.applyCalls).toBe(1);
|
||||
});
|
||||
|
||||
it('rejects replays < 2 (contract requires at least one retry)', async () => {
|
||||
const event = makeEvent();
|
||||
|
||||
await expect(
|
||||
assertProjectorIdempotent({
|
||||
projector,
|
||||
event,
|
||||
store,
|
||||
countMutations: () => repo.size(),
|
||||
replays: 1,
|
||||
}),
|
||||
).rejects.toThrow(/replays >= 2/);
|
||||
});
|
||||
|
||||
it('FAILS LOUDLY when a projector violates the contract', async () => {
|
||||
// Pathological projector that mutates state on EVERY dispatch,
|
||||
// bypassing the offset store. This simulates a future projector
|
||||
// author who forgets to route through the base class.
|
||||
class BrokenProjector extends Projector<ListingCreatedEvent> {
|
||||
readonly handlerName = 'broken.v1';
|
||||
constructor(
|
||||
offsetStore: IProjectionOffsetStore,
|
||||
logger: any,
|
||||
private readonly r: InMemoryListingCardRepo,
|
||||
) {
|
||||
super(offsetStore, logger);
|
||||
}
|
||||
override async dispatch(event: ListingCreatedEvent): Promise<void> {
|
||||
this.r.upsert(`${event.aggregateId}-${this.r.size()}`, event.payload.title);
|
||||
}
|
||||
protected async apply(): Promise<void> {
|
||||
/* unused — dispatch is overridden */
|
||||
}
|
||||
}
|
||||
|
||||
const broken = new BrokenProjector(store, silentLogger as any, repo);
|
||||
|
||||
await expect(
|
||||
assertProjectorIdempotent({
|
||||
projector: broken as unknown as Projector<ListingCreatedEvent>,
|
||||
event: makeEvent(),
|
||||
store,
|
||||
countMutations: () => repo.size(),
|
||||
replays: 3,
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
});
|
||||
|
||||
it('different events from the same aggregate each project exactly once', async () => {
|
||||
const e1 = makeEvent({ occurredAt: new Date('2026-04-24T00:00:00.000Z') });
|
||||
const e2 = makeEvent({ occurredAt: new Date('2026-04-24T00:00:01.000Z') });
|
||||
|
||||
await projector.dispatch(e1);
|
||||
await projector.dispatch(e1); // replay
|
||||
await projector.dispatch(e2);
|
||||
await projector.dispatch(e2); // replay
|
||||
|
||||
expect(projector.applyCalls).toBe(2);
|
||||
expect(store.size()).toBe(2);
|
||||
});
|
||||
|
||||
it('same event for TWO projectors records two independent offsets', async () => {
|
||||
// Two distinct handlers consuming the same event must both run —
|
||||
// the offset key is `(eventId, handlerName)`, not just `eventId`.
|
||||
class SecondProjector extends Projector<ListingCreatedEvent> {
|
||||
readonly handlerName = 'listing-search-index.v1';
|
||||
public applyCalls = 0;
|
||||
protected async apply(): Promise<void> {
|
||||
this.applyCalls += 1;
|
||||
}
|
||||
}
|
||||
|
||||
const p2 = new SecondProjector(store, silentLogger as any);
|
||||
const event = makeEvent();
|
||||
|
||||
await projector.dispatch(event);
|
||||
await projector.dispatch(event);
|
||||
await p2.dispatch(event);
|
||||
await p2.dispatch(event);
|
||||
|
||||
expect(projector.applyCalls).toBe(1);
|
||||
expect(p2.applyCalls).toBe(1);
|
||||
expect(store.size()).toBe(2);
|
||||
});
|
||||
|
||||
it('offset record is retained with appliedAt after projection', async () => {
|
||||
const event = makeEvent();
|
||||
await projector.dispatch(event);
|
||||
|
||||
const eventId = `${event.aggregateId}:${event.occurredAt.getTime()}:${event.eventName}`;
|
||||
const record = await store.find({ eventId, handlerName: 'listing-card.v1' });
|
||||
|
||||
expect(record).not.toBeNull();
|
||||
expect(record!.eventId).toBe(eventId);
|
||||
expect(record!.handlerName).toBe('listing-card.v1');
|
||||
expect(record!.appliedAt).toBeInstanceOf(Date);
|
||||
});
|
||||
});
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* InMemoryProjectionOffsetStore — direct port tests */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
describe('InMemoryProjectionOffsetStore', () => {
|
||||
let store: InMemoryProjectionOffsetStore;
|
||||
|
||||
beforeEach(() => {
|
||||
store = new InMemoryProjectionOffsetStore();
|
||||
});
|
||||
|
||||
it('recordIfAbsent: first insert returns applied=true', async () => {
|
||||
const r = await store.recordIfAbsent({ eventId: 'e1', handlerName: 'h1' });
|
||||
expect(r.applied).toBe(true);
|
||||
expect(store.size()).toBe(1);
|
||||
});
|
||||
|
||||
it('recordIfAbsent: duplicate returns applied=false and keeps first row', async () => {
|
||||
const now = new Date('2026-04-24T00:00:00.000Z');
|
||||
await store.recordIfAbsent({ eventId: 'e1', handlerName: 'h1', appliedAt: now });
|
||||
const r2 = await store.recordIfAbsent({
|
||||
eventId: 'e1',
|
||||
handlerName: 'h1',
|
||||
appliedAt: new Date('2026-04-25T00:00:00.000Z'),
|
||||
});
|
||||
|
||||
expect(r2.applied).toBe(false);
|
||||
const record = await store.find({ eventId: 'e1', handlerName: 'h1' });
|
||||
expect(record!.appliedAt.toISOString()).toBe(now.toISOString());
|
||||
});
|
||||
|
||||
it('find: returns null for unknown key', async () => {
|
||||
const record = await store.find({ eventId: 'missing', handlerName: 'h1' });
|
||||
expect(record).toBeNull();
|
||||
});
|
||||
|
||||
it('find: returns the full record including payloadHash when provided', async () => {
|
||||
await store.recordIfAbsent({
|
||||
eventId: 'e1',
|
||||
handlerName: 'h1',
|
||||
payloadHash: 'sha256:abc',
|
||||
});
|
||||
|
||||
const r = await store.find({ eventId: 'e1', handlerName: 'h1' });
|
||||
expect(r).toMatchObject({
|
||||
eventId: 'e1',
|
||||
handlerName: 'h1',
|
||||
payloadHash: 'sha256:abc',
|
||||
});
|
||||
});
|
||||
|
||||
it('clear(): test helper wipes all rows', async () => {
|
||||
await store.recordIfAbsent({ eventId: 'e1', handlerName: 'h1' });
|
||||
store.clear();
|
||||
expect(store.size()).toBe(0);
|
||||
});
|
||||
});
|
||||
@@ -2,3 +2,4 @@ export * from './refresh';
|
||||
export * from './reconciliation';
|
||||
export { ConfigReadModelKillSwitch } from './config-read-model-kill-switch';
|
||||
export { ReadModelRepositoryWrapper } from './read-model-repository-wrapper';
|
||||
export { PrismaProjectionOffsetStore } from './prisma-projection-offset-store';
|
||||
|
||||
@@ -0,0 +1,105 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { PrismaService } from '@modules/shared';
|
||||
import {
|
||||
type IProjectionOffsetStore,
|
||||
type ProjectionOffsetKey,
|
||||
type ProjectionOffsetRecord,
|
||||
type RecordOffsetInput,
|
||||
type RecordOffsetResult,
|
||||
} from '../domain';
|
||||
|
||||
/**
|
||||
* Postgres-backed implementation of {@link IProjectionOffsetStore} for
|
||||
* the `projection_offset` table (RFC-003 §0, [GOO-187](/GOO/issues/GOO-187)).
|
||||
*
|
||||
* `recordIfAbsent` uses `INSERT ... ON CONFLICT DO NOTHING` against the
|
||||
* composite PK `(eventId, handlerName)` and reports whether a row was
|
||||
* actually inserted. The operation is idempotent and safe under
|
||||
* concurrent dispatch — exactly one caller observes `applied: true`.
|
||||
*
|
||||
* For the transactional "write read-model state + record offset
|
||||
* atomically" workflow that projectors need, use {@link applyWithOffset}.
|
||||
*/
|
||||
@Injectable()
|
||||
export class PrismaProjectionOffsetStore implements IProjectionOffsetStore {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async recordIfAbsent(input: RecordOffsetInput): Promise<RecordOffsetResult> {
|
||||
const result = await this.prisma.projectionOffset.createMany({
|
||||
data: {
|
||||
eventId: input.eventId,
|
||||
handlerName: input.handlerName,
|
||||
appliedAt: input.appliedAt ?? new Date(),
|
||||
...(input.payloadHash !== undefined
|
||||
? { payloadHash: input.payloadHash }
|
||||
: {}),
|
||||
},
|
||||
skipDuplicates: true,
|
||||
});
|
||||
|
||||
return { applied: result.count === 1 };
|
||||
}
|
||||
|
||||
async find(key: ProjectionOffsetKey): Promise<ProjectionOffsetRecord | null> {
|
||||
const row = await this.prisma.projectionOffset.findUnique({
|
||||
where: {
|
||||
eventId_handlerName: {
|
||||
eventId: key.eventId,
|
||||
handlerName: key.handlerName,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
if (!row) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
eventId: row.eventId,
|
||||
handlerName: row.handlerName,
|
||||
appliedAt: row.appliedAt,
|
||||
...(row.payloadHash !== null ? { payloadHash: row.payloadHash } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Transactional helper that wraps a projector's read-model mutation
|
||||
* + offset insert into a single Postgres transaction.
|
||||
*
|
||||
* 1. `INSERT INTO projection_offset ... ON CONFLICT DO NOTHING`.
|
||||
* 2. If the insert was a no-op, the event was already projected —
|
||||
* we short-circuit without invoking `mutate`.
|
||||
* 3. Otherwise, invoke `mutate(tx)` so the caller can write to
|
||||
* read-model tables with the same transaction handle.
|
||||
* 4. Any throw inside `mutate` rolls BOTH the offset row and the
|
||||
* mutation back — the next delivery re-runs cleanly.
|
||||
*
|
||||
* Returns `{ applied: true }` when the mutation ran, `{ applied: false }`
|
||||
* when the event was a re-delivery.
|
||||
*/
|
||||
async applyWithOffset(
|
||||
input: RecordOffsetInput,
|
||||
mutate: (tx: unknown) => Promise<void>,
|
||||
): Promise<RecordOffsetResult> {
|
||||
return this.prisma.$transaction(async (tx) => {
|
||||
const result = await tx.projectionOffset.createMany({
|
||||
data: {
|
||||
eventId: input.eventId,
|
||||
handlerName: input.handlerName,
|
||||
appliedAt: input.appliedAt ?? new Date(),
|
||||
...(input.payloadHash !== undefined
|
||||
? { payloadHash: input.payloadHash }
|
||||
: {}),
|
||||
},
|
||||
skipDuplicates: true,
|
||||
});
|
||||
|
||||
if (result.count !== 1) {
|
||||
return { applied: false };
|
||||
}
|
||||
|
||||
await mutate(tx);
|
||||
return { applied: true };
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,10 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { CqrsModule } from '@nestjs/cqrs';
|
||||
import { SharedModule } from '@modules/shared';
|
||||
import { PROJECTION_OFFSET_STORE } from './domain/projection-offset-store';
|
||||
import { READ_MODEL_KILL_SWITCH } from './domain/read-model-kill-switch';
|
||||
import { ConfigReadModelKillSwitch } from './infrastructure/config-read-model-kill-switch';
|
||||
import { PrismaProjectionOffsetStore } from './infrastructure/prisma-projection-offset-store';
|
||||
|
||||
/**
|
||||
* Read-models module skeleton — RFC-003 Phase 0.
|
||||
@@ -10,16 +12,15 @@ import { ConfigReadModelKillSwitch } from './infrastructure/config-read-model-ki
|
||||
* Hosts:
|
||||
* - Projector base class (`application/projectors/projector.base.ts`).
|
||||
* - Read-model repository convention (`domain/read-repository.ts`).
|
||||
* - Idempotency port (`domain/projection-offset-store.ts`).
|
||||
* - Idempotency port (`domain/projection-offset-store.ts`) +
|
||||
* Prisma-backed implementation bound to the `projection_offset`
|
||||
* table ([GOO-187](/GOO/issues/GOO-187)).
|
||||
* - Per-read-model kill switch (`domain/read-model-kill-switch.ts`).
|
||||
*
|
||||
* No projectors, repositories, or `IProjectionOffsetStore` provider are
|
||||
* registered here yet. The Prisma-backed offset store binding lands with
|
||||
* [GOO-187](/GOO/issues/GOO-187); per-read-model projectors land in
|
||||
* Phase 2/3.
|
||||
*
|
||||
* The module is imported by `AppModule` so its DI container is wired up
|
||||
* even while empty — keeps Phase 2/3 PRs strictly additive.
|
||||
* No projectors or repositories are registered here yet — those land in
|
||||
* Phase 2/3. The module is imported by `AppModule` so its DI container
|
||||
* is wired up even while otherwise empty, keeping Phase 2/3 PRs
|
||||
* strictly additive.
|
||||
*/
|
||||
@Module({
|
||||
imports: [CqrsModule, SharedModule],
|
||||
@@ -28,7 +29,16 @@ import { ConfigReadModelKillSwitch } from './infrastructure/config-read-model-ki
|
||||
provide: READ_MODEL_KILL_SWITCH,
|
||||
useClass: ConfigReadModelKillSwitch,
|
||||
},
|
||||
{
|
||||
provide: PROJECTION_OFFSET_STORE,
|
||||
useClass: PrismaProjectionOffsetStore,
|
||||
},
|
||||
PrismaProjectionOffsetStore,
|
||||
],
|
||||
exports: [
|
||||
READ_MODEL_KILL_SWITCH,
|
||||
PROJECTION_OFFSET_STORE,
|
||||
PrismaProjectionOffsetStore,
|
||||
],
|
||||
exports: [READ_MODEL_KILL_SWITCH],
|
||||
})
|
||||
export class ReadModelsModule {}
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
import { expect } from 'vitest';
|
||||
import type { Projector } from '../application/projectors/projector.base';
|
||||
import type { ProjectableEvent } from '../domain';
|
||||
import { InMemoryProjectionOffsetStore } from './in-memory-projection-offset-store';
|
||||
|
||||
/**
|
||||
* Reusable idempotency harness for Phase 2/3 projector tests.
|
||||
*
|
||||
* RFC-003 §0 (CTO ask) requires every projector to reduce "replay same
|
||||
* event N times" to a single state mutation. Writing that assertion by
|
||||
* hand in every projector test is repetitive and easy to get subtly
|
||||
* wrong. This helper centralises the replay + count-mutations contract
|
||||
* so a new projector's test can opt in with one call.
|
||||
*
|
||||
* Contract verified on the projector under test:
|
||||
* - Exactly one offset row is recorded after N dispatches.
|
||||
* - The caller-supplied `countMutations()` returns `1` regardless of
|
||||
* how many times `dispatch` ran. Projectors that bypass the base
|
||||
* class idempotency (e.g. write to their read model outside of
|
||||
* `apply`) will fail this check.
|
||||
*
|
||||
* Example:
|
||||
* ```ts
|
||||
* const store = new InMemoryProjectionOffsetStore();
|
||||
* const repo = new InMemoryListingCardRepo();
|
||||
* const projector = new ListingCardProjector(store, silentLogger, repo);
|
||||
* const event = makeListingCreatedEvent();
|
||||
*
|
||||
* await assertProjectorIdempotent({
|
||||
* projector,
|
||||
* event,
|
||||
* store,
|
||||
* countMutations: () => repo.size(),
|
||||
* replays: 5,
|
||||
* });
|
||||
* ```
|
||||
*/
|
||||
export interface AssertProjectorIdempotentOptions<E extends ProjectableEvent> {
|
||||
/** Projector under test. */
|
||||
readonly projector: Projector<E>;
|
||||
/** Domain event to replay. */
|
||||
readonly event: E;
|
||||
/** Offset store the projector was constructed with. */
|
||||
readonly store: InMemoryProjectionOffsetStore;
|
||||
/**
|
||||
* Caller-supplied reader that returns the number of state mutations
|
||||
* the projector has emitted against its read model. Typically the
|
||||
* size of an in-memory repo, or a row count from a test database.
|
||||
*/
|
||||
readonly countMutations: () => number | Promise<number>;
|
||||
/** Number of replays to send. Defaults to 3; must be >= 2. */
|
||||
readonly replays?: number;
|
||||
}
|
||||
|
||||
export async function assertProjectorIdempotent<E extends ProjectableEvent>(
|
||||
options: AssertProjectorIdempotentOptions<E>,
|
||||
): Promise<void> {
|
||||
const replays = options.replays ?? 3;
|
||||
|
||||
if (replays < 2) {
|
||||
throw new Error(
|
||||
`assertProjectorIdempotent requires replays >= 2 (got ${replays}); ` +
|
||||
'the contract is about "replay N times → single mutation".',
|
||||
);
|
||||
}
|
||||
|
||||
const startingOffsets = options.store.size();
|
||||
|
||||
// Replay N times. Each dispatch either applies (first time) or is a
|
||||
// no-op (subsequent times). We never call `apply` directly.
|
||||
for (let i = 0; i < replays; i += 1) {
|
||||
await options.projector.dispatch(options.event);
|
||||
}
|
||||
|
||||
const mutations = await options.countMutations();
|
||||
expect(
|
||||
mutations,
|
||||
`expected exactly 1 state mutation after ${replays} replays, got ${mutations}`,
|
||||
).toBe(1);
|
||||
|
||||
expect(
|
||||
options.store.size() - startingOffsets,
|
||||
'exactly one offset row should be recorded for the replayed event',
|
||||
).toBe(1);
|
||||
}
|
||||
@@ -1 +1,5 @@
|
||||
export { InMemoryProjectionOffsetStore } from './in-memory-projection-offset-store';
|
||||
export {
|
||||
assertProjectorIdempotent,
|
||||
type AssertProjectorIdempotentOptions,
|
||||
} from './assert-projector-idempotent';
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
-- RFC-003 Phase 0 (GOO-187): projection_offset table.
|
||||
--
|
||||
-- Idempotency contract for CQRS projectors. Every projector dispatch
|
||||
-- wraps `apply()` in a transaction that inserts (event_id, handler_name)
|
||||
-- here. Re-deliveries hit the composite primary key, roll back, and the
|
||||
-- projector observes a no-op.
|
||||
--
|
||||
-- Port: apps/api/src/modules/read-models/domain/projection-offset-store.ts
|
||||
-- Prisma adapter: apps/api/src/modules/read-models/infrastructure/prisma-projection-offset-store.ts
|
||||
-- Test harness: apps/api/src/modules/read-models/testing/
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "projection_offset" (
|
||||
"eventId" TEXT NOT NULL,
|
||||
"handlerName" TEXT NOT NULL,
|
||||
"appliedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"payloadHash" TEXT,
|
||||
|
||||
CONSTRAINT "projection_offset_pkey" PRIMARY KEY ("eventId", "handlerName")
|
||||
);
|
||||
|
||||
-- CreateIndex (handler-scoped scans for reconciliation tooling)
|
||||
CREATE INDEX "projection_offset_handlerName_appliedAt_idx" ON "projection_offset"("handlerName", "appliedAt" DESC);
|
||||
@@ -61,30 +61,30 @@ model User {
|
||||
totpBackupCodes String[] // Bcrypt-hashed backup codes
|
||||
totpEnabledAt DateTime?
|
||||
|
||||
agent Agent?
|
||||
listings Listing[]
|
||||
savedSearches SavedSearch[]
|
||||
subscription Subscription?
|
||||
payments Payment[]
|
||||
reviews Review[]
|
||||
inquiriesSent Inquiry[]
|
||||
refreshTokens RefreshToken[]
|
||||
oauthAccounts OAuthAccount[]
|
||||
buyerTransactions Transaction[] @relation("BuyerTransactions")
|
||||
buyerOrders Order[] @relation("BuyerOrders")
|
||||
sellerOrders Order[] @relation("SellerOrders")
|
||||
mfaChallenges MfaChallenge[]
|
||||
transferListings TransferListing[]
|
||||
reports Report[]
|
||||
savedListings SavedListing[]
|
||||
agent Agent?
|
||||
listings Listing[]
|
||||
savedSearches SavedSearch[]
|
||||
subscription Subscription?
|
||||
payments Payment[]
|
||||
reviews Review[]
|
||||
inquiriesSent Inquiry[]
|
||||
refreshTokens RefreshToken[]
|
||||
oauthAccounts OAuthAccount[]
|
||||
buyerTransactions Transaction[] @relation("BuyerTransactions")
|
||||
buyerOrders Order[] @relation("BuyerOrders")
|
||||
sellerOrders Order[] @relation("SellerOrders")
|
||||
mfaChallenges MfaChallenge[]
|
||||
transferListings TransferListing[]
|
||||
reports Report[]
|
||||
savedListings SavedListing[]
|
||||
/// Dự án BĐS do user này làm chủ đầu tư (role=DEVELOPER).
|
||||
ownedProjects ProjectDevelopment[] @relation("ProjectOwner")
|
||||
ownedProjects ProjectDevelopment[] @relation("ProjectOwner")
|
||||
/// KCN do user này vận hành (role=PARK_OPERATOR).
|
||||
ownedIndustrialParks IndustrialPark[] @relation("IndustrialParkOwner")
|
||||
zaloAccountLink ZaloAccountLink?
|
||||
notificationLogs NotificationLog[]
|
||||
industrialListingsSelling IndustrialListing[] @relation("IndustrialListingSeller")
|
||||
listingFlagsReported ListingFlag[] @relation("listingFlagsReported")
|
||||
ownedIndustrialParks IndustrialPark[] @relation("IndustrialParkOwner")
|
||||
zaloAccountLink ZaloAccountLink?
|
||||
notificationLogs NotificationLog[]
|
||||
industrialListingsSelling IndustrialListing[] @relation("IndustrialListingSeller")
|
||||
listingFlagsReported ListingFlag[] @relation("listingFlagsReported")
|
||||
|
||||
@@index([role])
|
||||
@@index([kycStatus])
|
||||
@@ -153,20 +153,20 @@ model OAuthAccount {
|
||||
/// template messages to a linked user via ZNS.
|
||||
/// Token fields are AES-256-GCM encrypted at the application layer.
|
||||
model ZaloAccountLink {
|
||||
id String @id @default(cuid())
|
||||
userId String @unique
|
||||
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
|
||||
id String @id @default(cuid())
|
||||
userId String @unique
|
||||
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
|
||||
/// Zalo user ID scoped to the Official Account (OA UID, not Social Graph UID)
|
||||
zaloUserId String @unique
|
||||
zaloUserId String @unique
|
||||
/// AES-256-GCM encrypted access token (base64url: iv.tag.ciphertext)
|
||||
accessToken String
|
||||
accessToken String
|
||||
/// AES-256-GCM encrypted refresh token (base64url: iv.tag.ciphertext)
|
||||
refreshToken String
|
||||
expiresAt DateTime
|
||||
refreshToken String
|
||||
expiresAt DateTime
|
||||
/// Unix epoch (seconds) of the last user→OA interaction; used for 24-hour window check
|
||||
lastInteractAt DateTime?
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
@@index([zaloUserId])
|
||||
@@index([expiresAt])
|
||||
@@ -188,8 +188,8 @@ model Agent {
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
listings Listing[]
|
||||
leads Lead[]
|
||||
listings Listing[]
|
||||
leads Lead[]
|
||||
industrialListings IndustrialListing[] @relation("IndustrialListingAgent")
|
||||
|
||||
@@index([qualityScore])
|
||||
@@ -424,10 +424,10 @@ model Listing {
|
||||
saveCount Int @default(0)
|
||||
inquiryCount Int @default(0)
|
||||
featuredUntil DateTime?
|
||||
featuredPackage String? /// "3_days" | "7_days" | "30_days"
|
||||
expiresAt DateTime?
|
||||
expiryNotifiedAt DateTime?
|
||||
publishedAt DateTime?
|
||||
featuredPackage String? /// "3_days" | "7_days" | "30_days"
|
||||
expiresAt DateTime?
|
||||
expiryNotifiedAt DateTime?
|
||||
publishedAt DateTime?
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
|
||||
@@ -498,7 +498,7 @@ model ListingFlag {
|
||||
reporterId String
|
||||
reporter User @relation("listingFlagsReported", fields: [reporterId], references: [id], onDelete: Restrict)
|
||||
reason FlagReason
|
||||
description String? /// Mô tả chi tiết (tuỳ chọn)
|
||||
description String? /// Mô tả chi tiết (tuỳ chọn)
|
||||
status FlagStatus @default(PENDING)
|
||||
reviewedBy String?
|
||||
reviewedAt DateTime?
|
||||
@@ -1508,13 +1508,13 @@ model SystemSetting {
|
||||
// [GOO-21]
|
||||
|
||||
model VnProvince {
|
||||
code String @id // GSO province code, zero-padded (e.g. "01", "79")
|
||||
name String // Canonical Vietnamese name, e.g. "Thành phố Hồ Chí Minh"
|
||||
nameEn String?
|
||||
type String // "Thành phố Trung ương" | "Tỉnh"
|
||||
codename String // slug, e.g. "thanh_pho_ho_chi_minh"
|
||||
phoneCode Int?
|
||||
districts VnDistrict[]
|
||||
code String @id // GSO province code, zero-padded (e.g. "01", "79")
|
||||
name String // Canonical Vietnamese name, e.g. "Thành phố Hồ Chí Minh"
|
||||
nameEn String?
|
||||
type String // "Thành phố Trung ương" | "Tỉnh"
|
||||
codename String // slug, e.g. "thanh_pho_ho_chi_minh"
|
||||
phoneCode Int?
|
||||
districts VnDistrict[]
|
||||
|
||||
@@index([codename])
|
||||
@@map("vn_provinces")
|
||||
|
||||
Reference in New Issue
Block a user