"""Train the v1 park-level industrial AVM baseline (ridge + monotonic priors). Context (TEC-2768 / R5.2.1): The IndustrialPark table ships with ~20 seeded rows carrying three rent heads: land (usd/m²/year), RBF (ready-built factory, usd/m²/month), and RBW (ready-built warehouse, usd/m²/month). No IndustrialListing rows are seeded, so tree-boosted models are not viable at n=20. This script fits a regularized linear baseline on log-rent with sign-constrained coefficients that encode domain monotonicity priors (occupancy ↑ rent, distance ↑ rent ↓, etc.). Conformal prediction over LOO residuals gives the 80% CI band. Usage: python libs/ai-services/scripts/train_avm_industrial_park.py \ --input libs/ai-services/data/industrial/parks.json \ --out libs/ai-services/models Produces: /avm_industrial_park_ridge_v1.pkl — fitted artifact /avm_industrial_park_ridge_v1.model_card.json — metrics + slices """ from __future__ import annotations import argparse import json import math import os import pickle import sys from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any import numpy as np from scipy.optimize import nnls from sklearn.preprocessing import StandardScaler # ── Constants ────────────────────────────────────────────────── ARTIFACT_VERSION = "ridge-industrial-v1" CURRENT_YEAR = 2026 REGION_ORDER = ["south", "north", "central"] # drop-first encoding TOP_INDUSTRIES = ["electronics", "logistics", "automotive", "food", "garment", "plastics"] # Province → FDI inflow in million USD (trailing 12m, approximate market data). PROVINCE_FDI_MUSD: dict[str, float] = { "tp. hồ chí minh": 5500, "hà nội": 4200, "bình dương": 4800, "đồng nai": 3200, "bắc ninh": 5800, "hải phòng": 2800, "long an": 1500, "bà rịa - vũng tàu": 1800, "hải dương": 800, "hưng yên": 1200, "bình phước": 400, "nghệ An": 350, "nghệ an": 350, "quảng nam": 500, "quảng ngãi": 600, } DEFAULT_FDI = 500.0 # Feature expected-sign map (+1 rent↑ when feature↑, −1 rent↓ when feature↑). # Region one-hots stay unsigned (fixed effect). SIGN_PRIORS: dict[str, int] = { "occupancy": +1, "log_area_ha": +1, "park_age_years": -1, "log_dist_port_km": -1, "log_dist_airport_km": -1, "log_dist_highway_km": -1, "logistics_connectivity_score": +1, "log_fdi_province": +1, "has_special_zone": +1, "ind_electronics": +1, "ind_logistics": +1, "ind_automotive": +1, "ind_food": 0, "ind_garment": 0, "ind_plastics": 0, } MONOTONIC_FEATURES = [f for f, s in SIGN_PRIORS.items() if s != 0] REGION_FEATURES = [f"region_{r}" for r in REGION_ORDER[1:]] # drop south ALL_FEATURES = list(SIGN_PRIORS.keys()) + REGION_FEATURES # ── Feature engineering ──────────────────────────────────────── @dataclass class FeatureSpec: """Serializable feature spec so the loader can recreate training features.""" feature_cols: list[str] = field(default_factory=lambda: list(ALL_FEATURES)) region_order: list[str] = field(default_factory=lambda: list(REGION_ORDER)) top_industries: list[str] = field(default_factory=lambda: list(TOP_INDUSTRIES)) province_fdi: dict[str, float] = field(default_factory=lambda: dict(PROVINCE_FDI_MUSD)) default_fdi: float = DEFAULT_FDI sign_priors: dict[str, int] = field(default_factory=lambda: dict(SIGN_PRIORS)) current_year: int = CURRENT_YEAR def _connectivity_distance(conn: dict | None, key: str, default: float) -> float: if not conn or not isinstance(conn, dict): return default node = conn.get(key) if isinstance(node, dict): dist = node.get("distanceKm") or node.get("km") if isinstance(dist, (int, float)) and dist >= 0: return float(dist) return default def _logistics_score(dist_port: float, dist_airport: float, dist_highway: float) -> float: # Inverse-distance composite scaled to [0, 1]. Weights bias toward highway # proximity which matters most for trucking in VN industrial flows. def inv(d: float, cap: float) -> float: return max(0.0, 1.0 - min(d, cap) / cap) return round( 0.25 * inv(dist_port, 120) + 0.20 * inv(dist_airport, 80) + 0.55 * inv(dist_highway, 20), 4, ) def _industry_match(industries: list[str], target: str) -> int: lowered = [i.lower() for i in industries or []] return int(any(target in i for i in lowered)) def featureize(row: dict, spec: FeatureSpec) -> dict[str, float]: """Turn one park record into the flat feature vector used by the ridge.""" occupancy = row.get("occupancyRate") or 0 if occupancy > 1.5: # seed stores 0-100, plan normalizes to [0,1] occupancy = occupancy / 100.0 occupancy = min(max(occupancy, 0.0), 1.0) area_ha = float(row.get("totalAreaHa") or 0.0) established = row.get("establishedYear") or (spec.current_year - 10) park_age = max(0, spec.current_year - int(established)) conn = row.get("connectivity") or {} dist_port = _connectivity_distance(conn, "nearestPort", 60.0) dist_airport = _connectivity_distance(conn, "airport", 30.0) dist_highway = _connectivity_distance(conn, "highway", 5.0) logistics_score = _logistics_score(dist_port, dist_airport, dist_highway) province = (row.get("province") or "").strip().lower() fdi = spec.province_fdi.get(province, spec.default_fdi) incentives = row.get("incentives") or {} has_special = int(bool(incentives.get("specialZone"))) industries = row.get("targetIndustries") or [] region = str(row.get("region") or "south").lower() feats = { "occupancy": occupancy, "log_area_ha": math.log1p(area_ha), "park_age_years": float(park_age), "log_dist_port_km": math.log1p(dist_port), "log_dist_airport_km": math.log1p(dist_airport), "log_dist_highway_km": math.log1p(dist_highway), "logistics_connectivity_score": logistics_score, "log_fdi_province": math.log1p(fdi), "has_special_zone": float(has_special), } for ind in spec.top_industries: feats[f"ind_{ind}"] = float(_industry_match(industries, ind)) for r in spec.region_order[1:]: feats[f"region_{r}"] = float(region == r) return feats def build_feature_matrix(rows: list[dict], spec: FeatureSpec) -> tuple[np.ndarray, list[str]]: mats = [featureize(r, spec) for r in rows] cols = spec.feature_cols X = np.array([[m[c] for c in cols] for m in mats], dtype=np.float64) return X, cols # ── Sign-constrained ridge ───────────────────────────────────── def fit_ridge_nn(X: np.ndarray, y: np.ndarray, alpha: float, sign_vec: np.ndarray) -> np.ndarray: """Fit `y ≈ X @ β` with ridge penalty α and sign constraints. sign_vec[i] ∈ {−1, 0, +1}. For +1/−1 entries, the returned coefficient is constrained to have that sign. Solved as NNLS on the augmented system: minimize ‖[X; sqrt(α)*I] β̃ − [y; 0]‖² subject to β̃ ≥ 0 with features pre-multiplied by sign_vec (so "−1" features become "expect positive after flipping"). For sign 0 (e.g. neutral industry flags) we keep the feature unsigned by solving the corresponding coefficient on ±-split columns. """ n, p = X.shape # Expand each sign==0 column into two columns (positive and negative part) # so the NNLS solve can recover an unconstrained coefficient as β = β⁺ − β⁻. expand_cols: list[np.ndarray] = [] col_meta: list[tuple[int, int]] = [] # (orig_idx, +1 or -1) for j in range(p): if sign_vec[j] == 0: expand_cols.append(X[:, j]) col_meta.append((j, +1)) expand_cols.append(-X[:, j]) col_meta.append((j, -1)) else: # Flip so expected sign becomes +, enabling non-negativity constraint. expand_cols.append(sign_vec[j] * X[:, j]) col_meta.append((j, int(sign_vec[j]))) X_exp = np.stack(expand_cols, axis=1) # Augment for ridge. k = X_exp.shape[1] X_aug = np.vstack([X_exp, math.sqrt(alpha) * np.eye(k)]) y_aug = np.concatenate([y, np.zeros(k)]) beta_exp, _ = nnls(X_aug, y_aug, maxiter=5 * k) # Collapse expanded coefs back to original column indices. beta = np.zeros(p) for col_idx, (orig_j, sgn) in enumerate(col_meta): if sign_vec[orig_j] == 0: beta[orig_j] += sgn * beta_exp[col_idx] else: # sgn == sign_vec[orig_j]; β was fit on flipped column, so flip back. beta[orig_j] = sgn * beta_exp[col_idx] return beta # ── Model selection + conformal CI ───────────────────────────── def _pred(X: np.ndarray, beta: np.ndarray, intercept: float) -> np.ndarray: return X @ beta + intercept def loo_cv_mape( X: np.ndarray, y_log: np.ndarray, alpha: float, sign_vec: np.ndarray, scaler: StandardScaler, ) -> tuple[float, np.ndarray]: """Return (MAPE on original rent scale, LOO residual vector in log-space).""" n = X.shape[0] residuals_log = np.zeros(n) preds_rent = np.zeros(n) for i in range(n): mask = np.ones(n, dtype=bool) mask[i] = False X_train_raw = X[mask] X_train = scaler.fit_transform(X_train_raw) y_train = y_log[mask] intercept = float(np.mean(y_train)) X_cent = X_train beta = fit_ridge_nn(X_cent, y_train - intercept, alpha, sign_vec) x_test = scaler.transform(X[i : i + 1]) yhat_log = float(_pred(x_test, beta, intercept)[0]) residuals_log[i] = y_log[i] - yhat_log preds_rent[i] = math.expm1(yhat_log) y_true = np.expm1(y_log) mape = float(np.mean(np.abs(preds_rent - y_true) / np.maximum(y_true, 1e-6))) return mape, residuals_log def conformal_coverage(residuals_log: np.ndarray, q: float) -> float: return float(np.mean(np.abs(residuals_log) <= q)) # ── Training pipeline ────────────────────────────────────────── def train_head( rows: list[dict], target_key: str, spec: FeatureSpec, ) -> dict[str, Any]: """Fit one rent head and return a serializable head dict.""" valid = [r for r in rows if r.get(target_key) is not None] if len(valid) < 8: raise ValueError(f"Head '{target_key}': only {len(valid)} non-null rows — too few to train.") X, cols = build_feature_matrix(valid, spec) y_raw = np.array([r[target_key] for r in valid], dtype=np.float64) y_log = np.log1p(y_raw) sign_vec = np.array([spec.sign_priors.get(c, 0) for c in cols], dtype=np.int8) # Fit scaler on full (we also refit per-fold in LOO; this one is for final model). scaler_final = StandardScaler() scaler_final.fit(X) alphas = np.logspace(-2, 3, 20) best = None for a in alphas: mape, res = loo_cv_mape(X, y_log, a, sign_vec, StandardScaler()) if best is None or mape < best["mape"]: best = {"alpha": a, "mape": mape, "residuals_log": res} assert best is not None # Refit on full set with chosen alpha. X_std = scaler_final.transform(X) intercept = float(np.mean(y_log)) beta = fit_ridge_nn(X_std, y_log - intercept, best["alpha"], sign_vec) q80 = float(np.quantile(np.abs(best["residuals_log"]), 0.80)) coverage = conformal_coverage(best["residuals_log"], q80) # Per-region slice metrics. slices: dict[str, dict[str, float]] = {} regions = np.array([r.get("region", "south") for r in valid]) preds_rent = np.expm1(X_std @ beta + intercept) y_rent = np.expm1(y_log) for region in np.unique(regions): idx = np.where(regions == region)[0] if idx.size == 0: continue mape_slice = float( np.mean(np.abs(preds_rent[idx] - y_rent[idx]) / np.maximum(y_rent[idx], 1e-6)) ) slices[region] = { "n": int(idx.size), "mape_in_sample": round(mape_slice, 4), "median_residual_log": round(float(np.median(best["residuals_log"][idx])), 4), } return { "coefficients": beta, "intercept": intercept, "scaler": scaler_final, "alpha": float(best["alpha"]), "q80_log": q80, "feature_cols": cols, "sign_vec": sign_vec, "n_train": len(valid), "mape_loo": round(float(best["mape"]), 4), "coverage_80_loo": round(coverage, 4), "slices": slices, } def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser() parser.add_argument( "--input", default=os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data/industrial/parks.json", ), ) parser.add_argument( "--out", default=os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "models", ), ) args = parser.parse_args(argv) with open(args.input, "r", encoding="utf-8") as f: rows: list[dict] = json.load(f) spec = FeatureSpec() head_specs = { "land": "landRentUsdM2Year", "rbf": "rbfRentUsdM2Month", "rbw": "rbwRentUsdM2Month", } heads: dict[str, dict[str, Any]] = {} card_heads: dict[str, dict[str, Any]] = {} for head_name, target_key in head_specs.items(): print(f"→ Training head '{head_name}' on target '{target_key}'...") head = train_head(rows, target_key, spec) heads[head_name] = head card_heads[head_name] = { "target_column": target_key, "n_train": head["n_train"], "alpha": head["alpha"], "mape_loo": head["mape_loo"], "coverage_80_loo": head["coverage_80_loo"], "q80_log": round(head["q80_log"], 4), "top_coefficients": _top_coefs(head), "slices": head["slices"], } print( f" α={head['alpha']:.4g} MAPE_LOO={head['mape_loo']:.3f}" f" coverage_80={head['coverage_80_loo']:.3f} n={head['n_train']}" ) os.makedirs(args.out, exist_ok=True) pkl_path = os.path.join(args.out, "avm_industrial_park_ridge_v1.pkl") card_path = os.path.join(args.out, "avm_industrial_park_ridge_v1.model_card.json") # Serialize to a plain-dict artifact — no trainer class references — so the # API loader can unpickle without importing this training module. artifact = { "version": ARTIFACT_VERSION, "feature_spec": { "feature_cols": spec.feature_cols, "region_order": spec.region_order, "top_industries": spec.top_industries, "province_fdi": spec.province_fdi, "default_fdi": spec.default_fdi, "sign_priors": spec.sign_priors, "current_year": spec.current_year, }, "heads": { name: { "coefficients": np.asarray(head["coefficients"], dtype=np.float64), "intercept": float(head["intercept"]), "scaler_mean": np.asarray(head["scaler"].mean_, dtype=np.float64), "scaler_scale": np.asarray(head["scaler"].scale_, dtype=np.float64), "alpha": head["alpha"], "q80_log": head["q80_log"], "feature_cols": head["feature_cols"], "n_train": head["n_train"], "mape_loo": head["mape_loo"], "coverage_80_loo": head["coverage_80_loo"], } for name, head in heads.items() }, "trained_at": datetime.now(timezone.utc).isoformat(), } with open(pkl_path, "wb") as f: pickle.dump(artifact, f) card = { "version": ARTIFACT_VERSION, "trained_at": artifact["trained_at"], "n_parks_in_source": len(rows), "heads": card_heads, "warnings": [ "n_train < 30 per head — LOO metrics are noisy; interpret CIs as wide.", "Targets are log1p-transformed rent; CIs use conformal quantile on log residuals.", ], } with open(card_path, "w", encoding="utf-8") as f: json.dump(card, f, ensure_ascii=False, indent=2) print(f"\n✓ Wrote artifact → {pkl_path}") print(f"✓ Wrote model card → {card_path}") return 0 def _top_coefs(head: dict[str, Any], k: int = 8) -> list[dict[str, float]]: beta = head["coefficients"] cols = head["feature_cols"] order = np.argsort(-np.abs(beta))[:k] return [ {"feature": cols[i], "coef": round(float(beta[i]), 4)} for i in order if abs(beta[i]) > 1e-6 ] if __name__ == "__main__": sys.exit(main())