Files
goodgo-platform/libs/ai-services/app/services/avm_v2_service.py
Ho Ngoc Hai 66f952a4a8 feat(ai-services): complete AVM v2 ensemble — upload endpoint, per-district metrics, A/B routing
- Add POST /avm/v2/upload-training-data so AvmRetrainCronService can push
  CSV rows before triggering retraining (was called but missing)
- Add per-district MAE/MAPE/RMSE/R² to _evaluate_ensemble output;
  district_metrics are now returned in AVMv2TrainResponse and stored
  separately from global metrics in the model registry
- Add predict_with_ab() that applies the active model's ab_test_traffic_pct
  for deterministic per-property cohort assignment (v2 vs heuristic baseline)
- Add POST /avm/v2/ab-config to set traffic_pct on the active registry entry
- Add AVMv2ABConfigRequest schema
- Expand test suite: 24 → 28 tests covering upload, A/B config, and new
  validation paths; all green

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-21 04:39:57 +07:00

1260 lines
48 KiB
Python

"""AVM v2 — Multi-model ensemble service (XGBoost + LightGBM + CatBoost).
Heuristic fallback when trained models are not available.
Ensemble weights: XGBoost 0.4, LightGBM 0.35, CatBoost 0.25.
Confidence = 1 - CV(3 predictions), where CV = std / mean.
"""
import json
import logging
import os
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import numpy as np
from app.models.avm import AVMPredictRequest
from app.models.avm_v2 import (
ABComparisonRequest,
ABComparisonResponse,
AVMv1Summary,
AVMv2Comparable,
AVMv2FeatureImportance,
AVMv2FeatureImportanceResponse,
AVMv2ModelInfo,
AVMv2PredictRequest,
AVMv2PredictResponse,
AVMv2Summary,
AVMv2TrainRequest,
AVMv2TrainResponse,
ModelPrediction,
)
logger = logging.getLogger(__name__)
# ── Ensemble configuration ──────────────────────────────────────
ENSEMBLE_WEIGHTS = {
"xgboost": 0.40,
"lightgbm": 0.35,
"catboost": 0.25,
}
# ── Feature ordering for model input ────────────────────────────
FEATURE_NAMES = [
# Location (7)
"distance_to_cbd_km",
"distance_to_metro_km",
"distance_to_school_km",
"distance_to_hospital_km",
"distance_to_park_km",
"distance_to_mall_km",
"flood_zone_risk",
# Neighborhood (1)
"neighborhood_score",
# Physical (13)
"property_type_encoded",
"area_m2",
"rooms",
"floor_level",
"total_floors",
"direction_encoded",
"floor_ratio",
"building_age_years",
"has_elevator",
"has_parking",
"has_pool",
"has_legal_paper",
"developer_reputation",
# Market (6)
"avg_price_district_3m_vnd_m2",
"listing_density",
"absorption_rate",
"dom_avg",
"price_momentum_30d",
"yoy_change",
# LLM-extracted (5)
"renovation_score",
"view_quality",
"interior_quality",
"noise_level",
"natural_light",
# Temporal (3)
"month_sin",
"month_cos",
"is_year_end",
]
DIRECTION_MAP = {
"south": 0,
"southeast": 1,
"east": 2,
"southwest": 3,
"northeast": 4,
"west": 5,
"northwest": 6,
"north": 7,
"unknown": 4, # neutral mid-value
}
PROPERTY_TYPE_MAP = {
"apartment": 0,
"house": 1,
"townhouse": 2,
"villa": 3,
"land": 4,
"shophouse": 5,
"penthouse": 6,
}
# ── Heuristic baselines (millions VND/m²) ───────────────────────
CITY_BASELINE: dict[str, float] = {
"hà nội": 85.0,
"hồ chí minh": 90.0,
"đà nẵng": 45.0,
"hải phòng": 35.0,
"cần thơ": 25.0,
"bình dương": 22.0,
"đồng nai": 20.0,
"nha trang": 35.0,
"vũng tàu": 28.0,
}
DEFAULT_BASELINE = 30.0
# ── Heuristic feature importance ────────────────────────────────
# Used both inside heuristic predict responses and by the dedicated
# feature-importance endpoint when no trained booster is loaded.
_HEURISTIC_DRIVERS: list[tuple[str, float]] = [
("area_m2", 0.14),
("avg_price_district_3m_vnd_m2", 0.12),
("neighborhood_score", 0.10),
("property_type_encoded", 0.10),
("distance_to_cbd_km", 0.08),
("developer_reputation", 0.07),
("renovation_score", 0.07),
("building_age_years", 0.06),
("direction_encoded", 0.05),
("floor_level", 0.05),
("has_legal_paper", 0.05),
("distance_to_metro_km", 0.04),
("interior_quality", 0.04),
("price_momentum_30d", 0.03),
]
def _heuristic_drivers() -> list[AVMv2FeatureImportance]:
return [AVMv2FeatureImportance(feature=f, importance=w) for f, w in _HEURISTIC_DRIVERS]
def _encode_features(req: AVMv2PredictRequest) -> np.ndarray:
"""Encode a prediction request into a feature vector."""
month_rad = 2 * np.pi * req.month / 12.0
return np.array(
[[
# Location (7)
req.distance_to_cbd_km,
req.distance_to_metro_km,
req.distance_to_school_km,
req.distance_to_hospital_km,
req.distance_to_park_km,
req.distance_to_mall_km,
req.flood_zone_risk,
# Neighborhood (1)
req.neighborhood_score,
# Physical (13)
PROPERTY_TYPE_MAP.get(req.property_type.lower(), 1),
req.area_m2,
req.rooms,
float(req.floor_level),
float(req.total_floors),
float(DIRECTION_MAP.get(req.direction.lower(), 4)),
req.floor_ratio,
req.building_age_years,
1.0 if req.has_elevator else 0.0,
1.0 if req.has_parking else 0.0,
1.0 if req.has_pool else 0.0,
1.0 if req.has_legal_paper else 0.0,
req.developer_reputation,
# Market (6)
req.avg_price_district_3m_vnd_m2,
req.listing_density,
req.absorption_rate,
req.dom_avg,
req.price_momentum_30d,
req.yoy_change,
# LLM-extracted (5)
req.renovation_score,
req.view_quality,
req.interior_quality,
req.noise_level,
req.natural_light,
# Temporal (3)
np.sin(month_rad),
np.cos(month_rad),
1.0 if req.is_year_end else 0.0,
]],
dtype=np.float64,
)
class AVMv2EnsembleService:
"""Multi-model ensemble AVM for residential properties.
Attempts to load XGBoost, LightGBM, and CatBoost models from
the model directory. Falls back to a heuristic approach when
trained models are not available.
"""
def __init__(self) -> None:
self._models: dict[str, Any] = {}
self._model_version = "ensemble-v2-heuristic"
self._model_registry: list[AVMv2ModelInfo] = []
self._load_models()
# ── Model loading ───────────────────────────────────────────
def _load_models(self) -> None:
"""Attempt to load each model in the ensemble."""
from app.config import settings
model_dir = settings.model_path
# XGBoost
try:
import xgboost as xgb
path = os.path.join(model_dir, "avm_v2_xgboost.json")
if os.path.exists(path):
booster = xgb.Booster()
booster.load_model(path)
self._models["xgboost"] = booster
logger.info("Loaded XGBoost AVM v2 model from %s", path)
except Exception:
logger.info("XGBoost model not available")
# LightGBM
try:
import lightgbm as lgb
path = os.path.join(model_dir, "avm_v2_lightgbm.txt")
if os.path.exists(path):
self._models["lightgbm"] = lgb.Booster(model_file=path)
logger.info("Loaded LightGBM AVM v2 model from %s", path)
except Exception:
logger.info("LightGBM model not available")
# CatBoost
try:
from catboost import CatBoostRegressor
path = os.path.join(model_dir, "avm_v2_catboost.cbm")
if os.path.exists(path):
model = CatBoostRegressor()
model.load_model(path)
self._models["catboost"] = model
logger.info("Loaded CatBoost AVM v2 model from %s", path)
except Exception:
logger.info("CatBoost model not available")
if self._models:
self._model_version = f"ensemble-v2-{'+'.join(sorted(self._models.keys()))}"
logger.info("AVM v2 ensemble active with: %s", list(self._models.keys()))
else:
logger.info("No trained AVM v2 models found — using heuristic fallback")
# ── Prediction ──────────────────────────────────────────────
def predict(self, req: AVMv2PredictRequest) -> AVMv2PredictResponse:
"""Run the ensemble prediction pipeline."""
if self._models:
return self._predict_ensemble(req)
return self._predict_heuristic(req)
def predict_with_ab(self, req: AVMv2PredictRequest) -> tuple[AVMv2PredictResponse, bool]:
"""Run prediction respecting the A/B test traffic split.
Returns ``(response, used_v2)`` where ``used_v2`` is ``True`` when the
request was served by the v2 ensemble and ``False`` when it was served
by the v1-equivalent heuristic baseline (i.e. outside the v2 cohort).
The random draw is seeded from the request features so the same
property always lands in the same cohort within a training cycle.
"""
info = self.get_model_info()
traffic_pct = info.ab_test_traffic_pct
if traffic_pct <= 0.0:
# A/B disabled — always use v2
return self.predict(req), True
if traffic_pct >= 1.0:
return self.predict(req), True
# Deterministic per-property cohort assignment
rng = np.random.default_rng(
seed=int(req.area_m2 * 1000 + req.rooms * 100 + req.month + hash(req.district) % 10000)
)
use_v2 = rng.random() < traffic_pct
if use_v2:
return self.predict(req), True
# Outside v2 cohort: return heuristic baseline (v1-equivalent)
return self._predict_heuristic(req), False
def _predict_ensemble(self, req: AVMv2PredictRequest) -> AVMv2PredictResponse:
"""Run each loaded model and combine with weighted average."""
features = _encode_features(req)
predictions: list[ModelPrediction] = []
raw_prices: list[float] = []
for model_name, model in self._models.items():
weight = ENSEMBLE_WEIGHTS.get(model_name, 0.0)
price = self._predict_single_model(model_name, model, features)
raw_prices.append(price)
predictions.append(
ModelPrediction(
model_name=model_name,
weight=weight,
predicted_price_vnd=round(price, -3),
predicted_price_per_m2_vnd=round(price / req.area_m2, -3),
)
)
# Weighted ensemble
total_weight = sum(ENSEMBLE_WEIGHTS.get(p.model_name, 0) for p in predictions)
if total_weight == 0:
total_weight = 1.0
ensemble_price = sum(
p.predicted_price_vnd * ENSEMBLE_WEIGHTS.get(p.model_name, 0)
for p in predictions
) / total_weight
# Confidence = 1 - CV(predictions)
prices_arr = np.array(raw_prices)
mean_price = np.mean(prices_arr)
std_price = np.std(prices_arr)
cv = std_price / mean_price if mean_price > 0 else 0.5
confidence = max(0.0, min(1.0, 1.0 - cv))
# Range based on confidence
margin = max(0.05, 0.30 * (1.0 - confidence))
price_low = ensemble_price * (1.0 - margin)
price_high = ensemble_price * (1.0 + margin)
# Feature importance (aggregate from XGBoost if available)
drivers = self._get_feature_importance()
return AVMv2PredictResponse(
estimated_price_vnd=round(ensemble_price, -3),
price_per_m2_vnd=round(ensemble_price / req.area_m2, -3),
confidence=round(confidence, 4),
price_range_low_vnd=round(price_low, -3),
price_range_high_vnd=round(price_high, -3),
model_predictions=predictions,
drivers=drivers[:10],
comparables=[], # Populated by data layer in production
model_version=self._model_version,
ensemble_method="weighted_average",
)
def _predict_single_model(
self, name: str, model: Any, features: np.ndarray
) -> float:
"""Get a single model's raw prediction (log-price → price)."""
if name == "xgboost":
import xgboost as xgb
dmatrix = xgb.DMatrix(features, feature_names=FEATURE_NAMES)
pred_log = model.predict(dmatrix)[0]
return float(np.exp(pred_log))
if name == "lightgbm":
pred_log = model.predict(features)[0]
return float(np.exp(pred_log))
if name == "catboost":
pred_log = model.predict(features)[0]
return float(np.exp(pred_log))
logger.warning("Unknown model type: %s", name)
return 0.0
def _predict_heuristic(self, req: AVMv2PredictRequest) -> AVMv2PredictResponse:
"""Multi-factor heuristic simulating ensemble behavior."""
city_key = req.city.lower().strip()
base = CITY_BASELINE.get(city_key, DEFAULT_BASELINE)
# Property type multiplier
type_mult = {
"apartment": 0.90,
"house": 1.00,
"townhouse": 1.10,
"villa": 1.40,
"land": 0.70,
"shophouse": 1.30,
"penthouse": 1.60,
}.get(req.property_type.lower(), 1.0)
# Location adjustments
cbd_adj = max(0.7, 1.0 - req.distance_to_cbd_km * 0.02)
metro_adj = 1.0 + max(0.0, (2.0 - req.distance_to_metro_km) * 0.05)
flood_adj = 1.0 - req.flood_zone_risk * 0.15
# Neighborhood adjustment: ±15% swing around 0.5 midpoint
neighborhood_adj = 1.0 + (req.neighborhood_score - 0.5) * 0.30
# Physical adjustments
room_adj = 1.0 + req.rooms * 0.015
age_adj = max(0.75, 1.0 - req.building_age_years * 0.008)
amenity_adj = (
1.0
+ (0.03 if req.has_elevator else 0.0)
+ (0.05 if req.has_parking else 0.0)
+ (0.08 if req.has_pool else 0.0)
)
legal_adj = 1.0 if req.has_legal_paper else 0.70
# Floor level premium (apartments/penthouses: higher floors = premium)
floor_adj = 1.0
if req.floor_level > 0 and req.property_type.lower() in ("apartment", "penthouse"):
if req.total_floors > 0:
relative_floor = req.floor_level / req.total_floors
# Mid-to-high floors get up to +8% premium, ground floor -3%
floor_adj = 1.0 + (relative_floor - 0.3) * 0.12
floor_adj = max(0.97, min(1.08, floor_adj))
else:
# No total_floors info: mild premium for higher floors
floor_adj = min(1.08, 1.0 + req.floor_level * 0.003)
# Direction premium (Vietnamese preference: south/southeast best)
direction_adj = {
"south": 1.05,
"southeast": 1.04,
"east": 1.02,
"southwest": 1.01,
"northeast": 1.0,
"west": 0.98,
"northwest": 0.97,
"north": 0.96,
"unknown": 1.0,
}.get(req.direction.lower(), 1.0)
# Developer reputation: ±10% swing
developer_adj = 1.0 + (req.developer_reputation - 0.5) * 0.20
# Market adjustments
if req.avg_price_district_3m_vnd_m2 > 0:
market_adj = req.avg_price_district_3m_vnd_m2 / (base * 1_000_000)
market_adj = max(0.5, min(2.0, market_adj))
else:
market_adj = 1.0
momentum_adj = 1.0 + req.price_momentum_30d * 0.5
# Quality adjustments (LLM features)
quality_adj = (
1.0
+ (req.renovation_score - 0.5) * 0.15
+ (req.view_quality - 0.5) * 0.10
+ (req.interior_quality - 0.5) * 0.12
+ (0.5 - req.noise_level) * 0.05
+ (req.natural_light - 0.5) * 0.05
)
# Temporal — Q4/Tết premium
seasonal_adj = 1.03 if req.is_year_end else 1.0
price_per_m2 = (
base
* type_mult
* cbd_adj
* metro_adj
* flood_adj
* neighborhood_adj
* room_adj
* age_adj
* amenity_adj
* legal_adj
* floor_adj
* direction_adj
* developer_adj
* market_adj
* momentum_adj
* quality_adj
* seasonal_adj
* 1_000_000 # Convert to VND
)
estimated = price_per_m2 * req.area_m2
# Simulate 3 model predictions with small variance
rng = np.random.default_rng(
seed=int(req.area_m2 * 1000 + req.rooms * 100 + req.month)
)
noise = rng.normal(1.0, 0.04, size=3)
sim_prices = estimated * noise
xgb_price = float(sim_prices[0])
lgb_price = float(sim_prices[1])
cat_price = float(sim_prices[2])
predictions = [
ModelPrediction(
model_name="xgboost",
weight=0.40,
predicted_price_vnd=round(xgb_price, -3),
predicted_price_per_m2_vnd=round(xgb_price / req.area_m2, -3),
),
ModelPrediction(
model_name="lightgbm",
weight=0.35,
predicted_price_vnd=round(lgb_price, -3),
predicted_price_per_m2_vnd=round(lgb_price / req.area_m2, -3),
),
ModelPrediction(
model_name="catboost",
weight=0.25,
predicted_price_vnd=round(cat_price, -3),
predicted_price_per_m2_vnd=round(cat_price / req.area_m2, -3),
),
]
prices_arr = np.array([xgb_price, lgb_price, cat_price])
cv = float(np.std(prices_arr) / np.mean(prices_arr)) if np.mean(prices_arr) > 0 else 0.5
confidence = max(0.0, min(1.0, 1.0 - cv))
# Heuristic driver ranking
drivers = _heuristic_drivers()
return AVMv2PredictResponse(
estimated_price_vnd=round(estimated, -3),
price_per_m2_vnd=round(price_per_m2, -3),
confidence=round(confidence, 4),
price_range_low_vnd=round(estimated * 0.82, -3),
price_range_high_vnd=round(estimated * 1.18, -3),
model_predictions=predictions,
drivers=drivers,
comparables=[],
model_version="ensemble-v2-heuristic",
ensemble_method="weighted_average",
)
def _get_feature_importance(self) -> list[AVMv2FeatureImportance]:
"""Extract feature importance from loaded models."""
importances: dict[str, float] = {}
if "xgboost" in self._models:
try:
scores = self._models["xgboost"].get_score(
importance_type="gain"
)
total = sum(scores.values()) or 1.0
for feat, score in scores.items():
importances[feat] = importances.get(feat, 0) + score / total * 0.4
except Exception:
pass
if "lightgbm" in self._models:
try:
model = self._models["lightgbm"]
imp = model.feature_importance(importance_type="gain")
names = model.feature_name()
total = sum(imp) or 1.0
for name, score in zip(names, imp, strict=False):
importances[name] = importances.get(name, 0) + score / total * 0.35
except Exception:
pass
if "catboost" in self._models:
try:
imp = self._models["catboost"].get_feature_importance()
total = sum(imp) or 1.0
for i, score in enumerate(imp):
fname = FEATURE_NAMES[i] if i < len(FEATURE_NAMES) else f"f{i}"
importances[fname] = importances.get(fname, 0) + score / total * 0.25
except Exception:
pass
if not importances:
return []
sorted_imp = sorted(importances.items(), key=lambda x: x[1], reverse=True)
total_imp = sum(v for _, v in sorted_imp) or 1.0
return [
AVMv2FeatureImportance(feature=f, importance=round(v / total_imp, 4))
for f, v in sorted_imp
]
def get_feature_importance(self) -> AVMv2FeatureImportanceResponse:
"""Return global feature importance for the active ensemble.
Prefers trained-booster importances (weighted gain aggregation). Falls
back to a curated heuristic ranking when no boosters are loaded so the
endpoint stays available during scaffolded / heuristic-only runs.
"""
drivers = self._get_feature_importance() if self._models else []
if drivers:
return AVMv2FeatureImportanceResponse(
model_version=self._model_version,
source="model",
drivers=drivers,
)
return AVMv2FeatureImportanceResponse(
model_version=self._model_version,
source="heuristic",
drivers=_heuristic_drivers(),
)
# ── Training pipeline ───────────────────────────────────────
def train(self, req: AVMv2TrainRequest) -> AVMv2TrainResponse:
"""Train the ensemble models on available data.
Pipeline:
1. Load training data from CSV/database export
2. Feature engineering (encode, normalize, cyclical)
3. Train/val/test split stratified by district
4. For each model: Optuna hyperparameter optimization
5. Save versioned artifacts + register in model registry
"""
from app.config import settings
version = f"ensemble-v2-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}"
logger.info("Training AVM v2 ensemble — version %s, trials=%d", version, req.optuna_trials)
model_dir = Path(settings.model_path)
data_path = model_dir / "training_data.csv"
# Check for training data
if not data_path.exists():
logger.warning("No training data found at %s — returning scaffold", data_path)
return AVMv2TrainResponse(
model_version=version,
metrics={"mae": 0.0, "mape": 0.0, "rmse": 0.0, "r2": 0.0},
district_metrics={},
training_samples=0,
validation_samples=0,
test_samples=0,
best_params={},
)
# Load and prepare data
import pandas as pd
from sklearn.model_selection import GroupShuffleSplit
df = pd.read_csv(data_path)
logger.info("Loaded %d training samples", len(df))
# Feature engineering
X, y, groups = self._prepare_training_data(df)
if len(X) < 50:
logger.warning("Insufficient training data (%d samples)", len(X))
return AVMv2TrainResponse(
model_version=version,
metrics={"mae": 0.0, "mape": 0.0, "rmse": 0.0, "r2": 0.0},
district_metrics={},
training_samples=len(X),
validation_samples=0,
test_samples=0,
best_params={},
)
# Split: train/val/test grouped by district
gss_test = GroupShuffleSplit(n_splits=1, test_size=req.test_size, random_state=42)
train_val_idx, test_idx = next(gss_test.split(X, y, groups))
X_trainval, y_trainval = X[train_val_idx], y[train_val_idx]
X_test, y_test = X[test_idx], y[test_idx]
groups_test = groups[test_idx]
groups_trainval = groups[train_val_idx]
val_ratio = req.val_size / (1.0 - req.test_size)
gss_val = GroupShuffleSplit(n_splits=1, test_size=val_ratio, random_state=42)
train_idx, val_idx = next(gss_val.split(X_trainval, y_trainval, groups_trainval))
X_train, y_train = X_trainval[train_idx], y_trainval[train_idx]
X_val, y_val = X_trainval[val_idx], y_trainval[val_idx]
logger.info("Split: train=%d, val=%d, test=%d", len(X_train), len(X_val), len(X_test))
# Train each model with Optuna
best_params: dict[str, dict] = {}
trained_models: dict[str, Any] = {}
xgb_params, xgb_model = self._train_xgboost(X_train, y_train, X_val, y_val, req.optuna_trials)
if xgb_model:
best_params["xgboost"] = xgb_params
trained_models["xgboost"] = xgb_model
lgb_params, lgb_model = self._train_lightgbm(X_train, y_train, X_val, y_val, req.optuna_trials)
if lgb_model:
best_params["lightgbm"] = lgb_params
trained_models["lightgbm"] = lgb_model
cat_params, cat_model = self._train_catboost(X_train, y_train, X_val, y_val, req.optuna_trials)
if cat_model:
best_params["catboost"] = cat_params
trained_models["catboost"] = cat_model
# Evaluate ensemble on test set
metrics = self._evaluate_ensemble(trained_models, X_test, y_test, groups_test)
# Save versioned artifacts
version_dir = model_dir / "versions" / version
version_dir.mkdir(parents=True, exist_ok=True)
for name, model in trained_models.items():
self._save_model(name, model, version_dir)
# Also save to active model directory
self._save_model(name, model, model_dir)
# Register in model registry
registry_entry = AVMv2ModelInfo(
model_version=version,
created_at=datetime.now(timezone.utc).isoformat(),
metrics={k: v for k, v in metrics.items() if k != "district_metrics"},
is_active=True,
ab_test_traffic_pct=0.0,
)
self._register_model(registry_entry, model_dir)
# Reload models
self._models = trained_models
self._model_version = version
return AVMv2TrainResponse(
model_version=version,
metrics={k: v for k, v in metrics.items() if k != "district_metrics"},
district_metrics=metrics.get("district_metrics", {}),
training_samples=len(X_train),
validation_samples=len(X_val),
test_samples=len(X_test),
best_params=best_params,
)
def _prepare_training_data(
self, df: "pd.DataFrame"
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Encode a DataFrame into feature matrix, target vector, and group labels."""
import pandas as pd # noqa: F811
feature_cols = [
"distance_to_cbd_km", "distance_to_metro_km", "distance_to_school_km",
"distance_to_hospital_km", "distance_to_park_km", "distance_to_mall_km",
"flood_zone_risk", "neighborhood_score",
"property_type", "area_m2", "rooms", "floor_level", "total_floors",
"direction", "floor_ratio", "building_age_years",
"has_elevator", "has_parking", "has_pool", "has_legal_paper",
"developer_reputation",
"avg_price_district_3m_vnd_m2", "listing_density", "absorption_rate",
"dom_avg", "price_momentum_30d", "yoy_change",
"renovation_score", "view_quality", "interior_quality",
"noise_level", "natural_light",
"month",
]
# Fill missing columns with defaults
for col in feature_cols:
if col not in df.columns:
df[col] = 0.0 if col not in ("property_type", "direction") else "unknown"
# Encode categoricals
df["property_type_encoded"] = df["property_type"].str.lower().map(PROPERTY_TYPE_MAP).fillna(1)
df["direction_encoded"] = df["direction"].str.lower().map(DIRECTION_MAP).fillna(4)
# Cyclical month encoding
month_rad = 2 * np.pi * df["month"].astype(float) / 12.0
df["month_sin"] = np.sin(month_rad)
df["month_cos"] = np.cos(month_rad)
df["is_year_end_encoded"] = (df["month"].astype(int).isin([10, 11, 12])).astype(float)
# Boolean encoding
for col in ["has_elevator", "has_parking", "has_pool", "has_legal_paper"]:
df[col] = df[col].astype(float)
encoded_feature_cols = [
"distance_to_cbd_km", "distance_to_metro_km", "distance_to_school_km",
"distance_to_hospital_km", "distance_to_park_km", "distance_to_mall_km",
"flood_zone_risk", "neighborhood_score",
"property_type_encoded", "area_m2", "rooms", "floor_level", "total_floors",
"direction_encoded", "floor_ratio", "building_age_years",
"has_elevator", "has_parking", "has_pool", "has_legal_paper",
"developer_reputation",
"avg_price_district_3m_vnd_m2", "listing_density", "absorption_rate",
"dom_avg", "price_momentum_30d", "yoy_change",
"renovation_score", "view_quality", "interior_quality",
"noise_level", "natural_light",
"month_sin", "month_cos", "is_year_end_encoded",
]
X = df[encoded_feature_cols].values.astype(np.float64)
y = np.log(df["price_vnd"].values.astype(np.float64)) # Log-price target
groups = df.get("district", pd.Series(["default"] * len(df))).values
return X, y, groups
def _train_xgboost(
self,
X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray,
n_trials: int,
) -> tuple[dict, Any]:
"""Train XGBoost with Optuna hyperparameter optimization."""
try:
import optuna
import xgboost as xgb
optuna.logging.set_verbosity(optuna.logging.WARNING)
dtrain = xgb.DMatrix(X_train, label=y_train, feature_names=FEATURE_NAMES)
dval = xgb.DMatrix(X_val, label=y_val, feature_names=FEATURE_NAMES)
def objective(trial: optuna.Trial) -> float:
params = {
"objective": "reg:squarederror",
"eval_metric": "rmse",
"max_depth": trial.suggest_int("max_depth", 3, 10),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
"subsample": trial.suggest_float("subsample", 0.6, 1.0),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
"min_child_weight": trial.suggest_int("min_child_weight", 1, 10),
"reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True),
"reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True),
"verbosity": 0,
}
n_rounds = trial.suggest_int("n_rounds", 100, 1000)
model = xgb.train(
params, dtrain, num_boost_round=n_rounds,
evals=[(dval, "val")], verbose_eval=False,
early_stopping_rounds=50,
)
preds = model.predict(dval)
rmse = float(np.sqrt(np.mean((preds - y_val) ** 2)))
return rmse
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
# Retrain with best params on full train set
best = study.best_params
n_rounds = best.pop("n_rounds", 500)
best.update({"objective": "reg:squarederror", "eval_metric": "rmse", "verbosity": 0})
model = xgb.train(
best, dtrain, num_boost_round=n_rounds,
evals=[(dval, "val")], verbose_eval=False,
early_stopping_rounds=50,
)
logger.info("XGBoost trained — best RMSE: %.4f", study.best_value)
return best, model
except Exception as e:
logger.warning("XGBoost training failed: %s", e)
return {}, None
def _train_lightgbm(
self,
X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray,
n_trials: int,
) -> tuple[dict, Any]:
"""Train LightGBM with Optuna hyperparameter optimization."""
try:
import lightgbm as lgb
import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)
dtrain = lgb.Dataset(X_train, label=y_train, feature_name=FEATURE_NAMES)
dval = lgb.Dataset(X_val, label=y_val, feature_name=FEATURE_NAMES, reference=dtrain)
def objective(trial: optuna.Trial) -> float:
params = {
"objective": "regression",
"metric": "rmse",
"num_leaves": trial.suggest_int("num_leaves", 15, 127),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
"feature_fraction": trial.suggest_float("feature_fraction", 0.6, 1.0),
"bagging_fraction": trial.suggest_float("bagging_fraction", 0.6, 1.0),
"bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
"min_child_samples": trial.suggest_int("min_child_samples", 5, 50),
"reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True),
"reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True),
"verbosity": -1,
}
n_rounds = trial.suggest_int("n_rounds", 100, 1000)
callbacks = [lgb.early_stopping(50, verbose=False), lgb.log_evaluation(period=0)]
model = lgb.train(
params, dtrain, num_boost_round=n_rounds,
valid_sets=[dval], callbacks=callbacks,
)
preds = model.predict(X_val)
rmse = float(np.sqrt(np.mean((preds - y_val) ** 2)))
return rmse
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
best = study.best_params
n_rounds = best.pop("n_rounds", 500)
best.update({"objective": "regression", "metric": "rmse", "verbosity": -1})
callbacks = [lgb.early_stopping(50, verbose=False), lgb.log_evaluation(period=0)]
model = lgb.train(
best, dtrain, num_boost_round=n_rounds,
valid_sets=[dval], callbacks=callbacks,
)
logger.info("LightGBM trained — best RMSE: %.4f", study.best_value)
return best, model
except Exception as e:
logger.warning("LightGBM training failed: %s", e)
return {}, None
def _train_catboost(
self,
X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray,
n_trials: int,
) -> tuple[dict, Any]:
"""Train CatBoost with Optuna hyperparameter optimization."""
try:
import optuna
from catboost import CatBoostRegressor, Pool
optuna.logging.set_verbosity(optuna.logging.WARNING)
train_pool = Pool(X_train, label=y_train, feature_names=FEATURE_NAMES)
val_pool = Pool(X_val, label=y_val, feature_names=FEATURE_NAMES)
def objective(trial: optuna.Trial) -> float:
params = {
"iterations": trial.suggest_int("iterations", 100, 1000),
"depth": trial.suggest_int("depth", 4, 10),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
"l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1e-8, 10.0, log=True),
"bagging_temperature": trial.suggest_float("bagging_temperature", 0.0, 1.0),
"random_strength": trial.suggest_float("random_strength", 1e-8, 10.0, log=True),
"verbose": 0,
"loss_function": "RMSE",
"early_stopping_rounds": 50,
}
model = CatBoostRegressor(**params)
model.fit(train_pool, eval_set=val_pool, verbose=0)
preds = model.predict(val_pool)
rmse = float(np.sqrt(np.mean((preds - y_val) ** 2)))
return rmse
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
best = study.best_params
best.update({"verbose": 0, "loss_function": "RMSE", "early_stopping_rounds": 50})
model = CatBoostRegressor(**best)
model.fit(train_pool, eval_set=val_pool, verbose=0)
logger.info("CatBoost trained — best RMSE: %.4f", study.best_value)
return best, model
except Exception as e:
logger.warning("CatBoost training failed: %s", e)
return {}, None
def _evaluate_ensemble(
self, models: dict[str, Any], X_test: np.ndarray, y_test: np.ndarray,
groups_test: np.ndarray | None = None,
) -> dict:
"""Evaluate ensemble performance on a test set."""
if not models:
return {"mae": 0.0, "mape": 0.0, "rmse": 0.0, "r2": 0.0}
predictions = []
weights = []
for name, model in models.items():
w = ENSEMBLE_WEIGHTS.get(name, 0.0)
features = X_test
if name == "xgboost":
import xgboost as xgb
preds = model.predict(xgb.DMatrix(features, feature_names=FEATURE_NAMES))
elif name == "lightgbm":
preds = model.predict(features)
elif name == "catboost":
preds = model.predict(features)
else:
continue
predictions.append(preds * w)
weights.append(w)
total_weight = sum(weights) or 1.0
ensemble_preds = sum(predictions) / total_weight
# Metrics in log-space then convert
y_actual = np.exp(y_test)
y_pred = np.exp(ensemble_preds)
mae = float(np.mean(np.abs(y_actual - y_pred)))
mape = float(np.mean(np.abs((y_actual - y_pred) / y_actual))) * 100
rmse = float(np.sqrt(np.mean((y_actual - y_pred) ** 2)))
ss_res = np.sum((y_actual - y_pred) ** 2)
ss_tot = np.sum((y_actual - np.mean(y_actual)) ** 2)
r2 = float(1.0 - ss_res / ss_tot) if ss_tot > 0 else 0.0
global_metrics = {
"mae": round(mae, 2),
"mape": round(mape, 2),
"rmse": round(rmse, 2),
"r2": round(r2, 4),
}
# Per-district breakdown
district_metrics: dict[str, dict] = {}
if groups_test is not None and len(groups_test) == len(y_actual):
unique_districts = np.unique(groups_test)
for district in unique_districts:
mask = groups_test == district
if mask.sum() < 3:
# Too few samples for reliable per-district stats
continue
d_actual = y_actual[mask]
d_pred = y_pred[mask]
d_mae = float(np.mean(np.abs(d_actual - d_pred)))
d_mape = float(np.mean(np.abs((d_actual - d_pred) / d_actual))) * 100
d_rmse = float(np.sqrt(np.mean((d_actual - d_pred) ** 2)))
d_ss_res = np.sum((d_actual - d_pred) ** 2)
d_ss_tot = np.sum((d_actual - np.mean(d_actual)) ** 2)
d_r2 = float(1.0 - d_ss_res / d_ss_tot) if d_ss_tot > 0 else 0.0
district_metrics[str(district)] = {
"mae": round(d_mae, 2),
"mape": round(d_mape, 2),
"rmse": round(d_rmse, 2),
"r2": round(d_r2, 4),
"samples": int(mask.sum()),
}
global_metrics["district_metrics"] = district_metrics # type: ignore[assignment]
return global_metrics
def _save_model(self, name: str, model: Any, directory: Path) -> None:
"""Save a trained model to the specified directory."""
if name == "xgboost":
model.save_model(str(directory / "avm_v2_xgboost.json"))
elif name == "lightgbm":
model.save_model(str(directory / "avm_v2_lightgbm.txt"))
elif name == "catboost":
model.save_model(str(directory / "avm_v2_catboost.cbm"))
# ── Model registry ──────────────────────────────────────────
def _get_registry_path(self, model_dir: Path | None = None) -> Path:
"""Get the path to the model registry JSON file."""
if model_dir is None:
from app.config import settings
model_dir = Path(settings.model_path)
return model_dir / "model_registry.json"
def _load_registry(self, model_dir: Path | None = None) -> list[dict]:
"""Load the model registry from disk."""
path = self._get_registry_path(model_dir)
if path.exists():
with open(path) as f:
return json.load(f)
return []
def _save_registry(self, entries: list[dict], model_dir: Path | None = None) -> None:
"""Save the model registry to disk."""
path = self._get_registry_path(model_dir)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(entries, f, indent=2)
def _register_model(self, info: AVMv2ModelInfo, model_dir: Path) -> None:
"""Register a new model version and mark it as active."""
entries = self._load_registry(model_dir)
# Deactivate previous active models
for entry in entries:
entry["is_active"] = False
entries.append({
"model_version": info.model_version,
"created_at": info.created_at,
"metrics": info.metrics,
"is_active": True,
"ab_test_traffic_pct": info.ab_test_traffic_pct,
})
self._save_registry(entries, model_dir)
self._model_registry = [
AVMv2ModelInfo(**e) for e in entries
]
def get_model_info(self) -> AVMv2ModelInfo:
"""Return current active model information."""
# Check registry for active model
entries = self._load_registry()
for entry in reversed(entries):
if entry.get("is_active"):
return AVMv2ModelInfo(**entry)
return AVMv2ModelInfo(
model_version=self._model_version,
created_at=datetime.now(timezone.utc).isoformat(),
metrics={},
is_active=True,
ab_test_traffic_pct=0.0,
)
def list_versions(self) -> list[AVMv2ModelInfo]:
"""List all registered model versions."""
entries = self._load_registry()
return [AVMv2ModelInfo(**e) for e in entries]
def set_ab_traffic(self, traffic_pct: float) -> AVMv2ModelInfo:
"""Set the A/B test traffic percentage for the currently active model.
``traffic_pct=0.10`` routes 10% of ``/predict`` calls to the v2
ensemble; the remaining 90% receive the heuristic baseline response
(matching v1 behaviour). Set to ``1.0`` to fully switch to v2, or
``0.0`` to disable the A/B split (v2 always used when called directly).
"""
from app.config import settings
model_dir = Path(settings.model_path)
entries = self._load_registry(model_dir)
updated: dict | None = None
for entry in reversed(entries):
if entry.get("is_active"):
entry["ab_test_traffic_pct"] = traffic_pct
updated = entry
break
if updated is None:
raise ValueError("No active model found in registry")
self._save_registry(entries, model_dir)
self._model_registry = [AVMv2ModelInfo(**e) for e in entries]
return AVMv2ModelInfo(**updated)
def rollback(self, target_version: str) -> AVMv2ModelInfo:
"""Rollback to a previously trained model version.
Copies the target version's artifacts to the active model directory
and updates the registry.
"""
from app.config import settings
model_dir = Path(settings.model_path)
version_dir = model_dir / "versions" / target_version
if not version_dir.exists():
raise ValueError(f"Model version {target_version} not found")
# Copy versioned artifacts to active directory
for artifact in version_dir.iterdir():
if artifact.is_file():
shutil.copy2(artifact, model_dir / artifact.name)
# Update registry
entries = self._load_registry(model_dir)
found = False
for entry in entries:
entry["is_active"] = entry["model_version"] == target_version
if entry["model_version"] == target_version:
found = True
if not found:
raise ValueError(f"Model version {target_version} not in registry")
self._save_registry(entries, model_dir)
# Reload models from disk
self._models = {}
self._load_models()
self._model_version = target_version
logger.info("Rolled back to model version %s", target_version)
active = next(e for e in entries if e["is_active"])
return AVMv2ModelInfo(**active)
# ── A/B comparison ─────────────────────────────────────────
def compare_v1(self, req: ABComparisonRequest) -> ABComparisonResponse:
"""Compare v1 and v2 predictions on the same property."""
from app.services.avm_service import avm_service
# Build v1 request
v1_req = AVMPredictRequest(
area=req.area_m2,
district=req.district,
city=req.city,
property_type=req.property_type,
bedrooms=req.bedrooms or req.rooms,
floors=req.floors,
frontage=req.frontage,
has_legal_paper=req.has_legal_paper,
)
v1_result = avm_service.predict(v1_req)
# Build v2 request
v2_req = AVMv2PredictRequest(
district=req.district,
city=req.city,
property_type=req.property_type,
area_m2=req.area_m2,
rooms=req.rooms or req.bedrooms,
has_legal_paper=req.has_legal_paper,
neighborhood_score=req.neighborhood_score,
distance_to_cbd_km=req.distance_to_cbd_km,
distance_to_metro_km=req.distance_to_metro_km,
flood_zone_risk=req.flood_zone_risk,
building_age_years=req.building_age_years,
floor_level=req.floor_level,
total_floors=req.total_floors,
direction=req.direction,
has_elevator=req.has_elevator,
has_parking=req.has_parking,
has_pool=req.has_pool,
developer_reputation=req.developer_reputation,
renovation_score=req.renovation_score,
view_quality=req.view_quality,
interior_quality=req.interior_quality,
month=req.month,
quarter=req.quarter,
is_year_end=req.is_year_end,
)
v2_result = self.predict(v2_req)
# Compute diffs
price_diff = v2_result.estimated_price_vnd - v1_result.estimated_price_vnd
price_diff_pct = (
(price_diff / v1_result.estimated_price_vnd * 100)
if v1_result.estimated_price_vnd > 0
else 0.0
)
confidence_diff = v2_result.confidence - v1_result.confidence
# Recommendation logic
if v2_result.confidence > v1_result.confidence + 0.05:
recommendation = "v2 — higher confidence from ensemble model agreement"
elif v1_result.confidence > v2_result.confidence + 0.05:
recommendation = "v1 — higher confidence, v2 models may disagree on this property"
elif abs(price_diff_pct) < 5:
recommendation = "Both models agree (< 5% price difference)"
else:
recommendation = "v2 — richer feature set captures more market factors"
return ABComparisonResponse(
v1=AVMv1Summary(
estimated_price_vnd=v1_result.estimated_price_vnd,
confidence=v1_result.confidence,
price_per_m2=v1_result.price_per_m2,
price_range_low=v1_result.price_range_low,
price_range_high=v1_result.price_range_high,
),
v2=AVMv2Summary(
estimated_price_vnd=v2_result.estimated_price_vnd,
confidence=v2_result.confidence,
price_per_m2_vnd=v2_result.price_per_m2_vnd,
price_range_low_vnd=v2_result.price_range_low_vnd,
price_range_high_vnd=v2_result.price_range_high_vnd,
model_version=v2_result.model_version,
ensemble_method=v2_result.ensemble_method,
),
price_diff_vnd=round(price_diff, -3),
price_diff_pct=round(price_diff_pct, 2),
confidence_diff=round(confidence_diff, 4),
recommendation=recommendation,
)
# Module-level singleton
avm_v2_service = AVMv2EnsembleService()