diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index e23427f..3e09bf0 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -62,6 +62,7 @@ import { AppController } from './app.controller'; AdminModule, AnalyticsModule, MetricsModule, + MetricsModule.withQueueMetrics(), McpIntegrationModule, MessagingModule, ReportsModule, diff --git a/apps/api/src/modules/metrics/infrastructure/__tests__/queue-metrics.collector.spec.ts b/apps/api/src/modules/metrics/infrastructure/__tests__/queue-metrics.collector.spec.ts new file mode 100644 index 0000000..b2595fd --- /dev/null +++ b/apps/api/src/modules/metrics/infrastructure/__tests__/queue-metrics.collector.spec.ts @@ -0,0 +1,93 @@ +import { Counter, Gauge, Registry } from 'prom-client'; +import { describe, expect, it, vi } from 'vitest'; +import { + QueueMetricsCollector, + type QueueLike, +} from '../queue-metrics.collector'; + +function makeMetrics() { + const registry = new Registry(); + const depth = new Gauge({ + name: 'goodgo_queue_depth', + help: 'depth', + labelNames: ['queue', 'state'], + registers: [registry], + }); + const outcomes = new Counter({ + name: 'goodgo_queue_job_outcomes_total', + help: 'outcomes', + labelNames: ['queue', 'outcome'], + registers: [registry], + }); + return { registry, depth, outcomes }; +} + +function makeQueue(name: string, counts: Record): QueueLike { + return { + name, + async getJobCounts(..._types: string[]): Promise> { + return counts; + }, + }; +} + +describe('QueueMetricsCollector', () => { + it('samples each queue and writes labelled gauge values', async () => { + const { depth, outcomes } = makeMetrics(); + const q = makeQueue('report-generation', { waiting: 3, active: 1, completed: 100, failed: 2, delayed: 0 }); + const collector = new QueueMetricsCollector([q], depth, outcomes, { + setInterval: () => 0 as unknown as ReturnType, + clearInterval: () => undefined, + }); + await collector.sampleOnce(); + const v = await depth.get(); + const byState = Object.fromEntries(v.values.map((s) => [s.labels['state'], s.value])); + expect(byState).toMatchObject({ waiting: 3, active: 1, completed: 100, failed: 2, delayed: 0 }); + }); + + it('does not throw when getJobCounts rejects (e.g. Redis down)', async () => { + const { depth, outcomes } = makeMetrics(); + const broken: QueueLike = { + name: 'broken', + async getJobCounts() { + throw new Error('redis down'); + }, + }; + const collector = new QueueMetricsCollector([broken], depth, outcomes, { + setInterval: () => 0 as unknown as ReturnType, + clearInterval: () => undefined, + }); + await expect(collector.sampleOnce()).resolves.toBeUndefined(); + }); + + it('recordJobOutcome increments the outcome counter', async () => { + const { depth, outcomes } = makeMetrics(); + const collector = new QueueMetricsCollector([], depth, outcomes, { + setInterval: () => 0 as unknown as ReturnType, + clearInterval: () => undefined, + }); + collector.recordJobOutcome('report-generation', 'completed'); + collector.recordJobOutcome('report-generation', 'failed'); + collector.recordJobOutcome('report-generation', 'completed'); + const v = await outcomes.get(); + const completed = v.values.find((s) => s.labels['outcome'] === 'completed'); + const failed = v.values.find((s) => s.labels['outcome'] === 'failed'); + expect(completed?.value).toBe(2); + expect(failed?.value).toBe(1); + }); + + it('onModuleInit schedules the timer and onModuleDestroy clears it', () => { + const { depth, outcomes } = makeMetrics(); + const setIntervalSpy = vi.fn(() => 'h' as unknown as ReturnType); + const clearIntervalSpy = vi.fn(); + const collector = new QueueMetricsCollector([], depth, outcomes, { + intervalMs: 1234, + setInterval: setIntervalSpy, + clearInterval: clearIntervalSpy, + }); + collector.onModuleInit(); + expect(setIntervalSpy).toHaveBeenCalledWith(expect.any(Function), 1234); + collector.onModuleDestroy(); + expect(clearIntervalSpy).toHaveBeenCalledWith('h'); + }); +}); diff --git a/apps/api/src/modules/metrics/infrastructure/queue-metrics.collector.ts b/apps/api/src/modules/metrics/infrastructure/queue-metrics.collector.ts new file mode 100644 index 0000000..e67006c --- /dev/null +++ b/apps/api/src/modules/metrics/infrastructure/queue-metrics.collector.ts @@ -0,0 +1,103 @@ +import { Inject, Injectable, Logger, type OnModuleDestroy, type OnModuleInit } from '@nestjs/common'; +import { InjectMetric } from '@willsoto/nestjs-prometheus'; +import type { Queue } from 'bullmq'; +import { Counter, Gauge } from 'prom-client'; +import { QUEUE_DEPTH_GAUGE, QUEUE_JOB_OUTCOMES_TOTAL } from './queue-metrics.constants'; + +/** + * Minimal subset of BullMQ's Queue surface needed by the collector. + * Defined here so unit tests can pass a plain object without a live Redis. + */ +export interface QueueLike { + readonly name: string; + getJobCounts(...types: string[]): Promise>; +} + +export interface QueueMetricsCollectorOptions { + /** Polling interval in ms. Default 5_000. */ + intervalMs?: number; + /** Inject a clock for tests; defaults to setInterval/clearInterval. */ + setInterval?: (fn: () => void, ms: number) => ReturnType; + clearInterval?: (handle: ReturnType) => void; +} + +export const QUEUE_METRICS_COLLECTOR_QUEUES = 'QUEUE_METRICS_COLLECTOR_QUEUES'; +export const QUEUE_METRICS_COLLECTOR_OPTIONS = 'QUEUE_METRICS_COLLECTOR_OPTIONS'; + +/** + * Samples every registered BullMQ queue on a timer and updates the + * `goodgo_queue_depth` gauge. The gauge carries a `state` label so a single + * metric name fans out to waiting / active / completed / failed / delayed. + * + * Job-outcome counters (`goodgo_queue_job_outcomes_total`) are incremented + * from processor hooks rather than by polling so they capture every job, not + * just the ones alive at tick time. + */ +@Injectable() +export class QueueMetricsCollector implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(QueueMetricsCollector.name); + private handle: ReturnType | null = null; + private readonly intervalMs: number; + private readonly setIntervalFn: NonNullable; + private readonly clearIntervalFn: NonNullable; + + constructor( + @Inject(QUEUE_METRICS_COLLECTOR_QUEUES) private readonly queues: ReadonlyArray, + @InjectMetric(QUEUE_DEPTH_GAUGE) private readonly depthGauge: Gauge, + @InjectMetric(QUEUE_JOB_OUTCOMES_TOTAL) private readonly outcomes: Counter, + @Inject(QUEUE_METRICS_COLLECTOR_OPTIONS) options: QueueMetricsCollectorOptions = {}, + ) { + this.intervalMs = options.intervalMs ?? 5_000; + this.setIntervalFn = options.setInterval ?? ((fn, ms) => setInterval(fn, ms)); + this.clearIntervalFn = options.clearInterval ?? ((handle) => clearInterval(handle)); + } + + onModuleInit(): void { + // Kick off an immediate sample so gauges are populated before the first + // timer tick — useful for /metrics scrapes that land in the first 5 s. + void this.sampleOnce(); + this.handle = this.setIntervalFn(() => { + void this.sampleOnce(); + }, this.intervalMs); + } + + onModuleDestroy(): void { + if (this.handle) { + this.clearIntervalFn(this.handle); + this.handle = null; + } + } + + /** + * Increment the outcome counter. Call from a processor's `@OnWorkerEvent` + * completed/failed hook so every job is accounted for, not just the ones + * present during a poll tick. + */ + recordJobOutcome(queueName: string, outcome: 'completed' | 'failed'): void { + this.outcomes.labels(queueName, outcome).inc(1); + } + + /** Exposed for tests. */ + async sampleOnce(): Promise { + for (const queue of this.queues) { + try { + const counts = await queue.getJobCounts('waiting', 'active', 'completed', 'failed', 'delayed'); + for (const [state, count] of Object.entries(counts)) { + this.depthGauge.labels(queue.name, state).set(count); + } + } catch (err) { + // Redis outage or queue not yet ready — log and keep going. The + // gauge retains its last value; Prometheus `rate` / `absent()` + // alerts cover the "stopped updating" case. + this.logger.warn( + `queue-metrics sample failed for ${queue.name}: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + } +} + +/** Casts a real BullMQ Queue to the minimal surface the collector needs. */ +export function asQueueLike(q: Queue): QueueLike { + return q; +} diff --git a/apps/api/src/modules/metrics/infrastructure/queue-metrics.constants.ts b/apps/api/src/modules/metrics/infrastructure/queue-metrics.constants.ts new file mode 100644 index 0000000..15c3c45 --- /dev/null +++ b/apps/api/src/modules/metrics/infrastructure/queue-metrics.constants.ts @@ -0,0 +1,29 @@ +/** + * Queue metrics — RFC-004 Phase 3 workstream 3a. + * + * Exposes Prometheus gauges for BullMQ queue depth and a counter for job + * outcomes. The collector is intentionally polling-based (not subscribing + * to bullmq events) so a single collector tick covers every queue without + * adding per-queue listener wiring. Polling cadence is small (5 s) — depth + * gauges are coarse-grained and cheap to read via `queue.getJobCounts`. + * + * Adding a new queue means: + * 1. Register it via BullModule.registerQueue (existing pattern). + * 2. Pass its name into QUEUE_METRICS_QUEUE_NAMES (or extend the + * registration helper) so the collector samples it each tick. + */ + +export const QUEUE_DEPTH_GAUGE = 'goodgo_queue_depth'; +export const QUEUE_JOB_OUTCOMES_TOTAL = 'goodgo_queue_job_outcomes_total'; + +/** + * Queues sampled by the metrics collector. Add new BullMQ queue names here + * when they are registered via BullModule.registerQueue. + * + * Keeping this as a constant (rather than scanning the Nest container) keeps + * the collector's set deterministic and trivially testable; the cost is one + * extra line of bookkeeping per queue. + */ +export const QUEUE_METRICS_QUEUE_NAMES: readonly string[] = [ + 'report-generation', +]; diff --git a/apps/api/src/modules/metrics/metrics.module.ts b/apps/api/src/modules/metrics/metrics.module.ts index 4abd71d..6b81601 100644 --- a/apps/api/src/modules/metrics/metrics.module.ts +++ b/apps/api/src/modules/metrics/metrics.module.ts @@ -1,10 +1,24 @@ -import { Module } from '@nestjs/common'; +import { BullModule, getQueueToken } from '@nestjs/bullmq'; +import { Module, type DynamicModule } from '@nestjs/common'; import { makeCounterProvider, makeHistogramProvider, makeGaugeProvider, } from '@willsoto/nestjs-prometheus'; +import type { Queue } from 'bullmq'; import { MetricsService } from './infrastructure/metrics.service'; +import { + QueueMetricsCollector, + QUEUE_METRICS_COLLECTOR_OPTIONS, + QUEUE_METRICS_COLLECTOR_QUEUES, + type QueueLike, + type QueueMetricsCollectorOptions, +} from './infrastructure/queue-metrics.collector'; +import { + QUEUE_DEPTH_GAUGE, + QUEUE_JOB_OUTCOMES_TOTAL, + QUEUE_METRICS_QUEUE_NAMES, +} from './infrastructure/queue-metrics.constants'; import { GOODGO_API_REQUEST_DURATION, GOODGO_LISTINGS_CREATED_TOTAL, @@ -141,4 +155,48 @@ import { HttpMetricsInterceptor } from './presentation/interceptors/http-metrics controllers: [WebVitalsController], exports: [MetricsService, HttpMetricsInterceptor], }) -export class MetricsModule {} +export class MetricsModule { + /** + * Register the queue-metrics collector with a fixed list of BullMQ queue + * names. Each name must already be registered via BullModule.registerQueue + * somewhere in the app (root or feature module). + * + * RFC-004 Phase 3 — workstream 3a. + */ + static withQueueMetrics( + queueNames: readonly string[] = QUEUE_METRICS_QUEUE_NAMES, + options: QueueMetricsCollectorOptions = {}, + ): DynamicModule { + const queueTokens = queueNames.map((name) => getQueueToken(name)); + return { + module: MetricsModule, + imports: [ + // Re-register each queue here so the collector can resolve them via + // BullMQ's standard token even if MetricsModule is imported before + // the feature module that owns the queue. BullMQ deduplicates the + // queue instance under the hood. + ...queueNames.map((name) => BullModule.registerQueue({ name })), + ], + providers: [ + makeGaugeProvider({ + name: QUEUE_DEPTH_GAUGE, + help: 'BullMQ queue depth by state (waiting, active, completed, failed, delayed)', + labelNames: ['queue', 'state'], + }), + makeCounterProvider({ + name: QUEUE_JOB_OUTCOMES_TOTAL, + help: 'BullMQ job outcomes (completed, failed) by queue', + labelNames: ['queue', 'outcome'], + }), + { + provide: QUEUE_METRICS_COLLECTOR_QUEUES, + inject: queueTokens, + useFactory: (...queues: Queue[]): QueueLike[] => queues, + }, + { provide: QUEUE_METRICS_COLLECTOR_OPTIONS, useValue: options }, + QueueMetricsCollector, + ], + exports: [QueueMetricsCollector], + }; + } +}