Files
Ho Ngoc Hai 66f952a4a8 feat(ai-services): complete AVM v2 ensemble — upload endpoint, per-district metrics, A/B routing
- Add POST /avm/v2/upload-training-data so AvmRetrainCronService can push
  CSV rows before triggering retraining (was called but missing)
- Add per-district MAE/MAPE/RMSE/R² to _evaluate_ensemble output;
  district_metrics are now returned in AVMv2TrainResponse and stored
  separately from global metrics in the model registry
- Add predict_with_ab() that applies the active model's ab_test_traffic_pct
  for deterministic per-property cohort assignment (v2 vs heuristic baseline)
- Add POST /avm/v2/ab-config to set traffic_pct on the active registry entry
- Add AVMv2ABConfigRequest schema
- Expand test suite: 24 → 28 tests covering upload, A/B config, and new
  validation paths; all green

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-21 04:39:57 +07:00

302 lines
11 KiB
Python

"""AVM v2 — Residential multi-model ensemble request/response schemas."""
from pydantic import BaseModel, Field
class AVMv2PredictRequest(BaseModel):
"""Extended feature set for residential AVM v2 ensemble."""
# ── Location features ──────────────────────────────────
district: str = Field(..., min_length=1, description="District name")
city: str = Field(..., min_length=1, description="City name")
distance_to_cbd_km: float = Field(0.0, ge=0, description="Distance to CBD in km")
distance_to_metro_km: float = Field(
0.0, ge=0, description="Distance to nearest metro station in km"
)
distance_to_school_km: float = Field(
0.0, ge=0, description="Distance to nearest school in km"
)
distance_to_hospital_km: float = Field(
0.0, ge=0, description="Distance to nearest hospital in km"
)
distance_to_park_km: float = Field(
0.0, ge=0, description="Distance to nearest park in km"
)
distance_to_mall_km: float = Field(
0.0, ge=0, description="Distance to nearest mall/shopping center in km"
)
flood_zone_risk: float = Field(
0.0, ge=0, le=1, description="Flood zone risk score (0=safe, 1=high risk)"
)
# ── Neighborhood features ─────────────────────────────
neighborhood_score: float = Field(
0.5, ge=0, le=1,
description="Overall neighborhood quality score (0-1, aggregated from safety, amenities, walkability)",
)
# ── Physical features ──────────────────────────────────
property_type: str = Field(..., description="e.g. apartment, house, villa, land")
area_m2: float = Field(..., gt=0, description="Property area in m²")
rooms: int = Field(0, ge=0, description="Total rooms (bedrooms)")
floor_level: int = Field(
0, ge=0,
description="Floor level (0=ground or N/A, relevant for apartments/penthouses)",
)
total_floors: int = Field(
0, ge=0,
description="Total floors in the building (0=N/A)",
)
direction: str = Field(
"unknown",
description="Facing direction: north, south, east, west, northeast, northwest, southeast, southwest, unknown",
)
floor_ratio: float = Field(
1.0, gt=0, description="Total floor area / land area ratio"
)
building_age_years: int = Field(0, ge=0, description="Building age in years")
has_elevator: bool = Field(False, description="Building has elevator")
has_parking: bool = Field(False, description="Property has dedicated parking")
has_pool: bool = Field(False, description="Property has swimming pool")
has_legal_paper: bool = Field(True, description="Has sổ đỏ/sổ hồng")
developer_reputation: float = Field(
0.5, ge=0, le=1,
description="Project developer reputation score (0-1, based on past projects, delivery record)",
)
# ── Market features ────────────────────────────────────
avg_price_district_3m_vnd_m2: float = Field(
0.0, ge=0,
description="Avg price per m² in the district over last 3 months (VND)",
)
listing_density: float = Field(
0.0, ge=0,
description="Number of active listings per km² in the district",
)
absorption_rate: float = Field(
0.0, ge=0, le=1,
description="Percentage of listings sold in last 30 days (0-1)",
)
dom_avg: float = Field(
0.0, ge=0,
description="Average days on market in the district",
)
price_momentum_30d: float = Field(
0.0,
description="Price change percentage in last 30 days (-1 to +1)",
)
yoy_change: float = Field(
0.0,
description="Year-over-year price change percentage (-1 to +1)",
)
# ── LLM-extracted features ─────────────────────────────
renovation_score: float = Field(
0.5, ge=0, le=1, description="Renovation quality score (0-1)"
)
view_quality: float = Field(
0.5, ge=0, le=1, description="View quality score (0-1)"
)
interior_quality: float = Field(
0.5, ge=0, le=1, description="Interior quality score (0-1)"
)
noise_level: float = Field(
0.5, ge=0, le=1, description="Noise level score (0=quiet, 1=noisy)"
)
natural_light: float = Field(
0.5, ge=0, le=1, description="Natural light score (0-1)"
)
# ── Temporal features ──────────────────────────────────
month: int = Field(1, ge=1, le=12, description="Transaction month (1-12)")
quarter: int = Field(1, ge=1, le=4, description="Transaction quarter (1-4)")
is_year_end: bool = Field(False, description="Whether in Q4 / Tết season")
class AVMv2Comparable(BaseModel):
"""A comparable property used for context."""
district: str
property_type: str
area_m2: float
price_vnd: float
price_per_m2_vnd: float
similarity_score: float = Field(..., ge=0, le=1)
class AVMv2FeatureImportance(BaseModel):
"""Feature contribution to the prediction."""
feature: str
importance: float = Field(..., ge=0, le=1)
class ModelPrediction(BaseModel):
"""Individual model prediction within the ensemble."""
model_name: str
weight: float
predicted_price_vnd: float
predicted_price_per_m2_vnd: float
class AVMv2PredictResponse(BaseModel):
"""Multi-model ensemble prediction response."""
estimated_price_vnd: float = Field(..., description="Weighted ensemble estimated price in VND")
price_per_m2_vnd: float = Field(..., description="Price per m² in VND")
confidence: float = Field(
..., ge=0, le=1,
description="Confidence = 1 - CV(predictions across 3 models)",
)
price_range_low_vnd: float = Field(..., description="Lower bound estimate in VND")
price_range_high_vnd: float = Field(..., description="Upper bound estimate in VND")
# Ensemble breakdown
model_predictions: list[ModelPrediction] = Field(
default_factory=list,
description="Individual predictions from each model in the ensemble",
)
# Explainability
drivers: list[AVMv2FeatureImportance] = Field(
default_factory=list,
description="Top feature drivers ranked by importance",
)
comparables: list[AVMv2Comparable] = Field(
default_factory=list,
description="Similar properties for reference",
)
# Model metadata
model_version: str = Field("ensemble-v2-heuristic", description="Ensemble version used")
ensemble_method: str = Field("weighted_average", description="Ensemble strategy")
class AVMv2TrainRequest(BaseModel):
"""Request to trigger model retraining."""
force: bool = Field(False, description="Force retrain even if recent model exists")
optuna_trials: int = Field(100, ge=10, le=500, description="Number of Optuna trials")
test_size: float = Field(0.1, ge=0.05, le=0.3, description="Test split ratio")
val_size: float = Field(0.1, ge=0.05, le=0.3, description="Validation split ratio")
class AVMv2TrainResponse(BaseModel):
"""Training result summary."""
model_version: str
metrics: dict = Field(default_factory=dict, description="MAE, MAPE, RMSE, R²")
district_metrics: dict = Field(
default_factory=dict,
description="Per-district breakdown of metrics",
)
training_samples: int
validation_samples: int
test_samples: int
best_params: dict = Field(default_factory=dict, description="Optuna best hyperparameters per model")
class AVMv2ModelInfo(BaseModel):
"""Model registry entry information."""
model_version: str
created_at: str
metrics: dict
is_active: bool = Field(True)
ab_test_traffic_pct: float = Field(0.0, ge=0, le=1)
class AVMv2RollbackRequest(BaseModel):
"""Request to rollback to a specific model version."""
target_version: str = Field(..., min_length=1, description="Model version to roll back to")
class AVMv2ABConfigRequest(BaseModel):
"""Request to update the A/B test traffic percentage for the active model."""
traffic_pct: float = Field(
..., ge=0, le=1,
description="Fraction of /predict calls routed to v2 (0=disabled, 0.10=10%, 1=100%)",
)
class AVMv2FeatureImportanceResponse(BaseModel):
"""Global feature importance across the loaded ensemble.
`source` is `"model"` when importances come from the trained boosters
(weighted XGBoost gain + LightGBM gain + CatBoost importance), or
`"heuristic"` when the service is running without trained artifacts.
"""
model_version: str
source: str = Field(..., description="One of: model, heuristic")
drivers: list[AVMv2FeatureImportance] = Field(default_factory=list)
class AVMv1Summary(BaseModel):
"""Compact summary of a v1 prediction for comparison."""
estimated_price_vnd: float
confidence: float
price_per_m2: float
price_range_low: float
price_range_high: float
class AVMv2Summary(BaseModel):
"""Compact summary of a v2 prediction for comparison."""
estimated_price_vnd: float
confidence: float
price_per_m2_vnd: float
price_range_low_vnd: float
price_range_high_vnd: float
model_version: str
ensemble_method: str
class ABComparisonRequest(BaseModel):
"""Request for A/B comparison between v1 and v2."""
district: str = Field(..., min_length=1)
city: str = Field(..., min_length=1)
property_type: str = Field(...)
area_m2: float = Field(..., gt=0)
rooms: int = Field(0, ge=0)
bedrooms: int = Field(0, ge=0, description="Alias for rooms, used by v1")
floors: int = Field(0, ge=0)
frontage: float = Field(0.0, ge=0)
has_legal_paper: bool = Field(True)
# v2-specific features (optional, defaults applied)
neighborhood_score: float = Field(0.5, ge=0, le=1)
distance_to_cbd_km: float = Field(0.0, ge=0)
distance_to_metro_km: float = Field(0.0, ge=0)
flood_zone_risk: float = Field(0.0, ge=0, le=1)
building_age_years: int = Field(0, ge=0)
floor_level: int = Field(0, ge=0)
total_floors: int = Field(0, ge=0)
direction: str = Field("unknown")
has_elevator: bool = Field(False)
has_parking: bool = Field(False)
has_pool: bool = Field(False)
developer_reputation: float = Field(0.5, ge=0, le=1)
renovation_score: float = Field(0.5, ge=0, le=1)
view_quality: float = Field(0.5, ge=0, le=1)
interior_quality: float = Field(0.5, ge=0, le=1)
month: int = Field(1, ge=1, le=12)
quarter: int = Field(1, ge=1, le=4)
is_year_end: bool = Field(False)
class ABComparisonResponse(BaseModel):
"""Side-by-side A/B comparison of v1 vs v2 predictions."""
v1: AVMv1Summary
v2: AVMv2Summary
price_diff_vnd: float = Field(..., description="v2 - v1 price difference")
price_diff_pct: float = Field(..., description="Percentage difference ((v2-v1)/v1 * 100)")
confidence_diff: float = Field(..., description="v2 - v1 confidence difference")
recommendation: str = Field(..., description="Which model to prefer and why")