feat(analytics): integrate AI/ML services — AVM endpoint, moderation pipeline, market index cron

- Add AiServiceClient HTTP client for Python FastAPI AI service with timeout and fallback
- Add HttpAVMService that calls Python AVM endpoint, falls back to PrismaAVMService on failure
- Add ListingCreatedModerationHandler: auto-flags suspicious listings via AI moderation on create
- Add MarketIndexCronService: daily cron job aggregating market stats per district/city/type
- Wire ScheduleModule and new providers into AnalyticsModule and AppModule
- Add unit tests for AiServiceClient, HttpAVMService, and moderation handler (all passing)

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-09 10:13:06 +07:00
parent d64bbe97e2
commit 35feccb529
13 changed files with 1436 additions and 8 deletions

View File

@@ -3,6 +3,7 @@ import { CqrsModule } from '@nestjs/cqrs';
import { GenerateReportHandler } from './application/commands/generate-report/generate-report.handler';
import { TrackEventHandler } from './application/commands/track-event/track-event.handler';
import { UpdateMarketIndexHandler } from './application/commands/update-market-index/update-market-index.handler';
import { ListingCreatedModerationHandler } from './application/event-handlers/listing-created-moderation.handler';
import { GetDistrictStatsHandler } from './application/queries/get-district-stats/get-district-stats.handler';
import { GetHeatmapHandler } from './application/queries/get-heatmap/get-heatmap.handler';
import { GetMarketReportHandler } from './application/queries/get-market-report/get-market-report.handler';
@@ -13,6 +14,9 @@ import { VALUATION_REPOSITORY } from './domain/repositories/valuation.repository
import { AVM_SERVICE } from './domain/services/avm-service';
import { PrismaMarketIndexRepository } from './infrastructure/repositories/prisma-market-index.repository';
import { PrismaValuationRepository } from './infrastructure/repositories/prisma-valuation.repository';
import { AI_SERVICE_CLIENT, AiServiceClient } from './infrastructure/services/ai-service.client';
import { HttpAVMService } from './infrastructure/services/http-avm.service';
import { MarketIndexCronService } from './infrastructure/services/market-index-cron.service';
import { PrismaAVMService } from './infrastructure/services/prisma-avm.service';
import { AnalyticsController } from './presentation/controllers/analytics.controller';
@@ -30,19 +34,33 @@ const QueryHandlers = [
GetValuationHandler,
];
const EventHandlers = [
ListingCreatedModerationHandler,
];
@Module({
imports: [CqrsModule],
controllers: [AnalyticsController],
providers: [
// AI service client
{ provide: AI_SERVICE_CLIENT, useClass: AiServiceClient },
// Repositories
{ provide: MARKET_INDEX_REPOSITORY, useClass: PrismaMarketIndexRepository },
{ provide: VALUATION_REPOSITORY, useClass: PrismaValuationRepository },
{ provide: AVM_SERVICE, useClass: PrismaAVMService },
// AVM: HttpAVMService calls Python AI first, falls back to PrismaAVMService
PrismaAVMService,
{ provide: AVM_SERVICE, useClass: HttpAVMService },
// Cron
MarketIndexCronService,
// CQRS
...CommandHandlers,
...QueryHandlers,
...EventHandlers,
],
exports: [MARKET_INDEX_REPOSITORY, VALUATION_REPOSITORY, AVM_SERVICE],
exports: [MARKET_INDEX_REPOSITORY, VALUATION_REPOSITORY, AVM_SERVICE, AI_SERVICE_CLIENT],
})
export class AnalyticsModule {}

View File

@@ -0,0 +1,117 @@
import type { CommandBus } from '@nestjs/cqrs';
import type { ModerateListingCommand } from '@modules/listings/application/commands/moderate-listing/moderate-listing.command';
import { ListingCreatedEvent } from '@modules/listings/domain/events/listing-created.event';
import { type IAiServiceClient } from '../../infrastructure/services/ai-service.client';
import { ListingCreatedModerationHandler } from '../event-handlers/listing-created-moderation.handler';
describe('ListingCreatedModerationHandler', () => {
let handler: ListingCreatedModerationHandler;
let mockAiClient: IAiServiceClient;
let mockCommandBus: CommandBus;
let mockPrisma: { property: { findUnique: ReturnType<typeof vi.fn> } };
const event = new ListingCreatedEvent('listing-1', 'property-1', 'seller-1', 'SALE');
beforeEach(() => {
mockAiClient = {
predict: vi.fn(),
moderate: vi.fn(),
isAvailable: vi.fn(),
};
mockCommandBus = {
execute: vi.fn().mockResolvedValue({ status: 'DRAFT' }),
} as unknown as CommandBus;
mockPrisma = {
property: {
findUnique: vi.fn().mockResolvedValue({
title: 'Bán căn hộ Quận 1',
description: 'Căn hộ 80m2 view sông Sài Gòn',
}),
},
};
handler = new ListingCreatedModerationHandler(
mockAiClient,
mockCommandBus,
mockPrisma as never,
);
});
it('skips moderation when property not found', async () => {
mockPrisma.property.findUnique.mockResolvedValue(null);
await handler.handle(event);
expect(mockAiClient.moderate).not.toHaveBeenCalled();
});
it('does not dispatch command when text is clean', async () => {
(mockAiClient.moderate as ReturnType<typeof vi.fn>).mockResolvedValue({
is_flagged: false,
score: 0,
flags: [],
cleaned_text: 'Bán căn hộ Quận 1\nCăn hộ 80m2 view sông Sài Gòn',
});
await handler.handle(event);
expect(mockCommandBus.execute).not.toHaveBeenCalled();
});
it('dispatches approve command for low-score flagged listing', async () => {
(mockAiClient.moderate as ReturnType<typeof vi.fn>).mockResolvedValue({
is_flagged: true,
score: 0.5,
flags: [
{ category: 'contact_info', severity: 'medium', matched_text: '0901234567', reason: 'Contact info detected' },
],
cleaned_text: 'Bán căn hộ [REDACTED]',
});
await handler.handle(event);
expect(mockCommandBus.execute).toHaveBeenCalledWith(
expect.objectContaining({
listingId: 'listing-1',
action: 'approve',
moderationScore: 0.5,
}),
);
});
it('dispatches reject command for high-score flagged listing', async () => {
(mockAiClient.moderate as ReturnType<typeof vi.fn>).mockResolvedValue({
is_flagged: true,
score: 0.9,
flags: [
{ category: 'profanity', severity: 'high', matched_text: 'lừa đảo', reason: 'Harmful language' },
{ category: 'prohibited_content', severity: 'high', matched_text: 'đất tranh chấp', reason: 'Prohibited property' },
],
cleaned_text: '[REDACTED] [REDACTED]',
});
await handler.handle(event);
expect(mockCommandBus.execute).toHaveBeenCalledWith(
expect.objectContaining({
listingId: 'listing-1',
action: 'reject',
moderationScore: 0.9,
}),
);
const cmd = (mockCommandBus.execute as ReturnType<typeof vi.fn>).mock.calls[0][0] as ModerateListingCommand;
expect(cmd.notes).toContain('[AI Auto-Moderation]');
expect(cmd.moderatorId).toBe('system:ai-moderation');
});
it('silently handles AI service errors', async () => {
(mockAiClient.moderate as ReturnType<typeof vi.fn>).mockRejectedValue(
new Error('ECONNREFUSED'),
);
await expect(handler.handle(event)).resolves.not.toThrow();
expect(mockCommandBus.execute).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,76 @@
import { Inject, Logger } from '@nestjs/common';
import { EventsHandler, type IEventHandler, type CommandBus } from '@nestjs/cqrs';
import { ListingCreatedEvent, ModerateListingCommand } from '@modules/listings';
import { type PrismaService } from '@modules/shared';
import {
AI_SERVICE_CLIENT,
type IAiServiceClient,
} from '../../infrastructure/services/ai-service.client';
const AUTO_REJECT_THRESHOLD = 0.8;
const AI_MODERATOR_ID = 'system:ai-moderation';
@EventsHandler(ListingCreatedEvent)
export class ListingCreatedModerationHandler implements IEventHandler<ListingCreatedEvent> {
private readonly logger = new Logger(ListingCreatedModerationHandler.name);
constructor(
@Inject(AI_SERVICE_CLIENT) private readonly aiClient: IAiServiceClient,
private readonly commandBus: CommandBus,
private readonly prisma: PrismaService,
) {}
async handle(event: ListingCreatedEvent): Promise<void> {
try {
await this.moderateListing(event);
} catch (err) {
this.logger.warn(
`AI moderation skipped for listing ${event.aggregateId}: ${(err as Error).message}`,
);
}
}
private async moderateListing(event: ListingCreatedEvent): Promise<void> {
const property = await this.prisma.property.findUnique({
where: { id: event.propertyId },
select: { title: true, description: true },
});
if (!property) return;
const textToModerate = `${property.title}\n${property.description}`;
const result = await this.aiClient.moderate({
text: textToModerate,
context: 'listing',
});
if (!result.is_flagged) {
this.logger.debug(
`Listing ${event.aggregateId} passed AI moderation (score: ${result.score})`,
);
return;
}
this.logger.log(
`Listing ${event.aggregateId} flagged by AI moderation (score: ${result.score}, ` +
`flags: ${result.flags.map((f) => f.category).join(', ')})`,
);
const flagNotes = result.flags
.map((f) => `[${f.severity}] ${f.category}: ${f.reason}`)
.join('; ');
const action = result.score >= AUTO_REJECT_THRESHOLD ? 'reject' : 'approve';
await this.commandBus.execute(
new ModerateListingCommand(
event.aggregateId,
AI_MODERATOR_ID,
action,
result.score,
`[AI Auto-Moderation] ${flagNotes}`,
),
);
}
}

View File

@@ -0,0 +1,150 @@
import { AiServiceClient } from '../services/ai-service.client';
describe('AiServiceClient', () => {
let client: AiServiceClient;
const originalEnv = process.env;
beforeEach(() => {
process.env = { ...originalEnv, AI_SERVICE_URL: 'http://localhost:8000' };
client = new AiServiceClient();
});
afterEach(() => {
process.env = originalEnv;
vi.restoreAllMocks();
});
describe('predict', () => {
it('sends predict request to AI service', async () => {
const mockResponse = {
estimated_price_vnd: 5_000_000_000,
confidence: 0.82,
price_per_m2: 70_000_000,
price_range_low: 4_250_000_000,
price_range_high: 5_750_000_000,
};
vi.spyOn(globalThis, 'fetch').mockResolvedValue(
new Response(JSON.stringify(mockResponse), { status: 200 }),
);
const result = await client.predict({
area: 80,
district: 'Quận 1',
city: 'Hồ Chí Minh',
property_type: 'apartment',
});
expect(result.estimated_price_vnd).toBe(5_000_000_000);
expect(result.confidence).toBe(0.82);
expect(fetch).toHaveBeenCalledWith(
'http://localhost:8000/avm/predict',
expect.objectContaining({ method: 'POST' }),
);
});
it('throws on non-OK response', async () => {
vi.spyOn(globalThis, 'fetch').mockResolvedValue(
new Response('Service unavailable', { status: 503 }),
);
await expect(
client.predict({
area: 80,
district: 'Quận 1',
city: 'Hồ Chí Minh',
property_type: 'apartment',
}),
).rejects.toThrow('AI service /avm/predict returned 503');
});
it('throws on network error', async () => {
vi.spyOn(globalThis, 'fetch').mockRejectedValue(new Error('ECONNREFUSED'));
await expect(
client.predict({
area: 80,
district: 'Quận 1',
city: 'Hồ Chí Minh',
property_type: 'apartment',
}),
).rejects.toThrow('ECONNREFUSED');
});
});
describe('moderate', () => {
it('sends moderation request and returns result', async () => {
const mockResponse = {
is_flagged: true,
score: 0.7,
flags: [
{
category: 'contact_info',
severity: 'medium',
matched_text: '0901234567',
reason: 'Contact information detected',
},
],
cleaned_text: 'Bán nhà [REDACTED]',
};
vi.spyOn(globalThis, 'fetch').mockResolvedValue(
new Response(JSON.stringify(mockResponse), { status: 200 }),
);
const result = await client.moderate({
text: 'Bán nhà liên hệ 0901234567',
context: 'listing',
});
expect(result.is_flagged).toBe(true);
expect(result.score).toBe(0.7);
expect(result.flags).toHaveLength(1);
expect(result.flags[0].category).toBe('contact_info');
});
it('returns clean result for safe text', async () => {
const mockResponse = {
is_flagged: false,
score: 0,
flags: [],
cleaned_text: 'Bán căn hộ 80m2 Quận 1',
};
vi.spyOn(globalThis, 'fetch').mockResolvedValue(
new Response(JSON.stringify(mockResponse), { status: 200 }),
);
const result = await client.moderate({
text: 'Bán căn hộ 80m2 Quận 1',
});
expect(result.is_flagged).toBe(false);
expect(result.score).toBe(0);
});
});
describe('isAvailable', () => {
it('returns true when service is healthy', async () => {
vi.spyOn(globalThis, 'fetch').mockResolvedValue(
new Response('{"status":"ok"}', { status: 200 }),
);
expect(await client.isAvailable()).toBe(true);
});
it('returns false when service is down', async () => {
vi.spyOn(globalThis, 'fetch').mockRejectedValue(new Error('ECONNREFUSED'));
expect(await client.isAvailable()).toBe(false);
});
it('returns false when service returns error', async () => {
vi.spyOn(globalThis, 'fetch').mockResolvedValue(
new Response('error', { status: 500 }),
);
expect(await client.isAvailable()).toBe(false);
});
});
});

View File

@@ -0,0 +1,128 @@
import type { IAiServiceClient } from '../services/ai-service.client';
import { HttpAVMService } from '../services/http-avm.service';
import type { PrismaAVMService } from '../services/prisma-avm.service';
describe('HttpAVMService', () => {
let service: HttpAVMService;
let mockAiClient: IAiServiceClient;
let mockFallback: PrismaAVMService;
let mockPrisma: { property: { findUnique: ReturnType<typeof vi.fn> } };
beforeEach(() => {
mockAiClient = {
predict: vi.fn(),
moderate: vi.fn(),
isAvailable: vi.fn(),
};
mockFallback = {
estimateValue: vi.fn(),
getComparables: vi.fn().mockResolvedValue([]),
} as unknown as PrismaAVMService;
mockPrisma = {
property: { findUnique: vi.fn() },
};
service = new HttpAVMService(
mockAiClient,
mockFallback,
mockPrisma as never,
);
});
describe('estimateValue', () => {
it('uses AI service when available', async () => {
mockPrisma.property.findUnique.mockResolvedValue({
areaM2: 80,
district: 'Quận 1',
city: 'Hồ Chí Minh',
propertyType: 'APARTMENT',
bedrooms: 2,
bathrooms: 2,
floors: null,
yearBuilt: 2020,
legalStatus: 'SO_DO',
});
(mockAiClient.predict as ReturnType<typeof vi.fn>).mockResolvedValue({
estimated_price_vnd: 5_000_000_000,
confidence: 0.82,
price_per_m2: 62_500_000,
price_range_low: 4_250_000_000,
price_range_high: 5_750_000_000,
});
const result = await service.estimateValue({ propertyId: 'prop-1' });
expect(result.estimatedPrice).toBe('5000000000');
expect(result.confidence).toBe(0.82);
expect(result.modelVersion).toBe('ai-service-v1.0');
expect(mockFallback.estimateValue).not.toHaveBeenCalled();
});
it('falls back to PrismaAVM when AI service fails', async () => {
(mockAiClient.predict as ReturnType<typeof vi.fn>).mockRejectedValue(
new Error('ECONNREFUSED'),
);
mockPrisma.property.findUnique.mockResolvedValue({
areaM2: 80,
district: 'Quận 1',
city: 'Hồ Chí Minh',
propertyType: 'APARTMENT',
bedrooms: 2,
bathrooms: 2,
floors: null,
yearBuilt: 2020,
legalStatus: 'SO_DO',
});
const fallbackResult = {
estimatedPrice: '4800000000',
confidence: 0.6,
pricePerM2: 60_000_000,
comparables: [],
modelVersion: 'avm-v1.0',
};
(mockFallback.estimateValue as ReturnType<typeof vi.fn>).mockResolvedValue(fallbackResult);
const result = await service.estimateValue({ propertyId: 'prop-1' });
expect(result).toEqual(fallbackResult);
expect(mockFallback.estimateValue).toHaveBeenCalledWith({ propertyId: 'prop-1' });
});
it('uses coordinates directly when no propertyId', async () => {
(mockAiClient.predict as ReturnType<typeof vi.fn>).mockResolvedValue({
estimated_price_vnd: 4_000_000_000,
confidence: 0.65,
price_per_m2: 50_000_000,
price_range_low: 3_000_000_000,
price_range_high: 5_000_000_000,
});
const result = await service.estimateValue({
latitude: 10.762,
longitude: 106.66,
areaM2: 80,
propertyType: 'APARTMENT',
});
expect(result.estimatedPrice).toBe('4000000000');
expect(mockPrisma.property.findUnique).not.toHaveBeenCalled();
});
});
describe('getComparables', () => {
it('delegates to fallback PrismaAVM service', async () => {
const comparables = [{ propertyId: 'p1', distanceMeters: 100 }];
(mockFallback.getComparables as ReturnType<typeof vi.fn>).mockResolvedValue(comparables);
const result = await service.getComparables('prop-1', 2000);
expect(result).toEqual(comparables);
expect(mockFallback.getComparables).toHaveBeenCalledWith('prop-1', 2000);
});
});
});

View File

@@ -0,0 +1,108 @@
import { Injectable, Logger } from '@nestjs/common';
export interface AiPredictRequest {
area: number;
district: string;
city: string;
property_type: string;
bedrooms?: number;
bathrooms?: number;
floors?: number;
frontage?: number;
road_width?: number;
year_built?: number | null;
has_legal_paper?: boolean;
}
export interface AiPredictResponse {
estimated_price_vnd: number;
confidence: number;
price_per_m2: number;
price_range_low: number;
price_range_high: number;
}
export interface AiModerationRequest {
text: string;
context?: string;
}
export interface AiModerationFlag {
category: string;
severity: string;
matched_text: string;
reason: string;
}
export interface AiModerationResponse {
is_flagged: boolean;
score: number;
flags: AiModerationFlag[];
cleaned_text: string | null;
}
export const AI_SERVICE_CLIENT = Symbol('AI_SERVICE_CLIENT');
export interface IAiServiceClient {
predict(req: AiPredictRequest): Promise<AiPredictResponse>;
moderate(req: AiModerationRequest): Promise<AiModerationResponse>;
isAvailable(): Promise<boolean>;
}
@Injectable()
export class AiServiceClient implements IAiServiceClient {
private readonly logger = new Logger(AiServiceClient.name);
private readonly baseUrl: string;
private readonly apiKey: string;
private readonly timeoutMs: number;
constructor() {
this.baseUrl = process.env['AI_SERVICE_URL'] ?? 'http://localhost:8000';
this.apiKey = process.env['AI_SERVICE_API_KEY'] ?? '';
this.timeoutMs = Number(process.env['AI_SERVICE_TIMEOUT_MS']) || 5000;
}
async predict(req: AiPredictRequest): Promise<AiPredictResponse> {
return this.post<AiPredictResponse>('/avm/predict', req);
}
async moderate(req: AiModerationRequest): Promise<AiModerationResponse> {
return this.post<AiModerationResponse>('/moderation/check', req);
}
async isAvailable(): Promise<boolean> {
try {
const response = await fetch(`${this.baseUrl}/health`, {
method: 'GET',
signal: AbortSignal.timeout(2000),
});
return response.ok;
} catch {
return false;
}
}
private async post<T>(path: string, body: unknown): Promise<T> {
const url = `${this.baseUrl}${path}`;
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
if (this.apiKey) {
headers['X-API-Key'] = this.apiKey;
}
const response = await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(body),
signal: AbortSignal.timeout(this.timeoutMs),
});
if (!response.ok) {
const text = await response.text().catch(() => '');
throw new Error(`AI service ${path} returned ${response.status}: ${text}`);
}
return response.json() as Promise<T>;
}
}

View File

@@ -0,0 +1,111 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { type PrismaService } from '@modules/shared';
import {
type IAVMService,
type AVMParams,
type ValuationResult,
type Comparable,
} from '../../domain/services/avm-service';
import {
AI_SERVICE_CLIENT,
type IAiServiceClient,
type AiPredictRequest,
} from './ai-service.client';
import { type PrismaAVMService } from './prisma-avm.service';
@Injectable()
export class HttpAVMService implements IAVMService {
private readonly logger = new Logger(HttpAVMService.name);
constructor(
@Inject(AI_SERVICE_CLIENT) private readonly aiClient: IAiServiceClient,
private readonly fallback: PrismaAVMService,
private readonly prisma: PrismaService,
) {}
async estimateValue(params: AVMParams): Promise<ValuationResult> {
try {
return await this.estimateViaAi(params);
} catch (err) {
this.logger.warn(
`AI AVM service unavailable, falling back to comparables-based estimation: ${(err as Error).message}`,
);
return this.fallback.estimateValue(params);
}
}
async getComparables(propertyId: string, radiusMeters: number): Promise<Comparable[]> {
return this.fallback.getComparables(propertyId, radiusMeters);
}
private async estimateViaAi(params: AVMParams): Promise<ValuationResult> {
const propertyData = params.propertyId
? await this.getPropertyDetails(params.propertyId)
: null;
const request: AiPredictRequest = {
area: params.areaM2 ?? propertyData?.areaM2 ?? 0,
district: propertyData?.district ?? '',
city: propertyData?.city ?? '',
property_type: (params.propertyType ?? propertyData?.propertyType ?? 'house').toLowerCase(),
bedrooms: propertyData?.bedrooms ?? 0,
bathrooms: propertyData?.bathrooms ?? 0,
floors: propertyData?.floors ?? 0,
frontage: 0,
road_width: 0,
year_built: params.yearBuilt ?? propertyData?.yearBuilt,
has_legal_paper: propertyData?.hasLegalPaper ?? true,
};
const aiResult = await this.aiClient.predict(request);
// Also fetch comparables from the local PostGIS service for context
let comparables: Comparable[] = [];
try {
if (params.propertyId) {
comparables = await this.fallback.getComparables(params.propertyId, 2000);
}
} catch {
// Comparables are supplementary — don't fail the valuation
}
return {
estimatedPrice: Math.round(aiResult.estimated_price_vnd).toString(),
confidence: aiResult.confidence,
pricePerM2: Math.round(aiResult.price_per_m2),
comparables,
modelVersion: 'ai-service-v1.0',
};
}
private async getPropertyDetails(propertyId: string) {
const row = await this.prisma.property.findUnique({
where: { id: propertyId },
select: {
areaM2: true,
district: true,
city: true,
propertyType: true,
bedrooms: true,
bathrooms: true,
floors: true,
yearBuilt: true,
legalStatus: true,
},
});
if (!row) return null;
return {
areaM2: row.areaM2,
district: row.district,
city: row.city,
propertyType: row.propertyType,
bedrooms: row.bedrooms ?? 0,
bathrooms: row.bathrooms ?? 0,
floors: row.floors ?? 0,
yearBuilt: row.yearBuilt,
hasLegalPaper: row.legalStatus === 'SO_DO' || row.legalStatus === 'SO_HONG',
};
}
}

View File

@@ -1 +1,5 @@
export { PrismaAVMService } from './prisma-avm.service';
export { HttpAVMService } from './http-avm.service';
export { AiServiceClient, AI_SERVICE_CLIENT } from './ai-service.client';
export type { IAiServiceClient, AiPredictRequest, AiPredictResponse, AiModerationRequest, AiModerationResponse } from './ai-service.client';
export { MarketIndexCronService } from './market-index-cron.service';

View File

@@ -0,0 +1,124 @@
import { Injectable, Logger } from '@nestjs/common';
import { type CommandBus } from '@nestjs/cqrs';
import { Cron, CronExpression } from '@nestjs/schedule';
import { PropertyType } from '@prisma/client';
import { type PrismaService } from '@modules/shared';
import { UpdateMarketIndexCommand } from '../../application/commands/update-market-index/update-market-index.command';
interface MarketStats {
district: string;
city: string;
propertyType: PropertyType;
medianPrice: bigint;
avgPriceM2: number;
totalListings: number;
avgDaysOnMarket: number;
inventoryLevel: number;
}
@Injectable()
export class MarketIndexCronService {
private readonly logger = new Logger(MarketIndexCronService.name);
constructor(
private readonly prisma: PrismaService,
private readonly commandBus: CommandBus,
) {}
@Cron(CronExpression.EVERY_DAY_AT_2AM, { name: 'market-index-calculation' })
async calculateMarketIndices(): Promise<void> {
this.logger.log('Starting market index calculation...');
const period = this.getCurrentPeriod();
try {
const stats = await this.aggregateMarketStats();
let updatedCount = 0;
for (const stat of stats) {
try {
await this.commandBus.execute(
new UpdateMarketIndexCommand(
stat.district,
stat.city,
stat.propertyType,
period,
stat.medianPrice,
stat.avgPriceM2,
stat.totalListings,
stat.avgDaysOnMarket,
stat.inventoryLevel,
),
);
updatedCount++;
} catch (err) {
this.logger.error(
`Failed to update market index for ${stat.district}/${stat.city}/${stat.propertyType}: ${(err as Error).message}`,
);
}
}
this.logger.log(
`Market index calculation completed: ${updatedCount}/${stats.length} indices updated for period ${period}`,
);
} catch (err) {
this.logger.error(`Market index calculation failed: ${(err as Error).message}`);
}
}
private async aggregateMarketStats(): Promise<MarketStats[]> {
const propertyTypes = Object.values(PropertyType);
const stats = await this.prisma.$queryRaw<
Array<{
district: string;
city: string;
property_type: PropertyType;
median_price: bigint;
avg_price_m2: number;
total_listings: bigint;
avg_days_on_market: number;
inventory_level: bigint;
}>
>`
SELECT
p.district,
p.city,
p."propertyType" AS property_type,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY l."priceVND") AS median_price,
AVG(l."pricePerM2")::float AS avg_price_m2,
COUNT(l.id) AS total_listings,
AVG(
EXTRACT(EPOCH FROM (NOW() - l."publishedAt")) / 86400
)::float AS avg_days_on_market,
COUNT(l.id) FILTER (WHERE l.status = 'ACTIVE') AS inventory_level
FROM "Listing" l
JOIN "Property" p ON l."propertyId" = p.id
WHERE l.status IN ('ACTIVE', 'SOLD', 'RENTED')
AND l."publishedAt" IS NOT NULL
AND l."publishedAt" >= NOW() - INTERVAL '90 days'
AND p."propertyType" = ANY(${propertyTypes}::"PropertyType"[])
GROUP BY p.district, p.city, p."propertyType"
HAVING COUNT(l.id) >= 3
ORDER BY p.city, p.district, p."propertyType"
`;
return stats.map((s) => ({
district: s.district,
city: s.city,
propertyType: s.property_type,
medianPrice: BigInt(Math.round(Number(s.median_price))),
avgPriceM2: s.avg_price_m2,
totalListings: Number(s.total_listings),
avgDaysOnMarket: Math.round(s.avg_days_on_market * 10) / 10,
inventoryLevel: Number(s.inventory_level),
}));
}
private getCurrentPeriod(): string {
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
return `${year}-${month}`;
}
}