feat(search): implement Search module with Typesense full-text & geo search
- TypesenseClient service with configurable connection - Collection schema for listings with facets, geo-point, and Vietnamese text - ListingIndexer service with PostGIS coordinate extraction for geo search - CQRS commands: SyncListing, ReindexAll (batch with pagination) - CQRS queries: SearchProperties (filters, sorting), GeoSearch (radius) - Event handlers for listing.approved/updated/deactivated auto-sync - REST endpoints: GET /search, GET /search/geo, POST /search/reindex (admin) - DTOs with class-validator validation and pagination Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
export { TypesenseClientService } from './typesense-client.service';
|
||||
export { TypesenseSearchRepository } from './typesense-search.repository';
|
||||
export { ListingIndexerService } from './listing-indexer.service';
|
||||
@@ -0,0 +1,220 @@
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { PrismaService } from '@modules/shared/infrastructure/prisma.service';
|
||||
import { LoggerService } from '@modules/shared/infrastructure/logger.service';
|
||||
import {
|
||||
SEARCH_REPOSITORY,
|
||||
type ISearchRepository,
|
||||
type ListingDocument,
|
||||
} from '../../domain/repositories/search.repository';
|
||||
|
||||
@Injectable()
|
||||
export class ListingIndexerService {
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
@Inject(SEARCH_REPOSITORY) private readonly searchRepo: ISearchRepository,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
async indexListing(listingId: string): Promise<void> {
|
||||
const doc = await this.fetchListingDocumentById(listingId);
|
||||
|
||||
if (!doc || doc.status !== 'ACTIVE') {
|
||||
this.logger.warn(`Listing ${listingId} not found or not active, skipping index`, 'ListingIndexer');
|
||||
return;
|
||||
}
|
||||
|
||||
await this.searchRepo.indexDocument(doc);
|
||||
this.logger.log(`Indexed listing ${listingId}`, 'ListingIndexer');
|
||||
}
|
||||
|
||||
async removeListing(listingId: string): Promise<void> {
|
||||
await this.searchRepo.removeDocument(listingId);
|
||||
this.logger.log(`Removed listing ${listingId} from index`, 'ListingIndexer');
|
||||
}
|
||||
|
||||
async reindexAll(): Promise<{ indexed: number; total: number }> {
|
||||
this.logger.log('Starting full reindex...', 'ListingIndexer');
|
||||
|
||||
await this.searchRepo.dropCollection();
|
||||
await this.searchRepo.ensureCollection();
|
||||
|
||||
const batchSize = 100;
|
||||
let offset = 0;
|
||||
let indexed = 0;
|
||||
|
||||
while (true) {
|
||||
const rows = await this.fetchListingsWithCoords(batchSize, offset);
|
||||
if (rows.length === 0) break;
|
||||
|
||||
await this.searchRepo.indexDocuments(rows);
|
||||
indexed += rows.length;
|
||||
offset += batchSize;
|
||||
|
||||
this.logger.log(`Reindex progress: ${indexed} documents`, 'ListingIndexer');
|
||||
}
|
||||
|
||||
this.logger.log(`Full reindex complete: ${indexed} documents`, 'ListingIndexer');
|
||||
return { indexed, total: indexed };
|
||||
}
|
||||
|
||||
private async fetchListingsWithCoords(
|
||||
limit: number,
|
||||
offset: number,
|
||||
): Promise<ListingDocument[]> {
|
||||
const rows = await this.prisma.$queryRaw<
|
||||
Array<{
|
||||
id: string;
|
||||
propertyId: string;
|
||||
transactionType: string;
|
||||
priceVND: bigint;
|
||||
pricePerM2: number | null;
|
||||
agentId: string | null;
|
||||
sellerId: string;
|
||||
status: string;
|
||||
viewCount: number;
|
||||
saveCount: number;
|
||||
publishedAt: Date | null;
|
||||
title: string;
|
||||
description: string;
|
||||
propertyType: string;
|
||||
areaM2: number;
|
||||
bedrooms: number | null;
|
||||
bathrooms: number | null;
|
||||
floors: number | null;
|
||||
direction: string | null;
|
||||
address: string;
|
||||
ward: string;
|
||||
district: string;
|
||||
city: string;
|
||||
projectName: string | null;
|
||||
amenities: unknown;
|
||||
lat: number | null;
|
||||
lng: number | null;
|
||||
}>
|
||||
>`
|
||||
SELECT
|
||||
l."id", l."propertyId", l."transactionType", l."priceVND", l."pricePerM2",
|
||||
l."agentId", l."sellerId", l."status", l."viewCount", l."saveCount", l."publishedAt",
|
||||
p."title", p."description", p."propertyType", p."areaM2",
|
||||
p."bedrooms", p."bathrooms", p."floors", p."direction",
|
||||
p."address", p."ward", p."district", p."city", p."projectName", p."amenities",
|
||||
ST_Y(p."location"::geometry) AS lat,
|
||||
ST_X(p."location"::geometry) AS lng
|
||||
FROM "Listing" l
|
||||
JOIN "Property" p ON l."propertyId" = p."id"
|
||||
WHERE l."status" = 'ACTIVE'
|
||||
ORDER BY l."publishedAt" DESC NULLS LAST
|
||||
LIMIT ${limit} OFFSET ${offset}
|
||||
`;
|
||||
|
||||
return rows.map((row) => ({
|
||||
id: row.id,
|
||||
listingId: row.id,
|
||||
propertyId: row.propertyId,
|
||||
title: row.title,
|
||||
description: row.description,
|
||||
propertyType: row.propertyType,
|
||||
transactionType: row.transactionType,
|
||||
priceVND: Number(row.priceVND),
|
||||
pricePerM2: row.pricePerM2,
|
||||
areaM2: row.areaM2,
|
||||
bedrooms: row.bedrooms,
|
||||
bathrooms: row.bathrooms,
|
||||
floors: row.floors,
|
||||
direction: row.direction,
|
||||
address: row.address,
|
||||
ward: row.ward,
|
||||
district: row.district,
|
||||
city: row.city,
|
||||
location: [row.lat ?? 0, row.lng ?? 0] as [number, number],
|
||||
agentId: row.agentId,
|
||||
sellerId: row.sellerId,
|
||||
status: row.status,
|
||||
publishedAt: row.publishedAt ? Math.floor(row.publishedAt.getTime() / 1000) : 0,
|
||||
viewCount: row.viewCount,
|
||||
saveCount: row.saveCount,
|
||||
projectName: row.projectName,
|
||||
amenities: Array.isArray(row.amenities) ? (row.amenities as string[]) : [],
|
||||
}));
|
||||
}
|
||||
|
||||
async fetchListingDocumentById(listingId: string): Promise<ListingDocument | null> {
|
||||
const rows = await this.prisma.$queryRaw<
|
||||
Array<{
|
||||
id: string;
|
||||
propertyId: string;
|
||||
transactionType: string;
|
||||
priceVND: bigint;
|
||||
pricePerM2: number | null;
|
||||
agentId: string | null;
|
||||
sellerId: string;
|
||||
status: string;
|
||||
viewCount: number;
|
||||
saveCount: number;
|
||||
publishedAt: Date | null;
|
||||
title: string;
|
||||
description: string;
|
||||
propertyType: string;
|
||||
areaM2: number;
|
||||
bedrooms: number | null;
|
||||
bathrooms: number | null;
|
||||
floors: number | null;
|
||||
direction: string | null;
|
||||
address: string;
|
||||
ward: string;
|
||||
district: string;
|
||||
city: string;
|
||||
projectName: string | null;
|
||||
amenities: unknown;
|
||||
lat: number | null;
|
||||
lng: number | null;
|
||||
}>
|
||||
>`
|
||||
SELECT
|
||||
l."id", l."propertyId", l."transactionType", l."priceVND", l."pricePerM2",
|
||||
l."agentId", l."sellerId", l."status", l."viewCount", l."saveCount", l."publishedAt",
|
||||
p."title", p."description", p."propertyType", p."areaM2",
|
||||
p."bedrooms", p."bathrooms", p."floors", p."direction",
|
||||
p."address", p."ward", p."district", p."city", p."projectName", p."amenities",
|
||||
ST_Y(p."location"::geometry) AS lat,
|
||||
ST_X(p."location"::geometry) AS lng
|
||||
FROM "Listing" l
|
||||
JOIN "Property" p ON l."propertyId" = p."id"
|
||||
WHERE l."id" = ${listingId}
|
||||
`;
|
||||
|
||||
if (rows.length === 0) return null;
|
||||
|
||||
const row = rows[0]!;
|
||||
return {
|
||||
id: row.id,
|
||||
listingId: row.id,
|
||||
propertyId: row.propertyId,
|
||||
title: row.title,
|
||||
description: row.description,
|
||||
propertyType: row.propertyType,
|
||||
transactionType: row.transactionType,
|
||||
priceVND: Number(row.priceVND),
|
||||
pricePerM2: row.pricePerM2,
|
||||
areaM2: row.areaM2,
|
||||
bedrooms: row.bedrooms,
|
||||
bathrooms: row.bathrooms,
|
||||
floors: row.floors,
|
||||
direction: row.direction,
|
||||
address: row.address,
|
||||
ward: row.ward,
|
||||
district: row.district,
|
||||
city: row.city,
|
||||
location: [row.lat ?? 0, row.lng ?? 0],
|
||||
agentId: row.agentId,
|
||||
sellerId: row.sellerId,
|
||||
status: row.status,
|
||||
publishedAt: row.publishedAt ? Math.floor(row.publishedAt.getTime() / 1000) : 0,
|
||||
viewCount: row.viewCount,
|
||||
saveCount: row.saveCount,
|
||||
projectName: row.projectName,
|
||||
amenities: Array.isArray(row.amenities) ? (row.amenities as string[]) : [],
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
import { Injectable, type OnModuleInit } from '@nestjs/common';
|
||||
import { LoggerService } from '@modules/shared/infrastructure/logger.service';
|
||||
import { Client as TypesenseClient } from 'typesense';
|
||||
|
||||
@Injectable()
|
||||
export class TypesenseClientService implements OnModuleInit {
|
||||
private client!: TypesenseClient;
|
||||
|
||||
constructor(private readonly logger: LoggerService) {}
|
||||
|
||||
onModuleInit(): void {
|
||||
this.client = new TypesenseClient({
|
||||
nodes: [
|
||||
{
|
||||
host: process.env['TYPESENSE_HOST'] || 'localhost',
|
||||
port: parseInt(process.env['TYPESENSE_PORT'] || '8108', 10),
|
||||
protocol: process.env['TYPESENSE_PROTOCOL'] || 'http',
|
||||
},
|
||||
],
|
||||
apiKey: process.env['TYPESENSE_API_KEY'] || 'ts_dev_key_change_me',
|
||||
connectionTimeoutSeconds: 5,
|
||||
retryIntervalSeconds: 0.1,
|
||||
numRetries: 3,
|
||||
});
|
||||
|
||||
this.logger.log('TypesenseClientService initialized', 'TypesenseClient');
|
||||
}
|
||||
|
||||
getClient(): TypesenseClient {
|
||||
return this.client;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,174 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { LoggerService } from '@modules/shared/infrastructure/logger.service';
|
||||
import {
|
||||
type ISearchRepository,
|
||||
type ListingDocument,
|
||||
type SearchParams,
|
||||
type SearchResult,
|
||||
} from '../../domain/repositories/search.repository';
|
||||
import { TypesenseClientService } from './typesense-client.service';
|
||||
import { Client as TypesenseClient } from 'typesense';
|
||||
import type { CollectionCreateSchema } from 'typesense/lib/Typesense/Collections';
|
||||
|
||||
const COLLECTION_NAME = 'listings';
|
||||
|
||||
const LISTING_SCHEMA: CollectionCreateSchema = {
|
||||
name: COLLECTION_NAME,
|
||||
fields: [
|
||||
{ name: 'listingId', type: 'string', facet: false },
|
||||
{ name: 'propertyId', type: 'string', facet: false },
|
||||
{ name: 'title', type: 'string', facet: false },
|
||||
{ name: 'description', type: 'string', facet: false },
|
||||
{ name: 'propertyType', type: 'string', facet: true },
|
||||
{ name: 'transactionType', type: 'string', facet: true },
|
||||
{ name: 'priceVND', type: 'int64', facet: false },
|
||||
{ name: 'pricePerM2', type: 'float', facet: false, optional: true },
|
||||
{ name: 'areaM2', type: 'float', facet: false },
|
||||
{ name: 'bedrooms', type: 'int32', facet: true, optional: true },
|
||||
{ name: 'bathrooms', type: 'int32', facet: true, optional: true },
|
||||
{ name: 'floors', type: 'int32', facet: false, optional: true },
|
||||
{ name: 'direction', type: 'string', facet: true, optional: true },
|
||||
{ name: 'address', type: 'string', facet: false },
|
||||
{ name: 'ward', type: 'string', facet: true },
|
||||
{ name: 'district', type: 'string', facet: true },
|
||||
{ name: 'city', type: 'string', facet: true },
|
||||
{ name: 'location', type: 'geopoint', facet: false },
|
||||
{ name: 'agentId', type: 'string', facet: false, optional: true },
|
||||
{ name: 'sellerId', type: 'string', facet: false },
|
||||
{ name: 'status', type: 'string', facet: true },
|
||||
{ name: 'publishedAt', type: 'int64', facet: false },
|
||||
{ name: 'viewCount', type: 'int32', facet: false },
|
||||
{ name: 'saveCount', type: 'int32', facet: false },
|
||||
{ name: 'projectName', type: 'string', facet: true, optional: true },
|
||||
{ name: 'amenities', type: 'string[]', facet: true, optional: true },
|
||||
],
|
||||
token_separators: ['-', '_'],
|
||||
enable_nested_fields: false,
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class TypesenseSearchRepository implements ISearchRepository {
|
||||
private readonly client: TypesenseClient;
|
||||
|
||||
constructor(
|
||||
private readonly typesenseClient: TypesenseClientService,
|
||||
private readonly logger: LoggerService,
|
||||
) {
|
||||
this.client = this.typesenseClient.getClient();
|
||||
}
|
||||
|
||||
async ensureCollection(): Promise<void> {
|
||||
try {
|
||||
await this.client.collections(COLLECTION_NAME).retrieve();
|
||||
this.logger.log(`Collection "${COLLECTION_NAME}" already exists`, 'TypesenseSearch');
|
||||
} catch {
|
||||
await this.client.collections().create(LISTING_SCHEMA);
|
||||
this.logger.log(`Collection "${COLLECTION_NAME}" created`, 'TypesenseSearch');
|
||||
}
|
||||
}
|
||||
|
||||
async dropCollection(): Promise<void> {
|
||||
try {
|
||||
await this.client.collections(COLLECTION_NAME).delete();
|
||||
this.logger.log(`Collection "${COLLECTION_NAME}" dropped`, 'TypesenseSearch');
|
||||
} catch {
|
||||
this.logger.warn(`Collection "${COLLECTION_NAME}" not found to drop`, 'TypesenseSearch');
|
||||
}
|
||||
}
|
||||
|
||||
async indexDocument(doc: ListingDocument): Promise<void> {
|
||||
await this.client
|
||||
.collections(COLLECTION_NAME)
|
||||
.documents()
|
||||
.upsert(doc as unknown as Record<string, unknown>);
|
||||
}
|
||||
|
||||
async indexDocuments(docs: ListingDocument[]): Promise<void> {
|
||||
if (docs.length === 0) return;
|
||||
|
||||
const results = await this.client
|
||||
.collections(COLLECTION_NAME)
|
||||
.documents()
|
||||
.import(docs as unknown as Record<string, unknown>[], { action: 'upsert' });
|
||||
|
||||
const failures = results.filter((r: { success: boolean }) => !r.success);
|
||||
if (failures.length > 0) {
|
||||
this.logger.warn(
|
||||
`${failures.length}/${docs.length} documents failed to index`,
|
||||
'TypesenseSearch',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async removeDocument(id: string): Promise<void> {
|
||||
try {
|
||||
await this.client.collections(COLLECTION_NAME).documents(id).delete();
|
||||
} catch {
|
||||
this.logger.warn(`Document ${id} not found for removal`, 'TypesenseSearch');
|
||||
}
|
||||
}
|
||||
|
||||
async search(params: SearchParams): Promise<SearchResult> {
|
||||
const page = params.page ?? 1;
|
||||
const perPage = params.perPage ?? 20;
|
||||
|
||||
let filterBy = params.filterBy || '';
|
||||
|
||||
if (params.geoPoint && params.geoRadiusKm) {
|
||||
const geoFilter = `location:(${params.geoPoint.lat}, ${params.geoPoint.lng}, ${params.geoRadiusKm} km)`;
|
||||
filterBy = filterBy ? `${filterBy} && ${geoFilter}` : geoFilter;
|
||||
}
|
||||
|
||||
const searchParams = {
|
||||
q: params.query || '*',
|
||||
query_by: 'title,description,address,district,city,projectName',
|
||||
query_by_weights: '5,3,2,2,1,2',
|
||||
filter_by: filterBy,
|
||||
sort_by: this.buildSortBy(params),
|
||||
page,
|
||||
per_page: perPage,
|
||||
highlight_full_fields: 'title,description',
|
||||
highlight_start_tag: '<mark>',
|
||||
highlight_end_tag: '</mark>',
|
||||
};
|
||||
|
||||
const result = await this.client
|
||||
.collections(COLLECTION_NAME)
|
||||
.documents()
|
||||
.search(searchParams);
|
||||
|
||||
const hits = (result.hits ?? []).map(
|
||||
(hit) => hit.document as unknown as ListingDocument,
|
||||
);
|
||||
const totalFound = (result.found as number) ?? 0;
|
||||
|
||||
return {
|
||||
hits,
|
||||
totalFound,
|
||||
page,
|
||||
perPage,
|
||||
totalPages: Math.ceil(totalFound / perPage),
|
||||
searchTimeMs: result.search_time_ms ?? 0,
|
||||
};
|
||||
}
|
||||
|
||||
private buildSortBy(params: SearchParams): string {
|
||||
if (params.geoPoint) {
|
||||
if (params.sortBy === 'distance' || (!params.sortBy && params.geoRadiusKm)) {
|
||||
return `location(${params.geoPoint.lat}, ${params.geoPoint.lng}):asc`;
|
||||
}
|
||||
}
|
||||
|
||||
switch (params.sortBy) {
|
||||
case 'price_asc':
|
||||
return 'priceVND:asc';
|
||||
case 'price_desc':
|
||||
return 'priceVND:desc';
|
||||
case 'date_desc':
|
||||
return 'publishedAt:desc';
|
||||
case 'relevance':
|
||||
default:
|
||||
return params.query && params.query !== '*' ? '_text_match:desc,publishedAt:desc' : 'publishedAt:desc';
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user