import { Injectable } from '@nestjs/common'; import { CommandBus } from '@nestjs/cqrs'; import { Cron, CronExpression } from '@nestjs/schedule'; import { SendNotificationCommand } from '@modules/notifications'; import { PrismaService, LoggerService } from '@modules/shared'; /** Rows processed per cursor-page. Aligns with idx_savedsearch_alert_enabled batch size. */ const ALERT_BATCH_SIZE = 500; /** * Daily cron job that checks saved searches against new listings published since lastAlertAt. * This complements the real-time event-based handler by catching any listings that * were missed (e.g., due to service downtime or event processing failures). * * Memory footprint is bounded: rows are streamed in pages of {@link ALERT_BATCH_SIZE} * via keyset pagination on `createdAt`, which the partial index * `idx_savedsearch_alert_enabled` covers directly. */ @Injectable() export class SavedSearchAlertCronService { constructor( private readonly prisma: PrismaService, private readonly commandBus: CommandBus, private readonly logger: LoggerService, ) {} @Cron(CronExpression.EVERY_DAY_AT_8AM, { name: 'saved-search-daily-alerts' }) async processAlerts(): Promise { this.logger.log('Starting daily saved search alert processing...', 'SavedSearchAlertCron'); try { let totalAlerts = 0; let totalSearches = 0; let cursor: Date | undefined; // Stream alert-enabled saved searches in bounded batches (keyset on createdAt). // idx_savedsearch_alert_enabled covers WHERE alertEnabled = true ORDER BY createdAt. do { const batch = await this.prisma.savedSearch.findMany({ where: { alertEnabled: true, ...(cursor ? { createdAt: { gt: cursor } } : {}), }, include: { user: { select: { id: true, email: true, fullName: true } }, }, orderBy: { createdAt: 'asc' }, take: ALERT_BATCH_SIZE, }); if (batch.length === 0) break; totalSearches += batch.length; for (const search of batch) { try { const matchCount = await this.checkAndAlert(search); totalAlerts += matchCount; } catch (err) { this.logger.warn( `Failed to process alerts for saved search ${search.id}: ${err instanceof Error ? err.message : String(err)}`, 'SavedSearchAlertCron', ); } } // Advance cursor to the last row's createdAt for the next page. cursor = batch[batch.length - 1]!.createdAt; if (batch.length < ALERT_BATCH_SIZE) break; // eslint-disable-next-line no-constant-condition -- exit via `break` above } while (true); this.logger.log( `Daily saved search alert processing completed: ${totalAlerts} alerts sent for ${totalSearches} searches`, 'SavedSearchAlertCron', ); } catch (err) { this.logger.error( `Daily saved search alert processing failed: ${(err as Error).message}`, undefined, 'SavedSearchAlertCron', ); } } private async checkAndAlert( search: { id: string; name: string; userId: string; filters: unknown; lastAlertAt: Date | null; user: { id: string; email: string | null; fullName: string | null }; }, ): Promise { const filters = search.filters as Record; // Build query for new listings since last alert const sinceDate = search.lastAlertAt ?? new Date(Date.now() - 24 * 60 * 60 * 1000); const where: Record = { status: 'ACTIVE', publishedAt: { gte: sinceDate }, sellerId: { not: search.userId }, property: this.buildPropertyWhereClause(filters), }; if (filters['transactionType']) { where['transactionType'] = filters['transactionType']; } if (filters['priceMin'] || filters['priceMax']) { where['priceVND'] = { ...(filters['priceMin'] ? { gte: BigInt(Number(filters['priceMin'])) } : {}), ...(filters['priceMax'] ? { lte: BigInt(Number(filters['priceMax'])) } : {}), }; } const newListings = await this.prisma.listing.findMany({ where, include: { property: true }, take: 10, orderBy: { publishedAt: 'desc' }, }); if (newListings.length === 0) { return 0; } // Send a digest notification if (!search.user.email) { this.logger.warn( `User ${search.user.id} has no email, skipping saved search digest alert`, 'SavedSearchAlertCron', ); return 0; } try { await this.commandBus.execute( new SendNotificationCommand( search.user.id, 'EMAIL', 'saved_search_digest', { userName: search.user.fullName ?? 'Người dùng', searchName: search.name, matchCount: newListings.length, listings: newListings.slice(0, 5).map((l) => ({ title: l.property.title, price: Number(l.priceVND).toLocaleString('vi-VN'), district: l.property.district, city: l.property.city, url: `/listings/${l.id}`, })), }, search.user.email, ), ); // Update lastAlertAt await this.prisma.savedSearch.update({ where: { id: search.id }, data: { lastAlertAt: new Date() }, }); return 1; } catch (err) { this.logger.warn( `Failed to send digest alert for search ${search.id}: ${err instanceof Error ? err.message : String(err)}`, 'SavedSearchAlertCron', ); return 0; } } private buildPropertyWhereClause(filters: Record): Record { const propertyWhere: Record = {}; if (filters['propertyType']) { propertyWhere['propertyType'] = filters['propertyType']; } if (filters['district']) { propertyWhere['district'] = filters['district']; } if (filters['city']) { propertyWhere['city'] = filters['city']; } if (filters['areaMin'] || filters['areaMax']) { propertyWhere['areaM2'] = { ...(filters['areaMin'] ? { gte: Number(filters['areaMin']) } : {}), ...(filters['areaMax'] ? { lte: Number(filters['areaMax']) } : {}), }; } if (filters['bedrooms']) { propertyWhere['bedrooms'] = { gte: Number(filters['bedrooms']) }; } return propertyWhere; } }