feat(osm): foundation — admin boundaries, POI catalog, sync orchestrator
This is the Phase 0 + Phase 1 + Phase 4 foundation of the full OSM
integration plan. It backfills three things the rest of the platform
has been faking with hardcoded tables, and gives admins one dashboard
for every OSM-sourced layer.
Phase 0 — Vietnam administrative boundaries
* New columns on vn_provinces / vn_districts / vn_wards: PostGIS
geometry (MultiPolygon), centroid (Point), areaKm2, osmId, population,
lastSyncedAt + GIST indexes on geometry/centroid.
* `scripts/sync-osm-admin-boundaries.ts` pulls
`boundary=administrative + admin_level=4|6|8` from Overpass per chunk,
filters to mainland VN via the existing country polygon, resolves the
GSO code (or generates `OSM_<id>`), and upserts via raw SQL because
Prisma can't manage PostGIS columns.
* `GeoLookupService` (shared module) replaces the old
`nearestProvince()` heuristic — `lookup(lng,lat)` returns
province/district/ward via `ST_Contains` on the GIST-indexed polygons.
* The KCN sync now resolves province/district from the polygon table
and falls back to the centroid heuristic only when polygons aren't
loaded yet.
* `scripts/backfill-admin-codes.ts` rewrites province/district/ward on
IndustrialPark, ProjectDevelopment and Property using the new lookup.
Phase 1 — POI catalog (15 categories, schema only here)
* New `Poi` table with `PoiCategory` enum, OSM provenance columns,
GIST index on `location`. New `TransportLine` for metro/highway
multilinestrings.
* `scripts/sync-osm-poi.ts` queries Overpass per category × chunk,
resolves province/district codes from the boundary polygons, upserts
with `osmLocked` / `lockedFields` honour same as KCN.
* New NestJS `PoiModule` exposes:
GET /poi/by-bbox — GeoJSON for map overlays
GET /poi/nearby — sidebar "tiện ích xung quanh" (HMAC distance ranks)
GET /poi/coverage — admin per-category counts
* New web component `<NearbyPoiSidebar />` ready to drop into listing /
project / KCN detail pages.
Phase 4 — Sync orchestrator + admin dashboard
* New `OsmSyncRun` audit table tracks every sync invocation
(RUNNING / SUCCESS / PARTIAL / FAILED + row stats + error message).
* `OsmSyncService` spawns the right tsx script for any (layer, category,
chunk) tuple, parses stats out of stdout, updates the run row.
* `OsmSyncCronService` schedules:
Daily 02:00 → POI category rotation (1/day, 20-day cycle)
Mon 02:30 → admin-boundaries provinces
Wed 02:30 → admin-boundaries districts
Sat 02:30 → admin-boundaries wards
1st of month 03:00 → industrial-parks (per chunk)
All gated by `OSM_SYNC_ENABLED=true`.
* New admin endpoints under `/admin/osm/*` (layers / coverage / runs /
trigger), guarded by JWT + ADMIN role.
* New `/admin/osm` Next.js page: stat cards, coverage table with
per-row "Sync now", recent runs list with auto-refresh every 15s.
Run on dev so far: 33 provinces + 1100+ districts (still finishing) +
305 hospitals POI imported.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,80 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Cron } from '@nestjs/schedule';
|
||||
import { LoggerService } from '@modules/shared';
|
||||
import { OsmSyncService } from '../osm-sync.service';
|
||||
|
||||
/**
|
||||
* Scheduled sync runner. Spreads layer refreshes across the week so we
|
||||
* never hit Overpass with two heavy queries simultaneously and stay
|
||||
* under the per-IP rate limit.
|
||||
*
|
||||
* Schedule (Asia/Ho_Chi_Minh):
|
||||
* - Daily 02:00 → POI category rotation (one per day, 20-day cycle)
|
||||
* - Mon 02:30 → admin-boundaries level=4 (provinces, light)
|
||||
* - Wed 02:30 → admin-boundaries level=6 (districts, medium)
|
||||
* - Sat 02:30 → admin-boundaries level=8 (wards, heavy)
|
||||
* - 1st of month 03:00 → industrial-parks (existing flow, kept here so
|
||||
* everything routes through one orchestrator)
|
||||
*
|
||||
* All routes respect `OSM_SYNC_ENABLED=true` to allow disabling in dev.
|
||||
*/
|
||||
@Injectable()
|
||||
export class OsmSyncCronService {
|
||||
private readonly POI_CATEGORIES = [
|
||||
'SCHOOL_PRIMARY', 'SCHOOL_SECONDARY', 'UNIVERSITY',
|
||||
'HOSPITAL', 'CLINIC', 'PHARMACY',
|
||||
'MARKET', 'SUPERMARKET', 'MALL', 'CONVENIENCE',
|
||||
'BANK', 'ATM',
|
||||
'PARK',
|
||||
'GAS_STATION', 'POLICE', 'POST_OFFICE',
|
||||
'METRO_STATION', 'RAILWAY_STATION', 'BUS_STATION', 'AIRPORT',
|
||||
];
|
||||
|
||||
constructor(
|
||||
private readonly osmSync: OsmSyncService,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
private isEnabled(): boolean {
|
||||
return process.env['OSM_SYNC_ENABLED'] === 'true';
|
||||
}
|
||||
|
||||
@Cron('0 2 * * *', { timeZone: 'Asia/Ho_Chi_Minh' })
|
||||
async dailyPoiRotation(): Promise<void> {
|
||||
if (!this.isEnabled()) return;
|
||||
// Pick one category based on day-of-year so we cycle evenly.
|
||||
const dayOfYear = Math.floor(
|
||||
(Date.now() - new Date(new Date().getUTCFullYear(), 0, 0).getTime()) / 86_400_000,
|
||||
);
|
||||
const category = this.POI_CATEGORIES[dayOfYear % this.POI_CATEGORIES.length]!;
|
||||
this.logger.log(`Daily POI rotation: ${category}`, 'OsmSyncCronService');
|
||||
await this.osmSync.run({ layer: 'poi', category, wait: false });
|
||||
}
|
||||
|
||||
@Cron('30 2 * * 1', { timeZone: 'Asia/Ho_Chi_Minh' }) // Monday
|
||||
async weeklyProvinces(): Promise<void> {
|
||||
if (!this.isEnabled()) return;
|
||||
await this.osmSync.run({ layer: 'admin-boundaries', category: 'province', wait: false });
|
||||
}
|
||||
|
||||
@Cron('30 2 * * 3', { timeZone: 'Asia/Ho_Chi_Minh' }) // Wednesday
|
||||
async weeklyDistricts(): Promise<void> {
|
||||
if (!this.isEnabled()) return;
|
||||
await this.osmSync.run({ layer: 'admin-boundaries', category: 'district', wait: false });
|
||||
}
|
||||
|
||||
@Cron('30 2 * * 6', { timeZone: 'Asia/Ho_Chi_Minh' }) // Saturday
|
||||
async weeklyWards(): Promise<void> {
|
||||
if (!this.isEnabled()) return;
|
||||
await this.osmSync.run({ layer: 'admin-boundaries', category: 'ward', wait: false });
|
||||
}
|
||||
|
||||
@Cron('0 3 1 * *', { timeZone: 'Asia/Ho_Chi_Minh' }) // 1st of month
|
||||
async monthlyIndustrialParks(): Promise<void> {
|
||||
if (!this.isEnabled()) return;
|
||||
// KCN sync runs per chunk to spread load.
|
||||
for (const chunk of ['north', 'northCentral', 'southCentral', 'south']) {
|
||||
await this.osmSync.run({ layer: 'industrial-parks', chunk, wait: true });
|
||||
}
|
||||
}
|
||||
}
|
||||
239
apps/api/src/modules/osm-sync/infrastructure/osm-sync.service.ts
Normal file
239
apps/api/src/modules/osm-sync/infrastructure/osm-sync.service.ts
Normal file
@@ -0,0 +1,239 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { OsmSyncStatus } from '@prisma/client';
|
||||
import { spawn } from 'node:child_process';
|
||||
import { createHash } from 'node:crypto';
|
||||
import * as path from 'node:path';
|
||||
import { LoggerService, PrismaService } from '@modules/shared';
|
||||
|
||||
/**
|
||||
* Catalog of every sync layer / category we know about. The orchestrator
|
||||
* uses this to validate trigger requests, populate the admin UI, and
|
||||
* decide which scripts to run on the cron schedule.
|
||||
*/
|
||||
export interface OsmSyncLayerDef {
|
||||
layer: string;
|
||||
category?: string;
|
||||
/** Path of the tsx script under repo root. */
|
||||
scriptPath: string;
|
||||
/** Extra CLI args appended after `--category=` etc. */
|
||||
extraArgs?: string[];
|
||||
/** Approx Overpass cost — used to spread cron schedule. */
|
||||
weight: 'light' | 'medium' | 'heavy';
|
||||
}
|
||||
|
||||
export const SYNC_LAYERS: OsmSyncLayerDef[] = [
|
||||
{
|
||||
layer: 'admin-boundaries',
|
||||
category: 'province',
|
||||
scriptPath: 'scripts/sync-osm-admin-boundaries.ts',
|
||||
extraArgs: ['--level=4'],
|
||||
weight: 'light',
|
||||
},
|
||||
{
|
||||
layer: 'admin-boundaries',
|
||||
category: 'district',
|
||||
scriptPath: 'scripts/sync-osm-admin-boundaries.ts',
|
||||
extraArgs: ['--level=6'],
|
||||
weight: 'medium',
|
||||
},
|
||||
{
|
||||
layer: 'admin-boundaries',
|
||||
category: 'ward',
|
||||
scriptPath: 'scripts/sync-osm-admin-boundaries.ts',
|
||||
extraArgs: ['--level=8'],
|
||||
weight: 'heavy',
|
||||
},
|
||||
// POI categories — each one its own row so the dashboard shows progress
|
||||
// per category and the cron can rotate them across days.
|
||||
...['SCHOOL_PRIMARY', 'SCHOOL_SECONDARY', 'UNIVERSITY',
|
||||
'HOSPITAL', 'CLINIC', 'PHARMACY',
|
||||
'MARKET', 'SUPERMARKET', 'MALL', 'CONVENIENCE',
|
||||
'BANK', 'ATM',
|
||||
'PARK',
|
||||
'GAS_STATION', 'POLICE', 'POST_OFFICE',
|
||||
'METRO_STATION', 'RAILWAY_STATION', 'BUS_STATION', 'AIRPORT',
|
||||
].map<OsmSyncLayerDef>((cat) => ({
|
||||
layer: 'poi',
|
||||
category: cat,
|
||||
scriptPath: 'scripts/sync-osm-poi.ts',
|
||||
extraArgs: [`--category=${cat}`],
|
||||
weight: cat === 'BANK' || cat === 'PHARMACY' || cat === 'CONVENIENCE' ? 'medium' : 'light',
|
||||
})),
|
||||
{
|
||||
layer: 'industrial-parks',
|
||||
scriptPath: 'scripts/sync-osm-industrial-parks.ts',
|
||||
weight: 'heavy',
|
||||
},
|
||||
];
|
||||
|
||||
/**
|
||||
* Spawns the right tsx script for a given (layer, category, chunk) and
|
||||
* tracks the run in `OsmSyncRun`. Used both by the cron service and the
|
||||
* admin "Sync now" button.
|
||||
*/
|
||||
@Injectable()
|
||||
export class OsmSyncService {
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
/** Look up a sync layer by its (layer, category) tuple. */
|
||||
findLayer(layer: string, category?: string | null): OsmSyncLayerDef | undefined {
|
||||
return SYNC_LAYERS.find(
|
||||
(l) => l.layer === layer && (l.category ?? null) === (category ?? null),
|
||||
);
|
||||
}
|
||||
|
||||
list(): OsmSyncLayerDef[] {
|
||||
return SYNC_LAYERS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a sync layer (script invocation). Inserts a RUNNING `OsmSyncRun`,
|
||||
* captures script stdout/stderr line-by-line into the logger, and
|
||||
* updates the row to SUCCESS / PARTIAL / FAILED + row counts on exit.
|
||||
*
|
||||
* Returns the persisted `OsmSyncRun.id` immediately if `wait=false` so
|
||||
* the admin UI can poll, or after the script exits when `wait=true`.
|
||||
*/
|
||||
async run(opts: {
|
||||
layer: string;
|
||||
category?: string | null;
|
||||
chunk?: string | null;
|
||||
wait?: boolean;
|
||||
}): Promise<{ runId: string; status: OsmSyncStatus }> {
|
||||
const def = this.findLayer(opts.layer, opts.category);
|
||||
if (!def) {
|
||||
throw new Error(`Unknown OSM sync layer: ${opts.layer}/${opts.category ?? '-'}`);
|
||||
}
|
||||
|
||||
const args = [...(def.extraArgs ?? [])];
|
||||
if (opts.chunk) args.push(`--chunk=${opts.chunk}`);
|
||||
const queryHash = createHash('sha256')
|
||||
.update(`${def.scriptPath} ${args.join(' ')}`)
|
||||
.digest('hex')
|
||||
.slice(0, 16);
|
||||
|
||||
const run = await this.prisma.osmSyncRun.create({
|
||||
data: {
|
||||
layer: opts.layer,
|
||||
category: opts.category ?? null,
|
||||
chunk: opts.chunk ?? null,
|
||||
status: OsmSyncStatus.RUNNING,
|
||||
overpassQueryHash: queryHash,
|
||||
},
|
||||
});
|
||||
this.logger.log(
|
||||
`OSM sync started run=${run.id} layer=${opts.layer} category=${opts.category ?? '-'} chunk=${opts.chunk ?? '-'}`,
|
||||
'OsmSyncService',
|
||||
);
|
||||
|
||||
const promise = this.spawnAndTrack(run.id, def, args);
|
||||
if (opts.wait) {
|
||||
const status = await promise;
|
||||
return { runId: run.id, status };
|
||||
}
|
||||
void promise.catch((err) =>
|
||||
this.logger.error(
|
||||
`OSM sync ${run.id} background failure: ${err}`,
|
||||
err instanceof Error ? err.stack : undefined,
|
||||
'OsmSyncService',
|
||||
),
|
||||
);
|
||||
return { runId: run.id, status: OsmSyncStatus.RUNNING };
|
||||
}
|
||||
|
||||
private async spawnAndTrack(
|
||||
runId: string,
|
||||
def: OsmSyncLayerDef,
|
||||
args: string[],
|
||||
): Promise<OsmSyncStatus> {
|
||||
return new Promise((resolve) => {
|
||||
const repoRoot = path.resolve(__dirname, '../../../../../../..');
|
||||
const child = spawn('pnpm', ['tsx', def.scriptPath, ...args], {
|
||||
cwd: repoRoot,
|
||||
env: {
|
||||
...process.env,
|
||||
NODE_OPTIONS: '-r dotenv/config',
|
||||
DOTENV_CONFIG_PATH: '.env',
|
||||
},
|
||||
});
|
||||
|
||||
const stats = { added: 0, updated: 0, skipped: 0, locked: 0 };
|
||||
const errors: string[] = [];
|
||||
|
||||
const parseLine = (line: string) => {
|
||||
// Lines like: "inserted=12 updated=3 locked=1 skipped=0"
|
||||
const m = line.match(/inserted=(\d+).*updated=(\d+).*locked=(\d+).*skipped=(\d+)/);
|
||||
if (m) {
|
||||
stats.added += Number(m[1]);
|
||||
stats.updated += Number(m[2]);
|
||||
stats.locked += Number(m[3]);
|
||||
stats.skipped += Number(m[4]);
|
||||
}
|
||||
};
|
||||
|
||||
child.stdout?.on('data', (b) => {
|
||||
for (const line of b.toString().split('\n')) {
|
||||
if (!line.trim()) continue;
|
||||
this.logger.log(`[${runId}] ${line.trim()}`, 'OsmSyncService');
|
||||
parseLine(line);
|
||||
}
|
||||
});
|
||||
child.stderr?.on('data', (b) => {
|
||||
for (const line of b.toString().split('\n')) {
|
||||
if (!line.trim()) continue;
|
||||
this.logger.warn(`[${runId}] ${line.trim()}`, 'OsmSyncService');
|
||||
if (errors.length < 20) errors.push(line.trim().slice(0, 500));
|
||||
}
|
||||
});
|
||||
|
||||
child.on('error', async (err) => {
|
||||
await this.complete(runId, OsmSyncStatus.FAILED, stats, err.message);
|
||||
resolve(OsmSyncStatus.FAILED);
|
||||
});
|
||||
child.on('exit', async (code) => {
|
||||
const status =
|
||||
code === 0
|
||||
? errors.length > 0
|
||||
? OsmSyncStatus.PARTIAL
|
||||
: OsmSyncStatus.SUCCESS
|
||||
: OsmSyncStatus.FAILED;
|
||||
await this.complete(
|
||||
runId,
|
||||
status,
|
||||
stats,
|
||||
status === OsmSyncStatus.SUCCESS
|
||||
? null
|
||||
: `exit=${code}; ${errors.slice(0, 5).join(' | ')}`,
|
||||
);
|
||||
resolve(status);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private async complete(
|
||||
runId: string,
|
||||
status: OsmSyncStatus,
|
||||
stats: { added: number; updated: number; skipped: number; locked: number },
|
||||
errorMessage: string | null,
|
||||
): Promise<void> {
|
||||
await this.prisma.osmSyncRun.update({
|
||||
where: { id: runId },
|
||||
data: {
|
||||
status,
|
||||
finishedAt: new Date(),
|
||||
rowsAdded: stats.added,
|
||||
rowsUpdated: stats.updated,
|
||||
rowsSkipped: stats.skipped,
|
||||
rowsLocked: stats.locked,
|
||||
errorMessage,
|
||||
},
|
||||
});
|
||||
this.logger.log(
|
||||
`OSM sync ${runId} → ${status}: added=${stats.added} updated=${stats.updated} skipped=${stats.skipped} locked=${stats.locked}`,
|
||||
'OsmSyncService',
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user