feat(notifications): R2.8 residential WS events (TEC-2759)
- Add emitResidentialEvent helper on NotificationsGateway that fans residential:price-drop, residential:new-listing-in-project, and residential:inquiry-reply to the user's /notifications room. - Wire three CQRS @EventsHandler listeners on ListingPriceChangedEvent (only when newPrice < oldPrice, match saved searches), ListingApprovedEvent (match saved searches with filters.projectId against property.projectDevelopmentId), and InquiryReadEvent (notify inquiry author). - Redis pub/sub fan-out already handled by RedisIoAdapter from TEC-2766, so these broadcasts work across API instances. - Unit tests for all three listeners and the new gateway helper. Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -0,0 +1,223 @@
|
||||
import { ListingApprovedEvent } from '@modules/admin/domain/events/listing-approved.event';
|
||||
import { InquiryReadEvent } from '@modules/inquiries/domain/events/inquiry-read.event';
|
||||
import { ListingPriceChangedEvent } from '@modules/listings/domain/events/listing-price-changed.event';
|
||||
import {
|
||||
ResidentialInquiryReplyListener,
|
||||
ResidentialNewListingInProjectListener,
|
||||
ResidentialPriceDropListener,
|
||||
} from '../listeners/residential-events.listener';
|
||||
|
||||
function createMockPrisma() {
|
||||
return {
|
||||
listing: { findUnique: vi.fn() },
|
||||
savedSearch: { findMany: vi.fn().mockResolvedValue([]) },
|
||||
};
|
||||
}
|
||||
|
||||
function createMockGateway() {
|
||||
return {
|
||||
emitResidentialEvent: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
function createMockLogger() {
|
||||
return { log: vi.fn(), warn: vi.fn(), error: vi.fn() };
|
||||
}
|
||||
|
||||
describe('ResidentialPriceDropListener', () => {
|
||||
let listener: ResidentialPriceDropListener;
|
||||
let prisma: ReturnType<typeof createMockPrisma>;
|
||||
let gateway: ReturnType<typeof createMockGateway>;
|
||||
let logger: ReturnType<typeof createMockLogger>;
|
||||
|
||||
const listing = {
|
||||
id: 'listing-1',
|
||||
sellerId: 'seller-1',
|
||||
transactionType: 'SALE',
|
||||
priceVND: 2_000_000_000n,
|
||||
property: {
|
||||
title: 'Căn hộ 2PN Quận 7',
|
||||
propertyType: 'APARTMENT',
|
||||
areaM2: 70,
|
||||
bedrooms: 2,
|
||||
district: 'Quận 7',
|
||||
city: 'Hồ Chí Minh',
|
||||
projectDevelopmentId: null,
|
||||
},
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
prisma = createMockPrisma();
|
||||
gateway = createMockGateway();
|
||||
logger = createMockLogger();
|
||||
listener = new ResidentialPriceDropListener(
|
||||
prisma as any,
|
||||
gateway as any,
|
||||
logger as any,
|
||||
);
|
||||
});
|
||||
|
||||
it('emits residential:price-drop to each user with a matching saved search', async () => {
|
||||
prisma.listing.findUnique.mockResolvedValue(listing);
|
||||
prisma.savedSearch.findMany.mockResolvedValue([
|
||||
{
|
||||
id: 'ss-1',
|
||||
userId: 'user-1',
|
||||
name: 'Quận 7 căn hộ',
|
||||
filters: { city: 'Hồ Chí Minh', district: 'Quận 7', priceMax: 3_000_000_000 },
|
||||
},
|
||||
{
|
||||
id: 'ss-2',
|
||||
userId: 'user-2',
|
||||
name: 'Quận 1',
|
||||
filters: { district: 'Quận 1' },
|
||||
},
|
||||
]);
|
||||
|
||||
const event = new ListingPriceChangedEvent('listing-1', 2_500_000_000n, 2_000_000_000n);
|
||||
await listener.handle(event);
|
||||
|
||||
expect(gateway.emitResidentialEvent).toHaveBeenCalledTimes(1);
|
||||
expect(gateway.emitResidentialEvent).toHaveBeenCalledWith(
|
||||
'user-1',
|
||||
'residential:price-drop',
|
||||
expect.objectContaining({
|
||||
listingId: 'listing-1',
|
||||
savedSearchId: 'ss-1',
|
||||
oldPrice: '2500000000',
|
||||
newPrice: '2000000000',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('does not emit when the new price is not lower than the old price', async () => {
|
||||
const event = new ListingPriceChangedEvent('listing-1', 1_000_000_000n, 1_200_000_000n);
|
||||
await listener.handle(event);
|
||||
|
||||
expect(prisma.listing.findUnique).not.toHaveBeenCalled();
|
||||
expect(gateway.emitResidentialEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('skips saved searches owned by the listing seller', async () => {
|
||||
prisma.listing.findUnique.mockResolvedValue(listing);
|
||||
prisma.savedSearch.findMany.mockResolvedValue([
|
||||
{ id: 'ss-self', userId: 'seller-1', name: 'mine', filters: {} },
|
||||
]);
|
||||
|
||||
const event = new ListingPriceChangedEvent('listing-1', 2_500_000_000n, 2_000_000_000n);
|
||||
await listener.handle(event);
|
||||
|
||||
expect(gateway.emitResidentialEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('swallows infrastructure errors without throwing', async () => {
|
||||
prisma.listing.findUnique.mockRejectedValue(new Error('db down'));
|
||||
|
||||
const event = new ListingPriceChangedEvent('listing-1', 2_000_000_000n, 1_000_000_000n);
|
||||
await expect(listener.handle(event)).resolves.not.toThrow();
|
||||
expect(logger.warn).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('ResidentialNewListingInProjectListener', () => {
|
||||
let listener: ResidentialNewListingInProjectListener;
|
||||
let prisma: ReturnType<typeof createMockPrisma>;
|
||||
let gateway: ReturnType<typeof createMockGateway>;
|
||||
let logger: ReturnType<typeof createMockLogger>;
|
||||
|
||||
beforeEach(() => {
|
||||
prisma = createMockPrisma();
|
||||
gateway = createMockGateway();
|
||||
logger = createMockLogger();
|
||||
listener = new ResidentialNewListingInProjectListener(
|
||||
prisma as any,
|
||||
gateway as any,
|
||||
logger as any,
|
||||
);
|
||||
});
|
||||
|
||||
it('emits residential:new-listing-in-project to users tracking the project', async () => {
|
||||
prisma.listing.findUnique.mockResolvedValue({
|
||||
id: 'listing-9',
|
||||
sellerId: 'seller-9',
|
||||
priceVND: 3_500_000_000n,
|
||||
property: {
|
||||
title: 'Vinhomes Grand Park S5.02',
|
||||
district: 'Quận 9',
|
||||
city: 'Hồ Chí Minh',
|
||||
projectDevelopmentId: 'project-vgp',
|
||||
},
|
||||
});
|
||||
prisma.savedSearch.findMany.mockResolvedValue([
|
||||
{ id: 'ss-tracker', userId: 'user-10', name: 'VGP', filters: { projectId: 'project-vgp' } },
|
||||
{ id: 'ss-other', userId: 'user-11', name: 'khác', filters: { projectId: 'project-other' } },
|
||||
{ id: 'ss-no-project', userId: 'user-12', name: 'no-project', filters: {} },
|
||||
]);
|
||||
|
||||
const event = new ListingApprovedEvent('listing-9', 'admin-1');
|
||||
await listener.handle(event);
|
||||
|
||||
expect(gateway.emitResidentialEvent).toHaveBeenCalledTimes(1);
|
||||
expect(gateway.emitResidentialEvent).toHaveBeenCalledWith(
|
||||
'user-10',
|
||||
'residential:new-listing-in-project',
|
||||
expect.objectContaining({
|
||||
listingId: 'listing-9',
|
||||
projectId: 'project-vgp',
|
||||
price: '3500000000',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('does not emit when the listing has no linked project', async () => {
|
||||
prisma.listing.findUnique.mockResolvedValue({
|
||||
id: 'listing-9',
|
||||
sellerId: 'seller-9',
|
||||
priceVND: 1n,
|
||||
property: { title: 't', district: 'd', city: 'c', projectDevelopmentId: null },
|
||||
});
|
||||
|
||||
const event = new ListingApprovedEvent('listing-9', 'admin-1');
|
||||
await listener.handle(event);
|
||||
|
||||
expect(prisma.savedSearch.findMany).not.toHaveBeenCalled();
|
||||
expect(gateway.emitResidentialEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('ResidentialInquiryReplyListener', () => {
|
||||
let listener: ResidentialInquiryReplyListener;
|
||||
let gateway: ReturnType<typeof createMockGateway>;
|
||||
let logger: ReturnType<typeof createMockLogger>;
|
||||
|
||||
beforeEach(() => {
|
||||
gateway = createMockGateway();
|
||||
logger = createMockLogger();
|
||||
listener = new ResidentialInquiryReplyListener(gateway as any, logger as any);
|
||||
});
|
||||
|
||||
it('emits residential:inquiry-reply to the inquiry author', async () => {
|
||||
const event = new InquiryReadEvent('inq-1', 'listing-1', 'user-author');
|
||||
|
||||
await listener.handle(event);
|
||||
|
||||
expect(gateway.emitResidentialEvent).toHaveBeenCalledWith(
|
||||
'user-author',
|
||||
'residential:inquiry-reply',
|
||||
expect.objectContaining({
|
||||
inquiryId: 'inq-1',
|
||||
listingId: 'listing-1',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('swallows emission errors without throwing', async () => {
|
||||
gateway.emitResidentialEvent.mockImplementation(() => {
|
||||
throw new Error('server error');
|
||||
});
|
||||
const event = new InquiryReadEvent('inq-2', 'listing-2', 'user-2');
|
||||
|
||||
await expect(listener.handle(event)).resolves.not.toThrow();
|
||||
expect(logger.warn).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,242 @@
|
||||
import { EventsHandler, type IEventHandler } from '@nestjs/cqrs';
|
||||
import { ListingApprovedEvent } from '@modules/admin';
|
||||
import { InquiryReadEvent } from '@modules/inquiries';
|
||||
import { ListingPriceChangedEvent } from '@modules/listings';
|
||||
import { type LoggerService, type PrismaService } from '@modules/shared';
|
||||
import { type NotificationsGateway } from '../../presentation/gateways/notifications.gateway';
|
||||
|
||||
const CONTEXT = 'ResidentialEventsListener';
|
||||
|
||||
/**
|
||||
* Shape of the `filters` JSON column on `SavedSearch`. Matches fields
|
||||
* consumed by the saved-search alert matcher. Anything else is ignored.
|
||||
*/
|
||||
interface SavedSearchFilters {
|
||||
transactionType?: string;
|
||||
propertyType?: string;
|
||||
projectId?: string;
|
||||
district?: string;
|
||||
city?: string;
|
||||
priceMin?: number;
|
||||
priceMax?: number;
|
||||
areaMin?: number;
|
||||
areaMax?: number;
|
||||
bedrooms?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fans residential domain events out as Socket.IO events on the
|
||||
* `/notifications` namespace so subscribed users get live updates
|
||||
* without waiting for the email/push pipeline.
|
||||
*
|
||||
* Three WS events are emitted:
|
||||
* • `residential:price-drop` — listing price lowered and matches an
|
||||
* alert-enabled saved search.
|
||||
* • `residential:new-listing-in-project` — approved listing lives in
|
||||
* a project that the user tracks via `filters.projectId`.
|
||||
* • `residential:inquiry-reply` — the listing owner/agent marked the
|
||||
* user's inquiry as read, signalling that a reply is incoming.
|
||||
*
|
||||
* Redis pub/sub fan-out is handled by {@link RedisIoAdapter}, so the
|
||||
* broadcast reaches the user's socket regardless of which API pod
|
||||
* holds the connection.
|
||||
*/
|
||||
@EventsHandler(ListingPriceChangedEvent)
|
||||
export class ResidentialPriceDropListener
|
||||
implements IEventHandler<ListingPriceChangedEvent>
|
||||
{
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly gateway: NotificationsGateway,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
async handle(event: ListingPriceChangedEvent): Promise<void> {
|
||||
if (event.newPrice >= event.oldPrice) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const listing = await this.prisma.listing.findUnique({
|
||||
where: { id: event.aggregateId },
|
||||
include: { property: true },
|
||||
});
|
||||
if (!listing || !listing.property) return;
|
||||
|
||||
const savedSearches = await this.prisma.savedSearch.findMany({
|
||||
where: { alertEnabled: true },
|
||||
select: { id: true, userId: true, name: true, filters: true },
|
||||
});
|
||||
|
||||
let matchCount = 0;
|
||||
for (const search of savedSearches) {
|
||||
if (search.userId === listing.sellerId) continue;
|
||||
|
||||
const filters = normalizeFilters(search.filters);
|
||||
if (!matchesFilters(listing, listing.property, filters)) continue;
|
||||
|
||||
this.gateway.emitResidentialEvent(search.userId, 'residential:price-drop', {
|
||||
listingId: listing.id,
|
||||
savedSearchId: search.id,
|
||||
savedSearchName: search.name,
|
||||
title: listing.property.title,
|
||||
oldPrice: event.oldPrice.toString(),
|
||||
newPrice: event.newPrice.toString(),
|
||||
district: listing.property.district,
|
||||
city: listing.property.city,
|
||||
occurredAt: event.occurredAt.toISOString(),
|
||||
});
|
||||
matchCount++;
|
||||
}
|
||||
|
||||
if (matchCount > 0) {
|
||||
this.logger.log(
|
||||
`Emitted residential:price-drop to ${matchCount} users for listing ${listing.id}`,
|
||||
CONTEXT,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`Price-drop WS emission failed for listing ${event.aggregateId}: ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
CONTEXT,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@EventsHandler(ListingApprovedEvent)
|
||||
export class ResidentialNewListingInProjectListener
|
||||
implements IEventHandler<ListingApprovedEvent>
|
||||
{
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly gateway: NotificationsGateway,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
async handle(event: ListingApprovedEvent): Promise<void> {
|
||||
try {
|
||||
const listing = await this.prisma.listing.findUnique({
|
||||
where: { id: event.aggregateId },
|
||||
include: { property: true },
|
||||
});
|
||||
if (!listing || !listing.property?.projectDevelopmentId) return;
|
||||
|
||||
const projectId = listing.property.projectDevelopmentId;
|
||||
|
||||
const savedSearches = await this.prisma.savedSearch.findMany({
|
||||
where: { alertEnabled: true },
|
||||
select: { id: true, userId: true, name: true, filters: true },
|
||||
});
|
||||
|
||||
let matchCount = 0;
|
||||
for (const search of savedSearches) {
|
||||
if (search.userId === listing.sellerId) continue;
|
||||
|
||||
const filters = normalizeFilters(search.filters);
|
||||
if (filters.projectId !== projectId) continue;
|
||||
|
||||
this.gateway.emitResidentialEvent(
|
||||
search.userId,
|
||||
'residential:new-listing-in-project',
|
||||
{
|
||||
listingId: listing.id,
|
||||
projectId,
|
||||
savedSearchId: search.id,
|
||||
savedSearchName: search.name,
|
||||
title: listing.property.title,
|
||||
price: listing.priceVND.toString(),
|
||||
district: listing.property.district,
|
||||
city: listing.property.city,
|
||||
occurredAt: event.occurredAt.toISOString(),
|
||||
},
|
||||
);
|
||||
matchCount++;
|
||||
}
|
||||
|
||||
if (matchCount > 0) {
|
||||
this.logger.log(
|
||||
`Emitted residential:new-listing-in-project to ${matchCount} users for project ${projectId}`,
|
||||
CONTEXT,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`New-listing-in-project WS emission failed for listing ${event.aggregateId}: ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
CONTEXT,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@EventsHandler(InquiryReadEvent)
|
||||
export class ResidentialInquiryReplyListener
|
||||
implements IEventHandler<InquiryReadEvent>
|
||||
{
|
||||
constructor(
|
||||
private readonly gateway: NotificationsGateway,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
async handle(event: InquiryReadEvent): Promise<void> {
|
||||
try {
|
||||
this.gateway.emitResidentialEvent(event.userId, 'residential:inquiry-reply', {
|
||||
inquiryId: event.aggregateId,
|
||||
listingId: event.listingId,
|
||||
occurredAt: event.occurredAt.toISOString(),
|
||||
});
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`Inquiry-reply WS emission failed for inquiry ${event.aggregateId}: ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
CONTEXT,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* ────────────────────────────────────────────
|
||||
* Private helpers
|
||||
* ──────────────────────────────────────────── */
|
||||
|
||||
function normalizeFilters(raw: unknown): SavedSearchFilters {
|
||||
if (!raw || typeof raw !== 'object') return {};
|
||||
return raw as SavedSearchFilters;
|
||||
}
|
||||
|
||||
function matchesFilters(
|
||||
listing: { transactionType: string; priceVND: bigint; sellerId: string },
|
||||
property: {
|
||||
propertyType: string;
|
||||
areaM2: number;
|
||||
bedrooms: number | null;
|
||||
district: string;
|
||||
city: string;
|
||||
},
|
||||
filters: SavedSearchFilters,
|
||||
): boolean {
|
||||
if (filters.transactionType && filters.transactionType !== listing.transactionType) return false;
|
||||
if (filters.propertyType && filters.propertyType !== property.propertyType) return false;
|
||||
if (filters.district && filters.district !== property.district) return false;
|
||||
if (filters.city && filters.city !== property.city) return false;
|
||||
|
||||
const price = Number(listing.priceVND);
|
||||
if (filters.priceMin !== undefined && price < Number(filters.priceMin)) return false;
|
||||
if (filters.priceMax !== undefined && price > Number(filters.priceMax)) return false;
|
||||
if (filters.areaMin !== undefined && property.areaM2 < Number(filters.areaMin)) return false;
|
||||
if (filters.areaMax !== undefined && property.areaM2 > Number(filters.areaMax)) return false;
|
||||
if (
|
||||
filters.bedrooms !== undefined &&
|
||||
property.bedrooms !== null &&
|
||||
property.bedrooms < Number(filters.bedrooms)
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
@@ -14,6 +14,11 @@ import { PaymentFailedListener } from './application/listeners/payment-failed.li
|
||||
import { PaymentRefundedListener } from './application/listeners/payment-refunded.listener';
|
||||
import { PhoneChangeRequestedListener } from './application/listeners/phone-change-requested.listener';
|
||||
import { QuotaExceededListener } from './application/listeners/quota-exceeded.listener';
|
||||
import {
|
||||
ResidentialInquiryReplyListener,
|
||||
ResidentialNewListingInProjectListener,
|
||||
ResidentialPriceDropListener,
|
||||
} from './application/listeners/residential-events.listener';
|
||||
import { SubscriptionExpiredListener } from './application/listeners/subscription-expired.listener';
|
||||
import { SubscriptionExpiringListener } from './application/listeners/subscription-expiring.listener';
|
||||
import { SubscriptionRenewedListener } from './application/listeners/subscription-renewed.listener';
|
||||
@@ -51,6 +56,9 @@ const EventListeners = [
|
||||
UserKycUpdatedListener,
|
||||
EmailChangeRequestedListener,
|
||||
PhoneChangeRequestedListener,
|
||||
ResidentialPriceDropListener,
|
||||
ResidentialNewListingInProjectListener,
|
||||
ResidentialInquiryReplyListener,
|
||||
];
|
||||
|
||||
@Module({
|
||||
|
||||
@@ -332,4 +332,25 @@ describe('NotificationsGateway', () => {
|
||||
expect(mockRedisService.del).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('emitResidentialEvent', () => {
|
||||
it('emits the residential event to the user room and records a ws metric', () => {
|
||||
const roomEmit = vi.fn();
|
||||
mockServer.to.mockReturnValue({ emit: roomEmit });
|
||||
|
||||
gateway.emitResidentialEvent('user-42', 'residential:price-drop', {
|
||||
listingId: 'listing-1',
|
||||
});
|
||||
|
||||
expect(mockServer.to).toHaveBeenCalledWith('user:user-42');
|
||||
expect(roomEmit).toHaveBeenCalledWith('residential:price-drop', {
|
||||
listingId: 'listing-1',
|
||||
});
|
||||
expect(mockMetrics.recordWsMessage).toHaveBeenCalledWith(
|
||||
'/notifications',
|
||||
'residential:price-drop',
|
||||
'out',
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -212,6 +212,23 @@ export class NotificationsGateway
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a residential WS event (price drop, new listing in subscribed
|
||||
* project, inquiry reply) to a single user's private room.
|
||||
*
|
||||
* The Redis pub/sub adapter fans the broadcast out to every API
|
||||
* instance, so the target user receives the payload regardless of
|
||||
* which node their socket is attached to.
|
||||
*/
|
||||
emitResidentialEvent(
|
||||
userId: string,
|
||||
event: 'residential:price-drop' | 'residential:new-listing-in-project' | 'residential:inquiry-reply',
|
||||
payload: Record<string, unknown>,
|
||||
): void {
|
||||
this.server.to(`user:${userId}`).emit(event, payload);
|
||||
this.metrics.recordWsMessage(NAMESPACE_LABEL, event, 'out');
|
||||
}
|
||||
|
||||
/* ────────────────────────────────────────────
|
||||
* Private helpers
|
||||
* ──────────────────────────────────────────── */
|
||||
|
||||
Reference in New Issue
Block a user