diff --git a/apps/api/src/modules/notifications/application/__tests__/residential-events.listener.spec.ts b/apps/api/src/modules/notifications/application/__tests__/residential-events.listener.spec.ts new file mode 100644 index 0000000..503291f --- /dev/null +++ b/apps/api/src/modules/notifications/application/__tests__/residential-events.listener.spec.ts @@ -0,0 +1,223 @@ +import { ListingApprovedEvent } from '@modules/admin/domain/events/listing-approved.event'; +import { InquiryReadEvent } from '@modules/inquiries/domain/events/inquiry-read.event'; +import { ListingPriceChangedEvent } from '@modules/listings/domain/events/listing-price-changed.event'; +import { + ResidentialInquiryReplyListener, + ResidentialNewListingInProjectListener, + ResidentialPriceDropListener, +} from '../listeners/residential-events.listener'; + +function createMockPrisma() { + return { + listing: { findUnique: vi.fn() }, + savedSearch: { findMany: vi.fn().mockResolvedValue([]) }, + }; +} + +function createMockGateway() { + return { + emitResidentialEvent: vi.fn(), + }; +} + +function createMockLogger() { + return { log: vi.fn(), warn: vi.fn(), error: vi.fn() }; +} + +describe('ResidentialPriceDropListener', () => { + let listener: ResidentialPriceDropListener; + let prisma: ReturnType; + let gateway: ReturnType; + let logger: ReturnType; + + const listing = { + id: 'listing-1', + sellerId: 'seller-1', + transactionType: 'SALE', + priceVND: 2_000_000_000n, + property: { + title: 'Căn hộ 2PN Quận 7', + propertyType: 'APARTMENT', + areaM2: 70, + bedrooms: 2, + district: 'Quận 7', + city: 'Hồ Chí Minh', + projectDevelopmentId: null, + }, + }; + + beforeEach(() => { + prisma = createMockPrisma(); + gateway = createMockGateway(); + logger = createMockLogger(); + listener = new ResidentialPriceDropListener( + prisma as any, + gateway as any, + logger as any, + ); + }); + + it('emits residential:price-drop to each user with a matching saved search', async () => { + prisma.listing.findUnique.mockResolvedValue(listing); + prisma.savedSearch.findMany.mockResolvedValue([ + { + id: 'ss-1', + userId: 'user-1', + name: 'Quận 7 căn hộ', + filters: { city: 'Hồ Chí Minh', district: 'Quận 7', priceMax: 3_000_000_000 }, + }, + { + id: 'ss-2', + userId: 'user-2', + name: 'Quận 1', + filters: { district: 'Quận 1' }, + }, + ]); + + const event = new ListingPriceChangedEvent('listing-1', 2_500_000_000n, 2_000_000_000n); + await listener.handle(event); + + expect(gateway.emitResidentialEvent).toHaveBeenCalledTimes(1); + expect(gateway.emitResidentialEvent).toHaveBeenCalledWith( + 'user-1', + 'residential:price-drop', + expect.objectContaining({ + listingId: 'listing-1', + savedSearchId: 'ss-1', + oldPrice: '2500000000', + newPrice: '2000000000', + }), + ); + }); + + it('does not emit when the new price is not lower than the old price', async () => { + const event = new ListingPriceChangedEvent('listing-1', 1_000_000_000n, 1_200_000_000n); + await listener.handle(event); + + expect(prisma.listing.findUnique).not.toHaveBeenCalled(); + expect(gateway.emitResidentialEvent).not.toHaveBeenCalled(); + }); + + it('skips saved searches owned by the listing seller', async () => { + prisma.listing.findUnique.mockResolvedValue(listing); + prisma.savedSearch.findMany.mockResolvedValue([ + { id: 'ss-self', userId: 'seller-1', name: 'mine', filters: {} }, + ]); + + const event = new ListingPriceChangedEvent('listing-1', 2_500_000_000n, 2_000_000_000n); + await listener.handle(event); + + expect(gateway.emitResidentialEvent).not.toHaveBeenCalled(); + }); + + it('swallows infrastructure errors without throwing', async () => { + prisma.listing.findUnique.mockRejectedValue(new Error('db down')); + + const event = new ListingPriceChangedEvent('listing-1', 2_000_000_000n, 1_000_000_000n); + await expect(listener.handle(event)).resolves.not.toThrow(); + expect(logger.warn).toHaveBeenCalled(); + }); +}); + +describe('ResidentialNewListingInProjectListener', () => { + let listener: ResidentialNewListingInProjectListener; + let prisma: ReturnType; + let gateway: ReturnType; + let logger: ReturnType; + + beforeEach(() => { + prisma = createMockPrisma(); + gateway = createMockGateway(); + logger = createMockLogger(); + listener = new ResidentialNewListingInProjectListener( + prisma as any, + gateway as any, + logger as any, + ); + }); + + it('emits residential:new-listing-in-project to users tracking the project', async () => { + prisma.listing.findUnique.mockResolvedValue({ + id: 'listing-9', + sellerId: 'seller-9', + priceVND: 3_500_000_000n, + property: { + title: 'Vinhomes Grand Park S5.02', + district: 'Quận 9', + city: 'Hồ Chí Minh', + projectDevelopmentId: 'project-vgp', + }, + }); + prisma.savedSearch.findMany.mockResolvedValue([ + { id: 'ss-tracker', userId: 'user-10', name: 'VGP', filters: { projectId: 'project-vgp' } }, + { id: 'ss-other', userId: 'user-11', name: 'khác', filters: { projectId: 'project-other' } }, + { id: 'ss-no-project', userId: 'user-12', name: 'no-project', filters: {} }, + ]); + + const event = new ListingApprovedEvent('listing-9', 'admin-1'); + await listener.handle(event); + + expect(gateway.emitResidentialEvent).toHaveBeenCalledTimes(1); + expect(gateway.emitResidentialEvent).toHaveBeenCalledWith( + 'user-10', + 'residential:new-listing-in-project', + expect.objectContaining({ + listingId: 'listing-9', + projectId: 'project-vgp', + price: '3500000000', + }), + ); + }); + + it('does not emit when the listing has no linked project', async () => { + prisma.listing.findUnique.mockResolvedValue({ + id: 'listing-9', + sellerId: 'seller-9', + priceVND: 1n, + property: { title: 't', district: 'd', city: 'c', projectDevelopmentId: null }, + }); + + const event = new ListingApprovedEvent('listing-9', 'admin-1'); + await listener.handle(event); + + expect(prisma.savedSearch.findMany).not.toHaveBeenCalled(); + expect(gateway.emitResidentialEvent).not.toHaveBeenCalled(); + }); +}); + +describe('ResidentialInquiryReplyListener', () => { + let listener: ResidentialInquiryReplyListener; + let gateway: ReturnType; + let logger: ReturnType; + + beforeEach(() => { + gateway = createMockGateway(); + logger = createMockLogger(); + listener = new ResidentialInquiryReplyListener(gateway as any, logger as any); + }); + + it('emits residential:inquiry-reply to the inquiry author', async () => { + const event = new InquiryReadEvent('inq-1', 'listing-1', 'user-author'); + + await listener.handle(event); + + expect(gateway.emitResidentialEvent).toHaveBeenCalledWith( + 'user-author', + 'residential:inquiry-reply', + expect.objectContaining({ + inquiryId: 'inq-1', + listingId: 'listing-1', + }), + ); + }); + + it('swallows emission errors without throwing', async () => { + gateway.emitResidentialEvent.mockImplementation(() => { + throw new Error('server error'); + }); + const event = new InquiryReadEvent('inq-2', 'listing-2', 'user-2'); + + await expect(listener.handle(event)).resolves.not.toThrow(); + expect(logger.warn).toHaveBeenCalled(); + }); +}); diff --git a/apps/api/src/modules/notifications/application/listeners/residential-events.listener.ts b/apps/api/src/modules/notifications/application/listeners/residential-events.listener.ts new file mode 100644 index 0000000..6e5d425 --- /dev/null +++ b/apps/api/src/modules/notifications/application/listeners/residential-events.listener.ts @@ -0,0 +1,242 @@ +import { EventsHandler, type IEventHandler } from '@nestjs/cqrs'; +import { ListingApprovedEvent } from '@modules/admin'; +import { InquiryReadEvent } from '@modules/inquiries'; +import { ListingPriceChangedEvent } from '@modules/listings'; +import { type LoggerService, type PrismaService } from '@modules/shared'; +import { type NotificationsGateway } from '../../presentation/gateways/notifications.gateway'; + +const CONTEXT = 'ResidentialEventsListener'; + +/** + * Shape of the `filters` JSON column on `SavedSearch`. Matches fields + * consumed by the saved-search alert matcher. Anything else is ignored. + */ +interface SavedSearchFilters { + transactionType?: string; + propertyType?: string; + projectId?: string; + district?: string; + city?: string; + priceMin?: number; + priceMax?: number; + areaMin?: number; + areaMax?: number; + bedrooms?: number; +} + +/** + * Fans residential domain events out as Socket.IO events on the + * `/notifications` namespace so subscribed users get live updates + * without waiting for the email/push pipeline. + * + * Three WS events are emitted: + * • `residential:price-drop` — listing price lowered and matches an + * alert-enabled saved search. + * • `residential:new-listing-in-project` — approved listing lives in + * a project that the user tracks via `filters.projectId`. + * • `residential:inquiry-reply` — the listing owner/agent marked the + * user's inquiry as read, signalling that a reply is incoming. + * + * Redis pub/sub fan-out is handled by {@link RedisIoAdapter}, so the + * broadcast reaches the user's socket regardless of which API pod + * holds the connection. + */ +@EventsHandler(ListingPriceChangedEvent) +export class ResidentialPriceDropListener + implements IEventHandler +{ + constructor( + private readonly prisma: PrismaService, + private readonly gateway: NotificationsGateway, + private readonly logger: LoggerService, + ) {} + + async handle(event: ListingPriceChangedEvent): Promise { + if (event.newPrice >= event.oldPrice) { + return; + } + + try { + const listing = await this.prisma.listing.findUnique({ + where: { id: event.aggregateId }, + include: { property: true }, + }); + if (!listing || !listing.property) return; + + const savedSearches = await this.prisma.savedSearch.findMany({ + where: { alertEnabled: true }, + select: { id: true, userId: true, name: true, filters: true }, + }); + + let matchCount = 0; + for (const search of savedSearches) { + if (search.userId === listing.sellerId) continue; + + const filters = normalizeFilters(search.filters); + if (!matchesFilters(listing, listing.property, filters)) continue; + + this.gateway.emitResidentialEvent(search.userId, 'residential:price-drop', { + listingId: listing.id, + savedSearchId: search.id, + savedSearchName: search.name, + title: listing.property.title, + oldPrice: event.oldPrice.toString(), + newPrice: event.newPrice.toString(), + district: listing.property.district, + city: listing.property.city, + occurredAt: event.occurredAt.toISOString(), + }); + matchCount++; + } + + if (matchCount > 0) { + this.logger.log( + `Emitted residential:price-drop to ${matchCount} users for listing ${listing.id}`, + CONTEXT, + ); + } + } catch (err) { + this.logger.warn( + `Price-drop WS emission failed for listing ${event.aggregateId}: ${ + err instanceof Error ? err.message : String(err) + }`, + CONTEXT, + ); + } + } +} + +@EventsHandler(ListingApprovedEvent) +export class ResidentialNewListingInProjectListener + implements IEventHandler +{ + constructor( + private readonly prisma: PrismaService, + private readonly gateway: NotificationsGateway, + private readonly logger: LoggerService, + ) {} + + async handle(event: ListingApprovedEvent): Promise { + try { + const listing = await this.prisma.listing.findUnique({ + where: { id: event.aggregateId }, + include: { property: true }, + }); + if (!listing || !listing.property?.projectDevelopmentId) return; + + const projectId = listing.property.projectDevelopmentId; + + const savedSearches = await this.prisma.savedSearch.findMany({ + where: { alertEnabled: true }, + select: { id: true, userId: true, name: true, filters: true }, + }); + + let matchCount = 0; + for (const search of savedSearches) { + if (search.userId === listing.sellerId) continue; + + const filters = normalizeFilters(search.filters); + if (filters.projectId !== projectId) continue; + + this.gateway.emitResidentialEvent( + search.userId, + 'residential:new-listing-in-project', + { + listingId: listing.id, + projectId, + savedSearchId: search.id, + savedSearchName: search.name, + title: listing.property.title, + price: listing.priceVND.toString(), + district: listing.property.district, + city: listing.property.city, + occurredAt: event.occurredAt.toISOString(), + }, + ); + matchCount++; + } + + if (matchCount > 0) { + this.logger.log( + `Emitted residential:new-listing-in-project to ${matchCount} users for project ${projectId}`, + CONTEXT, + ); + } + } catch (err) { + this.logger.warn( + `New-listing-in-project WS emission failed for listing ${event.aggregateId}: ${ + err instanceof Error ? err.message : String(err) + }`, + CONTEXT, + ); + } + } +} + +@EventsHandler(InquiryReadEvent) +export class ResidentialInquiryReplyListener + implements IEventHandler +{ + constructor( + private readonly gateway: NotificationsGateway, + private readonly logger: LoggerService, + ) {} + + async handle(event: InquiryReadEvent): Promise { + try { + this.gateway.emitResidentialEvent(event.userId, 'residential:inquiry-reply', { + inquiryId: event.aggregateId, + listingId: event.listingId, + occurredAt: event.occurredAt.toISOString(), + }); + } catch (err) { + this.logger.warn( + `Inquiry-reply WS emission failed for inquiry ${event.aggregateId}: ${ + err instanceof Error ? err.message : String(err) + }`, + CONTEXT, + ); + } + } +} + +/* ──────────────────────────────────────────── + * Private helpers + * ──────────────────────────────────────────── */ + +function normalizeFilters(raw: unknown): SavedSearchFilters { + if (!raw || typeof raw !== 'object') return {}; + return raw as SavedSearchFilters; +} + +function matchesFilters( + listing: { transactionType: string; priceVND: bigint; sellerId: string }, + property: { + propertyType: string; + areaM2: number; + bedrooms: number | null; + district: string; + city: string; + }, + filters: SavedSearchFilters, +): boolean { + if (filters.transactionType && filters.transactionType !== listing.transactionType) return false; + if (filters.propertyType && filters.propertyType !== property.propertyType) return false; + if (filters.district && filters.district !== property.district) return false; + if (filters.city && filters.city !== property.city) return false; + + const price = Number(listing.priceVND); + if (filters.priceMin !== undefined && price < Number(filters.priceMin)) return false; + if (filters.priceMax !== undefined && price > Number(filters.priceMax)) return false; + if (filters.areaMin !== undefined && property.areaM2 < Number(filters.areaMin)) return false; + if (filters.areaMax !== undefined && property.areaM2 > Number(filters.areaMax)) return false; + if ( + filters.bedrooms !== undefined && + property.bedrooms !== null && + property.bedrooms < Number(filters.bedrooms) + ) { + return false; + } + + return true; +} diff --git a/apps/api/src/modules/notifications/notifications.module.ts b/apps/api/src/modules/notifications/notifications.module.ts index 2303c2a..90407f0 100644 --- a/apps/api/src/modules/notifications/notifications.module.ts +++ b/apps/api/src/modules/notifications/notifications.module.ts @@ -14,6 +14,11 @@ import { PaymentFailedListener } from './application/listeners/payment-failed.li import { PaymentRefundedListener } from './application/listeners/payment-refunded.listener'; import { PhoneChangeRequestedListener } from './application/listeners/phone-change-requested.listener'; import { QuotaExceededListener } from './application/listeners/quota-exceeded.listener'; +import { + ResidentialInquiryReplyListener, + ResidentialNewListingInProjectListener, + ResidentialPriceDropListener, +} from './application/listeners/residential-events.listener'; import { SubscriptionExpiredListener } from './application/listeners/subscription-expired.listener'; import { SubscriptionExpiringListener } from './application/listeners/subscription-expiring.listener'; import { SubscriptionRenewedListener } from './application/listeners/subscription-renewed.listener'; @@ -51,6 +56,9 @@ const EventListeners = [ UserKycUpdatedListener, EmailChangeRequestedListener, PhoneChangeRequestedListener, + ResidentialPriceDropListener, + ResidentialNewListingInProjectListener, + ResidentialInquiryReplyListener, ]; @Module({ diff --git a/apps/api/src/modules/notifications/presentation/__tests__/notifications.gateway.spec.ts b/apps/api/src/modules/notifications/presentation/__tests__/notifications.gateway.spec.ts index 772e5a1..5d0c018 100644 --- a/apps/api/src/modules/notifications/presentation/__tests__/notifications.gateway.spec.ts +++ b/apps/api/src/modules/notifications/presentation/__tests__/notifications.gateway.spec.ts @@ -332,4 +332,25 @@ describe('NotificationsGateway', () => { expect(mockRedisService.del).not.toHaveBeenCalled(); }); }); + + describe('emitResidentialEvent', () => { + it('emits the residential event to the user room and records a ws metric', () => { + const roomEmit = vi.fn(); + mockServer.to.mockReturnValue({ emit: roomEmit }); + + gateway.emitResidentialEvent('user-42', 'residential:price-drop', { + listingId: 'listing-1', + }); + + expect(mockServer.to).toHaveBeenCalledWith('user:user-42'); + expect(roomEmit).toHaveBeenCalledWith('residential:price-drop', { + listingId: 'listing-1', + }); + expect(mockMetrics.recordWsMessage).toHaveBeenCalledWith( + '/notifications', + 'residential:price-drop', + 'out', + ); + }); + }); }); diff --git a/apps/api/src/modules/notifications/presentation/gateways/notifications.gateway.ts b/apps/api/src/modules/notifications/presentation/gateways/notifications.gateway.ts index d384c3b..54278bc 100644 --- a/apps/api/src/modules/notifications/presentation/gateways/notifications.gateway.ts +++ b/apps/api/src/modules/notifications/presentation/gateways/notifications.gateway.ts @@ -212,6 +212,23 @@ export class NotificationsGateway } } + /** + * Emit a residential WS event (price drop, new listing in subscribed + * project, inquiry reply) to a single user's private room. + * + * The Redis pub/sub adapter fans the broadcast out to every API + * instance, so the target user receives the payload regardless of + * which node their socket is attached to. + */ + emitResidentialEvent( + userId: string, + event: 'residential:price-drop' | 'residential:new-listing-in-project' | 'residential:inquiry-reply', + payload: Record, + ): void { + this.server.to(`user:${userId}`).emit(event, payload); + this.metrics.recordWsMessage(NAMESPACE_LABEL, event, 'out'); + } + /* ──────────────────────────────────────────── * Private helpers * ──────────────────────────────────────────── */