Files
goodgo-platform/libs/ai-services/scripts/train_avm_industrial_park.py

459 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Train the v1 park-level industrial AVM baseline (ridge + monotonic priors).
Context (TEC-2768 / R5.2.1):
The IndustrialPark table ships with ~20 seeded rows carrying three rent
heads: land (usd/m²/year), RBF (ready-built factory, usd/m²/month), and
RBW (ready-built warehouse, usd/m²/month). No IndustrialListing rows are
seeded, so tree-boosted models are not viable at n=20. This script fits a
regularized linear baseline on log-rent with sign-constrained coefficients
that encode domain monotonicity priors (occupancy ↑ rent, distance ↑ rent
↓, etc.). Conformal prediction over LOO residuals gives the 80% CI band.
Usage:
python libs/ai-services/scripts/train_avm_industrial_park.py \
--input libs/ai-services/data/industrial/parks.json \
--out libs/ai-services/models
Produces:
<out>/avm_industrial_park_ridge_v1.pkl — fitted artifact
<out>/avm_industrial_park_ridge_v1.model_card.json — metrics + slices
"""
from __future__ import annotations
import argparse
import json
import math
import os
import pickle
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
import numpy as np
from scipy.optimize import nnls
from sklearn.preprocessing import StandardScaler
# ── Constants ──────────────────────────────────────────────────
ARTIFACT_VERSION = "ridge-industrial-v1"
CURRENT_YEAR = 2026
REGION_ORDER = ["south", "north", "central"] # drop-first encoding
TOP_INDUSTRIES = ["electronics", "logistics", "automotive", "food", "garment", "plastics"]
# Province → FDI inflow in million USD (trailing 12m, approximate market data).
PROVINCE_FDI_MUSD: dict[str, float] = {
"tp. hồ chí minh": 5500,
"hà nội": 4200,
"bình dương": 4800,
"đồng nai": 3200,
"bắc ninh": 5800,
"hải phòng": 2800,
"long an": 1500,
"bà rịa - vũng tàu": 1800,
"hải dương": 800,
"hưng yên": 1200,
"bình phước": 400,
"nghệ An": 350,
"nghệ an": 350,
"quảng nam": 500,
"quảng ngãi": 600,
}
DEFAULT_FDI = 500.0
# Feature expected-sign map (+1 rent↑ when feature↑, 1 rent↓ when feature↑).
# Region one-hots stay unsigned (fixed effect).
SIGN_PRIORS: dict[str, int] = {
"occupancy": +1,
"log_area_ha": +1,
"park_age_years": -1,
"log_dist_port_km": -1,
"log_dist_airport_km": -1,
"log_dist_highway_km": -1,
"logistics_connectivity_score": +1,
"log_fdi_province": +1,
"has_special_zone": +1,
"ind_electronics": +1,
"ind_logistics": +1,
"ind_automotive": +1,
"ind_food": 0,
"ind_garment": 0,
"ind_plastics": 0,
}
MONOTONIC_FEATURES = [f for f, s in SIGN_PRIORS.items() if s != 0]
REGION_FEATURES = [f"region_{r}" for r in REGION_ORDER[1:]] # drop south
ALL_FEATURES = list(SIGN_PRIORS.keys()) + REGION_FEATURES
# ── Feature engineering ────────────────────────────────────────
@dataclass
class FeatureSpec:
"""Serializable feature spec so the loader can recreate training features."""
feature_cols: list[str] = field(default_factory=lambda: list(ALL_FEATURES))
region_order: list[str] = field(default_factory=lambda: list(REGION_ORDER))
top_industries: list[str] = field(default_factory=lambda: list(TOP_INDUSTRIES))
province_fdi: dict[str, float] = field(default_factory=lambda: dict(PROVINCE_FDI_MUSD))
default_fdi: float = DEFAULT_FDI
sign_priors: dict[str, int] = field(default_factory=lambda: dict(SIGN_PRIORS))
current_year: int = CURRENT_YEAR
def _connectivity_distance(conn: dict | None, key: str, default: float) -> float:
if not conn or not isinstance(conn, dict):
return default
node = conn.get(key)
if isinstance(node, dict):
dist = node.get("distanceKm") or node.get("km")
if isinstance(dist, (int, float)) and dist >= 0:
return float(dist)
return default
def _logistics_score(dist_port: float, dist_airport: float, dist_highway: float) -> float:
# Inverse-distance composite scaled to [0, 1]. Weights bias toward highway
# proximity which matters most for trucking in VN industrial flows.
def inv(d: float, cap: float) -> float:
return max(0.0, 1.0 - min(d, cap) / cap)
return round(
0.25 * inv(dist_port, 120)
+ 0.20 * inv(dist_airport, 80)
+ 0.55 * inv(dist_highway, 20),
4,
)
def _industry_match(industries: list[str], target: str) -> int:
lowered = [i.lower() for i in industries or []]
return int(any(target in i for i in lowered))
def featureize(row: dict, spec: FeatureSpec) -> dict[str, float]:
"""Turn one park record into the flat feature vector used by the ridge."""
occupancy = row.get("occupancyRate") or 0
if occupancy > 1.5: # seed stores 0-100, plan normalizes to [0,1]
occupancy = occupancy / 100.0
occupancy = min(max(occupancy, 0.0), 1.0)
area_ha = float(row.get("totalAreaHa") or 0.0)
established = row.get("establishedYear") or (spec.current_year - 10)
park_age = max(0, spec.current_year - int(established))
conn = row.get("connectivity") or {}
dist_port = _connectivity_distance(conn, "nearestPort", 60.0)
dist_airport = _connectivity_distance(conn, "airport", 30.0)
dist_highway = _connectivity_distance(conn, "highway", 5.0)
logistics_score = _logistics_score(dist_port, dist_airport, dist_highway)
province = (row.get("province") or "").strip().lower()
fdi = spec.province_fdi.get(province, spec.default_fdi)
incentives = row.get("incentives") or {}
has_special = int(bool(incentives.get("specialZone")))
industries = row.get("targetIndustries") or []
region = str(row.get("region") or "south").lower()
feats = {
"occupancy": occupancy,
"log_area_ha": math.log1p(area_ha),
"park_age_years": float(park_age),
"log_dist_port_km": math.log1p(dist_port),
"log_dist_airport_km": math.log1p(dist_airport),
"log_dist_highway_km": math.log1p(dist_highway),
"logistics_connectivity_score": logistics_score,
"log_fdi_province": math.log1p(fdi),
"has_special_zone": float(has_special),
}
for ind in spec.top_industries:
feats[f"ind_{ind}"] = float(_industry_match(industries, ind))
for r in spec.region_order[1:]:
feats[f"region_{r}"] = float(region == r)
return feats
def build_feature_matrix(rows: list[dict], spec: FeatureSpec) -> tuple[np.ndarray, list[str]]:
mats = [featureize(r, spec) for r in rows]
cols = spec.feature_cols
X = np.array([[m[c] for c in cols] for m in mats], dtype=np.float64)
return X, cols
# ── Sign-constrained ridge ─────────────────────────────────────
def fit_ridge_nn(X: np.ndarray, y: np.ndarray, alpha: float, sign_vec: np.ndarray) -> np.ndarray:
"""Fit `y ≈ X @ β` with ridge penalty α and sign constraints.
sign_vec[i] ∈ {1, 0, +1}. For +1/1 entries, the returned coefficient is
constrained to have that sign. Solved as NNLS on the augmented system:
minimize ‖[X; sqrt(α)*I] β̃ [y; 0]‖² subject to β̃ ≥ 0
with features pre-multiplied by sign_vec (so "1" features become "expect
positive after flipping"). For sign 0 (e.g. neutral industry flags) we keep
the feature unsigned by solving the corresponding coefficient on ±-split
columns.
"""
n, p = X.shape
# Expand each sign==0 column into two columns (positive and negative part)
# so the NNLS solve can recover an unconstrained coefficient as β = β⁺ β⁻.
expand_cols: list[np.ndarray] = []
col_meta: list[tuple[int, int]] = [] # (orig_idx, +1 or -1)
for j in range(p):
if sign_vec[j] == 0:
expand_cols.append(X[:, j])
col_meta.append((j, +1))
expand_cols.append(-X[:, j])
col_meta.append((j, -1))
else:
# Flip so expected sign becomes +, enabling non-negativity constraint.
expand_cols.append(sign_vec[j] * X[:, j])
col_meta.append((j, int(sign_vec[j])))
X_exp = np.stack(expand_cols, axis=1)
# Augment for ridge.
k = X_exp.shape[1]
X_aug = np.vstack([X_exp, math.sqrt(alpha) * np.eye(k)])
y_aug = np.concatenate([y, np.zeros(k)])
beta_exp, _ = nnls(X_aug, y_aug, maxiter=5 * k)
# Collapse expanded coefs back to original column indices.
beta = np.zeros(p)
for col_idx, (orig_j, sgn) in enumerate(col_meta):
if sign_vec[orig_j] == 0:
beta[orig_j] += sgn * beta_exp[col_idx]
else:
# sgn == sign_vec[orig_j]; β was fit on flipped column, so flip back.
beta[orig_j] = sgn * beta_exp[col_idx]
return beta
# ── Model selection + conformal CI ─────────────────────────────
def _pred(X: np.ndarray, beta: np.ndarray, intercept: float) -> np.ndarray:
return X @ beta + intercept
def loo_cv_mape(
X: np.ndarray,
y_log: np.ndarray,
alpha: float,
sign_vec: np.ndarray,
scaler: StandardScaler,
) -> tuple[float, np.ndarray]:
"""Return (MAPE on original rent scale, LOO residual vector in log-space)."""
n = X.shape[0]
residuals_log = np.zeros(n)
preds_rent = np.zeros(n)
for i in range(n):
mask = np.ones(n, dtype=bool)
mask[i] = False
X_train_raw = X[mask]
X_train = scaler.fit_transform(X_train_raw)
y_train = y_log[mask]
intercept = float(np.mean(y_train))
X_cent = X_train
beta = fit_ridge_nn(X_cent, y_train - intercept, alpha, sign_vec)
x_test = scaler.transform(X[i : i + 1])
yhat_log = float(_pred(x_test, beta, intercept)[0])
residuals_log[i] = y_log[i] - yhat_log
preds_rent[i] = math.expm1(yhat_log)
y_true = np.expm1(y_log)
mape = float(np.mean(np.abs(preds_rent - y_true) / np.maximum(y_true, 1e-6)))
return mape, residuals_log
def conformal_coverage(residuals_log: np.ndarray, q: float) -> float:
return float(np.mean(np.abs(residuals_log) <= q))
# ── Training pipeline ──────────────────────────────────────────
def train_head(
rows: list[dict],
target_key: str,
spec: FeatureSpec,
) -> dict[str, Any]:
"""Fit one rent head and return a serializable head dict."""
valid = [r for r in rows if r.get(target_key) is not None]
if len(valid) < 8:
raise ValueError(f"Head '{target_key}': only {len(valid)} non-null rows — too few to train.")
X, cols = build_feature_matrix(valid, spec)
y_raw = np.array([r[target_key] for r in valid], dtype=np.float64)
y_log = np.log1p(y_raw)
sign_vec = np.array([spec.sign_priors.get(c, 0) for c in cols], dtype=np.int8)
# Fit scaler on full (we also refit per-fold in LOO; this one is for final model).
scaler_final = StandardScaler()
scaler_final.fit(X)
alphas = np.logspace(-2, 3, 20)
best = None
for a in alphas:
mape, res = loo_cv_mape(X, y_log, a, sign_vec, StandardScaler())
if best is None or mape < best["mape"]:
best = {"alpha": a, "mape": mape, "residuals_log": res}
assert best is not None
# Refit on full set with chosen alpha.
X_std = scaler_final.transform(X)
intercept = float(np.mean(y_log))
beta = fit_ridge_nn(X_std, y_log - intercept, best["alpha"], sign_vec)
q80 = float(np.quantile(np.abs(best["residuals_log"]), 0.80))
coverage = conformal_coverage(best["residuals_log"], q80)
# Per-region slice metrics.
slices: dict[str, dict[str, float]] = {}
regions = np.array([r.get("region", "south") for r in valid])
preds_rent = np.expm1(X_std @ beta + intercept)
y_rent = np.expm1(y_log)
for region in np.unique(regions):
idx = np.where(regions == region)[0]
if idx.size == 0:
continue
mape_slice = float(
np.mean(np.abs(preds_rent[idx] - y_rent[idx]) / np.maximum(y_rent[idx], 1e-6))
)
slices[region] = {
"n": int(idx.size),
"mape_in_sample": round(mape_slice, 4),
"median_residual_log": round(float(np.median(best["residuals_log"][idx])), 4),
}
return {
"coefficients": beta,
"intercept": intercept,
"scaler": scaler_final,
"alpha": float(best["alpha"]),
"q80_log": q80,
"feature_cols": cols,
"sign_vec": sign_vec,
"n_train": len(valid),
"mape_loo": round(float(best["mape"]), 4),
"coverage_80_loo": round(coverage, 4),
"slices": slices,
}
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
default=os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"data/industrial/parks.json",
),
)
parser.add_argument(
"--out",
default=os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"models",
),
)
args = parser.parse_args(argv)
with open(args.input, "r", encoding="utf-8") as f:
rows: list[dict] = json.load(f)
spec = FeatureSpec()
head_specs = {
"land": "landRentUsdM2Year",
"rbf": "rbfRentUsdM2Month",
"rbw": "rbwRentUsdM2Month",
}
heads: dict[str, dict[str, Any]] = {}
card_heads: dict[str, dict[str, Any]] = {}
for head_name, target_key in head_specs.items():
print(f"→ Training head '{head_name}' on target '{target_key}'...")
head = train_head(rows, target_key, spec)
heads[head_name] = head
card_heads[head_name] = {
"target_column": target_key,
"n_train": head["n_train"],
"alpha": head["alpha"],
"mape_loo": head["mape_loo"],
"coverage_80_loo": head["coverage_80_loo"],
"q80_log": round(head["q80_log"], 4),
"top_coefficients": _top_coefs(head),
"slices": head["slices"],
}
print(
f" α={head['alpha']:.4g} MAPE_LOO={head['mape_loo']:.3f}"
f" coverage_80={head['coverage_80_loo']:.3f} n={head['n_train']}"
)
os.makedirs(args.out, exist_ok=True)
pkl_path = os.path.join(args.out, "avm_industrial_park_ridge_v1.pkl")
card_path = os.path.join(args.out, "avm_industrial_park_ridge_v1.model_card.json")
# Serialize to a plain-dict artifact — no trainer class references — so the
# API loader can unpickle without importing this training module.
artifact = {
"version": ARTIFACT_VERSION,
"feature_spec": {
"feature_cols": spec.feature_cols,
"region_order": spec.region_order,
"top_industries": spec.top_industries,
"province_fdi": spec.province_fdi,
"default_fdi": spec.default_fdi,
"sign_priors": spec.sign_priors,
"current_year": spec.current_year,
},
"heads": {
name: {
"coefficients": np.asarray(head["coefficients"], dtype=np.float64),
"intercept": float(head["intercept"]),
"scaler_mean": np.asarray(head["scaler"].mean_, dtype=np.float64),
"scaler_scale": np.asarray(head["scaler"].scale_, dtype=np.float64),
"alpha": head["alpha"],
"q80_log": head["q80_log"],
"feature_cols": head["feature_cols"],
"n_train": head["n_train"],
"mape_loo": head["mape_loo"],
"coverage_80_loo": head["coverage_80_loo"],
}
for name, head in heads.items()
},
"trained_at": datetime.now(timezone.utc).isoformat(),
}
with open(pkl_path, "wb") as f:
pickle.dump(artifact, f)
card = {
"version": ARTIFACT_VERSION,
"trained_at": artifact["trained_at"],
"n_parks_in_source": len(rows),
"heads": card_heads,
"warnings": [
"n_train < 30 per head — LOO metrics are noisy; interpret CIs as wide.",
"Targets are log1p-transformed rent; CIs use conformal quantile on log residuals.",
],
}
with open(card_path, "w", encoding="utf-8") as f:
json.dump(card, f, ensure_ascii=False, indent=2)
print(f"\n✓ Wrote artifact → {pkl_path}")
print(f"✓ Wrote model card → {card_path}")
return 0
def _top_coefs(head: dict[str, Any], k: int = 8) -> list[dict[str, float]]:
beta = head["coefficients"]
cols = head["feature_cols"]
order = np.argsort(-np.abs(beta))[:k]
return [
{"feature": cols[i], "coef": round(float(beta[i]), 4)}
for i in order
if abs(beta[i]) > 1e-6
]
if __name__ == "__main__":
sys.exit(main())