feat(messaging): add in-app messaging module with Conversation + Message models

Implements buyer-agent in-app messaging (Task 8.4):
- Prisma models: Conversation, ConversationParticipant, Message
- Full DDD module: domain entities, repository interfaces, CQRS commands/queries
- REST API: POST/GET conversations, POST/GET messages, PATCH read, DELETE messages
- WebSocket gateway (/messaging namespace): real-time message delivery, typing indicators, room-based routing
- 46 unit tests covering handlers, repositories, controller, and gateway

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-16 05:36:04 +07:00
parent 30d3039b94
commit 3b5da2dcf9
37 changed files with 2310 additions and 0 deletions

View File

@@ -13,6 +13,7 @@ import { InquiriesModule } from '@modules/inquiries';
import { LeadsModule } from '@modules/leads'; import { LeadsModule } from '@modules/leads';
import { ListingsModule } from '@modules/listings'; import { ListingsModule } from '@modules/listings';
import { McpIntegrationModule } from '@modules/mcp'; import { McpIntegrationModule } from '@modules/mcp';
import { MessagingModule } from '@modules/messaging';
import { HttpMetricsInterceptor, MetricsModule } from '@modules/metrics'; import { HttpMetricsInterceptor, MetricsModule } from '@modules/metrics';
import { NotificationsModule } from '@modules/notifications'; import { NotificationsModule } from '@modules/notifications';
import { PaymentsModule } from '@modules/payments'; import { PaymentsModule } from '@modules/payments';
@@ -46,6 +47,7 @@ import { AppController } from './app.controller';
AnalyticsModule, AnalyticsModule,
MetricsModule, MetricsModule,
McpIntegrationModule, McpIntegrationModule,
MessagingModule,
// ── Rate Limiting ── // ── Rate Limiting ──
// Default: 60 requests per 60 seconds per IP // Default: 60 requests per 60 seconds per IP

View File

@@ -0,0 +1,133 @@
import { CreateConversationCommand } from '../commands/create-conversation/create-conversation.command';
import { CreateConversationHandler } from '../commands/create-conversation/create-conversation.handler';
describe('CreateConversationHandler', () => {
let handler: CreateConversationHandler;
let mockConversationRepo: {
create: ReturnType<typeof vi.fn>;
findExistingBetweenUsers: ReturnType<typeof vi.fn>;
updateLastMessage: ReturnType<typeof vi.fn>;
incrementUnreadCount: ReturnType<typeof vi.fn>;
findById: ReturnType<typeof vi.fn>;
};
let mockMessageRepo: {
create: ReturnType<typeof vi.fn>;
};
let mockEventBus: { publish: ReturnType<typeof vi.fn> };
let mockLogger: { log: ReturnType<typeof vi.fn>; warn: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn> };
const conversationEntity = {
id: 'conv-1',
listingId: 'listing-1',
subject: null,
status: 'ACTIVE' as const,
lastMessage: null,
lastMessageAt: null,
createdAt: new Date(),
updatedAt: new Date(),
participants: [
{ id: 'p-1', conversationId: 'conv-1', userId: 'user-1', unreadCount: 0, lastReadAt: null, joinedAt: new Date() },
{ id: 'p-2', conversationId: 'conv-1', userId: 'user-2', unreadCount: 0, lastReadAt: null, joinedAt: new Date() },
],
};
const messageEntity = {
id: 'msg-1',
conversationId: 'conv-1',
senderId: 'user-1',
type: 'TEXT' as const,
content: 'Hello!',
metadata: null,
editedAt: null,
deletedAt: null,
createdAt: new Date(),
};
beforeEach(() => {
mockConversationRepo = {
create: vi.fn().mockResolvedValue(conversationEntity),
findExistingBetweenUsers: vi.fn().mockResolvedValue(null),
updateLastMessage: vi.fn().mockResolvedValue(undefined),
incrementUnreadCount: vi.fn().mockResolvedValue(undefined),
findById: vi.fn(),
};
mockMessageRepo = {
create: vi.fn().mockResolvedValue(messageEntity),
};
mockEventBus = { publish: vi.fn() };
mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() };
handler = new CreateConversationHandler(
mockConversationRepo as any,
mockMessageRepo as any,
mockEventBus as any,
mockLogger as any,
);
});
it('creates a new conversation with participants', async () => {
const command = new CreateConversationCommand('user-1', 'user-2', 'listing-1');
const result = await handler.execute(command);
expect(mockConversationRepo.findExistingBetweenUsers).toHaveBeenCalledWith(
['user-1', 'user-2'],
'listing-1',
);
expect(mockConversationRepo.create).toHaveBeenCalledWith({
listingId: 'listing-1',
subject: undefined,
participantUserIds: ['user-1', 'user-2'],
});
expect(result.id).toBe('conv-1');
});
it('creates conversation with initial message', async () => {
const command = new CreateConversationCommand('user-1', 'user-2', 'listing-1', undefined, 'Hello!');
await handler.execute(command);
expect(mockMessageRepo.create).toHaveBeenCalledWith({
conversationId: 'conv-1',
senderId: 'user-1',
type: 'TEXT',
content: 'Hello!',
});
expect(mockConversationRepo.updateLastMessage).toHaveBeenCalled();
expect(mockConversationRepo.incrementUnreadCount).toHaveBeenCalledWith('conv-1', 'user-1');
expect(mockEventBus.publish).toHaveBeenCalled();
});
it('returns existing conversation when one exists between users', async () => {
mockConversationRepo.findExistingBetweenUsers.mockResolvedValue(conversationEntity);
const command = new CreateConversationCommand('user-1', 'user-2', 'listing-1');
const result = await handler.execute(command);
expect(result.id).toBe('conv-1');
expect(mockConversationRepo.create).not.toHaveBeenCalled();
});
it('sends message in existing conversation when initial message provided', async () => {
mockConversationRepo.findExistingBetweenUsers.mockResolvedValue(conversationEntity);
const command = new CreateConversationCommand('user-1', 'user-2', 'listing-1', undefined, 'Hello again!');
await handler.execute(command);
expect(mockConversationRepo.create).not.toHaveBeenCalled();
expect(mockMessageRepo.create).toHaveBeenCalledWith({
conversationId: 'conv-1',
senderId: 'user-1',
type: 'TEXT',
content: 'Hello again!',
});
});
it('throws when creating conversation with yourself', async () => {
const command = new CreateConversationCommand('user-1', 'user-1');
await expect(handler.execute(command)).rejects.toThrow('Không thể tạo hội thoại với chính mình');
});
});

View File

@@ -0,0 +1,44 @@
import { GetConversationsHandler } from '../queries/get-conversations/get-conversations.handler';
import { GetConversationsQuery } from '../queries/get-conversations/get-conversations.query';
describe('GetConversationsHandler', () => {
let handler: GetConversationsHandler;
let mockConversationRepo: {
findByUserId: ReturnType<typeof vi.fn>;
countByUserId: ReturnType<typeof vi.fn>;
};
const conversations = [
{
id: 'conv-1',
listingId: 'listing-1',
subject: null,
status: 'ACTIVE',
lastMessage: 'Hello!',
lastMessageAt: new Date(),
createdAt: new Date(),
updatedAt: new Date(),
participants: [],
},
];
beforeEach(() => {
mockConversationRepo = {
findByUserId: vi.fn().mockResolvedValue(conversations),
countByUserId: vi.fn().mockResolvedValue(1),
};
handler = new GetConversationsHandler(mockConversationRepo as any);
});
it('returns conversations with total count', async () => {
const query = new GetConversationsQuery('user-1', 20, 0);
const result = await handler.execute(query);
expect(result.conversations).toHaveLength(1);
expect(result.total).toBe(1);
expect(mockConversationRepo.findByUserId).toHaveBeenCalledWith('user-1', 20, 0);
expect(mockConversationRepo.countByUserId).toHaveBeenCalledWith('user-1');
});
});

View File

@@ -0,0 +1,71 @@
import { GetMessagesHandler } from '../queries/get-messages/get-messages.handler';
import { GetMessagesQuery } from '../queries/get-messages/get-messages.query';
describe('GetMessagesHandler', () => {
let handler: GetMessagesHandler;
let mockConversationRepo: {
findById: ReturnType<typeof vi.fn>;
};
let mockMessageRepo: {
findByConversationId: ReturnType<typeof vi.fn>;
};
const conversation = {
id: 'conv-1',
participants: [
{ userId: 'user-1' },
{ userId: 'user-2' },
],
};
const messages = [
{
id: 'msg-1',
conversationId: 'conv-1',
senderId: 'user-1',
type: 'TEXT',
content: 'Hello!',
metadata: null,
editedAt: null,
deletedAt: null,
createdAt: new Date(),
},
];
beforeEach(() => {
mockConversationRepo = {
findById: vi.fn().mockResolvedValue(conversation),
};
mockMessageRepo = {
findByConversationId: vi.fn().mockResolvedValue(messages),
};
handler = new GetMessagesHandler(
mockConversationRepo as any,
mockMessageRepo as any,
);
});
it('returns messages for a conversation', async () => {
const query = new GetMessagesQuery('conv-1', 'user-1', 50);
const result = await handler.execute(query);
expect(result).toHaveLength(1);
expect(mockMessageRepo.findByConversationId).toHaveBeenCalledWith('conv-1', 50, undefined);
});
it('throws NotFoundException when conversation does not exist', async () => {
mockConversationRepo.findById.mockResolvedValue(null);
const query = new GetMessagesQuery('conv-999', 'user-1');
await expect(handler.execute(query)).rejects.toThrow();
});
it('throws ForbiddenException when user is not a participant', async () => {
const query = new GetMessagesQuery('conv-1', 'user-3');
await expect(handler.execute(query)).rejects.toThrow('Bạn không phải là thành viên của hội thoại này');
});
});

View File

@@ -0,0 +1,55 @@
import { MarkConversationReadCommand } from '../commands/mark-read/mark-read.command';
import { MarkConversationReadHandler } from '../commands/mark-read/mark-read.handler';
describe('MarkConversationReadHandler', () => {
let handler: MarkConversationReadHandler;
let mockConversationRepo: {
findById: ReturnType<typeof vi.fn>;
resetUnreadCount: ReturnType<typeof vi.fn>;
};
let mockLogger: { log: ReturnType<typeof vi.fn>; warn: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn> };
const conversation = {
id: 'conv-1',
status: 'ACTIVE' as const,
participants: [
{ id: 'p-1', conversationId: 'conv-1', userId: 'user-1', unreadCount: 3, lastReadAt: null, joinedAt: new Date() },
{ id: 'p-2', conversationId: 'conv-1', userId: 'user-2', unreadCount: 0, lastReadAt: null, joinedAt: new Date() },
],
};
beforeEach(() => {
mockConversationRepo = {
findById: vi.fn().mockResolvedValue(conversation),
resetUnreadCount: vi.fn().mockResolvedValue(undefined),
};
mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() };
handler = new MarkConversationReadHandler(
mockConversationRepo as any,
mockLogger as any,
);
});
it('marks conversation as read for the user', async () => {
const command = new MarkConversationReadCommand('conv-1', 'user-1');
await handler.execute(command);
expect(mockConversationRepo.resetUnreadCount).toHaveBeenCalledWith('conv-1', 'user-1');
});
it('throws NotFoundException when conversation does not exist', async () => {
mockConversationRepo.findById.mockResolvedValue(null);
const command = new MarkConversationReadCommand('conv-999', 'user-1');
await expect(handler.execute(command)).rejects.toThrow();
});
it('throws ForbiddenException when user is not a participant', async () => {
const command = new MarkConversationReadCommand('conv-1', 'user-3');
await expect(handler.execute(command)).rejects.toThrow('Bạn không phải là thành viên của hội thoại này');
});
});

View File

@@ -0,0 +1,113 @@
import { SendMessageCommand } from '../commands/send-message/send-message.command';
import { SendMessageHandler } from '../commands/send-message/send-message.handler';
describe('SendMessageHandler', () => {
let handler: SendMessageHandler;
let mockConversationRepo: {
findById: ReturnType<typeof vi.fn>;
updateLastMessage: ReturnType<typeof vi.fn>;
incrementUnreadCount: ReturnType<typeof vi.fn>;
};
let mockMessageRepo: {
create: ReturnType<typeof vi.fn>;
};
let mockEventBus: { publish: ReturnType<typeof vi.fn> };
let mockLogger: { log: ReturnType<typeof vi.fn>; warn: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn> };
const conversation = {
id: 'conv-1',
status: 'ACTIVE' as const,
participants: [
{ id: 'p-1', conversationId: 'conv-1', userId: 'user-1', unreadCount: 0, lastReadAt: null, joinedAt: new Date() },
{ id: 'p-2', conversationId: 'conv-1', userId: 'user-2', unreadCount: 0, lastReadAt: null, joinedAt: new Date() },
],
};
const messageEntity = {
id: 'msg-1',
conversationId: 'conv-1',
senderId: 'user-1',
type: 'TEXT' as const,
content: 'Hello!',
metadata: null,
editedAt: null,
deletedAt: null,
createdAt: new Date(),
};
beforeEach(() => {
mockConversationRepo = {
findById: vi.fn().mockResolvedValue(conversation),
updateLastMessage: vi.fn().mockResolvedValue(undefined),
incrementUnreadCount: vi.fn().mockResolvedValue(undefined),
};
mockMessageRepo = {
create: vi.fn().mockResolvedValue(messageEntity),
};
mockEventBus = { publish: vi.fn() };
mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() };
handler = new SendMessageHandler(
mockConversationRepo as any,
mockMessageRepo as any,
mockEventBus as any,
mockLogger as any,
);
});
it('sends a message successfully', async () => {
const command = new SendMessageCommand('conv-1', 'user-1', 'Hello!');
const result = await handler.execute(command);
expect(result.id).toBe('msg-1');
expect(mockMessageRepo.create).toHaveBeenCalledWith({
conversationId: 'conv-1',
senderId: 'user-1',
type: 'TEXT',
content: 'Hello!',
metadata: undefined,
});
expect(mockConversationRepo.updateLastMessage).toHaveBeenCalled();
expect(mockConversationRepo.incrementUnreadCount).toHaveBeenCalledWith('conv-1', 'user-1');
expect(mockEventBus.publish).toHaveBeenCalled();
});
it('throws NotFoundException when conversation does not exist', async () => {
mockConversationRepo.findById.mockResolvedValue(null);
const command = new SendMessageCommand('conv-999', 'user-1', 'Hello!');
await expect(handler.execute(command)).rejects.toThrow();
});
it('throws ForbiddenException when user is not a participant', async () => {
const command = new SendMessageCommand('conv-1', 'user-3', 'Hello!');
await expect(handler.execute(command)).rejects.toThrow('Bạn không phải là thành viên của hội thoại này');
});
it('throws ValidationException when conversation is closed', async () => {
mockConversationRepo.findById.mockResolvedValue({ ...conversation, status: 'CLOSED' });
const command = new SendMessageCommand('conv-1', 'user-1', 'Hello!');
await expect(handler.execute(command)).rejects.toThrow('Hội thoại đã đóng');
});
it('publishes MessageSentEvent after successful send', async () => {
const command = new SendMessageCommand('conv-1', 'user-1', 'Hello!');
await handler.execute(command);
expect(mockEventBus.publish).toHaveBeenCalledWith(
expect.objectContaining({
eventName: 'message.sent',
aggregateId: 'msg-1',
conversationId: 'conv-1',
senderId: 'user-1',
content: 'Hello!',
}),
);
});
});

View File

@@ -0,0 +1,9 @@
export class CreateConversationCommand {
constructor(
public readonly initiatorUserId: string,
public readonly participantUserId: string,
public readonly listingId?: string,
public readonly subject?: string,
public readonly initialMessage?: string,
) {}
}

View File

@@ -0,0 +1,94 @@
import { Inject, InternalServerErrorException } from '@nestjs/common';
import { CommandHandler, type ICommandHandler } from '@nestjs/cqrs';
import { DomainException, ValidationException, type EventBusService, type LoggerService } from '@modules/shared';
import type { ConversationEntity } from '../../../domain/entities/conversation.entity';
import { MessageSentEvent } from '../../../domain/events/message-sent.event';
import {
CONVERSATION_REPOSITORY,
type IConversationRepository,
} from '../../../domain/repositories/conversation.repository';
import {
MESSAGE_REPOSITORY,
type IMessageRepository,
} from '../../../domain/repositories/message.repository';
import { CreateConversationCommand } from './create-conversation.command';
@CommandHandler(CreateConversationCommand)
export class CreateConversationHandler implements ICommandHandler<CreateConversationCommand> {
constructor(
@Inject(CONVERSATION_REPOSITORY)
private readonly conversationRepo: IConversationRepository,
@Inject(MESSAGE_REPOSITORY)
private readonly messageRepo: IMessageRepository,
private readonly eventBus: EventBusService,
private readonly logger: LoggerService,
) {}
async execute(command: CreateConversationCommand): Promise<ConversationEntity> {
try {
const { initiatorUserId, participantUserId, listingId, subject, initialMessage } = command;
if (initiatorUserId === participantUserId) {
throw new ValidationException('Không thể tạo hội thoại với chính mình');
}
// Check for existing conversation between these users for the same listing
const userIds = [initiatorUserId, participantUserId];
const existing = await this.conversationRepo.findExistingBetweenUsers(userIds, listingId);
if (existing) {
// If there's an initial message, send it in the existing conversation
if (initialMessage) {
const message = await this.messageRepo.create({
conversationId: existing.id,
senderId: initiatorUserId,
type: 'TEXT',
content: initialMessage,
});
await this.conversationRepo.updateLastMessage(existing.id, initialMessage, message.createdAt);
await this.conversationRepo.incrementUnreadCount(existing.id, initiatorUserId);
this.eventBus.publish(
new MessageSentEvent(message.id, existing.id, initiatorUserId, 'TEXT', initialMessage),
);
}
return existing;
}
// Create new conversation
const conversation = await this.conversationRepo.create({
listingId,
subject,
participantUserIds: userIds,
});
// Send initial message if provided
if (initialMessage) {
const message = await this.messageRepo.create({
conversationId: conversation.id,
senderId: initiatorUserId,
type: 'TEXT',
content: initialMessage,
});
await this.conversationRepo.updateLastMessage(conversation.id, initialMessage, message.createdAt);
await this.conversationRepo.incrementUnreadCount(conversation.id, initiatorUserId);
this.eventBus.publish(
new MessageSentEvent(message.id, conversation.id, initiatorUserId, 'TEXT', initialMessage),
);
}
this.logger.log(
`Conversation created: ${conversation.id} between [${userIds.join(', ')}]`,
'CreateConversationHandler',
);
return conversation;
} catch (error) {
if (error instanceof DomainException) throw error;
this.logger.error(
`Failed to create conversation: ${error instanceof Error ? error.message : error}`,
error instanceof Error ? error.stack : undefined,
this.constructor.name,
);
throw new InternalServerErrorException('Không thể tạo hội thoại');
}
}
}

View File

@@ -0,0 +1,6 @@
export class MarkConversationReadCommand {
constructor(
public readonly conversationId: string,
public readonly userId: string,
) {}
}

View File

@@ -0,0 +1,43 @@
import { Inject, InternalServerErrorException } from '@nestjs/common';
import { CommandHandler, type ICommandHandler } from '@nestjs/cqrs';
import { DomainException, ForbiddenException, NotFoundException, type LoggerService } from '@modules/shared';
import {
CONVERSATION_REPOSITORY,
type IConversationRepository,
} from '../../../domain/repositories/conversation.repository';
import { MarkConversationReadCommand } from './mark-read.command';
@CommandHandler(MarkConversationReadCommand)
export class MarkConversationReadHandler implements ICommandHandler<MarkConversationReadCommand> {
constructor(
@Inject(CONVERSATION_REPOSITORY)
private readonly conversationRepo: IConversationRepository,
private readonly logger: LoggerService,
) {}
async execute(command: MarkConversationReadCommand): Promise<void> {
try {
const { conversationId, userId } = command;
const conversation = await this.conversationRepo.findById(conversationId);
if (!conversation) {
throw new NotFoundException('Conversation', conversationId);
}
const isParticipant = conversation.participants.some((p) => p.userId === userId);
if (!isParticipant) {
throw new ForbiddenException('Bạn không phải là thành viên của hội thoại này');
}
await this.conversationRepo.resetUnreadCount(conversationId, userId);
} catch (error) {
if (error instanceof DomainException) throw error;
this.logger.error(
`Failed to mark conversation read: ${error instanceof Error ? error.message : error}`,
error instanceof Error ? error.stack : undefined,
this.constructor.name,
);
throw new InternalServerErrorException('Không thể đánh dấu đã đọc');
}
}
}

View File

@@ -0,0 +1,11 @@
import type { MessageType } from '../../../domain/value-objects/message-type.vo';
export class SendMessageCommand {
constructor(
public readonly conversationId: string,
public readonly senderId: string,
public readonly content: string,
public readonly type: MessageType = 'TEXT',
public readonly metadata?: Record<string, unknown>,
) {}
}

View File

@@ -0,0 +1,77 @@
import { Inject, InternalServerErrorException } from '@nestjs/common';
import { CommandHandler, type ICommandHandler } from '@nestjs/cqrs';
import { DomainException, ForbiddenException, NotFoundException, ValidationException, type EventBusService, type LoggerService } from '@modules/shared';
import type { MessageEntity } from '../../../domain/entities/message.entity';
import { MessageSentEvent } from '../../../domain/events/message-sent.event';
import {
CONVERSATION_REPOSITORY,
type IConversationRepository,
} from '../../../domain/repositories/conversation.repository';
import {
MESSAGE_REPOSITORY,
type IMessageRepository,
} from '../../../domain/repositories/message.repository';
import { SendMessageCommand } from './send-message.command';
@CommandHandler(SendMessageCommand)
export class SendMessageHandler implements ICommandHandler<SendMessageCommand> {
constructor(
@Inject(CONVERSATION_REPOSITORY)
private readonly conversationRepo: IConversationRepository,
@Inject(MESSAGE_REPOSITORY)
private readonly messageRepo: IMessageRepository,
private readonly eventBus: EventBusService,
private readonly logger: LoggerService,
) {}
async execute(command: SendMessageCommand): Promise<MessageEntity> {
try {
const { conversationId, senderId, content, type, metadata } = command;
// Verify conversation exists and sender is a participant
const conversation = await this.conversationRepo.findById(conversationId);
if (!conversation) {
throw new NotFoundException('Conversation', conversationId);
}
const isParticipant = conversation.participants.some((p) => p.userId === senderId);
if (!isParticipant) {
throw new ForbiddenException('Bạn không phải là thành viên của hội thoại này');
}
if (conversation.status !== 'ACTIVE') {
throw new ValidationException('Hội thoại đã đóng');
}
// Create message
const message = await this.messageRepo.create({
conversationId,
senderId,
type,
content,
metadata,
});
// Update conversation last message
await this.conversationRepo.updateLastMessage(conversationId, content, message.createdAt);
// Increment unread count for other participants
await this.conversationRepo.incrementUnreadCount(conversationId, senderId);
// Publish domain event
this.eventBus.publish(
new MessageSentEvent(message.id, conversationId, senderId, type, content),
);
return message;
} catch (error) {
if (error instanceof DomainException) throw error;
this.logger.error(
`Failed to send message: ${error instanceof Error ? error.message : error}`,
error instanceof Error ? error.stack : undefined,
this.constructor.name,
);
throw new InternalServerErrorException('Không thể gửi tin nhắn');
}
}
}

View File

@@ -0,0 +1,24 @@
import { Inject } from '@nestjs/common';
import { type IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import type { ConversationEntity } from '../../../domain/entities/conversation.entity';
import {
CONVERSATION_REPOSITORY,
type IConversationRepository,
} from '../../../domain/repositories/conversation.repository';
import { GetConversationsQuery } from './get-conversations.query';
@QueryHandler(GetConversationsQuery)
export class GetConversationsHandler implements IQueryHandler<GetConversationsQuery> {
constructor(
@Inject(CONVERSATION_REPOSITORY)
private readonly conversationRepo: IConversationRepository,
) {}
async execute(query: GetConversationsQuery): Promise<{ conversations: ConversationEntity[]; total: number }> {
const [conversations, total] = await Promise.all([
this.conversationRepo.findByUserId(query.userId, query.limit, query.offset),
this.conversationRepo.countByUserId(query.userId),
]);
return { conversations, total };
}
}

View File

@@ -0,0 +1,7 @@
export class GetConversationsQuery {
constructor(
public readonly userId: string,
public readonly limit: number = 20,
public readonly offset: number = 0,
) {}
}

View File

@@ -0,0 +1,38 @@
import { Inject } from '@nestjs/common';
import { type IQueryHandler, QueryHandler } from '@nestjs/cqrs';
import { ForbiddenException, NotFoundException } from '@modules/shared';
import type { MessageEntity } from '../../../domain/entities/message.entity';
import {
CONVERSATION_REPOSITORY,
type IConversationRepository,
} from '../../../domain/repositories/conversation.repository';
import {
MESSAGE_REPOSITORY,
type IMessageRepository,
} from '../../../domain/repositories/message.repository';
import { GetMessagesQuery } from './get-messages.query';
@QueryHandler(GetMessagesQuery)
export class GetMessagesHandler implements IQueryHandler<GetMessagesQuery> {
constructor(
@Inject(CONVERSATION_REPOSITORY)
private readonly conversationRepo: IConversationRepository,
@Inject(MESSAGE_REPOSITORY)
private readonly messageRepo: IMessageRepository,
) {}
async execute(query: GetMessagesQuery): Promise<MessageEntity[]> {
// Verify access
const conversation = await this.conversationRepo.findById(query.conversationId);
if (!conversation) {
throw new NotFoundException('Conversation', query.conversationId);
}
const isParticipant = conversation.participants.some((p) => p.userId === query.userId);
if (!isParticipant) {
throw new ForbiddenException('Bạn không phải là thành viên của hội thoại này');
}
return this.messageRepo.findByConversationId(query.conversationId, query.limit, query.before);
}
}

View File

@@ -0,0 +1,8 @@
export class GetMessagesQuery {
constructor(
public readonly conversationId: string,
public readonly userId: string,
public readonly limit: number = 50,
public readonly before?: string,
) {}
}

View File

@@ -0,0 +1,18 @@
import { MessageSentEvent } from '../events/message-sent.event';
describe('Messaging Domain', () => {
describe('MessageSentEvent', () => {
it('creates event with correct eventName', () => {
const event = new MessageSentEvent('msg-1', 'conv-1', 'user-1', 'TEXT', 'Hello!');
expect(event.eventName).toBe('message.sent');
expect(event.aggregateId).toBe('msg-1');
expect(event.messageId).toBe('msg-1');
expect(event.conversationId).toBe('conv-1');
expect(event.senderId).toBe('user-1');
expect(event.type).toBe('TEXT');
expect(event.content).toBe('Hello!');
expect(event.occurredAt).toBeInstanceOf(Date);
});
});
});

View File

@@ -0,0 +1,22 @@
import type { ConversationStatus } from '../value-objects/conversation-status.vo';
export interface ConversationParticipantEntity {
id: string;
conversationId: string;
userId: string;
unreadCount: number;
lastReadAt: Date | null;
joinedAt: Date;
}
export interface ConversationEntity {
id: string;
listingId: string | null;
subject: string | null;
status: ConversationStatus;
lastMessage: string | null;
lastMessageAt: Date | null;
createdAt: Date;
updatedAt: Date;
participants: ConversationParticipantEntity[];
}

View File

@@ -0,0 +1,13 @@
import type { MessageType } from '../value-objects/message-type.vo';
export interface MessageEntity {
id: string;
conversationId: string;
senderId: string;
type: MessageType;
content: string;
metadata: Record<string, unknown> | null;
editedAt: Date | null;
deletedAt: Date | null;
createdAt: Date;
}

View File

@@ -0,0 +1,19 @@
import { type DomainEvent } from '@modules/shared';
import type { MessageType } from '../value-objects/message-type.vo';
export class MessageSentEvent implements DomainEvent {
readonly eventName = 'message.sent';
readonly occurredAt = new Date();
constructor(
public readonly aggregateId: string,
public readonly conversationId: string,
public readonly senderId: string,
public readonly type: MessageType,
public readonly content: string,
) {}
get messageId(): string {
return this.aggregateId;
}
}

View File

@@ -0,0 +1,15 @@
export type { ConversationEntity, ConversationParticipantEntity } from './entities/conversation.entity';
export type { MessageEntity } from './entities/message.entity';
export { MessageSentEvent } from './events/message-sent.event';
export {
CONVERSATION_REPOSITORY,
type IConversationRepository,
type CreateConversationDto,
} from './repositories/conversation.repository';
export {
MESSAGE_REPOSITORY,
type IMessageRepository,
type CreateMessageDto,
} from './repositories/message.repository';
export type { ConversationStatus } from './value-objects/conversation-status.vo';
export { type MessageType, ALL_MESSAGE_TYPES } from './value-objects/message-type.vo';

View File

@@ -0,0 +1,22 @@
import type { ConversationEntity } from '../entities/conversation.entity';
import type { ConversationStatus } from '../value-objects/conversation-status.vo';
export const CONVERSATION_REPOSITORY = Symbol('CONVERSATION_REPOSITORY');
export interface CreateConversationDto {
listingId?: string;
subject?: string;
participantUserIds: string[];
}
export interface IConversationRepository {
create(dto: CreateConversationDto): Promise<ConversationEntity>;
findById(id: string): Promise<ConversationEntity | null>;
findByUserId(userId: string, limit?: number, offset?: number): Promise<ConversationEntity[]>;
findExistingBetweenUsers(userIds: string[], listingId?: string): Promise<ConversationEntity | null>;
updateStatus(id: string, status: ConversationStatus): Promise<void>;
updateLastMessage(id: string, content: string, sentAt: Date): Promise<void>;
incrementUnreadCount(conversationId: string, excludeUserId: string): Promise<void>;
resetUnreadCount(conversationId: string, userId: string): Promise<void>;
countByUserId(userId: string): Promise<number>;
}

View File

@@ -0,0 +1,19 @@
import type { MessageEntity } from '../entities/message.entity';
import type { MessageType } from '../value-objects/message-type.vo';
export const MESSAGE_REPOSITORY = Symbol('MESSAGE_REPOSITORY');
export interface CreateMessageDto {
conversationId: string;
senderId: string;
type: MessageType;
content: string;
metadata?: Record<string, unknown>;
}
export interface IMessageRepository {
create(dto: CreateMessageDto): Promise<MessageEntity>;
findByConversationId(conversationId: string, limit?: number, before?: string): Promise<MessageEntity[]>;
findById(id: string): Promise<MessageEntity | null>;
softDelete(id: string, senderId: string): Promise<void>;
}

View File

@@ -0,0 +1 @@
export type ConversationStatus = 'ACTIVE' | 'ARCHIVED' | 'CLOSED';

View File

@@ -0,0 +1,3 @@
export type MessageType = 'TEXT' | 'IMAGE' | 'FILE' | 'SYSTEM';
export const ALL_MESSAGE_TYPES: MessageType[] = ['TEXT', 'IMAGE', 'FILE', 'SYSTEM'];

View File

@@ -0,0 +1,4 @@
export { MessagingModule } from './messaging.module';
export { MessagingGateway } from './presentation/gateways/messaging.gateway';
export { CreateConversationCommand } from './application/commands/create-conversation/create-conversation.command';
export { SendMessageCommand } from './application/commands/send-message/send-message.command';

View File

@@ -0,0 +1,134 @@
import { PrismaConversationRepository } from '../repositories/prisma-conversation.repository';
describe('PrismaConversationRepository', () => {
let repository: PrismaConversationRepository;
let mockPrisma: {
conversation: {
create: ReturnType<typeof vi.fn>;
findUnique: ReturnType<typeof vi.fn>;
findMany: ReturnType<typeof vi.fn>;
findFirst: ReturnType<typeof vi.fn>;
update: ReturnType<typeof vi.fn>;
count: ReturnType<typeof vi.fn>;
};
conversationParticipant: {
updateMany: ReturnType<typeof vi.fn>;
};
};
const mockConversation = {
id: 'conv-1',
listingId: 'listing-1',
subject: null,
status: 'ACTIVE',
lastMessage: null,
lastMessageAt: null,
createdAt: new Date(),
updatedAt: new Date(),
participants: [
{ id: 'p-1', conversationId: 'conv-1', userId: 'user-1', unreadCount: 0, lastReadAt: null, joinedAt: new Date() },
{ id: 'p-2', conversationId: 'conv-1', userId: 'user-2', unreadCount: 0, lastReadAt: null, joinedAt: new Date() },
],
};
beforeEach(() => {
mockPrisma = {
conversation: {
create: vi.fn().mockResolvedValue(mockConversation),
findUnique: vi.fn().mockResolvedValue(mockConversation),
findMany: vi.fn().mockResolvedValue([mockConversation]),
findFirst: vi.fn().mockResolvedValue(null),
update: vi.fn().mockResolvedValue(mockConversation),
count: vi.fn().mockResolvedValue(1),
},
conversationParticipant: {
updateMany: vi.fn().mockResolvedValue({ count: 1 }),
},
};
repository = new PrismaConversationRepository(mockPrisma as any);
});
it('creates a conversation with participants', async () => {
const result = await repository.create({
listingId: 'listing-1',
participantUserIds: ['user-1', 'user-2'],
});
expect(mockPrisma.conversation.create).toHaveBeenCalledWith({
data: {
listingId: 'listing-1',
subject: null,
participants: {
create: [{ userId: 'user-1' }, { userId: 'user-2' }],
},
},
include: { participants: true },
});
expect(result.id).toBe('conv-1');
expect(result.participants).toHaveLength(2);
});
it('finds conversation by id', async () => {
const result = await repository.findById('conv-1');
expect(mockPrisma.conversation.findUnique).toHaveBeenCalledWith({
where: { id: 'conv-1' },
include: { participants: true },
});
expect(result?.id).toBe('conv-1');
});
it('returns null when conversation not found', async () => {
mockPrisma.conversation.findUnique.mockResolvedValue(null);
const result = await repository.findById('conv-999');
expect(result).toBeNull();
});
it('finds conversations by user id', async () => {
const result = await repository.findByUserId('user-1', 20, 0);
expect(mockPrisma.conversation.findMany).toHaveBeenCalledWith(
expect.objectContaining({
where: { participants: { some: { userId: 'user-1' } } },
take: 20,
skip: 0,
}),
);
expect(result).toHaveLength(1);
});
it('updates conversation status', async () => {
await repository.updateStatus('conv-1', 'CLOSED');
expect(mockPrisma.conversation.update).toHaveBeenCalledWith({
where: { id: 'conv-1' },
data: { status: 'CLOSED' },
});
});
it('increments unread count for other participants', async () => {
await repository.incrementUnreadCount('conv-1', 'user-1');
expect(mockPrisma.conversationParticipant.updateMany).toHaveBeenCalledWith({
where: { conversationId: 'conv-1', userId: { not: 'user-1' } },
data: { unreadCount: { increment: 1 } },
});
});
it('resets unread count for a user', async () => {
await repository.resetUnreadCount('conv-1', 'user-1');
expect(mockPrisma.conversationParticipant.updateMany).toHaveBeenCalledWith({
where: { conversationId: 'conv-1', userId: 'user-1' },
data: { unreadCount: 0, lastReadAt: expect.any(Date) },
});
});
it('counts conversations by user id', async () => {
const count = await repository.countByUserId('user-1');
expect(count).toBe(1);
});
});

View File

@@ -0,0 +1,88 @@
import { PrismaMessageRepository } from '../repositories/prisma-message.repository';
describe('PrismaMessageRepository', () => {
let repository: PrismaMessageRepository;
let mockPrisma: {
message: {
create: ReturnType<typeof vi.fn>;
findMany: ReturnType<typeof vi.fn>;
findUnique: ReturnType<typeof vi.fn>;
updateMany: ReturnType<typeof vi.fn>;
};
};
const mockMessage = {
id: 'msg-1',
conversationId: 'conv-1',
senderId: 'user-1',
type: 'TEXT',
content: 'Hello!',
metadata: null,
editedAt: null,
deletedAt: null,
createdAt: new Date(),
};
beforeEach(() => {
mockPrisma = {
message: {
create: vi.fn().mockResolvedValue(mockMessage),
findMany: vi.fn().mockResolvedValue([mockMessage]),
findUnique: vi.fn().mockResolvedValue(mockMessage),
updateMany: vi.fn().mockResolvedValue({ count: 1 }),
},
};
repository = new PrismaMessageRepository(mockPrisma as any);
});
it('creates a message', async () => {
const result = await repository.create({
conversationId: 'conv-1',
senderId: 'user-1',
type: 'TEXT',
content: 'Hello!',
});
expect(mockPrisma.message.create).toHaveBeenCalledWith({
data: {
conversationId: 'conv-1',
senderId: 'user-1',
type: 'TEXT',
content: 'Hello!',
metadata: undefined,
},
});
expect(result.id).toBe('msg-1');
expect(result.type).toBe('TEXT');
});
it('finds messages by conversation id', async () => {
const result = await repository.findByConversationId('conv-1', 50);
expect(result).toHaveLength(1);
expect(result[0]!.content).toBe('Hello!');
});
it('finds message by id', async () => {
const result = await repository.findById('msg-1');
expect(result?.id).toBe('msg-1');
});
it('returns null when message not found', async () => {
mockPrisma.message.findUnique.mockResolvedValue(null);
const result = await repository.findById('msg-999');
expect(result).toBeNull();
});
it('soft-deletes a message by sender only', async () => {
await repository.softDelete('msg-1', 'user-1');
expect(mockPrisma.message.updateMany).toHaveBeenCalledWith({
where: { id: 'msg-1', senderId: 'user-1' },
data: { deletedAt: expect.any(Date) },
});
});
});

View File

@@ -0,0 +1,2 @@
export { PrismaConversationRepository } from './repositories/prisma-conversation.repository';
export { PrismaMessageRepository } from './repositories/prisma-message.repository';

View File

@@ -0,0 +1,141 @@
import { Injectable } from '@nestjs/common';
import { type PrismaService } from '@modules/shared';
import type { ConversationEntity, ConversationParticipantEntity } from '../../domain/entities/conversation.entity';
import type {
IConversationRepository,
CreateConversationDto,
} from '../../domain/repositories/conversation.repository';
import type { ConversationStatus } from '../../domain/value-objects/conversation-status.vo';
@Injectable()
export class PrismaConversationRepository implements IConversationRepository {
constructor(private readonly prisma: PrismaService) {}
async create(dto: CreateConversationDto): Promise<ConversationEntity> {
const record = await this.prisma.conversation.create({
data: {
listingId: dto.listingId ?? null,
subject: dto.subject ?? null,
participants: {
create: dto.participantUserIds.map((userId) => ({ userId })),
},
},
include: { participants: true },
});
return this.toEntity(record);
}
async findById(id: string): Promise<ConversationEntity | null> {
const record = await this.prisma.conversation.findUnique({
where: { id },
include: { participants: true },
});
return record ? this.toEntity(record) : null;
}
async findByUserId(userId: string, limit = 20, offset = 0): Promise<ConversationEntity[]> {
const records = await this.prisma.conversation.findMany({
where: {
participants: { some: { userId } },
},
include: { participants: true },
orderBy: { lastMessageAt: { sort: 'desc', nulls: 'last' } },
take: limit,
skip: offset,
});
return records.map((r) => this.toEntity(r));
}
async findExistingBetweenUsers(
userIds: string[],
listingId?: string,
): Promise<ConversationEntity | null> {
const record = await this.prisma.conversation.findFirst({
where: {
status: 'ACTIVE',
...(listingId ? { listingId } : {}),
AND: userIds.map((userId) => ({
participants: { some: { userId } },
})),
participants: { every: { userId: { in: userIds } } },
},
include: { participants: true },
});
return record ? this.toEntity(record) : null;
}
async updateStatus(id: string, status: ConversationStatus): Promise<void> {
await this.prisma.conversation.update({
where: { id },
data: { status },
});
}
async updateLastMessage(id: string, content: string, sentAt: Date): Promise<void> {
await this.prisma.conversation.update({
where: { id },
data: {
lastMessage: content.length > 200 ? content.slice(0, 200) + '...' : content,
lastMessageAt: sentAt,
},
});
}
async incrementUnreadCount(conversationId: string, excludeUserId: string): Promise<void> {
await this.prisma.conversationParticipant.updateMany({
where: { conversationId, userId: { not: excludeUserId } },
data: { unreadCount: { increment: 1 } },
});
}
async resetUnreadCount(conversationId: string, userId: string): Promise<void> {
await this.prisma.conversationParticipant.updateMany({
where: { conversationId, userId },
data: { unreadCount: 0, lastReadAt: new Date() },
});
}
async countByUserId(userId: string): Promise<number> {
return this.prisma.conversation.count({
where: { participants: { some: { userId } } },
});
}
private toEntity(record: {
id: string;
listingId: string | null;
subject: string | null;
status: string;
lastMessage: string | null;
lastMessageAt: Date | null;
createdAt: Date;
updatedAt: Date;
participants: Array<{
id: string;
conversationId: string;
userId: string;
unreadCount: number;
lastReadAt: Date | null;
joinedAt: Date;
}>;
}): ConversationEntity {
return {
id: record.id,
listingId: record.listingId,
subject: record.subject,
status: record.status as ConversationStatus,
lastMessage: record.lastMessage,
lastMessageAt: record.lastMessageAt,
createdAt: record.createdAt,
updatedAt: record.updatedAt,
participants: record.participants.map((p): ConversationParticipantEntity => ({
id: p.id,
conversationId: p.conversationId,
userId: p.userId,
unreadCount: p.unreadCount,
lastReadAt: p.lastReadAt,
joinedAt: p.joinedAt,
})),
};
}
}

View File

@@ -0,0 +1,80 @@
import { Injectable } from '@nestjs/common';
import { type Prisma } from '@prisma/client';
import { type PrismaService } from '@modules/shared';
import type { MessageEntity } from '../../domain/entities/message.entity';
import type {
IMessageRepository,
CreateMessageDto,
} from '../../domain/repositories/message.repository';
import type { MessageType } from '../../domain/value-objects/message-type.vo';
@Injectable()
export class PrismaMessageRepository implements IMessageRepository {
constructor(private readonly prisma: PrismaService) {}
async create(dto: CreateMessageDto): Promise<MessageEntity> {
const record = await this.prisma.message.create({
data: {
conversationId: dto.conversationId,
senderId: dto.senderId,
type: dto.type,
content: dto.content,
metadata: (dto.metadata ?? undefined) as Prisma.InputJsonValue | undefined,
},
});
return this.toEntity(record);
}
async findByConversationId(
conversationId: string,
limit = 50,
before?: string,
): Promise<MessageEntity[]> {
const records = await this.prisma.message.findMany({
where: {
conversationId,
deletedAt: null,
...(before ? { createdAt: { lt: (await this.prisma.message.findUnique({ where: { id: before } }))?.createdAt } } : {}),
},
orderBy: { createdAt: 'desc' },
take: limit,
});
return records.map((r) => this.toEntity(r));
}
async findById(id: string): Promise<MessageEntity | null> {
const record = await this.prisma.message.findUnique({ where: { id } });
return record ? this.toEntity(record) : null;
}
async softDelete(id: string, senderId: string): Promise<void> {
await this.prisma.message.updateMany({
where: { id, senderId },
data: { deletedAt: new Date() },
});
}
private toEntity(record: {
id: string;
conversationId: string;
senderId: string;
type: string;
content: string;
metadata: unknown;
editedAt: Date | null;
deletedAt: Date | null;
createdAt: Date;
}): MessageEntity {
return {
id: record.id,
conversationId: record.conversationId,
senderId: record.senderId,
type: record.type as MessageType,
content: record.content,
metadata: record.metadata as Record<string, unknown> | null,
editedAt: record.editedAt,
deletedAt: record.deletedAt,
createdAt: record.createdAt,
};
}
}

View File

@@ -0,0 +1,44 @@
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { AuthModule } from '@modules/auth';
import { CreateConversationHandler } from './application/commands/create-conversation/create-conversation.handler';
import { MarkConversationReadHandler } from './application/commands/mark-read/mark-read.handler';
import { SendMessageHandler } from './application/commands/send-message/send-message.handler';
import { GetConversationsHandler } from './application/queries/get-conversations/get-conversations.handler';
import { GetMessagesHandler } from './application/queries/get-messages/get-messages.handler';
import { CONVERSATION_REPOSITORY } from './domain/repositories/conversation.repository';
import { MESSAGE_REPOSITORY } from './domain/repositories/message.repository';
import { PrismaConversationRepository } from './infrastructure/repositories/prisma-conversation.repository';
import { PrismaMessageRepository } from './infrastructure/repositories/prisma-message.repository';
import { MessagingController } from './presentation/controllers/messaging.controller';
import { MessagingGateway } from './presentation/gateways/messaging.gateway';
const CommandHandlers = [
CreateConversationHandler,
SendMessageHandler,
MarkConversationReadHandler,
];
const QueryHandlers = [
GetConversationsHandler,
GetMessagesHandler,
];
@Module({
imports: [CqrsModule, AuthModule],
controllers: [MessagingController],
providers: [
// Repositories
{ provide: CONVERSATION_REPOSITORY, useClass: PrismaConversationRepository },
{ provide: MESSAGE_REPOSITORY, useClass: PrismaMessageRepository },
// WebSocket Gateway
MessagingGateway,
// CQRS
...CommandHandlers,
...QueryHandlers,
],
exports: [MessagingGateway],
})
export class MessagingModule {}

View File

@@ -0,0 +1,97 @@
import { CreateConversationCommand } from '../../application/commands/create-conversation/create-conversation.command';
import { MarkConversationReadCommand } from '../../application/commands/mark-read/mark-read.command';
import { SendMessageCommand } from '../../application/commands/send-message/send-message.command';
import { GetConversationsQuery } from '../../application/queries/get-conversations/get-conversations.query';
import { GetMessagesQuery } from '../../application/queries/get-messages/get-messages.query';
import { MessagingController } from '../controllers/messaging.controller';
describe('MessagingController', () => {
let controller: MessagingController;
let mockCommandBus: { execute: ReturnType<typeof vi.fn> };
let mockQueryBus: { execute: ReturnType<typeof vi.fn> };
let mockMessageRepo: { softDelete: ReturnType<typeof vi.fn> };
const user = { sub: 'user-1', role: 'BUYER' };
beforeEach(() => {
mockCommandBus = { execute: vi.fn() };
mockQueryBus = { execute: vi.fn() };
mockMessageRepo = { softDelete: vi.fn() };
controller = new MessagingController(
mockCommandBus as any,
mockQueryBus as any,
mockMessageRepo as any,
);
});
describe('createConversation', () => {
it('dispatches CreateConversationCommand', async () => {
const dto = { participantUserId: 'user-2', listingId: 'listing-1', initialMessage: 'Hi' };
mockCommandBus.execute.mockResolvedValue({ id: 'conv-1' });
const result = await controller.createConversation(user as any, dto as any);
expect(mockCommandBus.execute).toHaveBeenCalledWith(
expect.any(CreateConversationCommand),
);
expect(result.id).toBe('conv-1');
});
});
describe('getConversations', () => {
it('dispatches GetConversationsQuery', async () => {
mockQueryBus.execute.mockResolvedValue({ conversations: [], total: 0 });
await controller.getConversations(user as any, 20, 0);
expect(mockQueryBus.execute).toHaveBeenCalledWith(
expect.any(GetConversationsQuery),
);
});
});
describe('getMessages', () => {
it('dispatches GetMessagesQuery', async () => {
mockQueryBus.execute.mockResolvedValue([]);
await controller.getMessages(user as any, 'conv-1', 50);
expect(mockQueryBus.execute).toHaveBeenCalledWith(
expect.any(GetMessagesQuery),
);
});
});
describe('sendMessage', () => {
it('dispatches SendMessageCommand', async () => {
const dto = { content: 'Hello!' };
mockCommandBus.execute.mockResolvedValue({ id: 'msg-1' });
const result = await controller.sendMessage(user as any, 'conv-1', dto as any);
expect(mockCommandBus.execute).toHaveBeenCalledWith(
expect.any(SendMessageCommand),
);
expect(result.id).toBe('msg-1');
});
});
describe('markAsRead', () => {
it('dispatches MarkConversationReadCommand', async () => {
await controller.markAsRead(user as any, 'conv-1');
expect(mockCommandBus.execute).toHaveBeenCalledWith(
expect.any(MarkConversationReadCommand),
);
});
});
describe('deleteMessage', () => {
it('calls messageRepo.softDelete with user id', async () => {
await controller.deleteMessage(user as any, 'msg-1');
expect(mockMessageRepo.softDelete).toHaveBeenCalledWith('msg-1', 'user-1');
});
});
});

View File

@@ -0,0 +1,199 @@
import { MessagingGateway } from '../gateways/messaging.gateway';
describe('MessagingGateway', () => {
let gateway: MessagingGateway;
let mockTokenService: { verifyAccessToken: ReturnType<typeof vi.fn> };
let mockLogger: { log: ReturnType<typeof vi.fn>; warn: ReturnType<typeof vi.fn>; error: ReturnType<typeof vi.fn>; debug: ReturnType<typeof vi.fn> };
let mockCommandBus: { execute: ReturnType<typeof vi.fn> };
let mockConversationRepo: {
findByUserId: ReturnType<typeof vi.fn>;
findById: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
mockTokenService = { verifyAccessToken: vi.fn() };
mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() };
mockCommandBus = { execute: vi.fn() };
mockConversationRepo = {
findByUserId: vi.fn().mockResolvedValue([]),
findById: vi.fn(),
};
gateway = new MessagingGateway(
mockTokenService as any,
mockLogger as any,
mockCommandBus as any,
mockConversationRepo as any,
);
gateway.server = {
to: vi.fn().mockReturnThis(),
emit: vi.fn(),
} as any;
});
describe('handleConnection', () => {
it('authenticates client and joins rooms', async () => {
mockTokenService.verifyAccessToken.mockReturnValue({ sub: 'user-1', role: 'BUYER' });
mockConversationRepo.findByUserId.mockResolvedValue([
{ id: 'conv-1' },
{ id: 'conv-2' },
]);
const client = {
id: 'socket-1',
data: {},
handshake: { auth: { token: 'valid-token' }, headers: {}, query: {} },
join: vi.fn(),
emit: vi.fn(),
disconnect: vi.fn(),
};
await gateway.handleConnection(client as any);
expect(client.data['userId']).toBe('user-1');
expect(client.join).toHaveBeenCalledWith('user:user-1');
expect(client.join).toHaveBeenCalledWith('conversation:conv-1');
expect(client.join).toHaveBeenCalledWith('conversation:conv-2');
});
it('disconnects client with no token', async () => {
const client = {
id: 'socket-1',
data: {},
handshake: { auth: {}, headers: {}, query: {} },
join: vi.fn(),
emit: vi.fn(),
disconnect: vi.fn(),
};
mockTokenService.verifyAccessToken.mockReturnValue(null);
await gateway.handleConnection(client as any);
expect(client.disconnect).toHaveBeenCalledWith(true);
});
it('disconnects client with invalid token', async () => {
mockTokenService.verifyAccessToken.mockReturnValue(null);
const client = {
id: 'socket-1',
data: {},
handshake: { auth: { token: 'invalid' }, headers: {}, query: {} },
join: vi.fn(),
emit: vi.fn(),
disconnect: vi.fn(),
};
await gateway.handleConnection(client as any);
expect(client.disconnect).toHaveBeenCalledWith(true);
});
});
describe('handleDisconnect', () => {
it('cleans up socket tracking', () => {
// First connect
(gateway as any).userSockets.set('user-1', new Set(['socket-1']));
const client = {
id: 'socket-1',
data: { userId: 'user-1' },
};
gateway.handleDisconnect(client as any);
expect((gateway as any).userSockets.has('user-1')).toBe(false);
});
});
describe('handleMessageSent', () => {
it('emits message:new to conversation room', async () => {
const event = {
aggregateId: 'msg-1',
conversationId: 'conv-1',
senderId: 'user-1',
type: 'TEXT',
content: 'Hello!',
occurredAt: new Date(),
};
await gateway.handleMessageSent(event as any);
expect(gateway.server.to).toHaveBeenCalledWith('conversation:conv-1');
});
});
describe('handleSendMessage', () => {
it('sends message via command bus', async () => {
mockCommandBus.execute.mockResolvedValue({ id: 'msg-1' });
const client = {
data: { userId: 'user-1' },
emit: vi.fn(),
};
await gateway.handleSendMessage(
client as any,
{ conversationId: 'conv-1', content: 'Hello!' },
);
expect(mockCommandBus.execute).toHaveBeenCalled();
expect(client.emit).toHaveBeenCalledWith('message:sent', {
messageId: 'msg-1',
conversationId: 'conv-1',
});
});
it('emits error when send fails', async () => {
mockCommandBus.execute.mockRejectedValue(new Error('Send failed'));
const client = {
data: { userId: 'user-1' },
emit: vi.fn(),
};
await gateway.handleSendMessage(
client as any,
{ conversationId: 'conv-1', content: 'Hello!' },
);
expect(client.emit).toHaveBeenCalledWith('message:error', {
conversationId: 'conv-1',
error: 'Send failed',
});
});
});
describe('handleJoinConversation', () => {
it('joins room when user is participant', async () => {
mockConversationRepo.findById.mockResolvedValue({
participants: [{ userId: 'user-1' }],
});
const client = {
data: { userId: 'user-1' },
join: vi.fn(),
};
await gateway.handleJoinConversation(client as any, { conversationId: 'conv-1' });
expect(client.join).toHaveBeenCalledWith('conversation:conv-1');
});
it('does not join room when user is not participant', async () => {
mockConversationRepo.findById.mockResolvedValue({
participants: [{ userId: 'user-2' }],
});
const client = {
data: { userId: 'user-1' },
join: vi.fn(),
};
await gateway.handleJoinConversation(client as any, { conversationId: 'conv-1' });
expect(client.join).not.toHaveBeenCalled();
});
});
});

View File

@@ -0,0 +1,188 @@
import {
Controller,
Get,
Post,
Patch,
Delete,
Body,
Param,
Query,
Inject,
UseGuards,
HttpCode,
HttpStatus,
} from '@nestjs/common';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { CommandBus, QueryBus } from '@nestjs/cqrs';
import { AuthGuard } from '@nestjs/passport';
import { ApiTags, ApiOperation, ApiResponse, ApiBearerAuth, ApiQuery, ApiProperty } from '@nestjs/swagger';
import { MessageType as PrismaMessageType } from '@prisma/client';
import { IsString, IsOptional, IsEnum, MaxLength } from 'class-validator';
import { CurrentUser, type JwtPayload } from '@modules/auth';
import { CreateConversationCommand } from '../../application/commands/create-conversation/create-conversation.command';
import { MarkConversationReadCommand } from '../../application/commands/mark-read/mark-read.command';
import { SendMessageCommand } from '../../application/commands/send-message/send-message.command';
import { GetConversationsQuery } from '../../application/queries/get-conversations/get-conversations.query';
import { GetMessagesQuery } from '../../application/queries/get-messages/get-messages.query';
import {
MESSAGE_REPOSITORY,
type IMessageRepository,
} from '../../domain/repositories/message.repository';
class CreateConversationDto {
@ApiProperty({ description: 'User ID of the other participant' })
@IsString()
participantUserId!: string;
@ApiProperty({ required: false, description: 'Associated listing ID' })
@IsString()
@IsOptional()
listingId?: string;
@ApiProperty({ required: false, description: 'Conversation subject' })
@IsString()
@IsOptional()
@MaxLength(200)
subject?: string;
@ApiProperty({ required: false, description: 'Initial message text' })
@IsString()
@IsOptional()
@MaxLength(5000)
initialMessage?: string;
}
class SendMessageDto {
@ApiProperty({ description: 'Message content' })
@IsString()
@MaxLength(5000)
content!: string;
@ApiProperty({ enum: PrismaMessageType, required: false, default: 'TEXT' })
@IsEnum(PrismaMessageType)
@IsOptional()
type?: PrismaMessageType;
@ApiProperty({ required: false, description: 'Additional metadata (e.g. file URL)' })
@IsOptional()
metadata?: Record<string, unknown>;
}
@ApiTags('messaging')
@ApiBearerAuth('JWT')
@Controller('messaging')
@UseGuards(AuthGuard('jwt'))
export class MessagingController {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus,
@Inject(MESSAGE_REPOSITORY)
private readonly messageRepo: IMessageRepository,
) {}
@Post('conversations')
@ApiOperation({ summary: 'Create a conversation or return existing one' })
@ApiResponse({ status: 201, description: 'Conversation created' })
@ApiResponse({ status: 400, description: 'Cannot create conversation with yourself' })
@ApiResponse({ status: 401, description: 'Unauthorized' })
async createConversation(
@CurrentUser() user: JwtPayload,
@Body() dto: CreateConversationDto,
) {
return this.commandBus.execute(
new CreateConversationCommand(
user.sub,
dto.participantUserId,
dto.listingId,
dto.subject,
dto.initialMessage,
),
);
}
@Get('conversations')
@ApiOperation({ summary: 'List user conversations' })
@ApiResponse({ status: 200, description: 'Conversations retrieved' })
@ApiResponse({ status: 401, description: 'Unauthorized' })
@ApiQuery({ name: 'limit', required: false, type: Number })
@ApiQuery({ name: 'offset', required: false, type: Number })
async getConversations(
@CurrentUser() user: JwtPayload,
@Query('limit') limit?: number,
@Query('offset') offset?: number,
) {
return this.queryBus.execute(
new GetConversationsQuery(user.sub, limit ?? 20, offset ?? 0),
);
}
@Get('conversations/:id/messages')
@ApiOperation({ summary: 'Get messages in a conversation' })
@ApiResponse({ status: 200, description: 'Messages retrieved' })
@ApiResponse({ status: 401, description: 'Unauthorized' })
@ApiResponse({ status: 403, description: 'Not a participant' })
@ApiResponse({ status: 404, description: 'Conversation not found' })
@ApiQuery({ name: 'limit', required: false, type: Number })
@ApiQuery({ name: 'before', required: false, type: String, description: 'Cursor: message ID for pagination' })
async getMessages(
@CurrentUser() user: JwtPayload,
@Param('id') conversationId: string,
@Query('limit') limit?: number,
@Query('before') before?: string,
) {
return this.queryBus.execute(
new GetMessagesQuery(conversationId, user.sub, limit ?? 50, before),
);
}
@Post('conversations/:id/messages')
@HttpCode(HttpStatus.CREATED)
@ApiOperation({ summary: 'Send a message in a conversation' })
@ApiResponse({ status: 201, description: 'Message sent' })
@ApiResponse({ status: 401, description: 'Unauthorized' })
@ApiResponse({ status: 403, description: 'Not a participant' })
@ApiResponse({ status: 404, description: 'Conversation not found' })
async sendMessage(
@CurrentUser() user: JwtPayload,
@Param('id') conversationId: string,
@Body() dto: SendMessageDto,
) {
return this.commandBus.execute(
new SendMessageCommand(
conversationId,
user.sub,
dto.content,
dto.type ?? 'TEXT',
dto.metadata,
),
);
}
@Patch('conversations/:id/read')
@HttpCode(HttpStatus.NO_CONTENT)
@ApiOperation({ summary: 'Mark a conversation as read' })
@ApiResponse({ status: 204, description: 'Conversation marked as read' })
@ApiResponse({ status: 401, description: 'Unauthorized' })
@ApiResponse({ status: 403, description: 'Not a participant' })
@ApiResponse({ status: 404, description: 'Conversation not found' })
async markAsRead(
@CurrentUser() user: JwtPayload,
@Param('id') conversationId: string,
) {
await this.commandBus.execute(
new MarkConversationReadCommand(conversationId, user.sub),
);
}
@Delete('conversations/:conversationId/messages/:messageId')
@HttpCode(HttpStatus.NO_CONTENT)
@ApiOperation({ summary: 'Soft-delete a message (sender only)' })
@ApiResponse({ status: 204, description: 'Message deleted' })
@ApiResponse({ status: 401, description: 'Unauthorized' })
async deleteMessage(
@CurrentUser() user: JwtPayload,
@Param('messageId') messageId: string,
) {
await this.messageRepo.softDelete(messageId, user.sub);
}
}

View File

@@ -0,0 +1,257 @@
import { Inject } from '@nestjs/common';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { CommandBus } from '@nestjs/cqrs';
import { OnEvent } from '@nestjs/event-emitter';
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
MessageBody,
ConnectedSocket,
type OnGatewayConnection,
type OnGatewayDisconnect,
type OnGatewayInit,
} from '@nestjs/websockets';
import type { Server, Socket } from 'socket.io';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { TokenService, type JwtPayload } from '@modules/auth';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports
import { LoggerService } from '@modules/shared';
import { MarkConversationReadCommand } from '../../application/commands/mark-read/mark-read.command';
import { SendMessageCommand } from '../../application/commands/send-message/send-message.command';
import type { MessageSentEvent } from '../../domain/events/message-sent.event';
import {
CONVERSATION_REPOSITORY,
type IConversationRepository,
} from '../../domain/repositories/conversation.repository';
@WebSocketGateway({
namespace: '/messaging',
cors: {
origin: (process.env['CORS_ORIGINS'] ?? 'http://localhost:3000')
.split(',')
.map((o) => o.trim()),
credentials: true,
},
})
export class MessagingGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
@WebSocketServer()
server!: Server;
private readonly userSockets = new Map<string, Set<string>>();
constructor(
private readonly tokenService: TokenService,
private readonly logger: LoggerService,
private readonly commandBus: CommandBus,
@Inject(CONVERSATION_REPOSITORY)
private readonly conversationRepo: IConversationRepository,
) {}
afterInit(): void {
this.logger.log('MessagingGateway initialized', 'MessagingGateway');
}
async handleConnection(client: Socket): Promise<void> {
try {
const payload = this.extractAndVerifyToken(client);
if (!payload) {
client.disconnect(true);
return;
}
client.data['userId'] = payload.sub;
// Join user's personal room for direct messaging events
await client.join(`user:${payload.sub}`);
// Join rooms for all active conversations
const conversations = await this.conversationRepo.findByUserId(payload.sub, 100);
for (const conv of conversations) {
await client.join(`conversation:${conv.id}`);
}
if (!this.userSockets.has(payload.sub)) {
this.userSockets.set(payload.sub, new Set());
}
this.userSockets.get(payload.sub)!.add(client.id);
this.logger.debug(
`WS messaging connected: user=${payload.sub} socket=${client.id} conversations=${conversations.length}`,
'MessagingGateway',
);
} catch (error) {
this.logger.error(
`WS messaging connection error: ${error instanceof Error ? error.message : error}`,
error instanceof Error ? error.stack : undefined,
'MessagingGateway',
);
client.disconnect(true);
}
}
handleDisconnect(client: Socket): void {
const userId = client.data['userId'] as string | undefined;
if (userId) {
const sockets = this.userSockets.get(userId);
if (sockets) {
sockets.delete(client.id);
if (sockets.size === 0) {
this.userSockets.delete(userId);
}
}
}
this.logger.debug(
`WS messaging disconnected: user=${userId ?? 'unknown'} socket=${client.id}`,
'MessagingGateway',
);
}
/* ────────────────────────────────────────────
* Client → Server message handlers
* ──────────────────────────────────────────── */
@SubscribeMessage('message:send')
async handleSendMessage(
@ConnectedSocket() client: Socket,
@MessageBody() data: { conversationId: string; content: string; type?: string },
): Promise<void> {
const userId = client.data['userId'] as string;
if (!userId) return;
try {
const message = await this.commandBus.execute(
new SendMessageCommand(
data.conversationId,
userId,
data.content,
(data.type as 'TEXT' | 'IMAGE' | 'FILE') ?? 'TEXT',
),
);
client.emit('message:sent', { messageId: message.id, conversationId: data.conversationId });
} catch (error) {
client.emit('message:error', {
conversationId: data.conversationId,
error: error instanceof Error ? error.message : 'Không thể gửi tin nhắn',
});
}
}
@SubscribeMessage('message:typing')
async handleTyping(
@ConnectedSocket() client: Socket,
@MessageBody() data: { conversationId: string },
): Promise<void> {
const userId = client.data['userId'] as string;
if (!userId) return;
client.to(`conversation:${data.conversationId}`).emit('message:typing', {
conversationId: data.conversationId,
userId,
});
}
@SubscribeMessage('message:stop-typing')
async handleStopTyping(
@ConnectedSocket() client: Socket,
@MessageBody() data: { conversationId: string },
): Promise<void> {
const userId = client.data['userId'] as string;
if (!userId) return;
client.to(`conversation:${data.conversationId}`).emit('message:stop-typing', {
conversationId: data.conversationId,
userId,
});
}
@SubscribeMessage('conversation:read')
async handleMarkRead(
@ConnectedSocket() client: Socket,
@MessageBody() data: { conversationId: string },
): Promise<void> {
const userId = client.data['userId'] as string;
if (!userId) return;
try {
await this.commandBus.execute(
new MarkConversationReadCommand(data.conversationId, userId),
);
client.emit('conversation:read-ack', { conversationId: data.conversationId });
} catch {
// Non-critical — silently fail
}
}
@SubscribeMessage('conversation:join')
async handleJoinConversation(
@ConnectedSocket() client: Socket,
@MessageBody() data: { conversationId: string },
): Promise<void> {
const userId = client.data['userId'] as string;
if (!userId) return;
// Verify the user is a participant before joining the room
const conversation = await this.conversationRepo.findById(data.conversationId);
if (conversation && conversation.participants.some((p) => p.userId === userId)) {
await client.join(`conversation:${data.conversationId}`);
}
}
/* ────────────────────────────────────────────
* Domain event handlers
* ──────────────────────────────────────────── */
@OnEvent('message.sent', { async: true })
async handleMessageSent(event: MessageSentEvent): Promise<void> {
try {
this.server.to(`conversation:${event.conversationId}`).emit('message:new', {
id: event.aggregateId,
conversationId: event.conversationId,
senderId: event.senderId,
type: event.type,
content: event.content,
createdAt: event.occurredAt.toISOString(),
});
} catch (error) {
this.logger.error(
`Failed to emit WS message for conversation ${event.conversationId}: ${
error instanceof Error ? error.message : error
}`,
error instanceof Error ? error.stack : undefined,
'MessagingGateway',
);
}
}
/* ────────────────────────────────────────────
* Private helpers
* ──────────────────────────────────────────── */
private extractAndVerifyToken(client: Socket): JwtPayload | null {
const raw: unknown =
client.handshake.auth?.['token'] ??
client.handshake.headers?.['authorization'] ??
client.handshake.query?.['token'];
if (!raw || typeof raw !== 'string') {
this.logger.warn(
`WS messaging auth failed: no token provided (socket=${client.id})`,
'MessagingGateway',
);
return null;
}
const token = raw.startsWith('Bearer ') ? raw.slice(7) : raw;
const payload = this.tokenService.verifyAccessToken(token);
if (!payload) {
this.logger.warn(
`WS messaging auth failed: invalid token (socket=${client.id})`,
'MessagingGateway',
);
}
return payload;
}
}

View File

@@ -880,3 +880,212 @@ model Review {
@@index([userId]) @@index([userId])
@@index([targetType, targetId, createdAt(sort: Desc)]) @@index([targetType, targetId, createdAt(sort: Desc)])
} }
// =============================================================================
// INDUSTRIAL PARKS (KCN)
// =============================================================================
enum IndustrialParkStatus {
PLANNING
UNDER_CONSTRUCTION
OPERATIONAL
FULL
}
enum IndustrialPropertyType {
INDUSTRIAL_LAND
READY_BUILT_FACTORY
READY_BUILT_WAREHOUSE
LOGISTICS_CENTER
OFFICE_IN_PARK
DATA_CENTER
}
enum IndustrialLeaseType {
LAND_LEASE
FACTORY_LEASE
WAREHOUSE_LEASE
SUBLEASE
}
enum IndustrialListingStatus {
DRAFT
ACTIVE
RESERVED
LEASED
EXPIRED
}
enum VietnamRegion {
NORTH
CENTRAL
SOUTH
}
model IndustrialPark {
id String @id @default(cuid())
name String
nameEn String?
slug String @unique
developer String
operator String?
status IndustrialParkStatus @default(PLANNING)
location Unsupported("geometry(Point, 4326)")
address String
district String
province String
region VietnamRegion
totalAreaHa Float
leasableAreaHa Float
occupancyRate Float @default(0) // 0-100
remainingAreaHa Float
tenantCount Int @default(0)
establishedYear Int?
landRentUsdM2Year Float?
rbfRentUsdM2Month Float?
rbwRentUsdM2Month Float?
managementFeeUsd Float?
infrastructure Json? // { electricity, water, wastewater, telecom, roads, fire }
connectivity Json? // { nearestPort, airport, highway, railway, seaport }
incentives Json? // { taxHoliday, importDuty, landRentReduction, specialZone }
targetIndustries String[]
existingTenants Json? // [{ name, country, industry }]
certifications Json? // ["ISO 14001", "Green park"]
media Json?
documents Json?
description String? @db.Text
descriptionEn String? @db.Text
isVerified Boolean @default(false)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
listings IndustrialListing[]
@@index([status])
@@index([province])
@@index([region])
@@index([developer])
@@index([location], type: Gist)
@@index([isVerified])
@@index([occupancyRate])
@@index([landRentUsdM2Year])
@@index([region, province, status])
@@index([createdAt])
}
model IndustrialListing {
id String @id @default(cuid())
parkId String
park IndustrialPark @relation(fields: [parkId], references: [id], onDelete: Cascade)
agentId String?
sellerId String
propertyType IndustrialPropertyType
leaseType IndustrialLeaseType
status IndustrialListingStatus @default(DRAFT)
title String
description String? @db.Text
areaM2 Float
ceilingHeightM Float?
floorLoadTonM2 Float?
columnSpacingM Float?
dockCount Int?
craneCapacityTon Float?
hasMezzanine Boolean @default(false)
hasOfficeArea Boolean @default(false)
officeAreaM2 Float?
priceUsdM2 Float?
pricingUnit String? // "usd/m2/month", "usd/m2/year"
totalLeasePrice Float?
managementFee Float?
depositMonths Int?
minLeaseYears Int?
maxLeaseYears Int?
leaseExpiry DateTime?
availableFrom DateTime?
powerCapacityKva Float?
waterSupplyM3Day Float?
media Json?
viewCount Int @default(0)
inquiryCount Int @default(0)
publishedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@index([parkId])
@@index([propertyType])
@@index([leaseType])
@@index([status])
@@index([areaM2])
@@index([priceUsdM2])
@@index([sellerId])
@@index([agentId])
@@index([publishedAt])
@@index([parkId, status])
@@index([propertyType, leaseType, status])
@@index([status, publishedAt(sort: Desc)])
}
// =============================================================================
// MESSAGING (buyer ↔ agent / seller in-app chat)
// =============================================================================
enum ConversationStatus {
ACTIVE
ARCHIVED
CLOSED
}
model Conversation {
id String @id @default(cuid())
listingId String?
subject String?
status ConversationStatus @default(ACTIVE)
lastMessage String? @db.Text
lastMessageAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
participants ConversationParticipant[]
messages Message[]
@@index([status])
@@index([lastMessageAt(sort: Desc)])
@@index([listingId])
}
model ConversationParticipant {
id String @id @default(cuid())
conversationId String
conversation Conversation @relation(fields: [conversationId], references: [id], onDelete: Cascade)
userId String
unreadCount Int @default(0)
lastReadAt DateTime?
joinedAt DateTime @default(now())
@@unique([conversationId, userId])
@@index([userId])
@@index([conversationId])
}
enum MessageType {
TEXT
IMAGE
FILE
SYSTEM
}
model Message {
id String @id @default(cuid())
conversationId String
conversation Conversation @relation(fields: [conversationId], references: [id], onDelete: Cascade)
senderId String
type MessageType @default(TEXT)
content String @db.Text
metadata Json?
editedAt DateTime?
deletedAt DateTime?
createdAt DateTime @default(now())
@@index([conversationId, createdAt])
@@index([senderId])
}