feat(ai-services): AVM v2 residential — expanded features, training pipeline, model versioning

Add neighborhood_score, developer_reputation, floor_level, direction premiums
to the multi-model ensemble. Implement real Optuna-based training pipeline
for XGBoost/LightGBM/CatBoost with grouped train/val/test splits. Add
file-based model registry with rollback and list-versions endpoints.
23 Python tests covering all new features.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-16 17:55:03 +07:00
parent 6cf2c23170
commit 9eaec46a37
4 changed files with 743 additions and 56 deletions

View File

@@ -5,9 +5,12 @@ Ensemble weights: XGBoost 0.4, LightGBM 0.35, CatBoost 0.25.
Confidence = 1 - CV(3 predictions), where CV = std / mean.
"""
import json
import logging
import os
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import numpy as np
@@ -47,16 +50,22 @@ FEATURE_NAMES = [
"distance_to_park_km",
"distance_to_mall_km",
"flood_zone_risk",
# Physical (8)
# Neighborhood (1)
"neighborhood_score",
# Physical (13)
"property_type_encoded",
"area_m2",
"rooms",
"floor_level",
"total_floors",
"direction_encoded",
"floor_ratio",
"building_age_years",
"has_elevator",
"has_parking",
"has_pool",
"has_legal_paper",
"developer_reputation",
# Market (6)
"avg_price_district_3m_vnd_m2",
"listing_density",
@@ -76,6 +85,18 @@ FEATURE_NAMES = [
"is_year_end",
]
DIRECTION_MAP = {
"south": 0,
"southeast": 1,
"east": 2,
"southwest": 3,
"northeast": 4,
"west": 5,
"northwest": 6,
"north": 7,
"unknown": 4, # neutral mid-value
}
PROPERTY_TYPE_MAP = {
"apartment": 0,
"house": 1,
@@ -106,7 +127,7 @@ def _encode_features(req: AVMv2PredictRequest) -> np.ndarray:
month_rad = 2 * np.pi * req.month / 12.0
return np.array(
[[
# Location
# Location (7)
req.distance_to_cbd_km,
req.distance_to_metro_km,
req.distance_to_school_km,
@@ -114,30 +135,36 @@ def _encode_features(req: AVMv2PredictRequest) -> np.ndarray:
req.distance_to_park_km,
req.distance_to_mall_km,
req.flood_zone_risk,
# Physical
# Neighborhood (1)
req.neighborhood_score,
# Physical (13)
PROPERTY_TYPE_MAP.get(req.property_type.lower(), 1),
req.area_m2,
req.rooms,
float(req.floor_level),
float(req.total_floors),
float(DIRECTION_MAP.get(req.direction.lower(), 4)),
req.floor_ratio,
req.building_age_years,
1.0 if req.has_elevator else 0.0,
1.0 if req.has_parking else 0.0,
1.0 if req.has_pool else 0.0,
1.0 if req.has_legal_paper else 0.0,
# Market
req.developer_reputation,
# Market (6)
req.avg_price_district_3m_vnd_m2,
req.listing_density,
req.absorption_rate,
req.dom_avg,
req.price_momentum_30d,
req.yoy_change,
# LLM-extracted
# LLM-extracted (5)
req.renovation_score,
req.view_quality,
req.interior_quality,
req.noise_level,
req.natural_light,
# Temporal
# Temporal (3)
np.sin(month_rad),
np.cos(month_rad),
1.0 if req.is_year_end else 0.0,
@@ -319,6 +346,9 @@ class AVMv2EnsembleService:
metro_adj = 1.0 + max(0.0, (2.0 - req.distance_to_metro_km) * 0.05)
flood_adj = 1.0 - req.flood_zone_risk * 0.15
# Neighborhood adjustment: ±15% swing around 0.5 midpoint
neighborhood_adj = 1.0 + (req.neighborhood_score - 0.5) * 0.30
# Physical adjustments
room_adj = 1.0 + req.rooms * 0.015
age_adj = max(0.75, 1.0 - req.building_age_years * 0.008)
@@ -330,6 +360,34 @@ class AVMv2EnsembleService:
)
legal_adj = 1.0 if req.has_legal_paper else 0.70
# Floor level premium (apartments/penthouses: higher floors = premium)
floor_adj = 1.0
if req.floor_level > 0 and req.property_type.lower() in ("apartment", "penthouse"):
if req.total_floors > 0:
relative_floor = req.floor_level / req.total_floors
# Mid-to-high floors get up to +8% premium, ground floor -3%
floor_adj = 1.0 + (relative_floor - 0.3) * 0.12
floor_adj = max(0.97, min(1.08, floor_adj))
else:
# No total_floors info: mild premium for higher floors
floor_adj = min(1.08, 1.0 + req.floor_level * 0.003)
# Direction premium (Vietnamese preference: south/southeast best)
direction_adj = {
"south": 1.05,
"southeast": 1.04,
"east": 1.02,
"southwest": 1.01,
"northeast": 1.0,
"west": 0.98,
"northwest": 0.97,
"north": 0.96,
"unknown": 1.0,
}.get(req.direction.lower(), 1.0)
# Developer reputation: ±10% swing
developer_adj = 1.0 + (req.developer_reputation - 0.5) * 0.20
# Market adjustments
if req.avg_price_district_3m_vnd_m2 > 0:
market_adj = req.avg_price_district_3m_vnd_m2 / (base * 1_000_000)
@@ -357,10 +415,14 @@ class AVMv2EnsembleService:
* cbd_adj
* metro_adj
* flood_adj
* neighborhood_adj
* room_adj
* age_adj
* amenity_adj
* legal_adj
* floor_adj
* direction_adj
* developer_adj
* market_adj
* momentum_adj
* quality_adj
@@ -407,16 +469,20 @@ class AVMv2EnsembleService:
# Heuristic driver ranking
drivers = [
AVMv2FeatureImportance(feature="area_m2", importance=0.18),
AVMv2FeatureImportance(feature="avg_price_district_3m_vnd_m2", importance=0.15),
AVMv2FeatureImportance(feature="property_type_encoded", importance=0.12),
AVMv2FeatureImportance(feature="distance_to_cbd_km", importance=0.10),
AVMv2FeatureImportance(feature="renovation_score", importance=0.08),
AVMv2FeatureImportance(feature="building_age_years", importance=0.07),
AVMv2FeatureImportance(feature="has_legal_paper", importance=0.06),
AVMv2FeatureImportance(feature="distance_to_metro_km", importance=0.05),
AVMv2FeatureImportance(feature="interior_quality", importance=0.05),
AVMv2FeatureImportance(feature="price_momentum_30d", importance=0.04),
AVMv2FeatureImportance(feature="area_m2", importance=0.14),
AVMv2FeatureImportance(feature="avg_price_district_3m_vnd_m2", importance=0.12),
AVMv2FeatureImportance(feature="neighborhood_score", importance=0.10),
AVMv2FeatureImportance(feature="property_type_encoded", importance=0.10),
AVMv2FeatureImportance(feature="distance_to_cbd_km", importance=0.08),
AVMv2FeatureImportance(feature="developer_reputation", importance=0.07),
AVMv2FeatureImportance(feature="renovation_score", importance=0.07),
AVMv2FeatureImportance(feature="building_age_years", importance=0.06),
AVMv2FeatureImportance(feature="direction_encoded", importance=0.05),
AVMv2FeatureImportance(feature="floor_level", importance=0.05),
AVMv2FeatureImportance(feature="has_legal_paper", importance=0.05),
AVMv2FeatureImportance(feature="distance_to_metro_km", importance=0.04),
AVMv2FeatureImportance(feature="interior_quality", importance=0.04),
AVMv2FeatureImportance(feature="price_momentum_30d", importance=0.03),
]
return AVMv2PredictResponse(
@@ -481,52 +547,455 @@ class AVMv2EnsembleService:
# ── Training pipeline ───────────────────────────────────────
def train(self, req: AVMv2TrainRequest) -> AVMv2TrainResponse:
"""Train the ensemble models.
"""Train the ensemble models on available data.
In production, this loads training data from the database/MinIO,
performs 5-fold CV by district with Optuna hyperparameter optimization,
and saves versioned model artifacts.
Currently returns a scaffold response. Real training requires
the data pipeline from Phase 3.
Pipeline:
1. Load training data from CSV/database export
2. Feature engineering (encode, normalize, cyclical)
3. Train/val/test split stratified by district
4. For each model: Optuna hyperparameter optimization
5. Save versioned artifacts + register in model registry
"""
from app.config import settings
version = f"ensemble-v2-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}"
logger.info("Training AVM v2 ensemble — version %s, trials=%d", version, req.optuna_trials)
# TODO: Replace with actual training pipeline when data is available
# 1. Load data from PostgreSQL/MinIO
# 2. Feature engineering (encode categoricals, normalize, cyclical)
# 3. 80/10/10 split stratified by district
# 4. For each model (XGBoost, LightGBM, CatBoost):
# a. Optuna study with req.optuna_trials trials
# b. 5-fold CV grouped by district
# c. Train on best params
# 5. Save artifacts to MinIO with version tag
# 6. Register in model registry
model_dir = Path(settings.model_path)
data_path = model_dir / "training_data.csv"
# Check for training data
if not data_path.exists():
logger.warning("No training data found at %s — returning scaffold", data_path)
return AVMv2TrainResponse(
model_version=version,
metrics={"mae": 0.0, "mape": 0.0, "rmse": 0.0, "r2": 0.0},
district_metrics={},
training_samples=0,
validation_samples=0,
test_samples=0,
best_params={},
)
# Load and prepare data
import pandas as pd
from sklearn.model_selection import GroupShuffleSplit
df = pd.read_csv(data_path)
logger.info("Loaded %d training samples", len(df))
# Feature engineering
X, y, groups = self._prepare_training_data(df)
if len(X) < 50:
logger.warning("Insufficient training data (%d samples)", len(X))
return AVMv2TrainResponse(
model_version=version,
metrics={"mae": 0.0, "mape": 0.0, "rmse": 0.0, "r2": 0.0},
district_metrics={},
training_samples=len(X),
validation_samples=0,
test_samples=0,
best_params={},
)
# Split: train/val/test grouped by district
gss_test = GroupShuffleSplit(n_splits=1, test_size=req.test_size, random_state=42)
train_val_idx, test_idx = next(gss_test.split(X, y, groups))
X_trainval, y_trainval = X[train_val_idx], y[train_val_idx]
X_test, y_test = X[test_idx], y[test_idx]
groups_trainval = groups[train_val_idx]
val_ratio = req.val_size / (1.0 - req.test_size)
gss_val = GroupShuffleSplit(n_splits=1, test_size=val_ratio, random_state=42)
train_idx, val_idx = next(gss_val.split(X_trainval, y_trainval, groups_trainval))
X_train, y_train = X_trainval[train_idx], y_trainval[train_idx]
X_val, y_val = X_trainval[val_idx], y_trainval[val_idx]
logger.info("Split: train=%d, val=%d, test=%d", len(X_train), len(X_val), len(X_test))
# Train each model with Optuna
best_params: dict[str, dict] = {}
trained_models: dict[str, Any] = {}
xgb_params, xgb_model = self._train_xgboost(X_train, y_train, X_val, y_val, req.optuna_trials)
if xgb_model:
best_params["xgboost"] = xgb_params
trained_models["xgboost"] = xgb_model
lgb_params, lgb_model = self._train_lightgbm(X_train, y_train, X_val, y_val, req.optuna_trials)
if lgb_model:
best_params["lightgbm"] = lgb_params
trained_models["lightgbm"] = lgb_model
cat_params, cat_model = self._train_catboost(X_train, y_train, X_val, y_val, req.optuna_trials)
if cat_model:
best_params["catboost"] = cat_params
trained_models["catboost"] = cat_model
# Evaluate ensemble on test set
metrics = self._evaluate_ensemble(trained_models, X_test, y_test)
# Save versioned artifacts
version_dir = model_dir / "versions" / version
version_dir.mkdir(parents=True, exist_ok=True)
for name, model in trained_models.items():
self._save_model(name, model, version_dir)
# Also save to active model directory
self._save_model(name, model, model_dir)
# Register in model registry
registry_entry = AVMv2ModelInfo(
model_version=version,
created_at=datetime.now(timezone.utc).isoformat(),
metrics=metrics,
is_active=True,
ab_test_traffic_pct=0.0,
)
self._register_model(registry_entry, model_dir)
# Reload models
self._models = trained_models
self._model_version = version
return AVMv2TrainResponse(
model_version=version,
metrics={
"mae": 0.0,
"mape": 0.0,
"rmse": 0.0,
"r2": 0.0,
},
metrics=metrics,
district_metrics={},
training_samples=0,
validation_samples=0,
test_samples=0,
best_params={
"xgboost": {"n_estimators": 500, "max_depth": 6, "learning_rate": 0.05},
"lightgbm": {"n_estimators": 500, "num_leaves": 31, "learning_rate": 0.05},
"catboost": {"iterations": 500, "depth": 6, "learning_rate": 0.05},
},
training_samples=len(X_train),
validation_samples=len(X_val),
test_samples=len(X_test),
best_params=best_params,
)
def _prepare_training_data(
self, df: "pd.DataFrame"
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Encode a DataFrame into feature matrix, target vector, and group labels."""
import pandas as pd # noqa: F811
feature_cols = [
"distance_to_cbd_km", "distance_to_metro_km", "distance_to_school_km",
"distance_to_hospital_km", "distance_to_park_km", "distance_to_mall_km",
"flood_zone_risk", "neighborhood_score",
"property_type", "area_m2", "rooms", "floor_level", "total_floors",
"direction", "floor_ratio", "building_age_years",
"has_elevator", "has_parking", "has_pool", "has_legal_paper",
"developer_reputation",
"avg_price_district_3m_vnd_m2", "listing_density", "absorption_rate",
"dom_avg", "price_momentum_30d", "yoy_change",
"renovation_score", "view_quality", "interior_quality",
"noise_level", "natural_light",
"month",
]
# Fill missing columns with defaults
for col in feature_cols:
if col not in df.columns:
df[col] = 0.0 if col not in ("property_type", "direction") else "unknown"
# Encode categoricals
df["property_type_encoded"] = df["property_type"].str.lower().map(PROPERTY_TYPE_MAP).fillna(1)
df["direction_encoded"] = df["direction"].str.lower().map(DIRECTION_MAP).fillna(4)
# Cyclical month encoding
month_rad = 2 * np.pi * df["month"].astype(float) / 12.0
df["month_sin"] = np.sin(month_rad)
df["month_cos"] = np.cos(month_rad)
df["is_year_end_encoded"] = (df["month"].astype(int).isin([10, 11, 12])).astype(float)
# Boolean encoding
for col in ["has_elevator", "has_parking", "has_pool", "has_legal_paper"]:
df[col] = df[col].astype(float)
encoded_feature_cols = [
"distance_to_cbd_km", "distance_to_metro_km", "distance_to_school_km",
"distance_to_hospital_km", "distance_to_park_km", "distance_to_mall_km",
"flood_zone_risk", "neighborhood_score",
"property_type_encoded", "area_m2", "rooms", "floor_level", "total_floors",
"direction_encoded", "floor_ratio", "building_age_years",
"has_elevator", "has_parking", "has_pool", "has_legal_paper",
"developer_reputation",
"avg_price_district_3m_vnd_m2", "listing_density", "absorption_rate",
"dom_avg", "price_momentum_30d", "yoy_change",
"renovation_score", "view_quality", "interior_quality",
"noise_level", "natural_light",
"month_sin", "month_cos", "is_year_end_encoded",
]
X = df[encoded_feature_cols].values.astype(np.float64)
y = np.log(df["price_vnd"].values.astype(np.float64)) # Log-price target
groups = df.get("district", pd.Series(["default"] * len(df))).values
return X, y, groups
def _train_xgboost(
self,
X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray,
n_trials: int,
) -> tuple[dict, Any]:
"""Train XGBoost with Optuna hyperparameter optimization."""
try:
import optuna
import xgboost as xgb
optuna.logging.set_verbosity(optuna.logging.WARNING)
dtrain = xgb.DMatrix(X_train, label=y_train, feature_names=FEATURE_NAMES)
dval = xgb.DMatrix(X_val, label=y_val, feature_names=FEATURE_NAMES)
def objective(trial: optuna.Trial) -> float:
params = {
"objective": "reg:squarederror",
"eval_metric": "rmse",
"max_depth": trial.suggest_int("max_depth", 3, 10),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
"subsample": trial.suggest_float("subsample", 0.6, 1.0),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
"min_child_weight": trial.suggest_int("min_child_weight", 1, 10),
"reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True),
"reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True),
"verbosity": 0,
}
n_rounds = trial.suggest_int("n_rounds", 100, 1000)
model = xgb.train(
params, dtrain, num_boost_round=n_rounds,
evals=[(dval, "val")], verbose_eval=False,
early_stopping_rounds=50,
)
preds = model.predict(dval)
rmse = float(np.sqrt(np.mean((preds - y_val) ** 2)))
return rmse
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
# Retrain with best params on full train set
best = study.best_params
n_rounds = best.pop("n_rounds", 500)
best.update({"objective": "reg:squarederror", "eval_metric": "rmse", "verbosity": 0})
model = xgb.train(
best, dtrain, num_boost_round=n_rounds,
evals=[(dval, "val")], verbose_eval=False,
early_stopping_rounds=50,
)
logger.info("XGBoost trained — best RMSE: %.4f", study.best_value)
return best, model
except Exception as e:
logger.warning("XGBoost training failed: %s", e)
return {}, None
def _train_lightgbm(
self,
X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray,
n_trials: int,
) -> tuple[dict, Any]:
"""Train LightGBM with Optuna hyperparameter optimization."""
try:
import lightgbm as lgb
import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)
dtrain = lgb.Dataset(X_train, label=y_train, feature_name=FEATURE_NAMES)
dval = lgb.Dataset(X_val, label=y_val, feature_name=FEATURE_NAMES, reference=dtrain)
def objective(trial: optuna.Trial) -> float:
params = {
"objective": "regression",
"metric": "rmse",
"num_leaves": trial.suggest_int("num_leaves", 15, 127),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
"feature_fraction": trial.suggest_float("feature_fraction", 0.6, 1.0),
"bagging_fraction": trial.suggest_float("bagging_fraction", 0.6, 1.0),
"bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
"min_child_samples": trial.suggest_int("min_child_samples", 5, 50),
"reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True),
"reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True),
"verbosity": -1,
}
n_rounds = trial.suggest_int("n_rounds", 100, 1000)
callbacks = [lgb.early_stopping(50, verbose=False), lgb.log_evaluation(period=0)]
model = lgb.train(
params, dtrain, num_boost_round=n_rounds,
valid_sets=[dval], callbacks=callbacks,
)
preds = model.predict(X_val)
rmse = float(np.sqrt(np.mean((preds - y_val) ** 2)))
return rmse
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
best = study.best_params
n_rounds = best.pop("n_rounds", 500)
best.update({"objective": "regression", "metric": "rmse", "verbosity": -1})
callbacks = [lgb.early_stopping(50, verbose=False), lgb.log_evaluation(period=0)]
model = lgb.train(
best, dtrain, num_boost_round=n_rounds,
valid_sets=[dval], callbacks=callbacks,
)
logger.info("LightGBM trained — best RMSE: %.4f", study.best_value)
return best, model
except Exception as e:
logger.warning("LightGBM training failed: %s", e)
return {}, None
def _train_catboost(
self,
X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray,
n_trials: int,
) -> tuple[dict, Any]:
"""Train CatBoost with Optuna hyperparameter optimization."""
try:
import optuna
from catboost import CatBoostRegressor, Pool
optuna.logging.set_verbosity(optuna.logging.WARNING)
train_pool = Pool(X_train, label=y_train, feature_names=FEATURE_NAMES)
val_pool = Pool(X_val, label=y_val, feature_names=FEATURE_NAMES)
def objective(trial: optuna.Trial) -> float:
params = {
"iterations": trial.suggest_int("iterations", 100, 1000),
"depth": trial.suggest_int("depth", 4, 10),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
"l2_leaf_reg": trial.suggest_float("l2_leaf_reg", 1e-8, 10.0, log=True),
"bagging_temperature": trial.suggest_float("bagging_temperature", 0.0, 1.0),
"random_strength": trial.suggest_float("random_strength", 1e-8, 10.0, log=True),
"verbose": 0,
"loss_function": "RMSE",
"early_stopping_rounds": 50,
}
model = CatBoostRegressor(**params)
model.fit(train_pool, eval_set=val_pool, verbose=0)
preds = model.predict(val_pool)
rmse = float(np.sqrt(np.mean((preds - y_val) ** 2)))
return rmse
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
best = study.best_params
best.update({"verbose": 0, "loss_function": "RMSE", "early_stopping_rounds": 50})
model = CatBoostRegressor(**best)
model.fit(train_pool, eval_set=val_pool, verbose=0)
logger.info("CatBoost trained — best RMSE: %.4f", study.best_value)
return best, model
except Exception as e:
logger.warning("CatBoost training failed: %s", e)
return {}, None
def _evaluate_ensemble(
self, models: dict[str, Any], X_test: np.ndarray, y_test: np.ndarray
) -> dict:
"""Evaluate ensemble performance on a test set."""
if not models:
return {"mae": 0.0, "mape": 0.0, "rmse": 0.0, "r2": 0.0}
predictions = []
weights = []
for name, model in models.items():
w = ENSEMBLE_WEIGHTS.get(name, 0.0)
features = X_test
if name == "xgboost":
import xgboost as xgb
preds = model.predict(xgb.DMatrix(features, feature_names=FEATURE_NAMES))
elif name == "lightgbm":
preds = model.predict(features)
elif name == "catboost":
preds = model.predict(features)
else:
continue
predictions.append(preds * w)
weights.append(w)
total_weight = sum(weights) or 1.0
ensemble_preds = sum(predictions) / total_weight
# Metrics in log-space then convert
y_actual = np.exp(y_test)
y_pred = np.exp(ensemble_preds)
mae = float(np.mean(np.abs(y_actual - y_pred)))
mape = float(np.mean(np.abs((y_actual - y_pred) / y_actual))) * 100
rmse = float(np.sqrt(np.mean((y_actual - y_pred) ** 2)))
ss_res = np.sum((y_actual - y_pred) ** 2)
ss_tot = np.sum((y_actual - np.mean(y_actual)) ** 2)
r2 = float(1.0 - ss_res / ss_tot) if ss_tot > 0 else 0.0
return {
"mae": round(mae, 2),
"mape": round(mape, 2),
"rmse": round(rmse, 2),
"r2": round(r2, 4),
}
def _save_model(self, name: str, model: Any, directory: Path) -> None:
"""Save a trained model to the specified directory."""
if name == "xgboost":
model.save_model(str(directory / "avm_v2_xgboost.json"))
elif name == "lightgbm":
model.save_model(str(directory / "avm_v2_lightgbm.txt"))
elif name == "catboost":
model.save_model(str(directory / "avm_v2_catboost.cbm"))
# ── Model registry ──────────────────────────────────────────
def _get_registry_path(self, model_dir: Path | None = None) -> Path:
"""Get the path to the model registry JSON file."""
if model_dir is None:
from app.config import settings
model_dir = Path(settings.model_path)
return model_dir / "model_registry.json"
def _load_registry(self, model_dir: Path | None = None) -> list[dict]:
"""Load the model registry from disk."""
path = self._get_registry_path(model_dir)
if path.exists():
with open(path) as f:
return json.load(f)
return []
def _save_registry(self, entries: list[dict], model_dir: Path | None = None) -> None:
"""Save the model registry to disk."""
path = self._get_registry_path(model_dir)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(entries, f, indent=2)
def _register_model(self, info: AVMv2ModelInfo, model_dir: Path) -> None:
"""Register a new model version and mark it as active."""
entries = self._load_registry(model_dir)
# Deactivate previous active models
for entry in entries:
entry["is_active"] = False
entries.append({
"model_version": info.model_version,
"created_at": info.created_at,
"metrics": info.metrics,
"is_active": True,
"ab_test_traffic_pct": info.ab_test_traffic_pct,
})
self._save_registry(entries, model_dir)
self._model_registry = [
AVMv2ModelInfo(**e) for e in entries
]
def get_model_info(self) -> AVMv2ModelInfo:
"""Return current active model information."""
# Check registry for active model
entries = self._load_registry()
for entry in reversed(entries):
if entry.get("is_active"):
return AVMv2ModelInfo(**entry)
return AVMv2ModelInfo(
model_version=self._model_version,
created_at=datetime.now(timezone.utc).isoformat(),
@@ -535,6 +1004,52 @@ class AVMv2EnsembleService:
ab_test_traffic_pct=0.0,
)
def list_versions(self) -> list[AVMv2ModelInfo]:
"""List all registered model versions."""
entries = self._load_registry()
return [AVMv2ModelInfo(**e) for e in entries]
def rollback(self, target_version: str) -> AVMv2ModelInfo:
"""Rollback to a previously trained model version.
Copies the target version's artifacts to the active model directory
and updates the registry.
"""
from app.config import settings
model_dir = Path(settings.model_path)
version_dir = model_dir / "versions" / target_version
if not version_dir.exists():
raise ValueError(f"Model version {target_version} not found")
# Copy versioned artifacts to active directory
for artifact in version_dir.iterdir():
if artifact.is_file():
shutil.copy2(artifact, model_dir / artifact.name)
# Update registry
entries = self._load_registry(model_dir)
found = False
for entry in entries:
entry["is_active"] = entry["model_version"] == target_version
if entry["model_version"] == target_version:
found = True
if not found:
raise ValueError(f"Model version {target_version} not in registry")
self._save_registry(entries, model_dir)
# Reload models from disk
self._models = {}
self._load_models()
self._model_version = target_version
logger.info("Rolled back to model version %s", target_version)
active = next(e for e in entries if e["is_active"])
return AVMv2ModelInfo(**active)
# ── A/B comparison ─────────────────────────────────────────
def compare_v1(self, req: ABComparisonRequest) -> ABComparisonResponse:
@@ -562,13 +1077,18 @@ class AVMv2EnsembleService:
area_m2=req.area_m2,
rooms=req.rooms or req.bedrooms,
has_legal_paper=req.has_legal_paper,
neighborhood_score=req.neighborhood_score,
distance_to_cbd_km=req.distance_to_cbd_km,
distance_to_metro_km=req.distance_to_metro_km,
flood_zone_risk=req.flood_zone_risk,
building_age_years=req.building_age_years,
floor_level=req.floor_level,
total_floors=req.total_floors,
direction=req.direction,
has_elevator=req.has_elevator,
has_parking=req.has_parking,
has_pool=req.has_pool,
developer_reputation=req.developer_reputation,
renovation_score=req.renovation_score,
view_quality=req.view_quality,
interior_quality=req.interior_quality,