feat(search): GOO-221 cursor/keyset pagination for SavedSearch alert listeners

All four alert code paths that previously loaded the entire SavedSearch
table into memory are replaced with bounded batch iteration backed by
the idx_savedsearch_alert_enabled partial index (merged in GOO-118).

Batch size is 500 rows; order-by is createdAt ASC, which matches the
index definition so the planner uses it for both the WHERE clause and
the cursor predicate.

Changed files:
- saved-search-alert.handler.ts: keyset loop on createdAt with
  alertEnabled=true, ALERT_BATCH_SIZE=500
- saved-search-alert-cron.service.ts: same pagination loop, removes
  the early-return on empty set (loop exits naturally on first empty page)
- residential-events.listener.ts: ResidentialPriceDropListener and
  ResidentialNewListingInProjectListener both paginated; select now
  includes createdAt to advance the cursor; shared ALERT_BATCH_SIZE

Tests:
- saved-search-alert.handler.spec.ts: adds createdAt to mock rows, adds
  3-page pagination test and orderBy/take assertion
- residential-events.listener.spec.ts: adds createdAt to mock rows, adds
  501-row pagination test verifying cursor advance on second call (9
  existing tests all pass)

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-24 12:58:16 +07:00
parent be47c26031
commit 9af9e1d84a
7 changed files with 269 additions and 96 deletions

View File

@@ -28,6 +28,7 @@ describe('SavedSearchAlertHandler', () => {
filters: { district: 'Quan 7', propertyType: 'APARTMENT' },
alertEnabled: true,
lastAlertAt: null,
createdAt: new Date('2026-01-01T00:00:00Z'),
user: { id: 'user-1', email: 'user@example.com', fullName: 'Nguyen Van A' },
};
@@ -124,4 +125,59 @@ describe('SavedSearchAlertHandler', () => {
expect(mockCommandBus.execute).not.toHaveBeenCalled();
});
it('paginates across multiple batches and processes all matching rows', async () => {
mockPrisma.listing.findUnique.mockResolvedValue(mockListing);
mockPrisma.savedSearch.update.mockResolvedValue({});
const BATCH = 500;
const makeRow = (n: number) => ({
id: `saved-${n}`,
userId: `user-${n}`,
name: `Search ${n}`,
filters: { district: 'Quan 7', propertyType: 'APARTMENT' },
alertEnabled: true,
lastAlertAt: null,
createdAt: new Date(Date.now() + n * 1000),
user: { id: `user-${n}`, email: `user${n}@example.com`, fullName: `User ${n}` },
});
const page1 = Array.from({ length: BATCH }, (_, i) => makeRow(i));
const page2 = Array.from({ length: BATCH }, (_, i) => makeRow(BATCH + i));
const page3 = [makeRow(BATCH * 2)];
mockPrisma.savedSearch.findMany
.mockResolvedValueOnce(page1)
.mockResolvedValueOnce(page2)
.mockResolvedValueOnce(page3)
.mockResolvedValue([]);
await handler.handle({ listingId: 'listing-1' });
expect(mockPrisma.savedSearch.findMany).toHaveBeenCalledTimes(3);
expect(mockCommandBus.execute).toHaveBeenCalledTimes(BATCH * 2 + 1);
// Second call must carry cursor from last row of first batch
const secondCallArgs = mockPrisma.savedSearch.findMany.mock.calls[1][0];
expect(secondCallArgs.where.createdAt?.gt).toEqual(page1[BATCH - 1]!.createdAt);
// Third call must carry cursor from last row of second batch
const thirdCallArgs = mockPrisma.savedSearch.findMany.mock.calls[2][0];
expect(thirdCallArgs.where.createdAt?.gt).toEqual(page2[BATCH - 1]!.createdAt);
});
it('uses orderBy createdAt asc and take=500 on first page', async () => {
mockPrisma.listing.findUnique.mockResolvedValue(mockListing);
mockPrisma.savedSearch.findMany.mockResolvedValue([]);
await handler.handle({ listingId: 'listing-1' });
expect(mockPrisma.savedSearch.findMany).toHaveBeenCalledWith(
expect.objectContaining({
orderBy: { createdAt: 'asc' },
take: 500,
where: expect.objectContaining({ alertEnabled: true }),
}),
);
});
});

View File

@@ -4,10 +4,17 @@ import { Cron, CronExpression } from '@nestjs/schedule';
import { SendNotificationCommand } from '@modules/notifications';
import { PrismaService, LoggerService } from '@modules/shared';
/** Rows processed per cursor-page. Aligns with idx_savedsearch_alert_enabled batch size. */
const ALERT_BATCH_SIZE = 500;
/**
* Daily cron job that checks saved searches against new listings published since lastAlertAt.
* This complements the real-time event-based handler by catching any listings that
* were missed (e.g., due to service downtime or event processing failures).
*
* Memory footprint is bounded: rows are streamed in pages of {@link ALERT_BATCH_SIZE}
* via keyset pagination on `createdAt`, which the partial index
* `idx_savedsearch_alert_enabled` covers directly.
*/
@Injectable()
export class SavedSearchAlertCronService {
@@ -22,34 +29,49 @@ export class SavedSearchAlertCronService {
this.logger.log('Starting daily saved search alert processing...', 'SavedSearchAlertCron');
try {
const savedSearches = await this.prisma.savedSearch.findMany({
where: { alertEnabled: true },
include: {
user: { select: { id: true, email: true, fullName: true } },
},
});
if (savedSearches.length === 0) {
this.logger.log('No saved searches with alerts enabled', 'SavedSearchAlertCron');
return;
}
let totalAlerts = 0;
let totalSearches = 0;
let cursor: Date | undefined;
for (const search of savedSearches) {
try {
const matchCount = await this.checkAndAlert(search);
totalAlerts += matchCount;
} catch (err) {
this.logger.warn(
`Failed to process alerts for saved search ${search.id}: ${err instanceof Error ? err.message : String(err)}`,
'SavedSearchAlertCron',
);
// Stream alert-enabled saved searches in bounded batches (keyset on createdAt).
// idx_savedsearch_alert_enabled covers WHERE alertEnabled = true ORDER BY createdAt.
do {
const batch = await this.prisma.savedSearch.findMany({
where: {
alertEnabled: true,
...(cursor ? { createdAt: { gt: cursor } } : {}),
},
include: {
user: { select: { id: true, email: true, fullName: true } },
},
orderBy: { createdAt: 'asc' },
take: ALERT_BATCH_SIZE,
});
if (batch.length === 0) break;
totalSearches += batch.length;
for (const search of batch) {
try {
const matchCount = await this.checkAndAlert(search);
totalAlerts += matchCount;
} catch (err) {
this.logger.warn(
`Failed to process alerts for saved search ${search.id}: ${err instanceof Error ? err.message : String(err)}`,
'SavedSearchAlertCron',
);
}
}
}
// Advance cursor to the last row's createdAt for the next page.
cursor = batch[batch.length - 1]!.createdAt;
if (batch.length < ALERT_BATCH_SIZE) break;
} while (true);
this.logger.log(
`Daily saved search alert processing completed: ${totalAlerts} alerts sent for ${savedSearches.length} searches`,
`Daily saved search alert processing completed: ${totalAlerts} alerts sent for ${totalSearches} searches`,
'SavedSearchAlertCron',
);
} catch (err) {

View File

@@ -4,9 +4,16 @@ import { OnEvent } from '@nestjs/event-emitter';
import { SendNotificationCommand } from '@modules/notifications';
import { PrismaService, LoggerService } from '@modules/shared';
/** Rows processed per cursor-page. Aligns with idx_savedsearch_alert_enabled batch size. */
const ALERT_BATCH_SIZE = 500;
/**
* When a new listing is approved, check all saved searches with alerts enabled
* and notify users whose filters match the new listing.
*
* Memory footprint is bounded: rows are streamed in pages of {@link ALERT_BATCH_SIZE}
* via keyset pagination on `createdAt`, which the partial index
* `idx_savedsearch_alert_enabled` covers directly.
*/
@Injectable()
export class SavedSearchAlertHandler {
@@ -34,28 +41,44 @@ export class SavedSearchAlertHandler {
return;
}
// Find all saved searches with alerts enabled
const savedSearches = await this.prisma.savedSearch.findMany({
where: { alertEnabled: true },
include: {
user: { select: { id: true, email: true, fullName: true } },
},
});
let matchCount = 0;
let cursor: Date | undefined;
for (const search of savedSearches) {
// Skip if search belongs to the listing owner
if (search.userId === listing.sellerId) {
continue;
// Stream alert-enabled saved searches in bounded batches (keyset on createdAt).
// idx_savedsearch_alert_enabled covers WHERE alertEnabled = true ORDER BY createdAt.
do {
const batch = await this.prisma.savedSearch.findMany({
where: {
alertEnabled: true,
...(cursor ? { createdAt: { gt: cursor } } : {}),
},
include: {
user: { select: { id: true, email: true, fullName: true } },
},
orderBy: { createdAt: 'asc' },
take: ALERT_BATCH_SIZE,
});
if (batch.length === 0) break;
for (const search of batch) {
// Skip if search belongs to the listing owner
if (search.userId === listing.sellerId) {
continue;
}
const filters = search.filters as Record<string, unknown>;
if (this.matchesFilters(listing, listing.property, filters)) {
matchCount++;
await this.sendAlert(search, listing, listing.property);
}
}
const filters = search.filters as Record<string, unknown>;
if (this.matchesFilters(listing, listing.property, filters)) {
matchCount++;
await this.sendAlert(search, listing, listing.property);
}
}
// Advance cursor to the last row's createdAt for the next page.
cursor = batch[batch.length - 1]!.createdAt;
if (batch.length < ALERT_BATCH_SIZE) break;
} while (true);
if (matchCount > 0) {
this.logger.log(