import { spawn } from 'node:child_process'; import { createHash } from 'node:crypto'; import * as path from 'node:path'; import { Injectable } from '@nestjs/common'; import { OsmSyncStatus } from '@prisma/client'; import { LoggerService, PrismaService } from '@modules/shared'; /** * Catalog of every sync layer / category we know about. The orchestrator * uses this to validate trigger requests, populate the admin UI, and * decide which scripts to run on the cron schedule. */ export interface OsmSyncLayerDef { layer: string; category?: string; /** Path of the tsx script under repo root. */ scriptPath: string; /** Extra CLI args appended after `--category=` etc. */ extraArgs?: string[]; /** Approx Overpass cost — used to spread cron schedule. */ weight: 'light' | 'medium' | 'heavy'; } export const SYNC_LAYERS: OsmSyncLayerDef[] = [ { layer: 'admin-boundaries', category: 'province', scriptPath: 'scripts/sync-osm-admin-boundaries.ts', extraArgs: ['--level=4'], weight: 'light', }, { layer: 'admin-boundaries', category: 'district', scriptPath: 'scripts/sync-osm-admin-boundaries.ts', extraArgs: ['--level=6'], weight: 'medium', }, { layer: 'admin-boundaries', category: 'ward', scriptPath: 'scripts/sync-osm-admin-boundaries.ts', extraArgs: ['--level=8'], weight: 'heavy', }, // POI categories — each one its own row so the dashboard shows progress // per category and the cron can rotate them across days. ...['SCHOOL_PRIMARY', 'SCHOOL_SECONDARY', 'UNIVERSITY', 'HOSPITAL', 'CLINIC', 'PHARMACY', 'MARKET', 'SUPERMARKET', 'MALL', 'CONVENIENCE', 'BANK', 'ATM', 'PARK', 'GAS_STATION', 'POLICE', 'POST_OFFICE', 'METRO_STATION', 'RAILWAY_STATION', 'BUS_STATION', 'AIRPORT', ].map((cat) => ({ layer: 'poi', category: cat, scriptPath: 'scripts/sync-osm-poi.ts', extraArgs: [`--category=${cat}`], weight: cat === 'BANK' || cat === 'PHARMACY' || cat === 'CONVENIENCE' ? 'medium' : 'light', })), { layer: 'industrial-parks', scriptPath: 'scripts/sync-osm-industrial-parks.ts', weight: 'heavy', }, ]; /** * Spawns the right tsx script for a given (layer, category, chunk) and * tracks the run in `OsmSyncRun`. Used both by the cron service and the * admin "Sync now" button. */ @Injectable() export class OsmSyncService { constructor( private readonly prisma: PrismaService, private readonly logger: LoggerService, ) {} /** * Refresh the proximity / density materialized views. Called by the * cron and from the admin "Refresh views" button. Runs concurrently * (`CONCURRENTLY`) so reads aren't blocked. */ async refreshMaterializedViews(): Promise { this.logger.log('Refreshing materialized views', 'OsmSyncService'); await this.prisma.$executeRawUnsafe( `REFRESH MATERIALIZED VIEW CONCURRENTLY "mv_park_nearest_poi"`, ); await this.prisma.$executeRawUnsafe( `REFRESH MATERIALIZED VIEW "mv_poi_density_by_province"`, ); this.logger.log('Materialized views refreshed', 'OsmSyncService'); } /** Look up a sync layer by its (layer, category) tuple. */ findLayer(layer: string, category?: string | null): OsmSyncLayerDef | undefined { return SYNC_LAYERS.find( (l) => l.layer === layer && (l.category ?? null) === (category ?? null), ); } list(): OsmSyncLayerDef[] { return SYNC_LAYERS; } /** * Run a sync layer (script invocation). Inserts a RUNNING `OsmSyncRun`, * captures script stdout/stderr line-by-line into the logger, and * updates the row to SUCCESS / PARTIAL / FAILED + row counts on exit. * * Returns the persisted `OsmSyncRun.id` immediately if `wait=false` so * the admin UI can poll, or after the script exits when `wait=true`. */ async run(opts: { layer: string; category?: string | null; chunk?: string | null; wait?: boolean; }): Promise<{ runId: string; status: OsmSyncStatus }> { const def = this.findLayer(opts.layer, opts.category); if (!def) { throw new Error(`Unknown OSM sync layer: ${opts.layer}/${opts.category ?? '-'}`); } const args = [...(def.extraArgs ?? [])]; if (opts.chunk) args.push(`--chunk=${opts.chunk}`); const queryHash = createHash('sha256') .update(`${def.scriptPath} ${args.join(' ')}`) .digest('hex') .slice(0, 16); const run = await this.prisma.osmSyncRun.create({ data: { layer: opts.layer, category: opts.category ?? null, chunk: opts.chunk ?? null, status: OsmSyncStatus.RUNNING, overpassQueryHash: queryHash, }, }); this.logger.log( `OSM sync started run=${run.id} layer=${opts.layer} category=${opts.category ?? '-'} chunk=${opts.chunk ?? '-'}`, 'OsmSyncService', ); const promise = this.spawnAndTrack(run.id, def, args); if (opts.wait) { const status = await promise; return { runId: run.id, status }; } void promise.catch((err) => this.logger.error( `OSM sync ${run.id} background failure: ${err}`, err instanceof Error ? err.stack : undefined, 'OsmSyncService', ), ); return { runId: run.id, status: OsmSyncStatus.RUNNING }; } private async spawnAndTrack( runId: string, def: OsmSyncLayerDef, args: string[], ): Promise { return new Promise((resolve) => { const repoRoot = path.resolve(__dirname, '../../../../../../..'); const child = spawn('pnpm', ['tsx', def.scriptPath, ...args], { cwd: repoRoot, env: { ...process.env, NODE_OPTIONS: '-r dotenv/config', DOTENV_CONFIG_PATH: '.env', }, }); const stats = { added: 0, updated: 0, skipped: 0, locked: 0 }; const errors: string[] = []; const parseLine = (line: string) => { // Lines like: "inserted=12 updated=3 locked=1 skipped=0" const m = line.match(/inserted=(\d+).*updated=(\d+).*locked=(\d+).*skipped=(\d+)/); if (m) { stats.added += Number(m[1]); stats.updated += Number(m[2]); stats.locked += Number(m[3]); stats.skipped += Number(m[4]); } }; child.stdout?.on('data', (b) => { for (const line of b.toString().split('\n')) { if (!line.trim()) continue; this.logger.log(`[${runId}] ${line.trim()}`, 'OsmSyncService'); parseLine(line); } }); child.stderr?.on('data', (b) => { for (const line of b.toString().split('\n')) { if (!line.trim()) continue; this.logger.warn(`[${runId}] ${line.trim()}`, 'OsmSyncService'); if (errors.length < 20) errors.push(line.trim().slice(0, 500)); } }); child.on('error', async (err) => { await this.complete(runId, OsmSyncStatus.FAILED, stats, err.message); resolve(OsmSyncStatus.FAILED); }); child.on('exit', async (code) => { const status = code === 0 ? errors.length > 0 ? OsmSyncStatus.PARTIAL : OsmSyncStatus.SUCCESS : OsmSyncStatus.FAILED; await this.complete( runId, status, stats, status === OsmSyncStatus.SUCCESS ? null : `exit=${code}; ${errors.slice(0, 5).join(' | ')}`, ); resolve(status); }); }); } private async complete( runId: string, status: OsmSyncStatus, stats: { added: number; updated: number; skipped: number; locked: number }, errorMessage: string | null, ): Promise { await this.prisma.osmSyncRun.update({ where: { id: runId }, data: { status, finishedAt: new Date(), rowsAdded: stats.added, rowsUpdated: stats.updated, rowsSkipped: stats.skipped, rowsLocked: stats.locked, errorMessage, }, }); this.logger.log( `OSM sync ${runId} → ${status}: added=${stats.added} updated=${stats.updated} skipped=${stats.skipped} locked=${stats.locked}`, 'OsmSyncService', ); } }