import logging import re import numpy as np from app.models.avm import ( AVMPredictRequest, AVMPredictResponse, ExtractedFeatures, FeatureExtractRequest, FeatureExtractResponse, ) logger = logging.getLogger(__name__) # Property type encoding for the model PROPERTY_TYPE_MAP = { "apartment": 0, "house": 1, "townhouse": 2, "villa": 3, "land": 4, "shophouse": 5, } # City-level price multiplier (baseline: millions VND/m²) CITY_BASELINE = { "hà nội": 85.0, "hồ chí minh": 90.0, "đà nẵng": 45.0, "hải phòng": 35.0, "cần thơ": 25.0, } DEFAULT_BASELINE = 30.0 class AVMService: """Automated Valuation Model service. Uses XGBoost when a trained model is available, falls back to heuristic pricing for development/demo. """ def __init__(self) -> None: self._model = None self._load_model() def _load_model(self) -> None: try: import xgboost as xgb from app.config import settings model_file = f"{settings.model_path}/avm_model.json" self._model = xgb.Booster() self._model.load_model(model_file) logger.info("Loaded XGBoost AVM model from %s", model_file) except Exception: logger.info("No trained AVM model found — using heuristic fallback") self._model = None def predict(self, req: AVMPredictRequest) -> AVMPredictResponse: if self._model is not None: return self._predict_xgboost(req) return self._predict_heuristic(req) def _predict_xgboost(self, req: AVMPredictRequest) -> AVMPredictResponse: import xgboost as xgb features = np.array( [[ req.area, PROPERTY_TYPE_MAP.get(req.property_type.lower(), 1), req.bedrooms, req.bathrooms, req.floors, req.frontage, req.road_width, req.year_built or 2020, 1.0 if req.has_legal_paper else 0.0, ]] ) dmatrix = xgb.DMatrix(features) pred_log = self._model.predict(dmatrix)[0] estimated = float(np.exp(pred_log)) price_per_m2 = estimated / req.area return AVMPredictResponse( estimated_price_vnd=estimated, confidence=0.82, price_per_m2=price_per_m2, price_range_low=estimated * 0.85, price_range_high=estimated * 1.15, ) def _predict_heuristic(self, req: AVMPredictRequest) -> AVMPredictResponse: city_key = req.city.lower().strip() base = CITY_BASELINE.get(city_key, DEFAULT_BASELINE) # Property type multiplier type_mult = { "apartment": 0.9, "house": 1.0, "townhouse": 1.1, "villa": 1.4, "land": 0.7, "shophouse": 1.3, }.get(req.property_type.lower(), 1.0) # Adjustments bedroom_adj = 1.0 + req.bedrooms * 0.02 frontage_adj = 1.0 + (req.frontage / 10.0) * 0.15 if req.frontage > 0 else 1.0 legal_adj = 1.0 if req.has_legal_paper else 0.7 price_per_m2 = base * type_mult * bedroom_adj * frontage_adj * legal_adj * 1_000_000 estimated = price_per_m2 * req.area return AVMPredictResponse( estimated_price_vnd=round(estimated, -3), confidence=0.65, price_per_m2=round(price_per_m2, -3), price_range_low=round(estimated * 0.75, -3), price_range_high=round(estimated * 1.25, -3), ) class FeatureExtractService: """Extract real-estate features from Vietnamese listing text.""" _AREA_PATTERN = re.compile(r"(\d+(?:[.,]\d+)?)\s*(?:m2|m²|mét vuông)", re.IGNORECASE) _BEDROOM_PATTERN = re.compile(r"(\d+)\s*(?:phòng ngủ|pn|PN)", re.IGNORECASE) _BATHROOM_PATTERN = re.compile(r"(\d+)\s*(?:phòng tắm|wc|WC|toilet)", re.IGNORECASE) _FLOOR_PATTERN = re.compile(r"(\d+)\s*(?:tầng|lầu)", re.IGNORECASE) _FRONTAGE_PATTERN = re.compile(r"(?:mặt tiền|ngang)\s*(\d+(?:[.,]\d+)?)\s*m", re.IGNORECASE) _ROAD_WIDTH_PATTERN = re.compile(r"(?:đường|hẻm)\s*(\d+(?:[.,]\d+)?)\s*m", re.IGNORECASE) _PRICE_PATTERN = re.compile( r"(\d+(?:[.,]\d+)?)\s*(?:tỷ|tỉ|triệu)", re.IGNORECASE ) _LEGAL_KEYWORDS = ["sổ đỏ", "sổ hồng", "chính chủ", "pháp lý rõ ràng"] _PROPERTY_TYPES = { "căn hộ": "apartment", "chung cư": "apartment", "nhà phố": "townhouse", "nhà riêng": "house", "biệt thự": "villa", "đất": "land", "đất nền": "land", "shophouse": "shophouse", } def extract(self, req: FeatureExtractRequest) -> FeatureExtractResponse: text = req.text features = ExtractedFeatures() # Area m = self._AREA_PATTERN.search(text) if m: features.area = float(m.group(1).replace(",", ".")) # Bedrooms m = self._BEDROOM_PATTERN.search(text) if m: features.bedrooms = int(m.group(1)) # Bathrooms m = self._BATHROOM_PATTERN.search(text) if m: features.bathrooms = int(m.group(1)) # Floors m = self._FLOOR_PATTERN.search(text) if m: features.floors = int(m.group(1)) # Frontage m = self._FRONTAGE_PATTERN.search(text) if m: features.frontage = float(m.group(1).replace(",", ".")) # Road width m = self._ROAD_WIDTH_PATTERN.search(text) if m: features.road_width = float(m.group(1).replace(",", ".")) # Price m = self._PRICE_PATTERN.search(text) if m: val = float(m.group(1).replace(",", ".")) unit = text[m.end() - 3 : m.end()].lower() if "tỷ" in unit or "tỉ" in unit: features.price_mentioned = val * 1_000_000_000 else: features.price_mentioned = val * 1_000_000 # Legal text_lower = text.lower() features.has_legal_paper = any(kw in text_lower for kw in self._LEGAL_KEYWORDS) # Property type for vn_type, en_type in self._PROPERTY_TYPES.items(): if vn_type in text_lower: features.property_type = en_type break # Tokenization and NER via underthesea tokens: list[str] = [] entities: list[dict] = [] try: from underthesea import ner, word_tokenize tokens = word_tokenize(text) ner_results = ner(text) for chunk in ner_results: if len(chunk) >= 4 and chunk[3] != "O": entities.append({"text": chunk[0], "label": chunk[3]}) except ImportError: logger.warning("underthesea not available — skipping NLP tokenization") tokens = text.split() return FeatureExtractResponse( features=features, tokens=tokens, entities=entities, ) avm_service = AVMService() feature_extract_service = FeatureExtractService()