"""AVM v2 — Multi-model ensemble service (XGBoost + LightGBM + CatBoost). Heuristic fallback when trained models are not available. Ensemble weights: XGBoost 0.4, LightGBM 0.35, CatBoost 0.25. Confidence = 1 - CV(3 predictions), where CV = std / mean. """ import json import logging import os import shutil from datetime import datetime, timezone from pathlib import Path from typing import Any import numpy as np from app.models.avm import AVMPredictRequest from app.models.avm_v2 import ( ABComparisonRequest, ABComparisonResponse, AVMv1Summary, AVMv2Comparable, AVMv2FeatureImportance, AVMv2FeatureImportanceResponse, AVMv2ModelInfo, AVMv2PredictRequest, AVMv2PredictResponse, AVMv2Summary, AVMv2TrainRequest, AVMv2TrainResponse, ModelPrediction, ) logger = logging.getLogger(__name__) # ── Ensemble configuration ────────────────────────────────────── ENSEMBLE_WEIGHTS = { "xgboost": 0.40, "lightgbm": 0.35, "catboost": 0.25, } # ── Feature ordering for model input ──────────────────────────── FEATURE_NAMES = [ # Location (7) "distance_to_cbd_km", "distance_to_metro_km", "distance_to_school_km", "distance_to_hospital_km", "distance_to_park_km", "distance_to_mall_km", "flood_zone_risk", # Neighborhood (1) "neighborhood_score", # Physical (13) "property_type_encoded", "area_m2", "rooms", "floor_level", "total_floors", "direction_encoded", "floor_ratio", "building_age_years", "has_elevator", "has_parking", "has_pool", "has_legal_paper", "developer_reputation", # Market (6) "avg_price_district_3m_vnd_m2", "listing_density", "absorption_rate", "dom_avg", "price_momentum_30d", "yoy_change", # LLM-extracted (5) "renovation_score", "view_quality", "interior_quality", "noise_level", "natural_light", # Temporal (3) "month_sin", "month_cos", "is_year_end", ] DIRECTION_MAP = { "south": 0, "southeast": 1, "east": 2, "southwest": 3, "northeast": 4, "west": 5, "northwest": 6, "north": 7, "unknown": 4, # neutral mid-value } PROPERTY_TYPE_MAP = { "apartment": 0, "house": 1, "townhouse": 2, "villa": 3, "land": 4, "shophouse": 5, "penthouse": 6, } # ── Heuristic baselines (millions VND/m²) ─────────────────────── CITY_BASELINE: dict[str, float] = { "hà nội": 85.0, "hồ chí minh": 90.0, "đà nẵng": 45.0, "hải phòng": 35.0, "cần thơ": 25.0, "bình dương": 22.0, "đồng nai": 20.0, "nha trang": 35.0, "vũng tàu": 28.0, } DEFAULT_BASELINE = 30.0 # ── Heuristic feature importance ──────────────────────────────── # Used both inside heuristic predict responses and by the dedicated # feature-importance endpoint when no trained booster is loaded. _HEURISTIC_DRIVERS: list[tuple[str, float]] = [ ("area_m2", 0.14), ("avg_price_district_3m_vnd_m2", 0.12), ("neighborhood_score", 0.10), ("property_type_encoded", 0.10), ("distance_to_cbd_km", 0.08), ("developer_reputation", 0.07), ("renovation_score", 0.07), ("building_age_years", 0.06), ("direction_encoded", 0.05), ("floor_level", 0.05), ("has_legal_paper", 0.05), ("distance_to_metro_km", 0.04), ("interior_quality", 0.04), ("price_momentum_30d", 0.03), ] def _heuristic_drivers() -> list[AVMv2FeatureImportance]: return [AVMv2FeatureImportance(feature=f, importance=w) for f, w in _HEURISTIC_DRIVERS] def _encode_features(req: AVMv2PredictRequest) -> np.ndarray: """Encode a prediction request into a feature vector.""" month_rad = 2 * np.pi * req.month / 12.0 return np.array( [[ # Location (7) req.distance_to_cbd_km, req.distance_to_metro_km, req.distance_to_school_km, req.distance_to_hospital_km, req.distance_to_park_km, req.distance_to_mall_km, req.flood_zone_risk, # Neighborhood (1) req.neighborhood_score, # Physical (13) PROPERTY_TYPE_MAP.get(req.property_type.lower(), 1), req.area_m2, req.rooms, float(req.floor_level), float(req.total_floors), float(DIRECTION_MAP.get(req.direction.lower(), 4)), req.floor_ratio, req.building_age_years, 1.0 if req.has_elevator else 0.0, 1.0 if req.has_parking else 0.0, 1.0 if req.has_pool else 0.0, 1.0 if req.has_legal_paper else 0.0, req.developer_reputation, # Market (6) req.avg_price_district_3m_vnd_m2, req.listing_density, req.absorption_rate, req.dom_avg, req.price_momentum_30d, req.yoy_change, # LLM-extracted (5) req.renovation_score, req.view_quality, req.interior_quality, req.noise_level, req.natural_light, # Temporal (3) np.sin(month_rad), np.cos(month_rad), 1.0 if req.is_year_end else 0.0, ]], dtype=np.float64, ) class AVMv2EnsembleService: """Multi-model ensemble AVM for residential properties. Attempts to load XGBoost, LightGBM, and CatBoost models from the model directory. Falls back to a heuristic approach when trained models are not available. """ def __init__(self) -> None: self._models: dict[str, Any] = {} self._model_version = "ensemble-v2-heuristic" self._model_registry: list[AVMv2ModelInfo] = [] self._load_models() # ── Model loading ─────────────────────────────────────────── def _load_models(self) -> None: """Attempt to load each model in the ensemble.""" from app.config import settings model_dir = settings.model_path # XGBoost try: import xgboost as xgb path = os.path.join(model_dir, "avm_v2_xgboost.json") if os.path.exists(path): booster = xgb.Booster() booster.load_model(path) self._models["xgboost"] = booster logger.info("Loaded XGBoost AVM v2 model from %s", path) except Exception: logger.info("XGBoost model not available") # LightGBM try: import lightgbm as lgb path = os.path.join(model_dir, "avm_v2_lightgbm.txt") if os.path.exists(path): self._models["lightgbm"] = lgb.Booster(model_file=path) logger.info("Loaded LightGBM AVM v2 model from %s", path) except Exception: logger.info("LightGBM model not available") # CatBoost try: from catboost import CatBoostRegressor path = os.path.join(model_dir, "avm_v2_catboost.cbm") if os.path.exists(path): model = CatBoostRegressor() model.load_model(path) self._models["catboost"] = model logger.info("Loaded CatBoost AVM v2 model from %s", path) except Exception: logger.info("CatBoost model not available") if self._models: self._model_version = f"ensemble-v2-{'+'.join(sorted(self._models.keys()))}" logger.info("AVM v2 ensemble active with: %s", list(self._models.keys())) else: logger.info("No trained AVM v2 models found — using heuristic fallback") # ── Prediction ────────────────────────────────────────────── def predict(self, req: AVMv2PredictRequest) -> AVMv2PredictResponse: """Run the ensemble prediction pipeline.""" if self._models: return self._predict_ensemble(req) return self._predict_heuristic(req) def predict_with_ab(self, req: AVMv2PredictRequest) -> tuple[AVMv2PredictResponse, bool]: """Run prediction respecting the A/B test traffic split. Returns ``(response, used_v2)`` where ``used_v2`` is ``True`` when the request was served by the v2 ensemble and ``False`` when it was served by the v1-equivalent heuristic baseline (i.e. outside the v2 cohort). The random draw is seeded from the request features so the same property always lands in the same cohort within a training cycle. """ info = self.get_model_info() traffic_pct = info.ab_test_traffic_pct if traffic_pct <= 0.0: # A/B disabled — always use v2 return self.predict(req), True if traffic_pct >= 1.0: return self.predict(req), True # Deterministic per-property cohort assignment rng = np.random.default_rng( seed=int(req.area_m2 * 1000 + req.rooms * 100 + req.month + hash(req.district) % 10000) ) use_v2 = rng.random() < traffic_pct if use_v2: return self.predict(req), True # Outside v2 cohort: return heuristic baseline (v1-equivalent) return self._predict_heuristic(req), False def _predict_ensemble(self, req: AVMv2PredictRequest) -> AVMv2PredictResponse: """Run each loaded model and combine with weighted average.""" features = _encode_features(req) predictions: list[ModelPrediction] = [] raw_prices: list[float] = [] for model_name, model in self._models.items(): weight = ENSEMBLE_WEIGHTS.get(model_name, 0.0) price = self._predict_single_model(model_name, model, features) raw_prices.append(price) predictions.append( ModelPrediction( model_name=model_name, weight=weight, predicted_price_vnd=round(price, -3), predicted_price_per_m2_vnd=round(price / req.area_m2, -3), ) ) # Weighted ensemble total_weight = sum(ENSEMBLE_WEIGHTS.get(p.model_name, 0) for p in predictions) if total_weight == 0: total_weight = 1.0 ensemble_price = sum( p.predicted_price_vnd * ENSEMBLE_WEIGHTS.get(p.model_name, 0) for p in predictions ) / total_weight # Confidence = 1 - CV(predictions) prices_arr = np.array(raw_prices) mean_price = np.mean(prices_arr) std_price = np.std(prices_arr) cv = std_price / mean_price if mean_price > 0 else 0.5 confidence = max(0.0, min(1.0, 1.0 - cv)) # Range based on confidence margin = max(0.05, 0.30 * (1.0 - confidence)) price_low = ensemble_price * (1.0 - margin) price_high = ensemble_price * (1.0 + margin) # Feature importance (aggregate from XGBoost if available) drivers = self._get_feature_importance() return AVMv2PredictResponse( estimated_price_vnd=round(ensemble_price, -3), price_per_m2_vnd=round(ensemble_price / req.area_m2, -3), confidence=round(confidence, 4), price_range_low_vnd=round(price_low, -3), price_range_high_vnd=round(price_high, -3), model_predictions=predictions, drivers=drivers[:10], comparables=[], # Populated by data layer in production model_version=self._model_version, ensemble_method="weighted_average", ) def _predict_single_model( self, name: str, model: Any, features: np.ndarray ) -> float: """Get a single model's raw prediction (log-price → price).""" if name == "xgboost": import xgboost as xgb dmatrix = xgb.DMatrix(features, feature_names=FEATURE_NAMES) pred_log = model.predict(dmatrix)[0] return float(np.exp(pred_log)) if name == "lightgbm": pred_log = model.predict(features)[0] return float(np.exp(pred_log)) if name == "catboost": pred_log = model.predict(features)[0] return float(np.exp(pred_log)) logger.warning("Unknown model type: %s", name) return 0.0 def _predict_heuristic(self, req: AVMv2PredictRequest) -> AVMv2PredictResponse: """Multi-factor heuristic simulating ensemble behavior.""" city_key = req.city.lower().strip() base = CITY_BASELINE.get(city_key, DEFAULT_BASELINE) # Property type multiplier type_mult = { "apartment": 0.90, "house": 1.00, "townhouse": 1.10, "villa": 1.40, "land": 0.70, "shophouse": 1.30, "penthouse": 1.60, }.get(req.property_type.lower(), 1.0) # Location adjustments cbd_adj = max(0.7, 1.0 - req.distance_to_cbd_km * 0.02) metro_adj = 1.0 + max(0.0, (2.0 - req.distance_to_metro_km) * 0.05) flood_adj = 1.0 - req.flood_zone_risk * 0.15 # Neighborhood adjustment: ±15% swing around 0.5 midpoint neighborhood_adj = 1.0 + (req.neighborhood_score - 0.5) * 0.30 # Physical adjustments room_adj = 1.0 + req.rooms * 0.015 age_adj = max(0.75, 1.0 - req.building_age_years * 0.008) amenity_adj = ( 1.0 + (0.03 if req.has_elevator else 0.0) + (0.05 if req.has_parking else 0.0) + (0.08 if req.has_pool else 0.0) ) legal_adj = 1.0 if req.has_legal_paper else 0.70 # Floor level premium (apartments/penthouses: higher floors = premium) floor_adj = 1.0 if req.floor_level > 0 and req.property_type.lower() in ("apartment", "penthouse"): if req.total_floors > 0: relative_floor = req.floor_level / req.total_floors # Mid-to-high floors get up to +8% premium, ground floor -3% floor_adj = 1.0 + (relative_floor - 0.3) * 0.12 floor_adj = max(0.97, min(1.08, floor_adj)) else: # No total_floors info: mild premium for higher floors floor_adj = min(1.08, 1.0 + req.floor_level * 0.003) # Direction premium (Vietnamese preference: south/southeast best) direction_adj = { "south": 1.05, "southeast": 1.04, "east": 1.02, "southwest": 1.01, "northeast": 1.0, "west": 0.98, "northwest": 0.97, "north": 0.96, "unknown": 1.0, }.get(req.direction.lower(), 1.0) # Developer reputation: ±10% swing developer_adj = 1.0 + (req.developer_reputation - 0.5) * 0.20 # Market adjustments if req.avg_price_district_3m_vnd_m2 > 0: market_adj = req.avg_price_district_3m_vnd_m2 / (base * 1_000_000) market_adj = max(0.5, min(2.0, market_adj)) else: market_adj = 1.0 momentum_adj = 1.0 + req.price_momentum_30d * 0.5 # Quality adjustments (LLM features) quality_adj = ( 1.0 + (req.renovation_score - 0.5) * 0.15 + (req.view_quality - 0.5) * 0.10 + (req.interior_quality - 0.5) * 0.12 + (0.5 - req.noise_level) * 0.05 + (req.natural_light - 0.5) * 0.05 ) # Temporal — Q4/Tết premium seasonal_adj = 1.03 if req.is_year_end else 1.0 price_per_m2 = ( base * type_mult * cbd_adj * metro_adj * flood_adj * neighborhood_adj * room_adj * age_adj * amenity_adj * legal_adj * floor_adj * direction_adj * developer_adj * market_adj * momentum_adj * quality_adj * seasonal_adj * 1_000_000 # Convert to VND ) estimated = price_per_m2 * req.area_m2 # Simulate 3 model predictions with small variance rng = np.random.default_rng( seed=int(req.area_m2 * 1000 + req.rooms * 100 + req.month) ) noise = rng.normal(1.0, 0.04, size=3) sim_prices = estimated * noise xgb_price = float(sim_prices[0]) lgb_price = float(sim_prices[1]) cat_price = float(sim_prices[2]) predictions = [ ModelPrediction( model_name="xgboost", weight=0.40, predicted_price_vnd=round(xgb_price, -3), predicted_price_per_m2_vnd=round(xgb_price / req.area_m2, -3), ), ModelPrediction( model_name="lightgbm", weight=0.35, predicted_price_vnd=round(lgb_price, -3), predicted_price_per_m2_vnd=round(lgb_price / req.area_m2, -3), ), ModelPrediction( model_name="catboost", weight=0.25, predicted_price_vnd=round(cat_price, -3), predicted_price_per_m2_vnd=round(cat_price / req.area_m2, -3), ), ] prices_arr = np.array([xgb_price, lgb_price, cat_price]) cv = float(np.std(prices_arr) / np.mean(prices_arr)) if np.mean(prices_arr) > 0 else 0.5 confidence = max(0.0, min(1.0, 1.0 - cv)) # Heuristic driver ranking drivers = _heuristic_drivers() return AVMv2PredictResponse( estimated_price_vnd=round(estimated, -3), price_per_m2_vnd=round(price_per_m2, -3), confidence=round(confidence, 4), price_range_low_vnd=round(estimated * 0.82, -3), price_range_high_vnd=round(estimated * 1.18, -3), model_predictions=predictions, drivers=drivers, comparables=[], model_version="ensemble-v2-heuristic", ensemble_method="weighted_average", ) def _get_feature_importance(self) -> list[AVMv2FeatureImportance]: """Extract feature importance from loaded models.""" importances: dict[str, float] = {} if "xgboost" in self._models: try: scores = self._models["xgboost"].get_score( importance_type="gain" ) total = sum(scores.values()) or 1.0 for feat, score in scores.items(): importances[feat] = importances.get(feat, 0) + score / total * 0.4 except Exception: pass if "lightgbm" in self._models: try: model = self._models["lightgbm"] imp = model.feature_importance(importance_type="gain") names = model.feature_name() total = sum(imp) or 1.0 for name, score in zip(names, imp, strict=False): importances[name] = importances.get(name, 0) + score / total * 0.35 except Exception: pass if "catboost" in self._models: try: imp = self._models["catboost"].get_feature_importance() total = sum(imp) or 1.0 for i, score in enumerate(imp): fname = FEATURE_NAMES[i] if i < len(FEATURE_NAMES) else f"f{i}" importances[fname] = importances.get(fname, 0) + score / total * 0.25 except Exception: pass if not importances: return [] sorted_imp = sorted(importances.items(), key=lambda x: x[1], reverse=True) total_imp = sum(v for _, v in sorted_imp) or 1.0 return [ AVMv2FeatureImportance(feature=f, importance=round(v / total_imp, 4)) for f, v in sorted_imp ] def get_feature_importance(self) -> AVMv2FeatureImportanceResponse: """Return global feature importance for the active ensemble. Prefers trained-booster importances (weighted gain aggregation). Falls back to a curated heuristic ranking when no boosters are loaded so the endpoint stays available during scaffolded / heuristic-only runs. """ drivers = self._get_feature_importance() if self._models else [] if drivers: return AVMv2FeatureImportanceResponse( model_version=self._model_version, source="model", drivers=drivers, ) return AVMv2FeatureImportanceResponse( model_version=self._model_version, source="heuristic", drivers=_heuristic_drivers(), ) # ── Training pipeline ─────────────────────────────────────── def train(self, req: AVMv2TrainRequest) -> AVMv2TrainResponse: """Train the ensemble models on available data. Pipeline: 1. Load training data from CSV/database export 2. Feature engineering (encode, normalize, cyclical) 3. Train/val/test split stratified by district 4. For each model: Optuna hyperparameter optimization 5. Save versioned artifacts + register in model registry """ from app.config import settings version = f"ensemble-v2-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}" logger.info("Training AVM v2 ensemble — version %s, trials=%d", version, req.optuna_trials) model_dir = Path(settings.model_path) data_path = model_dir / "training_data.csv" # Check for training data if not data_path.exists(): logger.warning("No training data found at %s — returning scaffold", data_path) return AVMv2TrainResponse( model_version=version, metrics={"mae": 0.0, "mape": 0.0, "rmse": 0.0, "r2": 0.0}, district_metrics={}, training_samples=0, validation_samples=0, test_samples=0, best_params={}, ) # Load and prepare data import pandas as pd from sklearn.model_selection import GroupShuffleSplit df = pd.read_csv(data_path) logger.info("Loaded %d training samples", len(df)) # Feature engineering X, y, groups = self._prepare_training_data(df) if len(X) < 50: logger.warning("Insufficient training data (%d samples)", len(X)) return AVMv2TrainResponse( model_version=version, metrics={"mae": 0.0, "mape": 0.0, "rmse": 0.0, "r2": 0.0}, district_metrics={}, training_samples=len(X), validation_samples=0, test_samples=0, best_params={}, ) # Split: train/val/test grouped by district gss_test = GroupShuffleSplit(n_splits=1, test_size=req.test_size, random_state=42) train_val_idx, test_idx = next(gss_test.split(X, y, groups)) X_trainval, y_trainval = X[train_val_idx], y[train_val_idx] X_test, y_test = X[test_idx], y[test_idx] groups_test = groups[test_idx] groups_trainval = groups[train_val_idx] val_ratio = req.val_size / (1.0 - req.test_size) gss_val = GroupShuffleSplit(n_splits=1, test_size=val_ratio, random_state=42) train_idx, val_idx = next(gss_val.split(X_trainval, y_trainval, groups_trainval)) X_train, y_train = X_trainval[train_idx], y_trainval[train_idx] X_val, y_val = X_trainval[val_idx], y_trainval[val_idx] logger.info("Split: train=%d, val=%d, test=%d", len(X_train), len(X_val), len(X_test)) # Train each model with Optuna best_params: dict[str, dict] = {} trained_models: dict[str, Any] = {} xgb_params, xgb_model = self._train_xgboost(X_train, y_train, X_val, y_val, req.optuna_trials) if xgb_model: best_params["xgboost"] = xgb_params trained_models["xgboost"] = xgb_model lgb_params, lgb_model = self._train_lightgbm(X_train, y_train, X_val, y_val, req.optuna_trials) if lgb_model: best_params["lightgbm"] = lgb_params trained_models["lightgbm"] = lgb_model cat_params, cat_model = self._train_catboost(X_train, y_train, X_val, y_val, req.optuna_trials) if cat_model: best_params["catboost"] = cat_params trained_models["catboost"] = cat_model # Evaluate ensemble on test set metrics = self._evaluate_ensemble(trained_models, X_test, y_test, groups_test) # Save versioned artifacts version_dir = model_dir / "versions" / version version_dir.mkdir(parents=True, exist_ok=True) for name, model in trained_models.items(): self._save_model(name, model, version_dir) # Also save to active model directory self._save_model(name, model, model_dir) # Register in model registry registry_entry = AVMv2ModelInfo( model_version=version, created_at=datetime.now(timezone.utc).isoformat(), metrics={k: v for k, v in metrics.items() if k != "district_metrics"}, is_active=True, ab_test_traffic_pct=0.0, ) self._register_model(registry_entry, model_dir) # Reload models self._models = trained_models self._model_version = version return AVMv2TrainResponse( model_version=version, metrics={k: v for k, v in metrics.items() if k != "district_metrics"}, district_metrics=metrics.get("district_metrics", {}), training_samples=len(X_train), validation_samples=len(X_val), test_samples=len(X_test), best_params=best_params, ) def _prepare_training_data( self, df: "pd.DataFrame" ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Encode a DataFrame into feature matrix, target vector, and group labels.""" import pandas as pd # noqa: F811 feature_cols = [ "distance_to_cbd_km", "distance_to_metro_km", "distance_to_school_km", "distance_to_hospital_km", "distance_to_park_km", "distance_to_mall_km", "flood_zone_risk", "neighborhood_score", "property_type", "area_m2", "rooms", "floor_level", "total_floors", "direction", "floor_ratio", "building_age_years", "has_elevator", "has_parking", "has_pool", "has_legal_paper", "developer_reputation", "avg_price_district_3m_vnd_m2", "listing_density", "absorption_rate", "dom_avg", "price_momentum_30d", "yoy_change", "renovation_score", "view_quality", "interior_quality", "noise_level", "natural_light", "month", ] # Fill missing columns with defaults for col in feature_cols: if col not in df.columns: df[col] = 0.0 if col not in ("property_type", "direction") else "unknown" # Encode categoricals df["property_type_encoded"] = df["property_type"].str.lower().map(PROPERTY_TYPE_MAP).fillna(1) df["direction_encoded"] = df["direction"].str.lower().map(DIRECTION_MAP).fillna(4) # Cyclical month encoding month_rad = 2 * np.pi * df["month"].astype(float) / 12.0 df["month_sin"] = np.sin(month_rad) df["month_cos"] = np.cos(month_rad) df["is_year_end_encoded"] = (df["month"].astype(int).isin([10, 11, 12])).astype(float) # Boolean encoding for col in ["has_elevator", "has_parking", "has_pool", "has_legal_paper"]: df[col] = df[col].astype(float) encoded_feature_cols = [ "distance_to_cbd_km", "distance_to_metro_km", "distance_to_school_km", "distance_to_hospital_km", "distance_to_park_km", "distance_to_mall_km", "flood_zone_risk", "neighborhood_score", "property_type_encoded", "area_m2", "rooms", "floor_level", "total_floors", "direction_encoded", "floor_ratio", "building_age_years", "has_elevator", "has_parking", "has_pool", "has_legal_paper", "developer_reputation", "avg_price_district_3m_vnd_m2", "listing_density", "absorption_rate", "dom_avg", "price_momentum_30d", "yoy_change", "renovation_score", "view_quality", "interior_quality", "noise_level", "natural_light", "month_sin", "month_cos", "is_year_end_encoded", ] X = df[encoded_feature_cols].values.astype(np.float64) y = np.log(df["price_vnd"].values.astype(np.float64)) # Log-price target groups = df.get("district", pd.Series(["default"] * len(df))).values return X, y, groups def _train_xgboost( self, X_train: np.ndarray, y_train: np.ndarray, X_val: np.ndarray, y_val: np.ndarray, n_trials: int, ) -> tuple[dict, Any]: """Train XGBoost with Optuna hyperparameter optimization.""" try: import optuna import xgboost as xgb optuna.logging.set_verbosity(optuna.logging.WARNING) dtrain = xgb.DMatrix(X_train, label=y_train, feature_names=FEATURE_NAMES) dval = xgb.DMatrix(X_val, label=y_val, feature_names=FEATURE_NAMES) def objective(trial: optuna.Trial) -> float: params = { "objective": "reg:squarederror", "eval_metric": "rmse", "max_depth": trial.suggest_int("max_depth", 3, 10), "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True), "subsample": trial.suggest_float("subsample", 0.6, 1.0), "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0), "min_child_weight": trial.suggest_int("min_child_weight", 1, 10), "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True), "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True), "verbosity": 0, } n_rounds = trial.suggest_int("n_rounds", 100, 1000) model = xgb.train( params, dtrain, num_boost_round=n_rounds, evals=[(dval, "val")], verbose_eval=False, early_stopping_rounds=50, ) preds = model.predict(dval) rmse = float(np.sqrt(np.mean((preds - y_val) ** 2))) return rmse study = optuna.create_study(direction="minimize") study.optimize(objective, n_trials=n_trials, show_progress_bar=False) # Retrain with best params on full train set best = study.best_params n_rounds = best.pop("n_rounds", 500) best.update({"objective": "reg:squarederror", "eval_metric": "rmse", "verbosity": 0}) model = xgb.train( best, dtrain, num_boost_round=n_rounds, evals=[(dval, "val")], verbose_eval=False, early_stopping_rounds=50, ) logger.info("XGBoost trained — best RMSE: %.4f", study.best_value) return best, model except Exception as e: logger.warning("XGBoost training failed: %s", e) return {}, None def _train_lightgbm( self, X_train: np.ndarray, y_train: np.ndarray, X_val: np.ndarray, y_val: np.ndarray, n_trials: int, ) -> tuple[dict, Any]: """Train LightGBM with Optuna hyperparameter optimization.""" try: import lightgbm as lgb import optuna optuna.logging.set_verbosity(optuna.logging.WARNING) dtrain = lgb.Dataset(X_train, label=y_train, feature_name=FEATURE_NAMES) dval = lgb.Dataset(X_val, label=y_val, feature_name=FEATURE_NAMES, reference=dtrain) def objective(trial: optuna.Trial) -> float: params = { "objective": "regression", "metric": "rmse", "num_leaves": trial.suggest_int("num_leaves", 15, 127), "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True), "feature_fraction": trial.suggest_float("feature_fraction", 0.6, 1.0), "bagging_fraction": trial.suggest_float("bagging_fraction", 0.6, 1.0), "bagging_freq": trial.suggest_int("bagging_freq", 1, 7), "min_child_samples": trial.suggest_int("min_child_samples", 5, 50), "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True), "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True), "verbosity": -1, } n_rounds = trial.suggest_int("n_rounds", 100, 1000) callbacks = [lgb.early_stopping(50, verbose=False), lgb.log_evaluation(period=0)] model = lgb.train( params, dtrain, num_boost_round=n_rounds, valid_sets=[dval], callbacks=callbacks, ) preds = model.predict(X_val) rmse = float(np.sqrt(np.mean((preds - y_val) ** 2))) return rmse study = optuna.create_study(direction="minimize") study.optimize(objective, n_trials=n_trials, show_progress_bar=False) best = study.best_params n_rounds = best.pop("n_rounds", 500) best.update({"objective": "regression", "metric": "rmse", "verbosity": -1}) callbacks = [lgb.early_stopping(50, verbose=False), lgb.log_evaluation(period=0)] model = lgb.train( best, dtrain, num_boost_round=n_rounds, valid_sets=[dval], callbacks=callbacks, ) logger.info("LightGBM trained — best RMSE: %.4f", study.best_value) return best, model except Exception as e: logger.warning("LightGBM training failed: %s", e) return {}, None def _train_catboost( self, X_train: np.ndarray, y_train: np.ndarray, X_val: np.ndarray, y_val: np.ndarray, n_trials: int, ) -> tuple[dict, Any]: """Train CatBoost with Optuna hyperparameter optimization.""" try: import optuna from catboost import CatBoostRegressor, Pool optuna.logging.set_verbosity(optuna.logging.WARNING) train_pool = Pool(X_train, label=y_train, feature_names=FEATURE_NAMES) val_pool = Pool(X_val, label=y_val, feature_names=FEATURE_NAMES) def objective(trial: optuna.Trial) -> float: params = { "iterations": trial.suggest_int("iterations", 100, 1000), "depth": trial.suggest_int("depth", 4, 10), "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True), "l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1e-8, 10.0, log=True), "bagging_temperature": trial.suggest_float("bagging_temperature", 0.0, 1.0), "random_strength": trial.suggest_float("random_strength", 1e-8, 10.0, log=True), "verbose": 0, "loss_function": "RMSE", "early_stopping_rounds": 50, } model = CatBoostRegressor(**params) model.fit(train_pool, eval_set=val_pool, verbose=0) preds = model.predict(val_pool) rmse = float(np.sqrt(np.mean((preds - y_val) ** 2))) return rmse study = optuna.create_study(direction="minimize") study.optimize(objective, n_trials=n_trials, show_progress_bar=False) best = study.best_params best.update({"verbose": 0, "loss_function": "RMSE", "early_stopping_rounds": 50}) model = CatBoostRegressor(**best) model.fit(train_pool, eval_set=val_pool, verbose=0) logger.info("CatBoost trained — best RMSE: %.4f", study.best_value) return best, model except Exception as e: logger.warning("CatBoost training failed: %s", e) return {}, None def _evaluate_ensemble( self, models: dict[str, Any], X_test: np.ndarray, y_test: np.ndarray, groups_test: np.ndarray | None = None, ) -> dict: """Evaluate ensemble performance on a test set.""" if not models: return {"mae": 0.0, "mape": 0.0, "rmse": 0.0, "r2": 0.0} predictions = [] weights = [] for name, model in models.items(): w = ENSEMBLE_WEIGHTS.get(name, 0.0) features = X_test if name == "xgboost": import xgboost as xgb preds = model.predict(xgb.DMatrix(features, feature_names=FEATURE_NAMES)) elif name == "lightgbm": preds = model.predict(features) elif name == "catboost": preds = model.predict(features) else: continue predictions.append(preds * w) weights.append(w) total_weight = sum(weights) or 1.0 ensemble_preds = sum(predictions) / total_weight # Metrics in log-space then convert y_actual = np.exp(y_test) y_pred = np.exp(ensemble_preds) mae = float(np.mean(np.abs(y_actual - y_pred))) mape = float(np.mean(np.abs((y_actual - y_pred) / y_actual))) * 100 rmse = float(np.sqrt(np.mean((y_actual - y_pred) ** 2))) ss_res = np.sum((y_actual - y_pred) ** 2) ss_tot = np.sum((y_actual - np.mean(y_actual)) ** 2) r2 = float(1.0 - ss_res / ss_tot) if ss_tot > 0 else 0.0 global_metrics = { "mae": round(mae, 2), "mape": round(mape, 2), "rmse": round(rmse, 2), "r2": round(r2, 4), } # Per-district breakdown district_metrics: dict[str, dict] = {} if groups_test is not None and len(groups_test) == len(y_actual): unique_districts = np.unique(groups_test) for district in unique_districts: mask = groups_test == district if mask.sum() < 3: # Too few samples for reliable per-district stats continue d_actual = y_actual[mask] d_pred = y_pred[mask] d_mae = float(np.mean(np.abs(d_actual - d_pred))) d_mape = float(np.mean(np.abs((d_actual - d_pred) / d_actual))) * 100 d_rmse = float(np.sqrt(np.mean((d_actual - d_pred) ** 2))) d_ss_res = np.sum((d_actual - d_pred) ** 2) d_ss_tot = np.sum((d_actual - np.mean(d_actual)) ** 2) d_r2 = float(1.0 - d_ss_res / d_ss_tot) if d_ss_tot > 0 else 0.0 district_metrics[str(district)] = { "mae": round(d_mae, 2), "mape": round(d_mape, 2), "rmse": round(d_rmse, 2), "r2": round(d_r2, 4), "samples": int(mask.sum()), } global_metrics["district_metrics"] = district_metrics # type: ignore[assignment] return global_metrics def _save_model(self, name: str, model: Any, directory: Path) -> None: """Save a trained model to the specified directory.""" if name == "xgboost": model.save_model(str(directory / "avm_v2_xgboost.json")) elif name == "lightgbm": model.save_model(str(directory / "avm_v2_lightgbm.txt")) elif name == "catboost": model.save_model(str(directory / "avm_v2_catboost.cbm")) # ── Model registry ────────────────────────────────────────── def _get_registry_path(self, model_dir: Path | None = None) -> Path: """Get the path to the model registry JSON file.""" if model_dir is None: from app.config import settings model_dir = Path(settings.model_path) return model_dir / "model_registry.json" def _load_registry(self, model_dir: Path | None = None) -> list[dict]: """Load the model registry from disk.""" path = self._get_registry_path(model_dir) if path.exists(): with open(path) as f: return json.load(f) return [] def _save_registry(self, entries: list[dict], model_dir: Path | None = None) -> None: """Save the model registry to disk.""" path = self._get_registry_path(model_dir) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: json.dump(entries, f, indent=2) def _register_model(self, info: AVMv2ModelInfo, model_dir: Path) -> None: """Register a new model version and mark it as active.""" entries = self._load_registry(model_dir) # Deactivate previous active models for entry in entries: entry["is_active"] = False entries.append({ "model_version": info.model_version, "created_at": info.created_at, "metrics": info.metrics, "is_active": True, "ab_test_traffic_pct": info.ab_test_traffic_pct, }) self._save_registry(entries, model_dir) self._model_registry = [ AVMv2ModelInfo(**e) for e in entries ] def get_model_info(self) -> AVMv2ModelInfo: """Return current active model information.""" # Check registry for active model entries = self._load_registry() for entry in reversed(entries): if entry.get("is_active"): return AVMv2ModelInfo(**entry) return AVMv2ModelInfo( model_version=self._model_version, created_at=datetime.now(timezone.utc).isoformat(), metrics={}, is_active=True, ab_test_traffic_pct=0.0, ) def list_versions(self) -> list[AVMv2ModelInfo]: """List all registered model versions.""" entries = self._load_registry() return [AVMv2ModelInfo(**e) for e in entries] def set_ab_traffic(self, traffic_pct: float) -> AVMv2ModelInfo: """Set the A/B test traffic percentage for the currently active model. ``traffic_pct=0.10`` routes 10% of ``/predict`` calls to the v2 ensemble; the remaining 90% receive the heuristic baseline response (matching v1 behaviour). Set to ``1.0`` to fully switch to v2, or ``0.0`` to disable the A/B split (v2 always used when called directly). """ from app.config import settings model_dir = Path(settings.model_path) entries = self._load_registry(model_dir) updated: dict | None = None for entry in reversed(entries): if entry.get("is_active"): entry["ab_test_traffic_pct"] = traffic_pct updated = entry break if updated is None: raise ValueError("No active model found in registry") self._save_registry(entries, model_dir) self._model_registry = [AVMv2ModelInfo(**e) for e in entries] return AVMv2ModelInfo(**updated) def rollback(self, target_version: str) -> AVMv2ModelInfo: """Rollback to a previously trained model version. Copies the target version's artifacts to the active model directory and updates the registry. """ from app.config import settings model_dir = Path(settings.model_path) version_dir = model_dir / "versions" / target_version if not version_dir.exists(): raise ValueError(f"Model version {target_version} not found") # Copy versioned artifacts to active directory for artifact in version_dir.iterdir(): if artifact.is_file(): shutil.copy2(artifact, model_dir / artifact.name) # Update registry entries = self._load_registry(model_dir) found = False for entry in entries: entry["is_active"] = entry["model_version"] == target_version if entry["model_version"] == target_version: found = True if not found: raise ValueError(f"Model version {target_version} not in registry") self._save_registry(entries, model_dir) # Reload models from disk self._models = {} self._load_models() self._model_version = target_version logger.info("Rolled back to model version %s", target_version) active = next(e for e in entries if e["is_active"]) return AVMv2ModelInfo(**active) # ── A/B comparison ───────────────────────────────────────── def compare_v1(self, req: ABComparisonRequest) -> ABComparisonResponse: """Compare v1 and v2 predictions on the same property.""" from app.services.avm_service import avm_service # Build v1 request v1_req = AVMPredictRequest( area=req.area_m2, district=req.district, city=req.city, property_type=req.property_type, bedrooms=req.bedrooms or req.rooms, floors=req.floors, frontage=req.frontage, has_legal_paper=req.has_legal_paper, ) v1_result = avm_service.predict(v1_req) # Build v2 request v2_req = AVMv2PredictRequest( district=req.district, city=req.city, property_type=req.property_type, area_m2=req.area_m2, rooms=req.rooms or req.bedrooms, has_legal_paper=req.has_legal_paper, neighborhood_score=req.neighborhood_score, distance_to_cbd_km=req.distance_to_cbd_km, distance_to_metro_km=req.distance_to_metro_km, flood_zone_risk=req.flood_zone_risk, building_age_years=req.building_age_years, floor_level=req.floor_level, total_floors=req.total_floors, direction=req.direction, has_elevator=req.has_elevator, has_parking=req.has_parking, has_pool=req.has_pool, developer_reputation=req.developer_reputation, renovation_score=req.renovation_score, view_quality=req.view_quality, interior_quality=req.interior_quality, month=req.month, quarter=req.quarter, is_year_end=req.is_year_end, ) v2_result = self.predict(v2_req) # Compute diffs price_diff = v2_result.estimated_price_vnd - v1_result.estimated_price_vnd price_diff_pct = ( (price_diff / v1_result.estimated_price_vnd * 100) if v1_result.estimated_price_vnd > 0 else 0.0 ) confidence_diff = v2_result.confidence - v1_result.confidence # Recommendation logic if v2_result.confidence > v1_result.confidence + 0.05: recommendation = "v2 — higher confidence from ensemble model agreement" elif v1_result.confidence > v2_result.confidence + 0.05: recommendation = "v1 — higher confidence, v2 models may disagree on this property" elif abs(price_diff_pct) < 5: recommendation = "Both models agree (< 5% price difference)" else: recommendation = "v2 — richer feature set captures more market factors" return ABComparisonResponse( v1=AVMv1Summary( estimated_price_vnd=v1_result.estimated_price_vnd, confidence=v1_result.confidence, price_per_m2=v1_result.price_per_m2, price_range_low=v1_result.price_range_low, price_range_high=v1_result.price_range_high, ), v2=AVMv2Summary( estimated_price_vnd=v2_result.estimated_price_vnd, confidence=v2_result.confidence, price_per_m2_vnd=v2_result.price_per_m2_vnd, price_range_low_vnd=v2_result.price_range_low_vnd, price_range_high_vnd=v2_result.price_range_high_vnd, model_version=v2_result.model_version, ensemble_method=v2_result.ensemble_method, ), price_diff_vnd=round(price_diff, -3), price_diff_pct=round(price_diff_pct, 2), confidence_diff=round(confidence_diff, 4), recommendation=recommendation, ) # Module-level singleton avm_v2_service = AVMv2EnsembleService()