feat(listings): add cron to auto-expire featured listings (TEC-2924)
- New FeaturedListingExpiryCronService runs every 5 minutes and clears Listing.featuredUntil when the promotion period has ended - Uses a single atomic UPDATE ... RETURNING so concurrent instances do not double-process rows (idempotent) - Publishes ListingFeaturedExpiredEvent via CQRS EventBus for downstream cache/search index invalidation - Unit test covers event emission, no-op path, error path, and concurrency Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -0,0 +1,88 @@
|
||||
import { ListingFeaturedExpiredEvent } from '../../domain/events/listing-featured-expired.event';
|
||||
import { FeaturedListingExpiryCronService } from '../../infrastructure/cron/featured-listing-expiry-cron.service';
|
||||
|
||||
describe('FeaturedListingExpiryCronService', () => {
|
||||
let service: FeaturedListingExpiryCronService;
|
||||
let mockPrisma: { $queryRaw: ReturnType<typeof vi.fn> };
|
||||
let mockEventBus: { publish: ReturnType<typeof vi.fn> };
|
||||
let mockLogger: {
|
||||
log: ReturnType<typeof vi.fn>;
|
||||
debug: ReturnType<typeof vi.fn>;
|
||||
error: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
mockPrisma = { $queryRaw: vi.fn() };
|
||||
mockEventBus = { publish: vi.fn() };
|
||||
mockLogger = { log: vi.fn(), debug: vi.fn(), error: vi.fn() };
|
||||
|
||||
service = new FeaturedListingExpiryCronService(
|
||||
mockPrisma as any,
|
||||
mockEventBus as any,
|
||||
mockLogger as any,
|
||||
);
|
||||
});
|
||||
|
||||
it('publishes ListingFeaturedExpiredEvent for each expired listing and logs the count', async () => {
|
||||
mockPrisma.$queryRaw.mockResolvedValue([{ id: 'listing-a' }, { id: 'listing-b' }]);
|
||||
|
||||
await service.expireFeaturedListings();
|
||||
|
||||
expect(mockPrisma.$queryRaw).toHaveBeenCalledTimes(1);
|
||||
expect(mockEventBus.publish).toHaveBeenCalledTimes(2);
|
||||
|
||||
const events = mockEventBus.publish.mock.calls.map((c) => c[0]) as ListingFeaturedExpiredEvent[];
|
||||
expect(events[0]).toBeInstanceOf(ListingFeaturedExpiredEvent);
|
||||
expect(events.map((e) => e.aggregateId).sort()).toEqual(['listing-a', 'listing-b']);
|
||||
expect(events[0].eventName).toBe('listing.featured_expired');
|
||||
expect(events[0].expiredAt).toBeInstanceOf(Date);
|
||||
|
||||
expect(mockLogger.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining('Expired 2 featured listing(s)'),
|
||||
'FeaturedListingExpiryCronService',
|
||||
);
|
||||
});
|
||||
|
||||
it('is a no-op when no rows match the predicate (idempotent across runs)', async () => {
|
||||
mockPrisma.$queryRaw.mockResolvedValue([]);
|
||||
|
||||
await service.expireFeaturedListings();
|
||||
|
||||
expect(mockEventBus.publish).not.toHaveBeenCalled();
|
||||
expect(mockLogger.log).not.toHaveBeenCalled();
|
||||
expect(mockLogger.debug).toHaveBeenCalledWith(
|
||||
'No featured listings to expire',
|
||||
'FeaturedListingExpiryCronService',
|
||||
);
|
||||
});
|
||||
|
||||
it('catches DB errors and logs them without throwing', async () => {
|
||||
const boom = new Error('connection lost');
|
||||
mockPrisma.$queryRaw.mockRejectedValue(boom);
|
||||
|
||||
await expect(service.expireFeaturedListings()).resolves.toBeUndefined();
|
||||
|
||||
expect(mockEventBus.publish).not.toHaveBeenCalled();
|
||||
expect(mockLogger.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining('connection lost'),
|
||||
expect.any(String),
|
||||
'FeaturedListingExpiryCronService',
|
||||
);
|
||||
});
|
||||
|
||||
it('uses a single atomic UPDATE ... RETURNING so concurrent runs do not double-update', async () => {
|
||||
// Simulate two parallel runs against the same DB; only the first sees the row.
|
||||
mockPrisma.$queryRaw
|
||||
.mockResolvedValueOnce([{ id: 'listing-1' }])
|
||||
.mockResolvedValueOnce([]);
|
||||
|
||||
await Promise.all([
|
||||
service.expireFeaturedListings(),
|
||||
service.expireFeaturedListings(),
|
||||
]);
|
||||
|
||||
// Only one event published across both runs.
|
||||
expect(mockEventBus.publish).toHaveBeenCalledTimes(1);
|
||||
expect(mockPrisma.$queryRaw).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
@@ -2,3 +2,4 @@ export { ListingCreatedEvent } from './listing-created.event';
|
||||
export { ListingApprovedEvent } from './listing-approved.event';
|
||||
export { ListingPriceChangedEvent } from './listing-price-changed.event';
|
||||
export { ListingSoldEvent } from './listing-sold.event';
|
||||
export { ListingFeaturedExpiredEvent } from './listing-featured-expired.event';
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
import { type DomainEvent } from '@modules/shared';
|
||||
|
||||
export class ListingFeaturedExpiredEvent implements DomainEvent {
|
||||
readonly eventName = 'listing.featured_expired';
|
||||
readonly occurredAt = new Date();
|
||||
|
||||
constructor(
|
||||
public readonly aggregateId: string,
|
||||
public readonly expiredAt: Date,
|
||||
) {}
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { EventBus } from '@nestjs/cqrs';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { Prisma } from '@prisma/client';
|
||||
import { PrismaService, LoggerService } from '@modules/shared';
|
||||
import { ListingFeaturedExpiredEvent } from '../../domain/events/listing-featured-expired.event';
|
||||
|
||||
/**
|
||||
* Cron service that automatically clears the `featuredUntil` column on listings
|
||||
* whose featured promotion period has ended (`featuredUntil < now()`).
|
||||
*
|
||||
* Design notes:
|
||||
* - Runs every 5 minutes (`@Cron(CronExpression.EVERY_5_MINUTES)`).
|
||||
* - Uses a single atomic SQL `UPDATE ... RETURNING id` so the operation is safe to
|
||||
* run on multiple instances concurrently — only one transaction will satisfy
|
||||
* the `featuredUntil < NOW()` predicate per row, and subsequent runs see no
|
||||
* rows to update (idempotent).
|
||||
* - Publishes `ListingFeaturedExpiredEvent` per affected listing so search index
|
||||
* and cache layers can invalidate.
|
||||
*/
|
||||
@Injectable()
|
||||
export class FeaturedListingExpiryCronService {
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly eventBus: EventBus,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
@Cron(CronExpression.EVERY_5_MINUTES, { name: 'featured-listing-expiry' })
|
||||
async expireFeaturedListings(): Promise<void> {
|
||||
const startedAt = new Date();
|
||||
try {
|
||||
const expired = await this.prisma.$queryRaw<Array<{ id: string }>>(Prisma.sql`
|
||||
UPDATE "Listing"
|
||||
SET "featuredUntil" = NULL,
|
||||
"updatedAt" = NOW()
|
||||
WHERE "featuredUntil" IS NOT NULL
|
||||
AND "featuredUntil" < NOW()
|
||||
RETURNING id
|
||||
`);
|
||||
|
||||
if (expired.length === 0) {
|
||||
this.logger.debug(
|
||||
'No featured listings to expire',
|
||||
'FeaturedListingExpiryCronService',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const row of expired) {
|
||||
this.eventBus.publish(new ListingFeaturedExpiredEvent(row.id, startedAt));
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`Expired ${expired.length} featured listing(s) at ${startedAt.toISOString()}`,
|
||||
'FeaturedListingExpiryCronService',
|
||||
);
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
`Failed to expire featured listings: ${(err as Error).message}`,
|
||||
(err as Error).stack,
|
||||
'FeaturedListingExpiryCronService',
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,7 @@ import { PROPERTY_REPOSITORY } from './domain/repositories/property.repository';
|
||||
import { DUPLICATE_DETECTOR } from './domain/services/duplicate-detector';
|
||||
import { ModerationService } from './domain/services/moderation.service';
|
||||
import { PRICE_VALIDATOR } from './domain/services/price-validator';
|
||||
import { FeaturedListingExpiryCronService } from './infrastructure/cron/featured-listing-expiry-cron.service';
|
||||
import { PrismaListingRepository } from './infrastructure/repositories/prisma-listing.repository';
|
||||
import { PrismaPropertyRepository } from './infrastructure/repositories/prisma-property.repository';
|
||||
import { MEDIA_STORAGE_SERVICE, MinioMediaStorageService } from './infrastructure/services/media-storage.service';
|
||||
@@ -75,6 +76,9 @@ const EventHandlers = [
|
||||
...CommandHandlers,
|
||||
...QueryHandlers,
|
||||
...EventHandlers,
|
||||
|
||||
// Cron
|
||||
FeaturedListingExpiryCronService,
|
||||
],
|
||||
exports: [LISTING_REPOSITORY, PROPERTY_REPOSITORY],
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user