feat(metrics): Prometheus queue metrics for BullMQ (RFC-004 Phase 3 WS3a)
Adds a 5 s polling collector that publishes BullMQ queue depth as the goodgo_queue_depth gauge (labels: queue, state) and a goodgo_queue_job_outcomes_total counter for processor hooks. The collector fails-soft on Redis errors so a queue blip cannot crash the app. - New constants: QUEUE_DEPTH_GAUGE, QUEUE_JOB_OUTCOMES_TOTAL, QUEUE_METRICS_QUEUE_NAMES (extend as Phase 2 adds queues) - New QueueMetricsCollector with injectable timer/clock for tests - MetricsModule.withQueueMetrics() dynamic module wires queue tokens via getQueueToken + factory provider; re-imports BullModule.registerQueue so ordering between MetricsModule and feature modules does not matter - AppModule mounts MetricsModule.withQueueMetrics() alongside MetricsModule - 4 unit tests cover sample → gauge mapping, Redis-down fail-soft, recordJobOutcome, and timer init/destroy Bull Board UI mount split into WS3b (needs @bull-board/* deps). Refs: GOO-175 Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -62,6 +62,7 @@ import { AppController } from './app.controller';
|
||||
AdminModule,
|
||||
AnalyticsModule,
|
||||
MetricsModule,
|
||||
MetricsModule.withQueueMetrics(),
|
||||
McpIntegrationModule,
|
||||
MessagingModule,
|
||||
ReportsModule,
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
import { Counter, Gauge, Registry } from 'prom-client';
|
||||
import { describe, expect, it, vi } from 'vitest';
|
||||
import {
|
||||
QueueMetricsCollector,
|
||||
type QueueLike,
|
||||
} from '../queue-metrics.collector';
|
||||
|
||||
function makeMetrics() {
|
||||
const registry = new Registry();
|
||||
const depth = new Gauge({
|
||||
name: 'goodgo_queue_depth',
|
||||
help: 'depth',
|
||||
labelNames: ['queue', 'state'],
|
||||
registers: [registry],
|
||||
});
|
||||
const outcomes = new Counter({
|
||||
name: 'goodgo_queue_job_outcomes_total',
|
||||
help: 'outcomes',
|
||||
labelNames: ['queue', 'outcome'],
|
||||
registers: [registry],
|
||||
});
|
||||
return { registry, depth, outcomes };
|
||||
}
|
||||
|
||||
function makeQueue(name: string, counts: Record<string, number>): QueueLike {
|
||||
return {
|
||||
name,
|
||||
async getJobCounts(..._types: string[]): Promise<Record<string, number>> {
|
||||
return counts;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe('QueueMetricsCollector', () => {
|
||||
it('samples each queue and writes labelled gauge values', async () => {
|
||||
const { depth, outcomes } = makeMetrics();
|
||||
const q = makeQueue('report-generation', { waiting: 3, active: 1, completed: 100, failed: 2, delayed: 0 });
|
||||
const collector = new QueueMetricsCollector([q], depth, outcomes, {
|
||||
setInterval: () => 0 as unknown as ReturnType<typeof setInterval>,
|
||||
clearInterval: () => undefined,
|
||||
});
|
||||
await collector.sampleOnce();
|
||||
const v = await depth.get();
|
||||
const byState = Object.fromEntries(v.values.map((s) => [s.labels['state'], s.value]));
|
||||
expect(byState).toMatchObject({ waiting: 3, active: 1, completed: 100, failed: 2, delayed: 0 });
|
||||
});
|
||||
|
||||
it('does not throw when getJobCounts rejects (e.g. Redis down)', async () => {
|
||||
const { depth, outcomes } = makeMetrics();
|
||||
const broken: QueueLike = {
|
||||
name: 'broken',
|
||||
async getJobCounts() {
|
||||
throw new Error('redis down');
|
||||
},
|
||||
};
|
||||
const collector = new QueueMetricsCollector([broken], depth, outcomes, {
|
||||
setInterval: () => 0 as unknown as ReturnType<typeof setInterval>,
|
||||
clearInterval: () => undefined,
|
||||
});
|
||||
await expect(collector.sampleOnce()).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it('recordJobOutcome increments the outcome counter', async () => {
|
||||
const { depth, outcomes } = makeMetrics();
|
||||
const collector = new QueueMetricsCollector([], depth, outcomes, {
|
||||
setInterval: () => 0 as unknown as ReturnType<typeof setInterval>,
|
||||
clearInterval: () => undefined,
|
||||
});
|
||||
collector.recordJobOutcome('report-generation', 'completed');
|
||||
collector.recordJobOutcome('report-generation', 'failed');
|
||||
collector.recordJobOutcome('report-generation', 'completed');
|
||||
const v = await outcomes.get();
|
||||
const completed = v.values.find((s) => s.labels['outcome'] === 'completed');
|
||||
const failed = v.values.find((s) => s.labels['outcome'] === 'failed');
|
||||
expect(completed?.value).toBe(2);
|
||||
expect(failed?.value).toBe(1);
|
||||
});
|
||||
|
||||
it('onModuleInit schedules the timer and onModuleDestroy clears it', () => {
|
||||
const { depth, outcomes } = makeMetrics();
|
||||
const setIntervalSpy = vi.fn(() => 'h' as unknown as ReturnType<typeof setInterval>);
|
||||
const clearIntervalSpy = vi.fn();
|
||||
const collector = new QueueMetricsCollector([], depth, outcomes, {
|
||||
intervalMs: 1234,
|
||||
setInterval: setIntervalSpy,
|
||||
clearInterval: clearIntervalSpy,
|
||||
});
|
||||
collector.onModuleInit();
|
||||
expect(setIntervalSpy).toHaveBeenCalledWith(expect.any(Function), 1234);
|
||||
collector.onModuleDestroy();
|
||||
expect(clearIntervalSpy).toHaveBeenCalledWith('h');
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,103 @@
|
||||
import { Inject, Injectable, Logger, type OnModuleDestroy, type OnModuleInit } from '@nestjs/common';
|
||||
import { InjectMetric } from '@willsoto/nestjs-prometheus';
|
||||
import type { Queue } from 'bullmq';
|
||||
import { Counter, Gauge } from 'prom-client';
|
||||
import { QUEUE_DEPTH_GAUGE, QUEUE_JOB_OUTCOMES_TOTAL } from './queue-metrics.constants';
|
||||
|
||||
/**
|
||||
* Minimal subset of BullMQ's Queue surface needed by the collector.
|
||||
* Defined here so unit tests can pass a plain object without a live Redis.
|
||||
*/
|
||||
export interface QueueLike {
|
||||
readonly name: string;
|
||||
getJobCounts(...types: string[]): Promise<Record<string, number>>;
|
||||
}
|
||||
|
||||
export interface QueueMetricsCollectorOptions {
|
||||
/** Polling interval in ms. Default 5_000. */
|
||||
intervalMs?: number;
|
||||
/** Inject a clock for tests; defaults to setInterval/clearInterval. */
|
||||
setInterval?: (fn: () => void, ms: number) => ReturnType<typeof setInterval>;
|
||||
clearInterval?: (handle: ReturnType<typeof setInterval>) => void;
|
||||
}
|
||||
|
||||
export const QUEUE_METRICS_COLLECTOR_QUEUES = 'QUEUE_METRICS_COLLECTOR_QUEUES';
|
||||
export const QUEUE_METRICS_COLLECTOR_OPTIONS = 'QUEUE_METRICS_COLLECTOR_OPTIONS';
|
||||
|
||||
/**
|
||||
* Samples every registered BullMQ queue on a timer and updates the
|
||||
* `goodgo_queue_depth` gauge. The gauge carries a `state` label so a single
|
||||
* metric name fans out to waiting / active / completed / failed / delayed.
|
||||
*
|
||||
* Job-outcome counters (`goodgo_queue_job_outcomes_total`) are incremented
|
||||
* from processor hooks rather than by polling so they capture every job, not
|
||||
* just the ones alive at tick time.
|
||||
*/
|
||||
@Injectable()
|
||||
export class QueueMetricsCollector implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(QueueMetricsCollector.name);
|
||||
private handle: ReturnType<typeof setInterval> | null = null;
|
||||
private readonly intervalMs: number;
|
||||
private readonly setIntervalFn: NonNullable<QueueMetricsCollectorOptions['setInterval']>;
|
||||
private readonly clearIntervalFn: NonNullable<QueueMetricsCollectorOptions['clearInterval']>;
|
||||
|
||||
constructor(
|
||||
@Inject(QUEUE_METRICS_COLLECTOR_QUEUES) private readonly queues: ReadonlyArray<QueueLike>,
|
||||
@InjectMetric(QUEUE_DEPTH_GAUGE) private readonly depthGauge: Gauge<string>,
|
||||
@InjectMetric(QUEUE_JOB_OUTCOMES_TOTAL) private readonly outcomes: Counter<string>,
|
||||
@Inject(QUEUE_METRICS_COLLECTOR_OPTIONS) options: QueueMetricsCollectorOptions = {},
|
||||
) {
|
||||
this.intervalMs = options.intervalMs ?? 5_000;
|
||||
this.setIntervalFn = options.setInterval ?? ((fn, ms) => setInterval(fn, ms));
|
||||
this.clearIntervalFn = options.clearInterval ?? ((handle) => clearInterval(handle));
|
||||
}
|
||||
|
||||
onModuleInit(): void {
|
||||
// Kick off an immediate sample so gauges are populated before the first
|
||||
// timer tick — useful for /metrics scrapes that land in the first 5 s.
|
||||
void this.sampleOnce();
|
||||
this.handle = this.setIntervalFn(() => {
|
||||
void this.sampleOnce();
|
||||
}, this.intervalMs);
|
||||
}
|
||||
|
||||
onModuleDestroy(): void {
|
||||
if (this.handle) {
|
||||
this.clearIntervalFn(this.handle);
|
||||
this.handle = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the outcome counter. Call from a processor's `@OnWorkerEvent`
|
||||
* completed/failed hook so every job is accounted for, not just the ones
|
||||
* present during a poll tick.
|
||||
*/
|
||||
recordJobOutcome(queueName: string, outcome: 'completed' | 'failed'): void {
|
||||
this.outcomes.labels(queueName, outcome).inc(1);
|
||||
}
|
||||
|
||||
/** Exposed for tests. */
|
||||
async sampleOnce(): Promise<void> {
|
||||
for (const queue of this.queues) {
|
||||
try {
|
||||
const counts = await queue.getJobCounts('waiting', 'active', 'completed', 'failed', 'delayed');
|
||||
for (const [state, count] of Object.entries(counts)) {
|
||||
this.depthGauge.labels(queue.name, state).set(count);
|
||||
}
|
||||
} catch (err) {
|
||||
// Redis outage or queue not yet ready — log and keep going. The
|
||||
// gauge retains its last value; Prometheus `rate` / `absent()`
|
||||
// alerts cover the "stopped updating" case.
|
||||
this.logger.warn(
|
||||
`queue-metrics sample failed for ${queue.name}: ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Casts a real BullMQ Queue to the minimal surface the collector needs. */
|
||||
export function asQueueLike(q: Queue): QueueLike {
|
||||
return q;
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
/**
|
||||
* Queue metrics — RFC-004 Phase 3 workstream 3a.
|
||||
*
|
||||
* Exposes Prometheus gauges for BullMQ queue depth and a counter for job
|
||||
* outcomes. The collector is intentionally polling-based (not subscribing
|
||||
* to bullmq events) so a single collector tick covers every queue without
|
||||
* adding per-queue listener wiring. Polling cadence is small (5 s) — depth
|
||||
* gauges are coarse-grained and cheap to read via `queue.getJobCounts`.
|
||||
*
|
||||
* Adding a new queue means:
|
||||
* 1. Register it via BullModule.registerQueue (existing pattern).
|
||||
* 2. Pass its name into QUEUE_METRICS_QUEUE_NAMES (or extend the
|
||||
* registration helper) so the collector samples it each tick.
|
||||
*/
|
||||
|
||||
export const QUEUE_DEPTH_GAUGE = 'goodgo_queue_depth';
|
||||
export const QUEUE_JOB_OUTCOMES_TOTAL = 'goodgo_queue_job_outcomes_total';
|
||||
|
||||
/**
|
||||
* Queues sampled by the metrics collector. Add new BullMQ queue names here
|
||||
* when they are registered via BullModule.registerQueue.
|
||||
*
|
||||
* Keeping this as a constant (rather than scanning the Nest container) keeps
|
||||
* the collector's set deterministic and trivially testable; the cost is one
|
||||
* extra line of bookkeeping per queue.
|
||||
*/
|
||||
export const QUEUE_METRICS_QUEUE_NAMES: readonly string[] = [
|
||||
'report-generation',
|
||||
];
|
||||
@@ -1,10 +1,24 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { BullModule, getQueueToken } from '@nestjs/bullmq';
|
||||
import { Module, type DynamicModule } from '@nestjs/common';
|
||||
import {
|
||||
makeCounterProvider,
|
||||
makeHistogramProvider,
|
||||
makeGaugeProvider,
|
||||
} from '@willsoto/nestjs-prometheus';
|
||||
import type { Queue } from 'bullmq';
|
||||
import { MetricsService } from './infrastructure/metrics.service';
|
||||
import {
|
||||
QueueMetricsCollector,
|
||||
QUEUE_METRICS_COLLECTOR_OPTIONS,
|
||||
QUEUE_METRICS_COLLECTOR_QUEUES,
|
||||
type QueueLike,
|
||||
type QueueMetricsCollectorOptions,
|
||||
} from './infrastructure/queue-metrics.collector';
|
||||
import {
|
||||
QUEUE_DEPTH_GAUGE,
|
||||
QUEUE_JOB_OUTCOMES_TOTAL,
|
||||
QUEUE_METRICS_QUEUE_NAMES,
|
||||
} from './infrastructure/queue-metrics.constants';
|
||||
import {
|
||||
GOODGO_API_REQUEST_DURATION,
|
||||
GOODGO_LISTINGS_CREATED_TOTAL,
|
||||
@@ -141,4 +155,48 @@ import { HttpMetricsInterceptor } from './presentation/interceptors/http-metrics
|
||||
controllers: [WebVitalsController],
|
||||
exports: [MetricsService, HttpMetricsInterceptor],
|
||||
})
|
||||
export class MetricsModule {}
|
||||
export class MetricsModule {
|
||||
/**
|
||||
* Register the queue-metrics collector with a fixed list of BullMQ queue
|
||||
* names. Each name must already be registered via BullModule.registerQueue
|
||||
* somewhere in the app (root or feature module).
|
||||
*
|
||||
* RFC-004 Phase 3 — workstream 3a.
|
||||
*/
|
||||
static withQueueMetrics(
|
||||
queueNames: readonly string[] = QUEUE_METRICS_QUEUE_NAMES,
|
||||
options: QueueMetricsCollectorOptions = {},
|
||||
): DynamicModule {
|
||||
const queueTokens = queueNames.map((name) => getQueueToken(name));
|
||||
return {
|
||||
module: MetricsModule,
|
||||
imports: [
|
||||
// Re-register each queue here so the collector can resolve them via
|
||||
// BullMQ's standard token even if MetricsModule is imported before
|
||||
// the feature module that owns the queue. BullMQ deduplicates the
|
||||
// queue instance under the hood.
|
||||
...queueNames.map((name) => BullModule.registerQueue({ name })),
|
||||
],
|
||||
providers: [
|
||||
makeGaugeProvider({
|
||||
name: QUEUE_DEPTH_GAUGE,
|
||||
help: 'BullMQ queue depth by state (waiting, active, completed, failed, delayed)',
|
||||
labelNames: ['queue', 'state'],
|
||||
}),
|
||||
makeCounterProvider({
|
||||
name: QUEUE_JOB_OUTCOMES_TOTAL,
|
||||
help: 'BullMQ job outcomes (completed, failed) by queue',
|
||||
labelNames: ['queue', 'outcome'],
|
||||
}),
|
||||
{
|
||||
provide: QUEUE_METRICS_COLLECTOR_QUEUES,
|
||||
inject: queueTokens,
|
||||
useFactory: (...queues: Queue[]): QueueLike[] => queues,
|
||||
},
|
||||
{ provide: QUEUE_METRICS_COLLECTOR_OPTIONS, useValue: options },
|
||||
QueueMetricsCollector,
|
||||
],
|
||||
exports: [QueueMetricsCollector],
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user