459 lines
17 KiB
Python
459 lines
17 KiB
Python
"""Train the v1 park-level industrial AVM baseline (ridge + monotonic priors).
|
||
|
||
Context (TEC-2768 / R5.2.1):
|
||
The IndustrialPark table ships with ~20 seeded rows carrying three rent
|
||
heads: land (usd/m²/year), RBF (ready-built factory, usd/m²/month), and
|
||
RBW (ready-built warehouse, usd/m²/month). No IndustrialListing rows are
|
||
seeded, so tree-boosted models are not viable at n=20. This script fits a
|
||
regularized linear baseline on log-rent with sign-constrained coefficients
|
||
that encode domain monotonicity priors (occupancy ↑ rent, distance ↑ rent
|
||
↓, etc.). Conformal prediction over LOO residuals gives the 80% CI band.
|
||
|
||
Usage:
|
||
python libs/ai-services/scripts/train_avm_industrial_park.py \
|
||
--input libs/ai-services/data/industrial/parks.json \
|
||
--out libs/ai-services/models
|
||
|
||
Produces:
|
||
<out>/avm_industrial_park_ridge_v1.pkl — fitted artifact
|
||
<out>/avm_industrial_park_ridge_v1.model_card.json — metrics + slices
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import math
|
||
import os
|
||
import pickle
|
||
import sys
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timezone
|
||
from typing import Any
|
||
|
||
import numpy as np
|
||
from scipy.optimize import nnls
|
||
from sklearn.preprocessing import StandardScaler
|
||
|
||
# ── Constants ──────────────────────────────────────────────────
|
||
ARTIFACT_VERSION = "ridge-industrial-v1"
|
||
CURRENT_YEAR = 2026
|
||
|
||
REGION_ORDER = ["south", "north", "central"] # drop-first encoding
|
||
TOP_INDUSTRIES = ["electronics", "logistics", "automotive", "food", "garment", "plastics"]
|
||
|
||
# Province → FDI inflow in million USD (trailing 12m, approximate market data).
|
||
PROVINCE_FDI_MUSD: dict[str, float] = {
|
||
"tp. hồ chí minh": 5500,
|
||
"hà nội": 4200,
|
||
"bình dương": 4800,
|
||
"đồng nai": 3200,
|
||
"bắc ninh": 5800,
|
||
"hải phòng": 2800,
|
||
"long an": 1500,
|
||
"bà rịa - vũng tàu": 1800,
|
||
"hải dương": 800,
|
||
"hưng yên": 1200,
|
||
"bình phước": 400,
|
||
"nghệ An": 350,
|
||
"nghệ an": 350,
|
||
"quảng nam": 500,
|
||
"quảng ngãi": 600,
|
||
}
|
||
DEFAULT_FDI = 500.0
|
||
|
||
# Feature expected-sign map (+1 rent↑ when feature↑, −1 rent↓ when feature↑).
|
||
# Region one-hots stay unsigned (fixed effect).
|
||
SIGN_PRIORS: dict[str, int] = {
|
||
"occupancy": +1,
|
||
"log_area_ha": +1,
|
||
"park_age_years": -1,
|
||
"log_dist_port_km": -1,
|
||
"log_dist_airport_km": -1,
|
||
"log_dist_highway_km": -1,
|
||
"logistics_connectivity_score": +1,
|
||
"log_fdi_province": +1,
|
||
"has_special_zone": +1,
|
||
"ind_electronics": +1,
|
||
"ind_logistics": +1,
|
||
"ind_automotive": +1,
|
||
"ind_food": 0,
|
||
"ind_garment": 0,
|
||
"ind_plastics": 0,
|
||
}
|
||
MONOTONIC_FEATURES = [f for f, s in SIGN_PRIORS.items() if s != 0]
|
||
REGION_FEATURES = [f"region_{r}" for r in REGION_ORDER[1:]] # drop south
|
||
ALL_FEATURES = list(SIGN_PRIORS.keys()) + REGION_FEATURES
|
||
|
||
|
||
# ── Feature engineering ────────────────────────────────────────
|
||
@dataclass
|
||
class FeatureSpec:
|
||
"""Serializable feature spec so the loader can recreate training features."""
|
||
|
||
feature_cols: list[str] = field(default_factory=lambda: list(ALL_FEATURES))
|
||
region_order: list[str] = field(default_factory=lambda: list(REGION_ORDER))
|
||
top_industries: list[str] = field(default_factory=lambda: list(TOP_INDUSTRIES))
|
||
province_fdi: dict[str, float] = field(default_factory=lambda: dict(PROVINCE_FDI_MUSD))
|
||
default_fdi: float = DEFAULT_FDI
|
||
sign_priors: dict[str, int] = field(default_factory=lambda: dict(SIGN_PRIORS))
|
||
current_year: int = CURRENT_YEAR
|
||
|
||
|
||
def _connectivity_distance(conn: dict | None, key: str, default: float) -> float:
|
||
if not conn or not isinstance(conn, dict):
|
||
return default
|
||
node = conn.get(key)
|
||
if isinstance(node, dict):
|
||
dist = node.get("distanceKm") or node.get("km")
|
||
if isinstance(dist, (int, float)) and dist >= 0:
|
||
return float(dist)
|
||
return default
|
||
|
||
|
||
def _logistics_score(dist_port: float, dist_airport: float, dist_highway: float) -> float:
|
||
# Inverse-distance composite scaled to [0, 1]. Weights bias toward highway
|
||
# proximity which matters most for trucking in VN industrial flows.
|
||
def inv(d: float, cap: float) -> float:
|
||
return max(0.0, 1.0 - min(d, cap) / cap)
|
||
return round(
|
||
0.25 * inv(dist_port, 120)
|
||
+ 0.20 * inv(dist_airport, 80)
|
||
+ 0.55 * inv(dist_highway, 20),
|
||
4,
|
||
)
|
||
|
||
|
||
def _industry_match(industries: list[str], target: str) -> int:
|
||
lowered = [i.lower() for i in industries or []]
|
||
return int(any(target in i for i in lowered))
|
||
|
||
|
||
def featureize(row: dict, spec: FeatureSpec) -> dict[str, float]:
|
||
"""Turn one park record into the flat feature vector used by the ridge."""
|
||
occupancy = row.get("occupancyRate") or 0
|
||
if occupancy > 1.5: # seed stores 0-100, plan normalizes to [0,1]
|
||
occupancy = occupancy / 100.0
|
||
occupancy = min(max(occupancy, 0.0), 1.0)
|
||
|
||
area_ha = float(row.get("totalAreaHa") or 0.0)
|
||
established = row.get("establishedYear") or (spec.current_year - 10)
|
||
park_age = max(0, spec.current_year - int(established))
|
||
|
||
conn = row.get("connectivity") or {}
|
||
dist_port = _connectivity_distance(conn, "nearestPort", 60.0)
|
||
dist_airport = _connectivity_distance(conn, "airport", 30.0)
|
||
dist_highway = _connectivity_distance(conn, "highway", 5.0)
|
||
|
||
logistics_score = _logistics_score(dist_port, dist_airport, dist_highway)
|
||
|
||
province = (row.get("province") or "").strip().lower()
|
||
fdi = spec.province_fdi.get(province, spec.default_fdi)
|
||
|
||
incentives = row.get("incentives") or {}
|
||
has_special = int(bool(incentives.get("specialZone")))
|
||
|
||
industries = row.get("targetIndustries") or []
|
||
region = str(row.get("region") or "south").lower()
|
||
|
||
feats = {
|
||
"occupancy": occupancy,
|
||
"log_area_ha": math.log1p(area_ha),
|
||
"park_age_years": float(park_age),
|
||
"log_dist_port_km": math.log1p(dist_port),
|
||
"log_dist_airport_km": math.log1p(dist_airport),
|
||
"log_dist_highway_km": math.log1p(dist_highway),
|
||
"logistics_connectivity_score": logistics_score,
|
||
"log_fdi_province": math.log1p(fdi),
|
||
"has_special_zone": float(has_special),
|
||
}
|
||
for ind in spec.top_industries:
|
||
feats[f"ind_{ind}"] = float(_industry_match(industries, ind))
|
||
for r in spec.region_order[1:]:
|
||
feats[f"region_{r}"] = float(region == r)
|
||
return feats
|
||
|
||
|
||
def build_feature_matrix(rows: list[dict], spec: FeatureSpec) -> tuple[np.ndarray, list[str]]:
|
||
mats = [featureize(r, spec) for r in rows]
|
||
cols = spec.feature_cols
|
||
X = np.array([[m[c] for c in cols] for m in mats], dtype=np.float64)
|
||
return X, cols
|
||
|
||
|
||
# ── Sign-constrained ridge ─────────────────────────────────────
|
||
def fit_ridge_nn(X: np.ndarray, y: np.ndarray, alpha: float, sign_vec: np.ndarray) -> np.ndarray:
|
||
"""Fit `y ≈ X @ β` with ridge penalty α and sign constraints.
|
||
|
||
sign_vec[i] ∈ {−1, 0, +1}. For +1/−1 entries, the returned coefficient is
|
||
constrained to have that sign. Solved as NNLS on the augmented system:
|
||
minimize ‖[X; sqrt(α)*I] β̃ − [y; 0]‖² subject to β̃ ≥ 0
|
||
with features pre-multiplied by sign_vec (so "−1" features become "expect
|
||
positive after flipping"). For sign 0 (e.g. neutral industry flags) we keep
|
||
the feature unsigned by solving the corresponding coefficient on ±-split
|
||
columns.
|
||
"""
|
||
n, p = X.shape
|
||
|
||
# Expand each sign==0 column into two columns (positive and negative part)
|
||
# so the NNLS solve can recover an unconstrained coefficient as β = β⁺ − β⁻.
|
||
expand_cols: list[np.ndarray] = []
|
||
col_meta: list[tuple[int, int]] = [] # (orig_idx, +1 or -1)
|
||
for j in range(p):
|
||
if sign_vec[j] == 0:
|
||
expand_cols.append(X[:, j])
|
||
col_meta.append((j, +1))
|
||
expand_cols.append(-X[:, j])
|
||
col_meta.append((j, -1))
|
||
else:
|
||
# Flip so expected sign becomes +, enabling non-negativity constraint.
|
||
expand_cols.append(sign_vec[j] * X[:, j])
|
||
col_meta.append((j, int(sign_vec[j])))
|
||
X_exp = np.stack(expand_cols, axis=1)
|
||
|
||
# Augment for ridge.
|
||
k = X_exp.shape[1]
|
||
X_aug = np.vstack([X_exp, math.sqrt(alpha) * np.eye(k)])
|
||
y_aug = np.concatenate([y, np.zeros(k)])
|
||
|
||
beta_exp, _ = nnls(X_aug, y_aug, maxiter=5 * k)
|
||
|
||
# Collapse expanded coefs back to original column indices.
|
||
beta = np.zeros(p)
|
||
for col_idx, (orig_j, sgn) in enumerate(col_meta):
|
||
if sign_vec[orig_j] == 0:
|
||
beta[orig_j] += sgn * beta_exp[col_idx]
|
||
else:
|
||
# sgn == sign_vec[orig_j]; β was fit on flipped column, so flip back.
|
||
beta[orig_j] = sgn * beta_exp[col_idx]
|
||
return beta
|
||
|
||
|
||
# ── Model selection + conformal CI ─────────────────────────────
|
||
def _pred(X: np.ndarray, beta: np.ndarray, intercept: float) -> np.ndarray:
|
||
return X @ beta + intercept
|
||
|
||
|
||
def loo_cv_mape(
|
||
X: np.ndarray,
|
||
y_log: np.ndarray,
|
||
alpha: float,
|
||
sign_vec: np.ndarray,
|
||
scaler: StandardScaler,
|
||
) -> tuple[float, np.ndarray]:
|
||
"""Return (MAPE on original rent scale, LOO residual vector in log-space)."""
|
||
n = X.shape[0]
|
||
residuals_log = np.zeros(n)
|
||
preds_rent = np.zeros(n)
|
||
for i in range(n):
|
||
mask = np.ones(n, dtype=bool)
|
||
mask[i] = False
|
||
X_train_raw = X[mask]
|
||
X_train = scaler.fit_transform(X_train_raw)
|
||
y_train = y_log[mask]
|
||
intercept = float(np.mean(y_train))
|
||
X_cent = X_train
|
||
beta = fit_ridge_nn(X_cent, y_train - intercept, alpha, sign_vec)
|
||
|
||
x_test = scaler.transform(X[i : i + 1])
|
||
yhat_log = float(_pred(x_test, beta, intercept)[0])
|
||
residuals_log[i] = y_log[i] - yhat_log
|
||
preds_rent[i] = math.expm1(yhat_log)
|
||
|
||
y_true = np.expm1(y_log)
|
||
mape = float(np.mean(np.abs(preds_rent - y_true) / np.maximum(y_true, 1e-6)))
|
||
return mape, residuals_log
|
||
|
||
|
||
def conformal_coverage(residuals_log: np.ndarray, q: float) -> float:
|
||
return float(np.mean(np.abs(residuals_log) <= q))
|
||
|
||
|
||
# ── Training pipeline ──────────────────────────────────────────
|
||
def train_head(
|
||
rows: list[dict],
|
||
target_key: str,
|
||
spec: FeatureSpec,
|
||
) -> dict[str, Any]:
|
||
"""Fit one rent head and return a serializable head dict."""
|
||
valid = [r for r in rows if r.get(target_key) is not None]
|
||
if len(valid) < 8:
|
||
raise ValueError(f"Head '{target_key}': only {len(valid)} non-null rows — too few to train.")
|
||
|
||
X, cols = build_feature_matrix(valid, spec)
|
||
y_raw = np.array([r[target_key] for r in valid], dtype=np.float64)
|
||
y_log = np.log1p(y_raw)
|
||
|
||
sign_vec = np.array([spec.sign_priors.get(c, 0) for c in cols], dtype=np.int8)
|
||
|
||
# Fit scaler on full (we also refit per-fold in LOO; this one is for final model).
|
||
scaler_final = StandardScaler()
|
||
scaler_final.fit(X)
|
||
|
||
alphas = np.logspace(-2, 3, 20)
|
||
best = None
|
||
for a in alphas:
|
||
mape, res = loo_cv_mape(X, y_log, a, sign_vec, StandardScaler())
|
||
if best is None or mape < best["mape"]:
|
||
best = {"alpha": a, "mape": mape, "residuals_log": res}
|
||
assert best is not None
|
||
|
||
# Refit on full set with chosen alpha.
|
||
X_std = scaler_final.transform(X)
|
||
intercept = float(np.mean(y_log))
|
||
beta = fit_ridge_nn(X_std, y_log - intercept, best["alpha"], sign_vec)
|
||
|
||
q80 = float(np.quantile(np.abs(best["residuals_log"]), 0.80))
|
||
coverage = conformal_coverage(best["residuals_log"], q80)
|
||
|
||
# Per-region slice metrics.
|
||
slices: dict[str, dict[str, float]] = {}
|
||
regions = np.array([r.get("region", "south") for r in valid])
|
||
preds_rent = np.expm1(X_std @ beta + intercept)
|
||
y_rent = np.expm1(y_log)
|
||
for region in np.unique(regions):
|
||
idx = np.where(regions == region)[0]
|
||
if idx.size == 0:
|
||
continue
|
||
mape_slice = float(
|
||
np.mean(np.abs(preds_rent[idx] - y_rent[idx]) / np.maximum(y_rent[idx], 1e-6))
|
||
)
|
||
slices[region] = {
|
||
"n": int(idx.size),
|
||
"mape_in_sample": round(mape_slice, 4),
|
||
"median_residual_log": round(float(np.median(best["residuals_log"][idx])), 4),
|
||
}
|
||
|
||
return {
|
||
"coefficients": beta,
|
||
"intercept": intercept,
|
||
"scaler": scaler_final,
|
||
"alpha": float(best["alpha"]),
|
||
"q80_log": q80,
|
||
"feature_cols": cols,
|
||
"sign_vec": sign_vec,
|
||
"n_train": len(valid),
|
||
"mape_loo": round(float(best["mape"]), 4),
|
||
"coverage_80_loo": round(coverage, 4),
|
||
"slices": slices,
|
||
}
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument(
|
||
"--input",
|
||
default=os.path.join(
|
||
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
||
"data/industrial/parks.json",
|
||
),
|
||
)
|
||
parser.add_argument(
|
||
"--out",
|
||
default=os.path.join(
|
||
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
||
"models",
|
||
),
|
||
)
|
||
args = parser.parse_args(argv)
|
||
|
||
with open(args.input, "r", encoding="utf-8") as f:
|
||
rows: list[dict] = json.load(f)
|
||
|
||
spec = FeatureSpec()
|
||
|
||
head_specs = {
|
||
"land": "landRentUsdM2Year",
|
||
"rbf": "rbfRentUsdM2Month",
|
||
"rbw": "rbwRentUsdM2Month",
|
||
}
|
||
heads: dict[str, dict[str, Any]] = {}
|
||
card_heads: dict[str, dict[str, Any]] = {}
|
||
for head_name, target_key in head_specs.items():
|
||
print(f"→ Training head '{head_name}' on target '{target_key}'...")
|
||
head = train_head(rows, target_key, spec)
|
||
heads[head_name] = head
|
||
card_heads[head_name] = {
|
||
"target_column": target_key,
|
||
"n_train": head["n_train"],
|
||
"alpha": head["alpha"],
|
||
"mape_loo": head["mape_loo"],
|
||
"coverage_80_loo": head["coverage_80_loo"],
|
||
"q80_log": round(head["q80_log"], 4),
|
||
"top_coefficients": _top_coefs(head),
|
||
"slices": head["slices"],
|
||
}
|
||
print(
|
||
f" α={head['alpha']:.4g} MAPE_LOO={head['mape_loo']:.3f}"
|
||
f" coverage_80={head['coverage_80_loo']:.3f} n={head['n_train']}"
|
||
)
|
||
|
||
os.makedirs(args.out, exist_ok=True)
|
||
pkl_path = os.path.join(args.out, "avm_industrial_park_ridge_v1.pkl")
|
||
card_path = os.path.join(args.out, "avm_industrial_park_ridge_v1.model_card.json")
|
||
|
||
# Serialize to a plain-dict artifact — no trainer class references — so the
|
||
# API loader can unpickle without importing this training module.
|
||
artifact = {
|
||
"version": ARTIFACT_VERSION,
|
||
"feature_spec": {
|
||
"feature_cols": spec.feature_cols,
|
||
"region_order": spec.region_order,
|
||
"top_industries": spec.top_industries,
|
||
"province_fdi": spec.province_fdi,
|
||
"default_fdi": spec.default_fdi,
|
||
"sign_priors": spec.sign_priors,
|
||
"current_year": spec.current_year,
|
||
},
|
||
"heads": {
|
||
name: {
|
||
"coefficients": np.asarray(head["coefficients"], dtype=np.float64),
|
||
"intercept": float(head["intercept"]),
|
||
"scaler_mean": np.asarray(head["scaler"].mean_, dtype=np.float64),
|
||
"scaler_scale": np.asarray(head["scaler"].scale_, dtype=np.float64),
|
||
"alpha": head["alpha"],
|
||
"q80_log": head["q80_log"],
|
||
"feature_cols": head["feature_cols"],
|
||
"n_train": head["n_train"],
|
||
"mape_loo": head["mape_loo"],
|
||
"coverage_80_loo": head["coverage_80_loo"],
|
||
}
|
||
for name, head in heads.items()
|
||
},
|
||
"trained_at": datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
with open(pkl_path, "wb") as f:
|
||
pickle.dump(artifact, f)
|
||
|
||
card = {
|
||
"version": ARTIFACT_VERSION,
|
||
"trained_at": artifact["trained_at"],
|
||
"n_parks_in_source": len(rows),
|
||
"heads": card_heads,
|
||
"warnings": [
|
||
"n_train < 30 per head — LOO metrics are noisy; interpret CIs as wide.",
|
||
"Targets are log1p-transformed rent; CIs use conformal quantile on log residuals.",
|
||
],
|
||
}
|
||
with open(card_path, "w", encoding="utf-8") as f:
|
||
json.dump(card, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"\n✓ Wrote artifact → {pkl_path}")
|
||
print(f"✓ Wrote model card → {card_path}")
|
||
return 0
|
||
|
||
|
||
def _top_coefs(head: dict[str, Any], k: int = 8) -> list[dict[str, float]]:
|
||
beta = head["coefficients"]
|
||
cols = head["feature_cols"]
|
||
order = np.argsort(-np.abs(beta))[:k]
|
||
return [
|
||
{"feature": cols[i], "coef": round(float(beta[i]), 4)}
|
||
for i in order
|
||
if abs(beta[i]) > 1e-6
|
||
]
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|