import re from app.models.moderation import ModerationFlag, ModerationRequest, ModerationResponse # Blocklist categories with patterns and severity _RULES: list[dict] = [ { "category": "contact_info", "severity": "medium", "patterns": [ re.compile(r"0\d{9,10}"), # Vietnamese phone numbers re.compile(r"\b[\w.+-]+@[\w-]+\.[\w.]+\b"), # Email re.compile(r"(?:zalo|viber|telegram|whatsapp)\s*[:\-]?\s*\d+", re.IGNORECASE), ], "reason": "Contact information detected — may bypass platform messaging", }, { "category": "spam", "severity": "low", "patterns": [ re.compile(r"(.)\1{5,}"), # Repeated characters re.compile(r"(!!!|\.\.\.){3,}"), # Excessive punctuation re.compile(r"(?:click|nhấn|bấm)\s+(?:here|vào đây|link)", re.IGNORECASE), ], "reason": "Spam-like content pattern", }, { "category": "profanity", "severity": "high", "patterns": [ re.compile( r"\b(?:lừa đảo|scam|fake|giả mạo)\b", re.IGNORECASE, ), ], "reason": "Potentially harmful or fraudulent language", }, { "category": "prohibited_content", "severity": "high", "patterns": [ re.compile( r"\b(?:đất rừng phòng hộ|đất quốc phòng|đất tranh chấp)\b", re.IGNORECASE, ), ], "reason": "Listing references prohibited property types", }, ] class ModerationService: def check(self, req: ModerationRequest) -> ModerationResponse: flags: list[ModerationFlag] = [] text = req.text for rule in _RULES: for pattern in rule["patterns"]: for match in pattern.finditer(text): flags.append( ModerationFlag( category=rule["category"], severity=rule["severity"], matched_text=match.group(), reason=rule["reason"], ) ) if not flags: return ModerationResponse( is_flagged=False, score=0.0, flags=[], cleaned_text=text, ) # Compute aggregate score severity_weights = {"low": 0.2, "medium": 0.5, "high": 0.9} max_score = max(severity_weights.get(f.severity, 0.5) for f in flags) avg_score = sum(severity_weights.get(f.severity, 0.5) for f in flags) / len(flags) score = round(min(1.0, max_score * 0.7 + avg_score * 0.3), 3) # Redact flagged content cleaned = text for flag in flags: cleaned = cleaned.replace(flag.matched_text, "[REDACTED]") return ModerationResponse( is_flagged=True, score=score, flags=flags, cleaned_text=cleaned, ) moderation_service = ModerationService()