feat(ai-services): add Python FastAPI AI/ML services container

Create libs/ai-services/ with FastAPI app providing:
- POST /avm/predict — XGBoost-backed property price prediction (heuristic fallback)
- POST /avm/extract-features — Vietnamese NLP feature extraction from listing text
- POST /moderation/check — content moderation with rule-based flagging
- GET /health — health check endpoint

Includes Dockerfile (Python 3.12), docker-compose integration, Pydantic models,
and 9 passing tests covering all endpoints.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-08 03:08:39 +07:00
parent 4ef54027d6
commit b392bc3570
20 changed files with 730 additions and 0 deletions

View File

@@ -0,0 +1,229 @@
import logging
import re
import numpy as np
from app.models.avm import (
AVMPredictRequest,
AVMPredictResponse,
ExtractedFeatures,
FeatureExtractRequest,
FeatureExtractResponse,
)
logger = logging.getLogger(__name__)
# Property type encoding for the model
PROPERTY_TYPE_MAP = {
"apartment": 0,
"house": 1,
"townhouse": 2,
"villa": 3,
"land": 4,
"shophouse": 5,
}
# City-level price multiplier (baseline: millions VND/m²)
CITY_BASELINE = {
"hà nội": 85.0,
"hồ chí minh": 90.0,
"đà nẵng": 45.0,
"hải phòng": 35.0,
"cần thơ": 25.0,
}
DEFAULT_BASELINE = 30.0
class AVMService:
"""Automated Valuation Model service.
Uses XGBoost when a trained model is available,
falls back to heuristic pricing for development/demo.
"""
def __init__(self) -> None:
self._model = None
self._load_model()
def _load_model(self) -> None:
try:
import xgboost as xgb
from app.config import settings
model_file = f"{settings.model_path}/avm_model.json"
self._model = xgb.Booster()
self._model.load_model(model_file)
logger.info("Loaded XGBoost AVM model from %s", model_file)
except Exception:
logger.info("No trained AVM model found — using heuristic fallback")
self._model = None
def predict(self, req: AVMPredictRequest) -> AVMPredictResponse:
if self._model is not None:
return self._predict_xgboost(req)
return self._predict_heuristic(req)
def _predict_xgboost(self, req: AVMPredictRequest) -> AVMPredictResponse:
import xgboost as xgb
features = np.array(
[[
req.area,
PROPERTY_TYPE_MAP.get(req.property_type.lower(), 1),
req.bedrooms,
req.bathrooms,
req.floors,
req.frontage,
req.road_width,
req.year_built or 2020,
1.0 if req.has_legal_paper else 0.0,
]]
)
dmatrix = xgb.DMatrix(features)
pred_log = self._model.predict(dmatrix)[0]
estimated = float(np.exp(pred_log))
price_per_m2 = estimated / req.area
return AVMPredictResponse(
estimated_price_vnd=estimated,
confidence=0.82,
price_per_m2=price_per_m2,
price_range_low=estimated * 0.85,
price_range_high=estimated * 1.15,
)
def _predict_heuristic(self, req: AVMPredictRequest) -> AVMPredictResponse:
city_key = req.city.lower().strip()
base = CITY_BASELINE.get(city_key, DEFAULT_BASELINE)
# Property type multiplier
type_mult = {
"apartment": 0.9,
"house": 1.0,
"townhouse": 1.1,
"villa": 1.4,
"land": 0.7,
"shophouse": 1.3,
}.get(req.property_type.lower(), 1.0)
# Adjustments
bedroom_adj = 1.0 + req.bedrooms * 0.02
frontage_adj = 1.0 + (req.frontage / 10.0) * 0.15 if req.frontage > 0 else 1.0
legal_adj = 1.0 if req.has_legal_paper else 0.7
price_per_m2 = base * type_mult * bedroom_adj * frontage_adj * legal_adj * 1_000_000
estimated = price_per_m2 * req.area
return AVMPredictResponse(
estimated_price_vnd=round(estimated, -3),
confidence=0.65,
price_per_m2=round(price_per_m2, -3),
price_range_low=round(estimated * 0.75, -3),
price_range_high=round(estimated * 1.25, -3),
)
class FeatureExtractService:
"""Extract real-estate features from Vietnamese listing text."""
_AREA_PATTERN = re.compile(r"(\d+(?:[.,]\d+)?)\s*(?:m2|m²|mét vuông)", re.IGNORECASE)
_BEDROOM_PATTERN = re.compile(r"(\d+)\s*(?:phòng ngủ|pn|PN)", re.IGNORECASE)
_BATHROOM_PATTERN = re.compile(r"(\d+)\s*(?:phòng tắm|wc|WC|toilet)", re.IGNORECASE)
_FLOOR_PATTERN = re.compile(r"(\d+)\s*(?:tầng|lầu)", re.IGNORECASE)
_FRONTAGE_PATTERN = re.compile(r"(?:mặt tiền|ngang)\s*(\d+(?:[.,]\d+)?)\s*m", re.IGNORECASE)
_ROAD_WIDTH_PATTERN = re.compile(r"(?:đường|hẻm)\s*(\d+(?:[.,]\d+)?)\s*m", re.IGNORECASE)
_PRICE_PATTERN = re.compile(
r"(\d+(?:[.,]\d+)?)\s*(?:tỷ|tỉ|triệu)", re.IGNORECASE
)
_LEGAL_KEYWORDS = ["sổ đỏ", "sổ hồng", "chính chủ", "pháp lý rõ ràng"]
_PROPERTY_TYPES = {
"căn hộ": "apartment",
"chung cư": "apartment",
"nhà phố": "townhouse",
"nhà riêng": "house",
"biệt thự": "villa",
"đất": "land",
"đất nền": "land",
"shophouse": "shophouse",
}
def extract(self, req: FeatureExtractRequest) -> FeatureExtractResponse:
text = req.text
features = ExtractedFeatures()
# Area
m = self._AREA_PATTERN.search(text)
if m:
features.area = float(m.group(1).replace(",", "."))
# Bedrooms
m = self._BEDROOM_PATTERN.search(text)
if m:
features.bedrooms = int(m.group(1))
# Bathrooms
m = self._BATHROOM_PATTERN.search(text)
if m:
features.bathrooms = int(m.group(1))
# Floors
m = self._FLOOR_PATTERN.search(text)
if m:
features.floors = int(m.group(1))
# Frontage
m = self._FRONTAGE_PATTERN.search(text)
if m:
features.frontage = float(m.group(1).replace(",", "."))
# Road width
m = self._ROAD_WIDTH_PATTERN.search(text)
if m:
features.road_width = float(m.group(1).replace(",", "."))
# Price
m = self._PRICE_PATTERN.search(text)
if m:
val = float(m.group(1).replace(",", "."))
unit = text[m.end() - 3 : m.end()].lower()
if "tỷ" in unit or "tỉ" in unit:
features.price_mentioned = val * 1_000_000_000
else:
features.price_mentioned = val * 1_000_000
# Legal
text_lower = text.lower()
features.has_legal_paper = any(kw in text_lower for kw in self._LEGAL_KEYWORDS)
# Property type
for vn_type, en_type in self._PROPERTY_TYPES.items():
if vn_type in text_lower:
features.property_type = en_type
break
# Tokenization and NER via underthesea
tokens: list[str] = []
entities: list[dict] = []
try:
from underthesea import ner, word_tokenize
tokens = word_tokenize(text)
ner_results = ner(text)
for chunk in ner_results:
if len(chunk) >= 4 and chunk[3] != "O":
entities.append({"text": chunk[0], "label": chunk[3]})
except ImportError:
logger.warning("underthesea not available — skipping NLP tokenization")
tokens = text.split()
return FeatureExtractResponse(
features=features,
tokens=tokens,
entities=entities,
)
avm_service = AVMService()
feature_extract_service = FeatureExtractService()

View File

@@ -0,0 +1,96 @@
import re
from app.models.moderation import ModerationFlag, ModerationRequest, ModerationResponse
# Blocklist categories with patterns and severity
_RULES: list[dict] = [
{
"category": "contact_info",
"severity": "medium",
"patterns": [
re.compile(r"0\d{9,10}"), # Vietnamese phone numbers
re.compile(r"\b[\w.+-]+@[\w-]+\.[\w.]+\b"), # Email
re.compile(r"(?:zalo|viber|telegram|whatsapp)\s*[:\-]?\s*\d+", re.IGNORECASE),
],
"reason": "Contact information detected — may bypass platform messaging",
},
{
"category": "spam",
"severity": "low",
"patterns": [
re.compile(r"(.)\1{5,}"), # Repeated characters
re.compile(r"(!!!|\.\.\.){3,}"), # Excessive punctuation
re.compile(r"(?:click|nhấn|bấm)\s+(?:here|vào đây|link)", re.IGNORECASE),
],
"reason": "Spam-like content pattern",
},
{
"category": "profanity",
"severity": "high",
"patterns": [
re.compile(
r"\b(?:lừa đảo|scam|fake|giả mạo)\b",
re.IGNORECASE,
),
],
"reason": "Potentially harmful or fraudulent language",
},
{
"category": "prohibited_content",
"severity": "high",
"patterns": [
re.compile(
r"\b(?:đất rừng phòng hộ|đất quốc phòng|đất tranh chấp)\b",
re.IGNORECASE,
),
],
"reason": "Listing references prohibited property types",
},
]
class ModerationService:
def check(self, req: ModerationRequest) -> ModerationResponse:
flags: list[ModerationFlag] = []
text = req.text
for rule in _RULES:
for pattern in rule["patterns"]:
for match in pattern.finditer(text):
flags.append(
ModerationFlag(
category=rule["category"],
severity=rule["severity"],
matched_text=match.group(),
reason=rule["reason"],
)
)
if not flags:
return ModerationResponse(
is_flagged=False,
score=0.0,
flags=[],
cleaned_text=text,
)
# Compute aggregate score
severity_weights = {"low": 0.2, "medium": 0.5, "high": 0.9}
max_score = max(severity_weights.get(f.severity, 0.5) for f in flags)
avg_score = sum(severity_weights.get(f.severity, 0.5) for f in flags) / len(flags)
score = round(min(1.0, max_score * 0.7 + avg_score * 0.3), 3)
# Redact flagged content
cleaned = text
for flag in flags:
cleaned = cleaned.replace(flag.matched_text, "[REDACTED]")
return ModerationResponse(
is_flagged=True,
score=score,
flags=flags,
cleaned_text=cleaned,
)
moderation_service = ModerationService()