"""Industrial AVM — Rent estimation service for industrial parks. Preference order: park-level ridge baseline (v1, TEC-2768) → XGBoost → heuristic. Heuristic fallback remains when no trained artifact is on disk. """ import logging import math import os import pickle from typing import Any import numpy as np from app.models.avm_industrial import ( FeatureImportance, IndustrialAVMRequest, IndustrialAVMResponse, IndustrialComparable, ) logger = logging.getLogger(__name__) RIDGE_ARTIFACT_NAME = "avm_industrial_park_ridge_v1.pkl" # Map API property types to the rent head trained in the ridge baseline. # Land rent is stored as USD/m²/year; others as USD/m²/month — convert where # needed so the response stays in USD/m²/month. _PROPERTY_TO_HEAD: dict[str, str] = { "warehouse": "rbw", "ready_built_warehouse": "rbw", "factory": "rbf", "ready_built_factory": "rbf", "office_in_park": "rbf", "open_yard": "land", "industrial_land": "land", } # ── Feature ordering for model input ──────────────────────────── INDUSTRIAL_FEATURE_NAMES = [ "region_encoded", "park_occupancy_rate", "park_area_ha", "park_age_years", "distance_to_port_km", "distance_to_airport_km", "distance_to_highway_km", "property_type_encoded", "area_m2", "ceiling_height_m", "floor_load_ton_m2", "power_capacity_kva", "building_coverage", "loading_docks", "zoning_encoded", "industry_demand_index", "fdi_province_musd", "labor_cost_province_vnd", "logistics_connectivity_score", ] REGION_MAP = { "south": 0, "north": 1, "central": 2, "mekong_delta": 3, } PROPERTY_TYPE_MAP = { "warehouse": 0, "factory": 1, "ready_built_factory": 2, "ready_built_warehouse": 3, "open_yard": 4, "office_in_park": 5, } ZONING_MAP = { "general_industrial": 0, "heavy_industrial": 1, "light_industrial": 2, "logistics": 3, "free_trade_zone": 4, "high_tech": 5, } # ── Province-level rent baselines (USD/m²/month) ──────────────── # Based on Vietnamese industrial real estate market data PROVINCE_BASELINE: dict[str, float] = { # Southern Economic Zone "hồ chí minh": 6.5, "bình dương": 5.0, "đồng nai": 4.5, "long an": 3.5, "bà rịa - vũng tàu": 4.0, "tây ninh": 3.0, # Northern Industrial Corridor "hà nội": 5.5, "bắc ninh": 5.0, "hải phòng": 4.8, "hải dương": 4.0, "hưng yên": 3.8, "vĩnh phúc": 3.5, "thái nguyên": 3.2, "bắc giang": 4.2, # Central "đà nẵng": 4.0, "quảng nam": 3.0, # Mekong Delta "cần thơ": 3.0, "tiền giang": 2.8, } DEFAULT_RENT_BASELINE = 3.5 # ── Comparable industrial parks (synthetic for heuristic) ──────── SYNTHETIC_COMPARABLES: list[dict] = [ {"park_name": "VSIP I", "province": "Bình Dương", "type": "factory", "area": 5000, "rent": 5.2}, {"park_name": "Amata", "province": "Đồng Nai", "type": "factory", "area": 8000, "rent": 4.8}, {"park_name": "Long Hậu", "province": "Long An", "type": "warehouse", "area": 3000, "rent": 3.8}, {"park_name": "Đình Vũ", "province": "Hải Phòng", "type": "warehouse", "area": 6000, "rent": 4.5}, {"park_name": "Yên Phong", "province": "Bắc Ninh", "type": "ready_built_factory", "area": 4000, "rent": 5.0}, {"park_name": "Thăng Long", "province": "Hà Nội", "type": "factory", "area": 10000, "rent": 5.8}, {"park_name": "VSIP Quảng Ngãi", "province": "Quảng Ngãi", "type": "factory", "area": 5000, "rent": 3.2}, {"park_name": "Châu Đức", "province": "Bà Rịa - Vũng Tàu", "type": "warehouse", "area": 4000, "rent": 4.0}, ] def _encode_features(req: IndustrialAVMRequest) -> np.ndarray: """Encode an industrial prediction request into a feature vector.""" return np.array( [[ REGION_MAP.get(req.region.lower(), 0), req.park_occupancy_rate, req.park_area_ha, req.park_age_years, req.distance_to_port_km, req.distance_to_airport_km, req.distance_to_highway_km, PROPERTY_TYPE_MAP.get(req.property_type.lower(), 1), req.area_m2, req.ceiling_height_m, req.floor_load_ton_m2, req.power_capacity_kva, req.building_coverage, req.loading_docks, ZONING_MAP.get(req.zoning.lower(), 0), req.industry_demand_index, req.fdi_province_musd, req.labor_cost_province_vnd, req.logistics_connectivity_score, ]], dtype=np.float64, ) def _find_comparables(req: IndustrialAVMRequest) -> list[IndustrialComparable]: """Find synthetic comparable properties based on similarity.""" comparables: list[IndustrialComparable] = [] for comp in SYNTHETIC_COMPARABLES: # Simple similarity: province match (0.4) + type match (0.3) + area proximity (0.3) province_score = 0.4 if comp["province"].lower() == req.province.lower() else 0.0 type_score = 0.3 if comp["type"] == req.property_type.lower() else 0.0 area_ratio = min(req.area_m2, comp["area"]) / max(req.area_m2, comp["area"]) area_score = area_ratio * 0.3 similarity = province_score + type_score + area_score if similarity >= 0.15: comparables.append( IndustrialComparable( park_name=comp["park_name"], province=comp["province"], property_type=comp["type"], area_m2=comp["area"], rent_usd_m2=comp["rent"], similarity_score=round(similarity, 4), ) ) comparables.sort(key=lambda c: c.similarity_score, reverse=True) return comparables[:5] class IndustrialAVMService: """Industrial property rent estimation service. Preference order when a trained artifact is available: 1. Ridge v1 (park-level baseline with conformal CIs, TEC-2768) 2. XGBoost (legacy, listing-level) 3. Multi-factor heuristic (always available) """ def __init__(self) -> None: self._model: Any = None self._model_version = "heuristic-v1" self._backend: str = "heuristic" self._load_model() def _load_model(self) -> None: """Attempt to load trained industrial AVM artifacts (ridge first).""" try: from app.config import settings model_path = settings.model_path except Exception: logger.info("Industrial AVM: config unavailable — using heuristic") return ridge_path = os.path.join(model_path, RIDGE_ARTIFACT_NAME) if os.path.exists(ridge_path): try: with open(ridge_path, "rb") as f: artifact = pickle.load(f) if not isinstance(artifact, dict) or artifact.get("version") != "ridge-industrial-v1": raise ValueError(f"Unexpected artifact version in {ridge_path}") self._model = artifact self._model_version = "ridge-industrial-v1" self._backend = "ridge" logger.info("Loaded industrial AVM ridge artifact from %s", ridge_path) return except Exception as exc: # keep service alive on artifact corruption logger.warning("Failed to load ridge artifact (%s); falling back", exc) try: import xgboost as xgb xgb_path = os.path.join(model_path, "avm_industrial_xgb.json") if os.path.exists(xgb_path): booster = xgb.Booster() booster.load_model(xgb_path) self._model = booster self._model_version = "xgb-industrial-v1" self._backend = "xgb" logger.info("Loaded industrial AVM xgb model from %s", xgb_path) return except Exception: pass logger.info("No trained industrial AVM model — using heuristic") def predict(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse: """Predict industrial property rent.""" if self._backend == "ridge": return self._predict_ridge(req) if self._backend == "xgb" and self._model is not None: return self._predict_model(req) return self._predict_heuristic(req) def _featureize_ridge(self, req: IndustrialAVMRequest, spec: dict) -> np.ndarray: """Build the exact feature vector used during ridge training. Feature ordering must match `spec["feature_cols"]` which is the canonical order emitted by the trainer. Sources: - numeric fields come straight from the request - province FDI comes from the artifact lookup (fallback to default) - target-industry flags approximate one-hots against top-6 list """ province = (req.province or "").strip().lower() fdi = spec["province_fdi"].get(province, spec["default_fdi"]) occupancy = float(req.park_occupancy_rate) if occupancy > 1.5: occupancy = occupancy / 100.0 occupancy = min(max(occupancy, 0.0), 1.0) feats: dict[str, float] = { "occupancy": occupancy, "log_area_ha": math.log1p(max(0.0, float(req.park_area_ha))), "park_age_years": float(max(0, int(req.park_age_years))), "log_dist_port_km": math.log1p(max(0.0, float(req.distance_to_port_km))), "log_dist_airport_km": math.log1p(max(0.0, float(req.distance_to_airport_km))), "log_dist_highway_km": math.log1p(max(0.0, float(req.distance_to_highway_km))), "logistics_connectivity_score": float(req.logistics_connectivity_score), "log_fdi_province": math.log1p( max(0.0, float(req.fdi_province_musd) or fdi) ), "has_special_zone": float( req.zoning.lower() in {"free_trade_zone", "high_tech"} ), } # Property type flags can proxy certain target-industry signals but the # trainer's industry one-hots are park-level. At inference we don't know # the park's industry mix, so default to 0 and let the province/region # fixed effects carry the signal. for ind in spec["top_industries"]: feats[f"ind_{ind}"] = 0.0 region = (req.region or "south").lower() for r in spec["region_order"][1:]: feats[f"region_{r}"] = float(region == r) vec = np.array([feats[c] for c in spec["feature_cols"]], dtype=np.float64) return vec def _predict_ridge(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse: """Predict using the ridge v1 park-level baseline (conformal CIs).""" artifact = self._model spec = artifact["feature_spec"] x = self._featureize_ridge(req, spec) head_name = _PROPERTY_TO_HEAD.get(req.property_type.lower(), "rbf") head = artifact["heads"][head_name] x_std = (x - head["scaler_mean"]) / np.where( head["scaler_scale"] == 0, 1.0, head["scaler_scale"] ) log_pred = float(x_std @ head["coefficients"] + head["intercept"]) q80 = float(head["q80_log"]) # Ridge head is trained in natural units (USD/m²/month for rbf/rbw, # USD/m²/year for land). Convert to the response contract which always # reports monthly USD/m² for the primary estimate. rent_native = math.expm1(log_pred) low_native = math.expm1(log_pred - q80) high_native = math.expm1(log_pred + q80) if head_name == "land": rent = rent_native / 12.0 low = low_native / 12.0 high = high_native / 12.0 else: rent = rent_native low = low_native high = high_native comparables = _find_comparables(req) # Drivers: top coefficients by absolute standardized contribution. contrib = head["coefficients"] * x_std order = np.argsort(-np.abs(contrib))[:8] total = float(np.sum(np.abs(contrib))) or 1.0 drivers = [ FeatureImportance( feature=head["feature_cols"][i], importance=round(float(abs(contrib[i]) / total), 4), ) for i in order if abs(contrib[i]) > 1e-6 ] return IndustrialAVMResponse( estimated_rent_usd_m2=round(max(0.0, rent), 2), confidence=round(float(head.get("coverage_80_loo", 0.80)), 2), rent_range_low_usd_m2=round(max(0.0, low), 2), rent_range_high_usd_m2=round(max(0.0, high), 2), annual_rent_usd_m2=round(max(0.0, rent) * 12, 2), total_monthly_rent_usd=round(max(0.0, rent) * req.area_m2, 2), comparables=comparables, drivers=drivers, model_version=self._model_version, ) def _predict_model(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse: """Predict using trained gradient boosting model.""" import xgboost as xgb features = _encode_features(req) dmatrix = xgb.DMatrix(features, feature_names=INDUSTRIAL_FEATURE_NAMES) pred_log = self._model.predict(dmatrix)[0] rent = float(np.exp(pred_log)) comparables = _find_comparables(req) # Feature importance try: scores = self._model.get_score(importance_type="gain") total = sum(scores.values()) or 1.0 drivers = [ FeatureImportance(feature=f, importance=round(s / total, 4)) for f, s in sorted(scores.items(), key=lambda x: x[1], reverse=True) ][:8] except Exception: drivers = [] return IndustrialAVMResponse( estimated_rent_usd_m2=round(rent, 2), confidence=0.80, rent_range_low_usd_m2=round(rent * 0.88, 2), rent_range_high_usd_m2=round(rent * 1.12, 2), annual_rent_usd_m2=round(rent * 12, 2), total_monthly_rent_usd=round(rent * req.area_m2, 2), comparables=comparables, drivers=drivers, model_version=self._model_version, ) def _predict_heuristic(self, req: IndustrialAVMRequest) -> IndustrialAVMResponse: """Multi-factor heuristic for industrial rent estimation.""" province_key = req.province.lower().strip() base = PROVINCE_BASELINE.get(province_key, DEFAULT_RENT_BASELINE) # Property type multiplier type_mult = { "warehouse": 0.85, "factory": 1.00, "ready_built_factory": 1.30, "ready_built_warehouse": 1.15, "open_yard": 0.50, "office_in_park": 1.50, }.get(req.property_type.lower(), 1.0) # Park quality adjustments occupancy_adj = 1.0 + (req.park_occupancy_rate - 0.7) * 0.3 age_adj = max(0.85, 1.0 - req.park_age_years * 0.005) size_adj = 1.0 + min(0.15, req.park_area_ha / 5000 * 0.15) # Logistics / infrastructure port_adj = max(0.85, 1.0 - req.distance_to_port_km * 0.002) airport_adj = max(0.90, 1.0 - req.distance_to_airport_km * 0.001) highway_adj = max(0.90, 1.0 - req.distance_to_highway_km * 0.005) logistics_adj = 1.0 + (req.logistics_connectivity_score - 0.5) * 0.20 # Building specs premium ceiling_adj = 1.0 + max(0.0, (req.ceiling_height_m - 8.0) * 0.02) floor_load_adj = 1.0 + max(0.0, (req.floor_load_ton_m2 - 2.0) * 0.03) power_adj = 1.0 + min(0.10, req.power_capacity_kva / 5000 * 0.10) # Building coverage — higher coverage = more usable space = premium coverage_adj = 1.0 + max(0.0, (req.building_coverage - 0.4) * 0.15) # Loading docks — each dock adds a small premium, diminishing returns docks_adj = 1.0 + min(0.12, req.loading_docks * 0.02) # Zoning premium — specialized zones command higher rents zoning_mult = { "general_industrial": 1.00, "heavy_industrial": 0.95, "light_industrial": 1.05, "logistics": 1.10, "free_trade_zone": 1.20, "high_tech": 1.25, }.get(req.zoning.lower(), 1.0) # Economic indicators demand_adj = 1.0 + (req.industry_demand_index - 0.5) * 0.25 fdi_adj = 1.0 + min(0.15, req.fdi_province_musd / 5000 * 0.15) labor_adj = max(0.90, 1.0 - req.labor_cost_province_vnd / 20_000_000 * 0.10) # Area discount (larger areas get lower per-m² rent) area_discount = 1.0 if req.area_m2 > 10_000: area_discount = 0.92 elif req.area_m2 > 5_000: area_discount = 0.95 elif req.area_m2 > 2_000: area_discount = 0.98 rent = ( base * type_mult * occupancy_adj * age_adj * size_adj * port_adj * airport_adj * highway_adj * logistics_adj * ceiling_adj * floor_load_adj * power_adj * coverage_adj * docks_adj * zoning_mult * demand_adj * fdi_adj * labor_adj * area_discount ) confidence = 0.65 comparables = _find_comparables(req) # Heuristic feature importance drivers = [ FeatureImportance(feature="province_baseline", importance=0.16), FeatureImportance(feature="property_type", importance=0.12), FeatureImportance(feature="zoning", importance=0.11), FeatureImportance(feature="park_occupancy_rate", importance=0.10), FeatureImportance(feature="logistics_connectivity_score", importance=0.09), FeatureImportance(feature="industry_demand_index", importance=0.09), FeatureImportance(feature="building_coverage", importance=0.07), FeatureImportance(feature="loading_docks", importance=0.06), FeatureImportance(feature="fdi_province_musd", importance=0.06), FeatureImportance(feature="distance_to_port_km", importance=0.05), FeatureImportance(feature="area_m2", importance=0.05), ] return IndustrialAVMResponse( estimated_rent_usd_m2=round(rent, 2), confidence=confidence, rent_range_low_usd_m2=round(rent * 0.80, 2), rent_range_high_usd_m2=round(rent * 1.20, 2), annual_rent_usd_m2=round(rent * 12, 2), total_monthly_rent_usd=round(rent * req.area_m2, 2), comparables=comparables, drivers=drivers, model_version=self._model_version, ) # Module-level singleton industrial_avm_service = IndustrialAVMService()