"""AVM v2 — Multi-model ensemble service (XGBoost + LightGBM + CatBoost). Heuristic fallback when trained models are not available. Ensemble weights: XGBoost 0.4, LightGBM 0.35, CatBoost 0.25. Confidence = 1 - CV(3 predictions), where CV = std / mean. """ import logging import os from datetime import datetime, timezone from typing import Any import numpy as np from app.models.avm_v2 import ( AVMv2Comparable, AVMv2FeatureImportance, AVMv2ModelInfo, AVMv2PredictRequest, AVMv2PredictResponse, AVMv2TrainRequest, AVMv2TrainResponse, ModelPrediction, ) logger = logging.getLogger(__name__) # ── Ensemble configuration ────────────────────────────────────── ENSEMBLE_WEIGHTS = { "xgboost": 0.40, "lightgbm": 0.35, "catboost": 0.25, } # ── Feature ordering for model input ──────────────────────────── FEATURE_NAMES = [ # Location (7) "distance_to_cbd_km", "distance_to_metro_km", "distance_to_school_km", "distance_to_hospital_km", "distance_to_park_km", "distance_to_mall_km", "flood_zone_risk", # Physical (8) "property_type_encoded", "area_m2", "rooms", "floor_ratio", "building_age_years", "has_elevator", "has_parking", "has_pool", "has_legal_paper", # Market (6) "avg_price_district_3m_vnd_m2", "listing_density", "absorption_rate", "dom_avg", "price_momentum_30d", "yoy_change", # LLM-extracted (5) "renovation_score", "view_quality", "interior_quality", "noise_level", "natural_light", # Temporal (3) "month_sin", "month_cos", "is_year_end", ] PROPERTY_TYPE_MAP = { "apartment": 0, "house": 1, "townhouse": 2, "villa": 3, "land": 4, "shophouse": 5, "penthouse": 6, } # ── Heuristic baselines (millions VND/m²) ─────────────────────── CITY_BASELINE: dict[str, float] = { "hà nội": 85.0, "hồ chí minh": 90.0, "đà nẵng": 45.0, "hải phòng": 35.0, "cần thơ": 25.0, "bình dương": 22.0, "đồng nai": 20.0, "nha trang": 35.0, "vũng tàu": 28.0, } DEFAULT_BASELINE = 30.0 def _encode_features(req: AVMv2PredictRequest) -> np.ndarray: """Encode a prediction request into a feature vector.""" month_rad = 2 * np.pi * req.month / 12.0 return np.array( [[ # Location req.distance_to_cbd_km, req.distance_to_metro_km, req.distance_to_school_km, req.distance_to_hospital_km, req.distance_to_park_km, req.distance_to_mall_km, req.flood_zone_risk, # Physical PROPERTY_TYPE_MAP.get(req.property_type.lower(), 1), req.area_m2, req.rooms, req.floor_ratio, req.building_age_years, 1.0 if req.has_elevator else 0.0, 1.0 if req.has_parking else 0.0, 1.0 if req.has_pool else 0.0, 1.0 if req.has_legal_paper else 0.0, # Market req.avg_price_district_3m_vnd_m2, req.listing_density, req.absorption_rate, req.dom_avg, req.price_momentum_30d, req.yoy_change, # LLM-extracted req.renovation_score, req.view_quality, req.interior_quality, req.noise_level, req.natural_light, # Temporal np.sin(month_rad), np.cos(month_rad), 1.0 if req.is_year_end else 0.0, ]], dtype=np.float64, ) class AVMv2EnsembleService: """Multi-model ensemble AVM for residential properties. Attempts to load XGBoost, LightGBM, and CatBoost models from the model directory. Falls back to a heuristic approach when trained models are not available. """ def __init__(self) -> None: self._models: dict[str, Any] = {} self._model_version = "ensemble-v2-heuristic" self._model_registry: list[AVMv2ModelInfo] = [] self._load_models() # ── Model loading ─────────────────────────────────────────── def _load_models(self) -> None: """Attempt to load each model in the ensemble.""" from app.config import settings model_dir = settings.model_path # XGBoost try: import xgboost as xgb path = os.path.join(model_dir, "avm_v2_xgboost.json") if os.path.exists(path): booster = xgb.Booster() booster.load_model(path) self._models["xgboost"] = booster logger.info("Loaded XGBoost AVM v2 model from %s", path) except Exception: logger.info("XGBoost model not available") # LightGBM try: import lightgbm as lgb path = os.path.join(model_dir, "avm_v2_lightgbm.txt") if os.path.exists(path): self._models["lightgbm"] = lgb.Booster(model_file=path) logger.info("Loaded LightGBM AVM v2 model from %s", path) except Exception: logger.info("LightGBM model not available") # CatBoost try: from catboost import CatBoostRegressor path = os.path.join(model_dir, "avm_v2_catboost.cbm") if os.path.exists(path): model = CatBoostRegressor() model.load_model(path) self._models["catboost"] = model logger.info("Loaded CatBoost AVM v2 model from %s", path) except Exception: logger.info("CatBoost model not available") if self._models: self._model_version = f"ensemble-v2-{'+'.join(sorted(self._models.keys()))}" logger.info("AVM v2 ensemble active with: %s", list(self._models.keys())) else: logger.info("No trained AVM v2 models found — using heuristic fallback") # ── Prediction ────────────────────────────────────────────── def predict(self, req: AVMv2PredictRequest) -> AVMv2PredictResponse: """Run the ensemble prediction pipeline.""" if self._models: return self._predict_ensemble(req) return self._predict_heuristic(req) def _predict_ensemble(self, req: AVMv2PredictRequest) -> AVMv2PredictResponse: """Run each loaded model and combine with weighted average.""" features = _encode_features(req) predictions: list[ModelPrediction] = [] raw_prices: list[float] = [] for model_name, model in self._models.items(): weight = ENSEMBLE_WEIGHTS.get(model_name, 0.0) price = self._predict_single_model(model_name, model, features) raw_prices.append(price) predictions.append( ModelPrediction( model_name=model_name, weight=weight, predicted_price_vnd=round(price, -3), predicted_price_per_m2_vnd=round(price / req.area_m2, -3), ) ) # Weighted ensemble total_weight = sum(ENSEMBLE_WEIGHTS.get(p.model_name, 0) for p in predictions) if total_weight == 0: total_weight = 1.0 ensemble_price = sum( p.predicted_price_vnd * ENSEMBLE_WEIGHTS.get(p.model_name, 0) for p in predictions ) / total_weight # Confidence = 1 - CV(predictions) prices_arr = np.array(raw_prices) mean_price = np.mean(prices_arr) std_price = np.std(prices_arr) cv = std_price / mean_price if mean_price > 0 else 0.5 confidence = max(0.0, min(1.0, 1.0 - cv)) # Range based on confidence margin = max(0.05, 0.30 * (1.0 - confidence)) price_low = ensemble_price * (1.0 - margin) price_high = ensemble_price * (1.0 + margin) # Feature importance (aggregate from XGBoost if available) drivers = self._get_feature_importance() return AVMv2PredictResponse( estimated_price_vnd=round(ensemble_price, -3), price_per_m2_vnd=round(ensemble_price / req.area_m2, -3), confidence=round(confidence, 4), price_range_low_vnd=round(price_low, -3), price_range_high_vnd=round(price_high, -3), model_predictions=predictions, drivers=drivers[:10], comparables=[], # Populated by data layer in production model_version=self._model_version, ensemble_method="weighted_average", ) def _predict_single_model( self, name: str, model: Any, features: np.ndarray ) -> float: """Get a single model's raw prediction (log-price → price).""" if name == "xgboost": import xgboost as xgb dmatrix = xgb.DMatrix(features, feature_names=FEATURE_NAMES) pred_log = model.predict(dmatrix)[0] return float(np.exp(pred_log)) if name == "lightgbm": pred_log = model.predict(features)[0] return float(np.exp(pred_log)) if name == "catboost": pred_log = model.predict(features)[0] return float(np.exp(pred_log)) logger.warning("Unknown model type: %s", name) return 0.0 def _predict_heuristic(self, req: AVMv2PredictRequest) -> AVMv2PredictResponse: """Multi-factor heuristic simulating ensemble behavior.""" city_key = req.city.lower().strip() base = CITY_BASELINE.get(city_key, DEFAULT_BASELINE) # Property type multiplier type_mult = { "apartment": 0.90, "house": 1.00, "townhouse": 1.10, "villa": 1.40, "land": 0.70, "shophouse": 1.30, "penthouse": 1.60, }.get(req.property_type.lower(), 1.0) # Location adjustments cbd_adj = max(0.7, 1.0 - req.distance_to_cbd_km * 0.02) metro_adj = 1.0 + max(0.0, (2.0 - req.distance_to_metro_km) * 0.05) flood_adj = 1.0 - req.flood_zone_risk * 0.15 # Physical adjustments room_adj = 1.0 + req.rooms * 0.015 age_adj = max(0.75, 1.0 - req.building_age_years * 0.008) amenity_adj = ( 1.0 + (0.03 if req.has_elevator else 0.0) + (0.05 if req.has_parking else 0.0) + (0.08 if req.has_pool else 0.0) ) legal_adj = 1.0 if req.has_legal_paper else 0.70 # Market adjustments if req.avg_price_district_3m_vnd_m2 > 0: market_adj = req.avg_price_district_3m_vnd_m2 / (base * 1_000_000) market_adj = max(0.5, min(2.0, market_adj)) else: market_adj = 1.0 momentum_adj = 1.0 + req.price_momentum_30d * 0.5 # Quality adjustments (LLM features) quality_adj = ( 1.0 + (req.renovation_score - 0.5) * 0.15 + (req.view_quality - 0.5) * 0.10 + (req.interior_quality - 0.5) * 0.12 + (0.5 - req.noise_level) * 0.05 + (req.natural_light - 0.5) * 0.05 ) # Temporal — Q4/Tết premium seasonal_adj = 1.03 if req.is_year_end else 1.0 price_per_m2 = ( base * type_mult * cbd_adj * metro_adj * flood_adj * room_adj * age_adj * amenity_adj * legal_adj * market_adj * momentum_adj * quality_adj * seasonal_adj * 1_000_000 # Convert to VND ) estimated = price_per_m2 * req.area_m2 # Simulate 3 model predictions with small variance rng = np.random.default_rng( seed=int(req.area_m2 * 1000 + req.rooms * 100 + req.month) ) noise = rng.normal(1.0, 0.04, size=3) sim_prices = estimated * noise xgb_price = float(sim_prices[0]) lgb_price = float(sim_prices[1]) cat_price = float(sim_prices[2]) predictions = [ ModelPrediction( model_name="xgboost", weight=0.40, predicted_price_vnd=round(xgb_price, -3), predicted_price_per_m2_vnd=round(xgb_price / req.area_m2, -3), ), ModelPrediction( model_name="lightgbm", weight=0.35, predicted_price_vnd=round(lgb_price, -3), predicted_price_per_m2_vnd=round(lgb_price / req.area_m2, -3), ), ModelPrediction( model_name="catboost", weight=0.25, predicted_price_vnd=round(cat_price, -3), predicted_price_per_m2_vnd=round(cat_price / req.area_m2, -3), ), ] prices_arr = np.array([xgb_price, lgb_price, cat_price]) cv = float(np.std(prices_arr) / np.mean(prices_arr)) if np.mean(prices_arr) > 0 else 0.5 confidence = max(0.0, min(1.0, 1.0 - cv)) # Heuristic driver ranking drivers = [ AVMv2FeatureImportance(feature="area_m2", importance=0.18), AVMv2FeatureImportance(feature="avg_price_district_3m_vnd_m2", importance=0.15), AVMv2FeatureImportance(feature="property_type_encoded", importance=0.12), AVMv2FeatureImportance(feature="distance_to_cbd_km", importance=0.10), AVMv2FeatureImportance(feature="renovation_score", importance=0.08), AVMv2FeatureImportance(feature="building_age_years", importance=0.07), AVMv2FeatureImportance(feature="has_legal_paper", importance=0.06), AVMv2FeatureImportance(feature="distance_to_metro_km", importance=0.05), AVMv2FeatureImportance(feature="interior_quality", importance=0.05), AVMv2FeatureImportance(feature="price_momentum_30d", importance=0.04), ] return AVMv2PredictResponse( estimated_price_vnd=round(estimated, -3), price_per_m2_vnd=round(price_per_m2, -3), confidence=round(confidence, 4), price_range_low_vnd=round(estimated * 0.82, -3), price_range_high_vnd=round(estimated * 1.18, -3), model_predictions=predictions, drivers=drivers, comparables=[], model_version="ensemble-v2-heuristic", ensemble_method="weighted_average", ) def _get_feature_importance(self) -> list[AVMv2FeatureImportance]: """Extract feature importance from loaded models.""" importances: dict[str, float] = {} if "xgboost" in self._models: try: scores = self._models["xgboost"].get_score( importance_type="gain" ) total = sum(scores.values()) or 1.0 for feat, score in scores.items(): importances[feat] = importances.get(feat, 0) + score / total * 0.4 except Exception: pass if "lightgbm" in self._models: try: model = self._models["lightgbm"] imp = model.feature_importance(importance_type="gain") names = model.feature_name() total = sum(imp) or 1.0 for name, score in zip(names, imp, strict=False): importances[name] = importances.get(name, 0) + score / total * 0.35 except Exception: pass if "catboost" in self._models: try: imp = self._models["catboost"].get_feature_importance() total = sum(imp) or 1.0 for i, score in enumerate(imp): fname = FEATURE_NAMES[i] if i < len(FEATURE_NAMES) else f"f{i}" importances[fname] = importances.get(fname, 0) + score / total * 0.25 except Exception: pass if not importances: return [] sorted_imp = sorted(importances.items(), key=lambda x: x[1], reverse=True) total_imp = sum(v for _, v in sorted_imp) or 1.0 return [ AVMv2FeatureImportance(feature=f, importance=round(v / total_imp, 4)) for f, v in sorted_imp ] # ── Training pipeline ─────────────────────────────────────── def train(self, req: AVMv2TrainRequest) -> AVMv2TrainResponse: """Train the ensemble models. In production, this loads training data from the database/MinIO, performs 5-fold CV by district with Optuna hyperparameter optimization, and saves versioned model artifacts. Currently returns a scaffold response. Real training requires the data pipeline from Phase 3. """ version = f"ensemble-v2-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}" logger.info("Training AVM v2 ensemble — version %s, trials=%d", version, req.optuna_trials) # TODO: Replace with actual training pipeline when data is available # 1. Load data from PostgreSQL/MinIO # 2. Feature engineering (encode categoricals, normalize, cyclical) # 3. 80/10/10 split stratified by district # 4. For each model (XGBoost, LightGBM, CatBoost): # a. Optuna study with req.optuna_trials trials # b. 5-fold CV grouped by district # c. Train on best params # 5. Save artifacts to MinIO with version tag # 6. Register in model registry return AVMv2TrainResponse( model_version=version, metrics={ "mae": 0.0, "mape": 0.0, "rmse": 0.0, "r2": 0.0, }, district_metrics={}, training_samples=0, validation_samples=0, test_samples=0, best_params={ "xgboost": {"n_estimators": 500, "max_depth": 6, "learning_rate": 0.05}, "lightgbm": {"n_estimators": 500, "num_leaves": 31, "learning_rate": 0.05}, "catboost": {"iterations": 500, "depth": 6, "learning_rate": 0.05}, }, ) # ── Model registry ────────────────────────────────────────── def get_model_info(self) -> AVMv2ModelInfo: """Return current active model information.""" return AVMv2ModelInfo( model_version=self._model_version, created_at=datetime.now(timezone.utc).isoformat(), metrics={}, is_active=True, ab_test_traffic_pct=0.0, ) # Module-level singleton avm_v2_service = AVMv2EnsembleService()