diff --git a/apps/api/src/modules/notifications/application/__tests__/residential-events.listener.spec.ts b/apps/api/src/modules/notifications/application/__tests__/residential-events.listener.spec.ts index 503291f..242ffb0 100644 --- a/apps/api/src/modules/notifications/application/__tests__/residential-events.listener.spec.ts +++ b/apps/api/src/modules/notifications/application/__tests__/residential-events.listener.spec.ts @@ -65,12 +65,14 @@ describe('ResidentialPriceDropListener', () => { userId: 'user-1', name: 'Quận 7 căn hộ', filters: { city: 'Hồ Chí Minh', district: 'Quận 7', priceMax: 3_000_000_000 }, + createdAt: new Date('2026-01-01T00:00:00Z'), }, { id: 'ss-2', userId: 'user-2', name: 'Quận 1', filters: { district: 'Quận 1' }, + createdAt: new Date('2026-01-02T00:00:00Z'), }, ]); @@ -101,7 +103,7 @@ describe('ResidentialPriceDropListener', () => { it('skips saved searches owned by the listing seller', async () => { prisma.listing.findUnique.mockResolvedValue(listing); prisma.savedSearch.findMany.mockResolvedValue([ - { id: 'ss-self', userId: 'seller-1', name: 'mine', filters: {} }, + { id: 'ss-self', userId: 'seller-1', name: 'mine', filters: {}, createdAt: new Date('2026-01-01T00:00:00Z') }, ]); const event = new ListingPriceChangedEvent('listing-1', 2_500_000_000n, 2_000_000_000n); @@ -117,6 +119,38 @@ describe('ResidentialPriceDropListener', () => { await expect(listener.handle(event)).resolves.not.toThrow(); expect(logger.warn).toHaveBeenCalled(); }); + + it('paginates across batches and emits to all matching users', async () => { + prisma.listing.findUnique.mockResolvedValue(listing); + + const BATCH = 500; + const makeRow = (n: number) => ({ + id: `ss-${n}`, + userId: `user-${n}`, + name: `Search ${n}`, + // All match: district + city + filters: { city: 'Hồ Chí Minh', district: 'Quận 7' }, + createdAt: new Date(Date.now() + n * 1000), + }); + + const page1 = Array.from({ length: BATCH }, (_, i) => makeRow(i)); + const page2 = [makeRow(BATCH)]; + + prisma.savedSearch.findMany + .mockResolvedValueOnce(page1) + .mockResolvedValueOnce(page2) + .mockResolvedValue([]); + + const event = new ListingPriceChangedEvent('listing-1', 2_500_000_000n, 2_000_000_000n); + await listener.handle(event); + + expect(prisma.savedSearch.findMany).toHaveBeenCalledTimes(2); + expect(gateway.emitResidentialEvent).toHaveBeenCalledTimes(BATCH + 1); + + // Second call must use cursor from last row of first batch + const secondCall = prisma.savedSearch.findMany.mock.calls[1][0]; + expect(secondCall.where.createdAt?.gt).toEqual(page1[BATCH - 1]!.createdAt); + }); }); describe('ResidentialNewListingInProjectListener', () => { @@ -149,9 +183,9 @@ describe('ResidentialNewListingInProjectListener', () => { }, }); prisma.savedSearch.findMany.mockResolvedValue([ - { id: 'ss-tracker', userId: 'user-10', name: 'VGP', filters: { projectId: 'project-vgp' } }, - { id: 'ss-other', userId: 'user-11', name: 'khác', filters: { projectId: 'project-other' } }, - { id: 'ss-no-project', userId: 'user-12', name: 'no-project', filters: {} }, + { id: 'ss-tracker', userId: 'user-10', name: 'VGP', filters: { projectId: 'project-vgp' }, createdAt: new Date('2026-01-01T00:00:00Z') }, + { id: 'ss-other', userId: 'user-11', name: 'khác', filters: { projectId: 'project-other' }, createdAt: new Date('2026-01-02T00:00:00Z') }, + { id: 'ss-no-project', userId: 'user-12', name: 'no-project', filters: {}, createdAt: new Date('2026-01-03T00:00:00Z') }, ]); const event = new ListingApprovedEvent('listing-9', 'admin-1'); diff --git a/apps/api/src/modules/notifications/application/listeners/residential-events.listener.ts b/apps/api/src/modules/notifications/application/listeners/residential-events.listener.ts index d448a34..627d34c 100644 --- a/apps/api/src/modules/notifications/application/listeners/residential-events.listener.ts +++ b/apps/api/src/modules/notifications/application/listeners/residential-events.listener.ts @@ -7,6 +7,9 @@ import { NotificationsGateway } from '../../presentation/gateways/notifications. const CONTEXT = 'ResidentialEventsListener'; +/** Rows processed per cursor-page. Aligns with idx_savedsearch_alert_enabled batch size. */ +const ALERT_BATCH_SIZE = 500; + /** * Shape of the `filters` JSON column on `SavedSearch`. Matches fields * consumed by the saved-search alert matcher. Anything else is ignored. @@ -63,31 +66,47 @@ export class ResidentialPriceDropListener }); if (!listing || !listing.property) return; - const savedSearches = await this.prisma.savedSearch.findMany({ - where: { alertEnabled: true }, - select: { id: true, userId: true, name: true, filters: true }, - }); - let matchCount = 0; - for (const search of savedSearches) { - if (search.userId === listing.sellerId) continue; + let cursor: Date | undefined; - const filters = normalizeFilters(search.filters); - if (!matchesFilters(listing, listing.property, filters)) continue; - - this.gateway.emitResidentialEvent(search.userId, 'residential:price-drop', { - listingId: listing.id, - savedSearchId: search.id, - savedSearchName: search.name, - title: listing.property.title, - oldPrice: event.oldPrice.toString(), - newPrice: event.newPrice.toString(), - district: listing.property.district, - city: listing.property.city, - occurredAt: event.occurredAt.toISOString(), + // Stream alert-enabled saved searches in bounded batches (keyset on createdAt). + // idx_savedsearch_alert_enabled covers WHERE alertEnabled = true ORDER BY createdAt. + do { + const batch = await this.prisma.savedSearch.findMany({ + where: { + alertEnabled: true, + ...(cursor ? { createdAt: { gt: cursor } } : {}), + }, + select: { id: true, userId: true, name: true, filters: true, createdAt: true }, + orderBy: { createdAt: 'asc' }, + take: ALERT_BATCH_SIZE, }); - matchCount++; - } + + if (batch.length === 0) break; + + for (const search of batch) { + if (search.userId === listing.sellerId) continue; + + const filters = normalizeFilters(search.filters); + if (!matchesFilters(listing, listing.property, filters)) continue; + + this.gateway.emitResidentialEvent(search.userId, 'residential:price-drop', { + listingId: listing.id, + savedSearchId: search.id, + savedSearchName: search.name, + title: listing.property.title, + oldPrice: event.oldPrice.toString(), + newPrice: event.newPrice.toString(), + district: listing.property.district, + city: listing.property.city, + occurredAt: event.occurredAt.toISOString(), + }); + matchCount++; + } + + cursor = batch[batch.length - 1]!.createdAt; + if (batch.length < ALERT_BATCH_SIZE) break; + } while (true); if (matchCount > 0) { this.logger.log( @@ -126,35 +145,51 @@ export class ResidentialNewListingInProjectListener const projectId = listing.property.projectDevelopmentId; - const savedSearches = await this.prisma.savedSearch.findMany({ - where: { alertEnabled: true }, - select: { id: true, userId: true, name: true, filters: true }, - }); - let matchCount = 0; - for (const search of savedSearches) { - if (search.userId === listing.sellerId) continue; + let cursor: Date | undefined; - const filters = normalizeFilters(search.filters); - if (filters.projectId !== projectId) continue; - - this.gateway.emitResidentialEvent( - search.userId, - 'residential:new-listing-in-project', - { - listingId: listing.id, - projectId, - savedSearchId: search.id, - savedSearchName: search.name, - title: listing.property.title, - price: listing.priceVND.toString(), - district: listing.property.district, - city: listing.property.city, - occurredAt: event.occurredAt.toISOString(), + // Stream alert-enabled saved searches in bounded batches (keyset on createdAt). + // idx_savedsearch_alert_enabled covers WHERE alertEnabled = true ORDER BY createdAt. + do { + const batch = await this.prisma.savedSearch.findMany({ + where: { + alertEnabled: true, + ...(cursor ? { createdAt: { gt: cursor } } : {}), }, - ); - matchCount++; - } + select: { id: true, userId: true, name: true, filters: true, createdAt: true }, + orderBy: { createdAt: 'asc' }, + take: ALERT_BATCH_SIZE, + }); + + if (batch.length === 0) break; + + for (const search of batch) { + if (search.userId === listing.sellerId) continue; + + const filters = normalizeFilters(search.filters); + if (filters.projectId !== projectId) continue; + + this.gateway.emitResidentialEvent( + search.userId, + 'residential:new-listing-in-project', + { + listingId: listing.id, + projectId, + savedSearchId: search.id, + savedSearchName: search.name, + title: listing.property.title, + price: listing.priceVND.toString(), + district: listing.property.district, + city: listing.property.city, + occurredAt: event.occurredAt.toISOString(), + }, + ); + matchCount++; + } + + cursor = batch[batch.length - 1]!.createdAt; + if (batch.length < ALERT_BATCH_SIZE) break; + } while (true); if (matchCount > 0) { this.logger.log( diff --git a/apps/api/src/modules/search/infrastructure/__tests__/saved-search-alert.handler.spec.ts b/apps/api/src/modules/search/infrastructure/__tests__/saved-search-alert.handler.spec.ts index 5a526b6..f6d8fe6 100644 --- a/apps/api/src/modules/search/infrastructure/__tests__/saved-search-alert.handler.spec.ts +++ b/apps/api/src/modules/search/infrastructure/__tests__/saved-search-alert.handler.spec.ts @@ -28,6 +28,7 @@ describe('SavedSearchAlertHandler', () => { filters: { district: 'Quan 7', propertyType: 'APARTMENT' }, alertEnabled: true, lastAlertAt: null, + createdAt: new Date('2026-01-01T00:00:00Z'), user: { id: 'user-1', email: 'user@example.com', fullName: 'Nguyen Van A' }, }; @@ -124,4 +125,59 @@ describe('SavedSearchAlertHandler', () => { expect(mockCommandBus.execute).not.toHaveBeenCalled(); }); + + it('paginates across multiple batches and processes all matching rows', async () => { + mockPrisma.listing.findUnique.mockResolvedValue(mockListing); + mockPrisma.savedSearch.update.mockResolvedValue({}); + + const BATCH = 500; + const makeRow = (n: number) => ({ + id: `saved-${n}`, + userId: `user-${n}`, + name: `Search ${n}`, + filters: { district: 'Quan 7', propertyType: 'APARTMENT' }, + alertEnabled: true, + lastAlertAt: null, + createdAt: new Date(Date.now() + n * 1000), + user: { id: `user-${n}`, email: `user${n}@example.com`, fullName: `User ${n}` }, + }); + + const page1 = Array.from({ length: BATCH }, (_, i) => makeRow(i)); + const page2 = Array.from({ length: BATCH }, (_, i) => makeRow(BATCH + i)); + const page3 = [makeRow(BATCH * 2)]; + + mockPrisma.savedSearch.findMany + .mockResolvedValueOnce(page1) + .mockResolvedValueOnce(page2) + .mockResolvedValueOnce(page3) + .mockResolvedValue([]); + + await handler.handle({ listingId: 'listing-1' }); + + expect(mockPrisma.savedSearch.findMany).toHaveBeenCalledTimes(3); + expect(mockCommandBus.execute).toHaveBeenCalledTimes(BATCH * 2 + 1); + + // Second call must carry cursor from last row of first batch + const secondCallArgs = mockPrisma.savedSearch.findMany.mock.calls[1][0]; + expect(secondCallArgs.where.createdAt?.gt).toEqual(page1[BATCH - 1]!.createdAt); + + // Third call must carry cursor from last row of second batch + const thirdCallArgs = mockPrisma.savedSearch.findMany.mock.calls[2][0]; + expect(thirdCallArgs.where.createdAt?.gt).toEqual(page2[BATCH - 1]!.createdAt); + }); + + it('uses orderBy createdAt asc and take=500 on first page', async () => { + mockPrisma.listing.findUnique.mockResolvedValue(mockListing); + mockPrisma.savedSearch.findMany.mockResolvedValue([]); + + await handler.handle({ listingId: 'listing-1' }); + + expect(mockPrisma.savedSearch.findMany).toHaveBeenCalledWith( + expect.objectContaining({ + orderBy: { createdAt: 'asc' }, + take: 500, + where: expect.objectContaining({ alertEnabled: true }), + }), + ); + }); }); diff --git a/apps/api/src/modules/search/infrastructure/cron/saved-search-alert-cron.service.ts b/apps/api/src/modules/search/infrastructure/cron/saved-search-alert-cron.service.ts index 9668c90..c43a024 100644 --- a/apps/api/src/modules/search/infrastructure/cron/saved-search-alert-cron.service.ts +++ b/apps/api/src/modules/search/infrastructure/cron/saved-search-alert-cron.service.ts @@ -4,10 +4,17 @@ import { Cron, CronExpression } from '@nestjs/schedule'; import { SendNotificationCommand } from '@modules/notifications'; import { PrismaService, LoggerService } from '@modules/shared'; +/** Rows processed per cursor-page. Aligns with idx_savedsearch_alert_enabled batch size. */ +const ALERT_BATCH_SIZE = 500; + /** * Daily cron job that checks saved searches against new listings published since lastAlertAt. * This complements the real-time event-based handler by catching any listings that * were missed (e.g., due to service downtime or event processing failures). + * + * Memory footprint is bounded: rows are streamed in pages of {@link ALERT_BATCH_SIZE} + * via keyset pagination on `createdAt`, which the partial index + * `idx_savedsearch_alert_enabled` covers directly. */ @Injectable() export class SavedSearchAlertCronService { @@ -22,34 +29,49 @@ export class SavedSearchAlertCronService { this.logger.log('Starting daily saved search alert processing...', 'SavedSearchAlertCron'); try { - const savedSearches = await this.prisma.savedSearch.findMany({ - where: { alertEnabled: true }, - include: { - user: { select: { id: true, email: true, fullName: true } }, - }, - }); - - if (savedSearches.length === 0) { - this.logger.log('No saved searches with alerts enabled', 'SavedSearchAlertCron'); - return; - } - let totalAlerts = 0; + let totalSearches = 0; + let cursor: Date | undefined; - for (const search of savedSearches) { - try { - const matchCount = await this.checkAndAlert(search); - totalAlerts += matchCount; - } catch (err) { - this.logger.warn( - `Failed to process alerts for saved search ${search.id}: ${err instanceof Error ? err.message : String(err)}`, - 'SavedSearchAlertCron', - ); + // Stream alert-enabled saved searches in bounded batches (keyset on createdAt). + // idx_savedsearch_alert_enabled covers WHERE alertEnabled = true ORDER BY createdAt. + do { + const batch = await this.prisma.savedSearch.findMany({ + where: { + alertEnabled: true, + ...(cursor ? { createdAt: { gt: cursor } } : {}), + }, + include: { + user: { select: { id: true, email: true, fullName: true } }, + }, + orderBy: { createdAt: 'asc' }, + take: ALERT_BATCH_SIZE, + }); + + if (batch.length === 0) break; + + totalSearches += batch.length; + + for (const search of batch) { + try { + const matchCount = await this.checkAndAlert(search); + totalAlerts += matchCount; + } catch (err) { + this.logger.warn( + `Failed to process alerts for saved search ${search.id}: ${err instanceof Error ? err.message : String(err)}`, + 'SavedSearchAlertCron', + ); + } } - } + + // Advance cursor to the last row's createdAt for the next page. + cursor = batch[batch.length - 1]!.createdAt; + + if (batch.length < ALERT_BATCH_SIZE) break; + } while (true); this.logger.log( - `Daily saved search alert processing completed: ${totalAlerts} alerts sent for ${savedSearches.length} searches`, + `Daily saved search alert processing completed: ${totalAlerts} alerts sent for ${totalSearches} searches`, 'SavedSearchAlertCron', ); } catch (err) { diff --git a/apps/api/src/modules/search/infrastructure/event-handlers/saved-search-alert.handler.ts b/apps/api/src/modules/search/infrastructure/event-handlers/saved-search-alert.handler.ts index 20d1e56..a8010f9 100644 --- a/apps/api/src/modules/search/infrastructure/event-handlers/saved-search-alert.handler.ts +++ b/apps/api/src/modules/search/infrastructure/event-handlers/saved-search-alert.handler.ts @@ -4,9 +4,16 @@ import { OnEvent } from '@nestjs/event-emitter'; import { SendNotificationCommand } from '@modules/notifications'; import { PrismaService, LoggerService } from '@modules/shared'; +/** Rows processed per cursor-page. Aligns with idx_savedsearch_alert_enabled batch size. */ +const ALERT_BATCH_SIZE = 500; + /** * When a new listing is approved, check all saved searches with alerts enabled * and notify users whose filters match the new listing. + * + * Memory footprint is bounded: rows are streamed in pages of {@link ALERT_BATCH_SIZE} + * via keyset pagination on `createdAt`, which the partial index + * `idx_savedsearch_alert_enabled` covers directly. */ @Injectable() export class SavedSearchAlertHandler { @@ -34,28 +41,44 @@ export class SavedSearchAlertHandler { return; } - // Find all saved searches with alerts enabled - const savedSearches = await this.prisma.savedSearch.findMany({ - where: { alertEnabled: true }, - include: { - user: { select: { id: true, email: true, fullName: true } }, - }, - }); - let matchCount = 0; + let cursor: Date | undefined; - for (const search of savedSearches) { - // Skip if search belongs to the listing owner - if (search.userId === listing.sellerId) { - continue; + // Stream alert-enabled saved searches in bounded batches (keyset on createdAt). + // idx_savedsearch_alert_enabled covers WHERE alertEnabled = true ORDER BY createdAt. + do { + const batch = await this.prisma.savedSearch.findMany({ + where: { + alertEnabled: true, + ...(cursor ? { createdAt: { gt: cursor } } : {}), + }, + include: { + user: { select: { id: true, email: true, fullName: true } }, + }, + orderBy: { createdAt: 'asc' }, + take: ALERT_BATCH_SIZE, + }); + + if (batch.length === 0) break; + + for (const search of batch) { + // Skip if search belongs to the listing owner + if (search.userId === listing.sellerId) { + continue; + } + + const filters = search.filters as Record; + if (this.matchesFilters(listing, listing.property, filters)) { + matchCount++; + await this.sendAlert(search, listing, listing.property); + } } - const filters = search.filters as Record; - if (this.matchesFilters(listing, listing.property, filters)) { - matchCount++; - await this.sendAlert(search, listing, listing.property); - } - } + // Advance cursor to the last row's createdAt for the next page. + cursor = batch[batch.length - 1]!.createdAt; + + if (batch.length < ALERT_BATCH_SIZE) break; + } while (true); if (matchCount > 0) { this.logger.log( diff --git a/apps/web/components/inquiries/__tests__/inquiry-detail-dialog.spec.tsx b/apps/web/components/inquiries/__tests__/inquiry-detail-dialog.spec.tsx index d36a40c..3e26e50 100644 --- a/apps/web/components/inquiries/__tests__/inquiry-detail-dialog.spec.tsx +++ b/apps/web/components/inquiries/__tests__/inquiry-detail-dialog.spec.tsx @@ -81,7 +81,8 @@ describe('InquiryDetailDialog', () => { render( , ); - expect(screen.getByText(/0912345678/)).toBeInTheDocument(); + // formatPhone formats VN numbers as "0xxx yyy zzz" — match with optional spaces + expect(screen.getByText(/0912[\s]?345[\s]?678/)).toBeInTheDocument(); }); it('renders inquiry message', () => { @@ -156,6 +157,7 @@ describe('InquiryDetailDialog', () => { render( , ); - expect(screen.getByText(/0987654321/)).toBeInTheDocument(); + // formatPhone formats VN numbers as "0xxx yyy zzz" — match with optional spaces + expect(screen.getByText(/0987[\s]?654[\s]?321/)).toBeInTheDocument(); }); }); diff --git a/apps/web/components/leads/__tests__/lead-detail-dialog.spec.tsx b/apps/web/components/leads/__tests__/lead-detail-dialog.spec.tsx index 80117b0..c95c796 100644 --- a/apps/web/components/leads/__tests__/lead-detail-dialog.spec.tsx +++ b/apps/web/components/leads/__tests__/lead-detail-dialog.spec.tsx @@ -69,7 +69,8 @@ describe('LeadDetailDialog', () => { it('renders phone number', () => { render(); - expect(screen.getByText(/0987654321/)).toBeInTheDocument(); + // formatPhone formats VN numbers as "0xxx yyy zzz" — match with optional spaces + expect(screen.getByText(/0987[\s]?654[\s]?321/)).toBeInTheDocument(); }); it('renders email when present', () => {