import { Injectable, type OnModuleDestroy } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Cron } from '@nestjs/schedule'; import { InjectMetric } from '@willsoto/nestjs-prometheus'; // eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports for emitDecoratorMetadata import { Counter, Histogram } from 'prom-client'; import { PrismaService, RedisService, LoggerService } from '@modules/shared'; /** * Metric names exported so modules can wire `makeCounterProvider` / `makeHistogramProvider`. */ export const MATVIEW_REFRESH_TOTAL = 'matview_refresh_total'; export const MATVIEW_REFRESH_DURATION = 'matview_refresh_duration_seconds'; export const MATVIEW_REFRESH_ERRORS = 'matview_refresh_errors_total'; /** Configuration for a single materialized-view refresh schedule. */ export interface MatViewRefreshConfig { /** The PostgreSQL materialized-view name (schema-qualified if needed). */ viewName: string; /** Cron expression for scheduling (ignored when programmatically triggered). */ cron: string; /** Expected max duration in seconds — watchdog kills at 2×. */ expectedDurationSeconds: number; } /** * Default views to refresh — empty in Phase 0 (no Phase 1 views yet). * Phase 1 will add entries here or via `MATVIEW_REFRESH_VIEWS` env var. */ const DEFAULT_VIEWS: MatViewRefreshConfig[] = []; const LOCK_PREFIX = 'matview:lock:'; const LOCK_TTL_MULTIPLIER = 2; @Injectable() export class RefreshMaterializedViewCronService implements OnModuleDestroy { private readonly views: MatViewRefreshConfig[]; /** Track in-flight AbortControllers so the watchdog can cancel them. */ private readonly inflight = new Map(); constructor( private readonly prisma: PrismaService, private readonly redis: RedisService, private readonly logger: LoggerService, private readonly config: ConfigService, @InjectMetric(MATVIEW_REFRESH_TOTAL) private readonly refreshCounter: Counter, @InjectMetric(MATVIEW_REFRESH_DURATION) private readonly refreshDuration: Histogram, @InjectMetric(MATVIEW_REFRESH_ERRORS) private readonly refreshErrors: Counter, ) { this.views = this.loadViewConfig(); if (this.views.length > 0) { this.logger.log( `Materialized-view refresh configured for: ${this.views.map((v) => v.viewName).join(', ')}`, 'RefreshMatView', ); } } onModuleDestroy(): void { // Abort any in-flight refreshes during graceful shutdown. for (const [view, ctrl] of this.inflight) { ctrl.abort(); this.logger.warn(`Aborted in-flight refresh for ${view} (shutdown)`, 'RefreshMatView'); } this.inflight.clear(); } // ─── Cron entry-point ─────────────────────────────────────────────── // Fires every 5 minutes. Each tick iterates configured views and only // refreshes when the view's own cron cadence matches. Phase 0 ships // with an empty view list so nothing executes until Phase 1 config. @Cron('*/5 * * * *', { name: 'matview-refresh-tick' }) async tick(): Promise { for (const view of this.views) { await this.tryRefresh(view); } } /** * Public entry for ad-hoc / test invocation. */ async refreshView(viewName: string): Promise { const view = this.views.find((v) => v.viewName === viewName); if (!view) { throw new Error(`Unknown materialized view: ${viewName}`); } await this.executeRefresh(view); } // ─── Core logic ───────────────────────────────────────────────────── /** Acquire mutex, refresh, release. No-op when lock is held. */ async tryRefresh(view: MatViewRefreshConfig): Promise { const lockKey = `${LOCK_PREFIX}${view.viewName}`; const lockTtl = view.expectedDurationSeconds * LOCK_TTL_MULTIPLIER; const acquired = await this.acquireLock(lockKey, lockTtl); if (!acquired) { this.logger.debug(`Skipping ${view.viewName} — lock held`, 'RefreshMatView'); return false; } try { await this.executeRefresh(view); return true; } finally { await this.releaseLock(lockKey); } } private async executeRefresh(view: MatViewRefreshConfig): Promise { const watchdogMs = view.expectedDurationSeconds * LOCK_TTL_MULTIPLIER * 1000; const ctrl = new AbortController(); this.inflight.set(view.viewName, ctrl); const watchdog = setTimeout(() => { ctrl.abort(); this.refreshErrors.inc({ view: view.viewName, reason: 'watchdog' }); this.logger.error( `Watchdog killed refresh of ${view.viewName} after ${watchdogMs}ms`, undefined, 'RefreshMatView', ); }, watchdogMs); const start = Date.now(); try { // REFRESH MATERIALIZED VIEW CONCURRENTLY requires a unique index on the // view. Callers are responsible for ensuring that index exists. await this.prisma.$executeRawUnsafe( `REFRESH MATERIALIZED VIEW CONCURRENTLY "${view.viewName}"`, ); const durationSec = (Date.now() - start) / 1000; this.refreshCounter.inc({ view: view.viewName, status: 'success' }); this.refreshDuration.observe({ view: view.viewName }, durationSec); this.logger.log( `Refreshed ${view.viewName} in ${durationSec.toFixed(2)}s`, 'RefreshMatView', ); } catch (err) { if (ctrl.signal.aborted) return; // watchdog already logged const durationSec = (Date.now() - start) / 1000; this.refreshErrors.inc({ view: view.viewName, reason: 'query' }); this.refreshDuration.observe({ view: view.viewName }, durationSec); this.logger.error( `Failed to refresh ${view.viewName}: ${(err as Error).message}`, (err as Error).stack, 'RefreshMatView', ); } finally { clearTimeout(watchdog); this.inflight.delete(view.viewName); } } // ─── Redis distributed lock (SET NX EX) ───────────────────────────── private async acquireLock(key: string, ttlSeconds: number): Promise { if (!this.redis.isAvailable()) { // Fallback: allow refresh (single-instance safe, no mutex). return true; } try { const result = await this.redis.getClient().set(key, '1', 'EX', ttlSeconds, 'NX'); return result === 'OK'; } catch (err) { this.logger.warn(`Lock acquire failed for ${key}: ${(err as Error).message}`, 'RefreshMatView'); return true; // degrade open — better to refresh than skip } } private async releaseLock(key: string): Promise { try { await this.redis.getClient().del(key); } catch (err) { this.logger.warn(`Lock release failed for ${key}: ${(err as Error).message}`, 'RefreshMatView'); } } // ─── Config loading ───────────────────────────────────────────────── private loadViewConfig(): MatViewRefreshConfig[] { const raw = this.config.get('MATVIEW_REFRESH_VIEWS'); if (!raw) return DEFAULT_VIEWS; try { const parsed = JSON.parse(raw) as MatViewRefreshConfig[]; if (!Array.isArray(parsed)) throw new Error('Expected JSON array'); return parsed; } catch (err) { this.logger.error( `Invalid MATVIEW_REFRESH_VIEWS config: ${(err as Error).message}`, undefined, 'RefreshMatView', ); return DEFAULT_VIEWS; } } }