Files
goodgo-platform/libs/ai-services/app/services/avm_industrial_service.py

503 lines
19 KiB
Python

"""Industrial AVM — Rent estimation service for industrial parks.
Preference order: park-level ridge baseline (v1, TEC-2768) → XGBoost → heuristic.
Heuristic fallback remains when no trained artifact is on disk.
"""
import logging
import math
import os
import pickle
from typing import Any
import numpy as np
from app.models.avm_industrial import (
FeatureImportance,
IndustrialAVMRequest,
IndustrialAVMResponse,
IndustrialComparable,
)
logger = logging.getLogger(__name__)
RIDGE_ARTIFACT_NAME = "avm_industrial_park_ridge_v1.pkl"
# Map API property types to the rent head trained in the ridge baseline.
# Land rent is stored as USD/m²/year; others as USD/m²/month — convert where
# needed so the response stays in USD/m²/month.
_PROPERTY_TO_HEAD: dict[str, str] = {
"warehouse": "rbw",
"ready_built_warehouse": "rbw",
"factory": "rbf",
"ready_built_factory": "rbf",
"office_in_park": "rbf",
"open_yard": "land",
"industrial_land": "land",
}
# ── Feature ordering for model input ────────────────────────────
INDUSTRIAL_FEATURE_NAMES = [
"region_encoded",
"park_occupancy_rate",
"park_area_ha",
"park_age_years",
"distance_to_port_km",
"distance_to_airport_km",
"distance_to_highway_km",
"property_type_encoded",
"area_m2",
"ceiling_height_m",
"floor_load_ton_m2",
"power_capacity_kva",
"building_coverage",
"loading_docks",
"zoning_encoded",
"industry_demand_index",
"fdi_province_musd",
"labor_cost_province_vnd",
"logistics_connectivity_score",
]
REGION_MAP = {
"south": 0,
"north": 1,
"central": 2,
"mekong_delta": 3,
}
PROPERTY_TYPE_MAP = {
"warehouse": 0,
"factory": 1,
"ready_built_factory": 2,
"ready_built_warehouse": 3,
"open_yard": 4,
"office_in_park": 5,
}
ZONING_MAP = {
"general_industrial": 0,
"heavy_industrial": 1,
"light_industrial": 2,
"logistics": 3,
"free_trade_zone": 4,
"high_tech": 5,
}
# ── Province-level rent baselines (USD/m²/month) ────────────────
# Based on Vietnamese industrial real estate market data
PROVINCE_BASELINE: dict[str, float] = {
# Southern Economic Zone
"hồ chí minh": 6.5,
"bình dương": 5.0,
"đồng nai": 4.5,
"long an": 3.5,
"bà rịa - vũng tàu": 4.0,
"tây ninh": 3.0,
# Northern Industrial Corridor
"hà nội": 5.5,
"bắc ninh": 5.0,
"hải phòng": 4.8,
"hải dương": 4.0,
"hưng yên": 3.8,
"vĩnh phúc": 3.5,
"thái nguyên": 3.2,
"bắc giang": 4.2,
# Central
"đà nẵng": 4.0,
"quảng nam": 3.0,
# Mekong Delta
"cần thơ": 3.0,
"tiền giang": 2.8,
}
DEFAULT_RENT_BASELINE = 3.5
# ── Comparable industrial parks (synthetic for heuristic) ────────
SYNTHETIC_COMPARABLES: list[dict] = [
{"park_name": "VSIP I", "province": "Bình Dương", "type": "factory", "area": 5000, "rent": 5.2},
{"park_name": "Amata", "province": "Đồng Nai", "type": "factory", "area": 8000, "rent": 4.8},
{"park_name": "Long Hậu", "province": "Long An", "type": "warehouse", "area": 3000, "rent": 3.8},
{"park_name": "Đình Vũ", "province": "Hải Phòng", "type": "warehouse", "area": 6000, "rent": 4.5},
{"park_name": "Yên Phong", "province": "Bắc Ninh", "type": "ready_built_factory", "area": 4000, "rent": 5.0},
{"park_name": "Thăng Long", "province": "Hà Nội", "type": "factory", "area": 10000, "rent": 5.8},
{"park_name": "VSIP Quảng Ngãi", "province": "Quảng Ngãi", "type": "factory", "area": 5000, "rent": 3.2},
{"park_name": "Châu Đức", "province": "Bà Rịa - Vũng Tàu", "type": "warehouse", "area": 4000, "rent": 4.0},
]
def _encode_features(req: IndustrialAVMRequest) -> np.ndarray:
"""Encode an industrial prediction request into a feature vector."""
return np.array(
[[
REGION_MAP.get(req.region.lower(), 0),
req.park_occupancy_rate,
req.park_area_ha,
req.park_age_years,
req.distance_to_port_km,
req.distance_to_airport_km,
req.distance_to_highway_km,
PROPERTY_TYPE_MAP.get(req.property_type.lower(), 1),
req.area_m2,
req.ceiling_height_m,
req.floor_load_ton_m2,
req.power_capacity_kva,
req.building_coverage,
req.loading_docks,
ZONING_MAP.get(req.zoning.lower(), 0),
req.industry_demand_index,
req.fdi_province_musd,
req.labor_cost_province_vnd,
req.logistics_connectivity_score,
]],
dtype=np.float64,
)
def _find_comparables(req: IndustrialAVMRequest) -> list[IndustrialComparable]:
"""Find synthetic comparable properties based on similarity."""
comparables: list[IndustrialComparable] = []
for comp in SYNTHETIC_COMPARABLES:
# Simple similarity: province match (0.4) + type match (0.3) + area proximity (0.3)
province_score = 0.4 if comp["province"].lower() == req.province.lower() else 0.0
type_score = 0.3 if comp["type"] == req.property_type.lower() else 0.0
area_ratio = min(req.area_m2, comp["area"]) / max(req.area_m2, comp["area"])
area_score = area_ratio * 0.3
similarity = province_score + type_score + area_score
if similarity >= 0.15:
comparables.append(
IndustrialComparable(
park_name=comp["park_name"],
province=comp["province"],
property_type=comp["type"],
area_m2=comp["area"],
rent_usd_m2=comp["rent"],
similarity_score=round(similarity, 4),
)
)
comparables.sort(key=lambda c: c.similarity_score, reverse=True)
return comparables[:5]
class IndustrialAVMService:
"""Industrial property rent estimation service.
Preference order when a trained artifact is available:
1. Ridge v1 (park-level baseline with conformal CIs, TEC-2768)
2. XGBoost (legacy, listing-level)
3. Multi-factor heuristic (always available)
"""
def __init__(self) -> None:
self._model: Any = None
self._model_version = "heuristic-v1"
self._backend: str = "heuristic"
self._load_model()
def _load_model(self) -> None:
"""Attempt to load trained industrial AVM artifacts (ridge first)."""
try:
from app.config import settings
model_path = settings.model_path
except Exception:
logger.info("Industrial AVM: config unavailable — using heuristic")
return
ridge_path = os.path.join(model_path, RIDGE_ARTIFACT_NAME)
if os.path.exists(ridge_path):
try:
with open(ridge_path, "rb") as f:
artifact = pickle.load(f)
if not isinstance(artifact, dict) or artifact.get("version") != "ridge-industrial-v1":
raise ValueError(f"Unexpected artifact version in {ridge_path}")
self._model = artifact
self._model_version = "ridge-industrial-v1"
self._backend = "ridge"
logger.info("Loaded industrial AVM ridge artifact from %s", ridge_path)
return
except Exception as exc: # keep service alive on artifact corruption
logger.warning("Failed to load ridge artifact (%s); falling back", exc)
try:
import xgboost as xgb
xgb_path = os.path.join(model_path, "avm_industrial_xgb.json")
if os.path.exists(xgb_path):
booster = xgb.Booster()
booster.load_model(xgb_path)
self._model = booster
self._model_version = "xgb-industrial-v1"
self._backend = "xgb"
logger.info("Loaded industrial AVM xgb model from %s", xgb_path)
return
except Exception:
pass
logger.info("No trained industrial AVM model — using heuristic")
def predict(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse:
"""Predict industrial property rent."""
if self._backend == "ridge":
return self._predict_ridge(req)
if self._backend == "xgb" and self._model is not None:
return self._predict_model(req)
return self._predict_heuristic(req)
def _featureize_ridge(self, req: IndustrialAVMRequest, spec: dict) -> np.ndarray:
"""Build the exact feature vector used during ridge training.
Feature ordering must match `spec["feature_cols"]` which is the canonical
order emitted by the trainer. Sources:
- numeric fields come straight from the request
- province FDI comes from the artifact lookup (fallback to default)
- target-industry flags approximate one-hots against top-6 list
"""
province = (req.province or "").strip().lower()
fdi = spec["province_fdi"].get(province, spec["default_fdi"])
occupancy = float(req.park_occupancy_rate)
if occupancy > 1.5:
occupancy = occupancy / 100.0
occupancy = min(max(occupancy, 0.0), 1.0)
feats: dict[str, float] = {
"occupancy": occupancy,
"log_area_ha": math.log1p(max(0.0, float(req.park_area_ha))),
"park_age_years": float(max(0, int(req.park_age_years))),
"log_dist_port_km": math.log1p(max(0.0, float(req.distance_to_port_km))),
"log_dist_airport_km": math.log1p(max(0.0, float(req.distance_to_airport_km))),
"log_dist_highway_km": math.log1p(max(0.0, float(req.distance_to_highway_km))),
"logistics_connectivity_score": float(req.logistics_connectivity_score),
"log_fdi_province": math.log1p(
max(0.0, float(req.fdi_province_musd) or fdi)
),
"has_special_zone": float(
req.zoning.lower() in {"free_trade_zone", "high_tech"}
),
}
# Property type flags can proxy certain target-industry signals but the
# trainer's industry one-hots are park-level. At inference we don't know
# the park's industry mix, so default to 0 and let the province/region
# fixed effects carry the signal.
for ind in spec["top_industries"]:
feats[f"ind_{ind}"] = 0.0
region = (req.region or "south").lower()
for r in spec["region_order"][1:]:
feats[f"region_{r}"] = float(region == r)
vec = np.array([feats[c] for c in spec["feature_cols"]], dtype=np.float64)
return vec
def _predict_ridge(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse:
"""Predict using the ridge v1 park-level baseline (conformal CIs)."""
artifact = self._model
spec = artifact["feature_spec"]
x = self._featureize_ridge(req, spec)
head_name = _PROPERTY_TO_HEAD.get(req.property_type.lower(), "rbf")
head = artifact["heads"][head_name]
x_std = (x - head["scaler_mean"]) / np.where(
head["scaler_scale"] == 0, 1.0, head["scaler_scale"]
)
log_pred = float(x_std @ head["coefficients"] + head["intercept"])
q80 = float(head["q80_log"])
# Ridge head is trained in natural units (USD/m²/month for rbf/rbw,
# USD/m²/year for land). Convert to the response contract which always
# reports monthly USD/m² for the primary estimate.
rent_native = math.expm1(log_pred)
low_native = math.expm1(log_pred - q80)
high_native = math.expm1(log_pred + q80)
if head_name == "land":
rent = rent_native / 12.0
low = low_native / 12.0
high = high_native / 12.0
else:
rent = rent_native
low = low_native
high = high_native
comparables = _find_comparables(req)
# Drivers: top coefficients by absolute standardized contribution.
contrib = head["coefficients"] * x_std
order = np.argsort(-np.abs(contrib))[:8]
total = float(np.sum(np.abs(contrib))) or 1.0
drivers = [
FeatureImportance(
feature=head["feature_cols"][i],
importance=round(float(abs(contrib[i]) / total), 4),
)
for i in order
if abs(contrib[i]) > 1e-6
]
return IndustrialAVMResponse(
estimated_rent_usd_m2=round(max(0.0, rent), 2),
confidence=round(float(head.get("coverage_80_loo", 0.80)), 2),
rent_range_low_usd_m2=round(max(0.0, low), 2),
rent_range_high_usd_m2=round(max(0.0, high), 2),
annual_rent_usd_m2=round(max(0.0, rent) * 12, 2),
total_monthly_rent_usd=round(max(0.0, rent) * req.area_m2, 2),
comparables=comparables,
drivers=drivers,
model_version=self._model_version,
)
def _predict_model(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse:
"""Predict using trained gradient boosting model."""
import xgboost as xgb
features = _encode_features(req)
dmatrix = xgb.DMatrix(features, feature_names=INDUSTRIAL_FEATURE_NAMES)
pred_log = self._model.predict(dmatrix)[0]
rent = float(np.exp(pred_log))
comparables = _find_comparables(req)
# Feature importance
try:
scores = self._model.get_score(importance_type="gain")
total = sum(scores.values()) or 1.0
drivers = [
FeatureImportance(feature=f, importance=round(s / total, 4))
for f, s in sorted(scores.items(), key=lambda x: x[1], reverse=True)
][:8]
except Exception:
drivers = []
return IndustrialAVMResponse(
estimated_rent_usd_m2=round(rent, 2),
confidence=0.80,
rent_range_low_usd_m2=round(rent * 0.88, 2),
rent_range_high_usd_m2=round(rent * 1.12, 2),
annual_rent_usd_m2=round(rent * 12, 2),
total_monthly_rent_usd=round(rent * req.area_m2, 2),
comparables=comparables,
drivers=drivers,
model_version=self._model_version,
)
def _predict_heuristic(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse:
"""Multi-factor heuristic for industrial rent estimation."""
province_key = req.province.lower().strip()
base = PROVINCE_BASELINE.get(province_key, DEFAULT_RENT_BASELINE)
# Property type multiplier
type_mult = {
"warehouse": 0.85,
"factory": 1.00,
"ready_built_factory": 1.30,
"ready_built_warehouse": 1.15,
"open_yard": 0.50,
"office_in_park": 1.50,
}.get(req.property_type.lower(), 1.0)
# Park quality adjustments
occupancy_adj = 1.0 + (req.park_occupancy_rate - 0.7) * 0.3
age_adj = max(0.85, 1.0 - req.park_age_years * 0.005)
size_adj = 1.0 + min(0.15, req.park_area_ha / 5000 * 0.15)
# Logistics / infrastructure
port_adj = max(0.85, 1.0 - req.distance_to_port_km * 0.002)
airport_adj = max(0.90, 1.0 - req.distance_to_airport_km * 0.001)
highway_adj = max(0.90, 1.0 - req.distance_to_highway_km * 0.005)
logistics_adj = 1.0 + (req.logistics_connectivity_score - 0.5) * 0.20
# Building specs premium
ceiling_adj = 1.0 + max(0.0, (req.ceiling_height_m - 8.0) * 0.02)
floor_load_adj = 1.0 + max(0.0, (req.floor_load_ton_m2 - 2.0) * 0.03)
power_adj = 1.0 + min(0.10, req.power_capacity_kva / 5000 * 0.10)
# Building coverage — higher coverage = more usable space = premium
coverage_adj = 1.0 + max(0.0, (req.building_coverage - 0.4) * 0.15)
# Loading docks — each dock adds a small premium, diminishing returns
docks_adj = 1.0 + min(0.12, req.loading_docks * 0.02)
# Zoning premium — specialized zones command higher rents
zoning_mult = {
"general_industrial": 1.00,
"heavy_industrial": 0.95,
"light_industrial": 1.05,
"logistics": 1.10,
"free_trade_zone": 1.20,
"high_tech": 1.25,
}.get(req.zoning.lower(), 1.0)
# Economic indicators
demand_adj = 1.0 + (req.industry_demand_index - 0.5) * 0.25
fdi_adj = 1.0 + min(0.15, req.fdi_province_musd / 5000 * 0.15)
labor_adj = max(0.90, 1.0 - req.labor_cost_province_vnd / 20_000_000 * 0.10)
# Area discount (larger areas get lower per-m² rent)
area_discount = 1.0
if req.area_m2 > 10_000:
area_discount = 0.92
elif req.area_m2 > 5_000:
area_discount = 0.95
elif req.area_m2 > 2_000:
area_discount = 0.98
rent = (
base
* type_mult
* occupancy_adj
* age_adj
* size_adj
* port_adj
* airport_adj
* highway_adj
* logistics_adj
* ceiling_adj
* floor_load_adj
* power_adj
* coverage_adj
* docks_adj
* zoning_mult
* demand_adj
* fdi_adj
* labor_adj
* area_discount
)
confidence = 0.65
comparables = _find_comparables(req)
# Heuristic feature importance
drivers = [
FeatureImportance(feature="province_baseline", importance=0.16),
FeatureImportance(feature="property_type", importance=0.12),
FeatureImportance(feature="zoning", importance=0.11),
FeatureImportance(feature="park_occupancy_rate", importance=0.10),
FeatureImportance(feature="logistics_connectivity_score", importance=0.09),
FeatureImportance(feature="industry_demand_index", importance=0.09),
FeatureImportance(feature="building_coverage", importance=0.07),
FeatureImportance(feature="loading_docks", importance=0.06),
FeatureImportance(feature="fdi_province_musd", importance=0.06),
FeatureImportance(feature="distance_to_port_km", importance=0.05),
FeatureImportance(feature="area_m2", importance=0.05),
]
return IndustrialAVMResponse(
estimated_rent_usd_m2=round(rent, 2),
confidence=confidence,
rent_range_low_usd_m2=round(rent * 0.80, 2),
rent_range_high_usd_m2=round(rent * 1.20, 2),
annual_rent_usd_m2=round(rent * 12, 2),
total_monthly_rent_usd=round(rent * req.area_m2, 2),
comparables=comparables,
drivers=drivers,
model_version=self._model_version,
)
# Module-level singleton
industrial_avm_service = IndustrialAVMService()