feat: implement project development module, transfer management features, and industrial AVM model integration

This commit is contained in:
Ho Ngoc Hai
2026-04-18 20:34:35 +07:00
parent 0f3b4d7b0d
commit 38b9def99a
66 changed files with 9051 additions and 17 deletions

View File

@@ -1,12 +1,13 @@
"""Industrial AVM — Rent estimation service for industrial parks.
Heuristic fallback when trained models are not available.
Uses gradient boosting approach similar to residential AVM v2.
Preference order: park-level ridge baseline (v1, TEC-2768) → XGBoost → heuristic.
Heuristic fallback remains when no trained artifact is on disk.
"""
import logging
import math
import os
from datetime import datetime, timezone
import pickle
from typing import Any
import numpy as np
@@ -20,6 +21,21 @@ from app.models.avm_industrial import (
logger = logging.getLogger(__name__)
RIDGE_ARTIFACT_NAME = "avm_industrial_park_ridge_v1.pkl"
# Map API property types to the rent head trained in the ridge baseline.
# Land rent is stored as USD/m²/year; others as USD/m²/month — convert where
# needed so the response stays in USD/m²/month.
_PROPERTY_TO_HEAD: dict[str, str] = {
"warehouse": "rbw",
"ready_built_warehouse": "rbw",
"factory": "rbf",
"ready_built_factory": "rbf",
"office_in_park": "rbf",
"open_yard": "land",
"industrial_land": "land",
}
# ── Feature ordering for model input ────────────────────────────
INDUSTRIAL_FEATURE_NAMES = [
"region_encoded",
@@ -169,40 +185,171 @@ def _find_comparables(req: IndustrialAVMRequest) -> list[IndustrialComparable]:
class IndustrialAVMService:
"""Industrial property rent estimation service.
Uses gradient boosting when a trained model is available,
falls back to heuristic pricing for development/demo.
Preference order when a trained artifact is available:
1. Ridge v1 (park-level baseline with conformal CIs, TEC-2768)
2. XGBoost (legacy, listing-level)
3. Multi-factor heuristic (always available)
"""
def __init__(self) -> None:
self._model: Any = None
self._model_version = "heuristic-v1"
self._backend: str = "heuristic"
self._load_model()
def _load_model(self) -> None:
"""Attempt to load trained industrial AVM model."""
"""Attempt to load trained industrial AVM artifacts (ridge first)."""
try:
from app.config import settings
model_path = settings.model_path
except Exception:
logger.info("Industrial AVM: config unavailable — using heuristic")
return
ridge_path = os.path.join(model_path, RIDGE_ARTIFACT_NAME)
if os.path.exists(ridge_path):
try:
with open(ridge_path, "rb") as f:
artifact = pickle.load(f)
if not isinstance(artifact, dict) or artifact.get("version") != "ridge-industrial-v1":
raise ValueError(f"Unexpected artifact version in {ridge_path}")
self._model = artifact
self._model_version = "ridge-industrial-v1"
self._backend = "ridge"
logger.info("Loaded industrial AVM ridge artifact from %s", ridge_path)
return
except Exception as exc: # keep service alive on artifact corruption
logger.warning("Failed to load ridge artifact (%s); falling back", exc)
try:
import xgboost as xgb
from app.config import settings
path = os.path.join(settings.model_path, "avm_industrial_xgb.json")
if os.path.exists(path):
xgb_path = os.path.join(model_path, "avm_industrial_xgb.json")
if os.path.exists(xgb_path):
booster = xgb.Booster()
booster.load_model(path)
booster.load_model(xgb_path)
self._model = booster
self._model_version = "xgb-industrial-v1"
logger.info("Loaded industrial AVM model from %s", path)
else:
logger.info("No trained industrial AVM model — using heuristic")
self._backend = "xgb"
logger.info("Loaded industrial AVM xgb model from %s", xgb_path)
return
except Exception:
logger.info("Industrial AVM model not available — using heuristic")
pass
logger.info("No trained industrial AVM model — using heuristic")
def predict(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse:
"""Predict industrial property rent."""
if self._model is not None:
if self._backend == "ridge":
return self._predict_ridge(req)
if self._backend == "xgb" and self._model is not None:
return self._predict_model(req)
return self._predict_heuristic(req)
def _featureize_ridge(self, req: IndustrialAVMRequest, spec: dict) -> np.ndarray:
"""Build the exact feature vector used during ridge training.
Feature ordering must match `spec["feature_cols"]` which is the canonical
order emitted by the trainer. Sources:
- numeric fields come straight from the request
- province FDI comes from the artifact lookup (fallback to default)
- target-industry flags approximate one-hots against top-6 list
"""
province = (req.province or "").strip().lower()
fdi = spec["province_fdi"].get(province, spec["default_fdi"])
occupancy = float(req.park_occupancy_rate)
if occupancy > 1.5:
occupancy = occupancy / 100.0
occupancy = min(max(occupancy, 0.0), 1.0)
feats: dict[str, float] = {
"occupancy": occupancy,
"log_area_ha": math.log1p(max(0.0, float(req.park_area_ha))),
"park_age_years": float(max(0, int(req.park_age_years))),
"log_dist_port_km": math.log1p(max(0.0, float(req.distance_to_port_km))),
"log_dist_airport_km": math.log1p(max(0.0, float(req.distance_to_airport_km))),
"log_dist_highway_km": math.log1p(max(0.0, float(req.distance_to_highway_km))),
"logistics_connectivity_score": float(req.logistics_connectivity_score),
"log_fdi_province": math.log1p(
max(0.0, float(req.fdi_province_musd) or fdi)
),
"has_special_zone": float(
req.zoning.lower() in {"free_trade_zone", "high_tech"}
),
}
# Property type flags can proxy certain target-industry signals but the
# trainer's industry one-hots are park-level. At inference we don't know
# the park's industry mix, so default to 0 and let the province/region
# fixed effects carry the signal.
for ind in spec["top_industries"]:
feats[f"ind_{ind}"] = 0.0
region = (req.region or "south").lower()
for r in spec["region_order"][1:]:
feats[f"region_{r}"] = float(region == r)
vec = np.array([feats[c] for c in spec["feature_cols"]], dtype=np.float64)
return vec
def _predict_ridge(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse:
"""Predict using the ridge v1 park-level baseline (conformal CIs)."""
artifact = self._model
spec = artifact["feature_spec"]
x = self._featureize_ridge(req, spec)
head_name = _PROPERTY_TO_HEAD.get(req.property_type.lower(), "rbf")
head = artifact["heads"][head_name]
x_std = (x - head["scaler_mean"]) / np.where(
head["scaler_scale"] == 0, 1.0, head["scaler_scale"]
)
log_pred = float(x_std @ head["coefficients"] + head["intercept"])
q80 = float(head["q80_log"])
# Ridge head is trained in natural units (USD/m²/month for rbf/rbw,
# USD/m²/year for land). Convert to the response contract which always
# reports monthly USD/m² for the primary estimate.
rent_native = math.expm1(log_pred)
low_native = math.expm1(log_pred - q80)
high_native = math.expm1(log_pred + q80)
if head_name == "land":
rent = rent_native / 12.0
low = low_native / 12.0
high = high_native / 12.0
else:
rent = rent_native
low = low_native
high = high_native
comparables = _find_comparables(req)
# Drivers: top coefficients by absolute standardized contribution.
contrib = head["coefficients"] * x_std
order = np.argsort(-np.abs(contrib))[:8]
total = float(np.sum(np.abs(contrib))) or 1.0
drivers = [
FeatureImportance(
feature=head["feature_cols"][i],
importance=round(float(abs(contrib[i]) / total), 4),
)
for i in order
if abs(contrib[i]) > 1e-6
]
return IndustrialAVMResponse(
estimated_rent_usd_m2=round(max(0.0, rent), 2),
confidence=round(float(head.get("coverage_80_loo", 0.80)), 2),
rent_range_low_usd_m2=round(max(0.0, low), 2),
rent_range_high_usd_m2=round(max(0.0, high), 2),
annual_rent_usd_m2=round(max(0.0, rent) * 12, 2),
total_monthly_rent_usd=round(max(0.0, rent) * req.area_m2, 2),
comparables=comparables,
drivers=drivers,
model_version=self._model_version,
)
def _predict_model(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse:
"""Predict using trained gradient boosting model."""
import xgboost as xgb