From 3b5da2dcf93e21c8c665cad0938a038086045dd8 Mon Sep 17 00:00:00 2001 From: Ho Ngoc Hai Date: Thu, 16 Apr 2026 05:36:04 +0700 Subject: [PATCH] feat(messaging): add in-app messaging module with Conversation + Message models Implements buyer-agent in-app messaging (Task 8.4): - Prisma models: Conversation, ConversationParticipant, Message - Full DDD module: domain entities, repository interfaces, CQRS commands/queries - REST API: POST/GET conversations, POST/GET messages, PATCH read, DELETE messages - WebSocket gateway (/messaging namespace): real-time message delivery, typing indicators, room-based routing - 46 unit tests covering handlers, repositories, controller, and gateway Co-Authored-By: Paperclip --- apps/api/src/app.module.ts | 2 + .../create-conversation.handler.spec.ts | 133 +++++++++ .../get-conversations.handler.spec.ts | 44 +++ .../__tests__/get-messages.handler.spec.ts | 71 +++++ .../__tests__/mark-read.handler.spec.ts | 55 ++++ .../__tests__/send-message.handler.spec.ts | 113 ++++++++ .../create-conversation.command.ts | 9 + .../create-conversation.handler.ts | 94 +++++++ .../commands/mark-read/mark-read.command.ts | 6 + .../commands/mark-read/mark-read.handler.ts | 43 +++ .../send-message/send-message.command.ts | 11 + .../send-message/send-message.handler.ts | 77 ++++++ .../get-conversations.handler.ts | 24 ++ .../get-conversations.query.ts | 7 + .../get-messages/get-messages.handler.ts | 38 +++ .../get-messages/get-messages.query.ts | 8 + .../domain/__tests__/messaging-domain.spec.ts | 18 ++ .../domain/entities/conversation.entity.ts | 22 ++ .../domain/entities/message.entity.ts | 13 + .../domain/events/message-sent.event.ts | 19 ++ .../api/src/modules/messaging/domain/index.ts | 15 + .../repositories/conversation.repository.ts | 22 ++ .../domain/repositories/message.repository.ts | 19 ++ .../value-objects/conversation-status.vo.ts | 1 + .../domain/value-objects/message-type.vo.ts | 3 + apps/api/src/modules/messaging/index.ts | 4 + .../prisma-conversation.repository.spec.ts | 134 +++++++++ .../prisma-message.repository.spec.ts | 88 ++++++ .../modules/messaging/infrastructure/index.ts | 2 + .../prisma-conversation.repository.ts | 141 ++++++++++ .../repositories/prisma-message.repository.ts | 80 ++++++ .../src/modules/messaging/messaging.module.ts | 44 +++ .../__tests__/messaging.controller.spec.ts | 97 +++++++ .../__tests__/messaging.gateway.spec.ts | 199 ++++++++++++++ .../controllers/messaging.controller.ts | 188 +++++++++++++ .../gateways/messaging.gateway.ts | 257 ++++++++++++++++++ prisma/schema.prisma | 209 ++++++++++++++ 37 files changed, 2310 insertions(+) create mode 100644 apps/api/src/modules/messaging/application/__tests__/create-conversation.handler.spec.ts create mode 100644 apps/api/src/modules/messaging/application/__tests__/get-conversations.handler.spec.ts create mode 100644 apps/api/src/modules/messaging/application/__tests__/get-messages.handler.spec.ts create mode 100644 apps/api/src/modules/messaging/application/__tests__/mark-read.handler.spec.ts create mode 100644 apps/api/src/modules/messaging/application/__tests__/send-message.handler.spec.ts create mode 100644 apps/api/src/modules/messaging/application/commands/create-conversation/create-conversation.command.ts create mode 100644 apps/api/src/modules/messaging/application/commands/create-conversation/create-conversation.handler.ts create mode 100644 apps/api/src/modules/messaging/application/commands/mark-read/mark-read.command.ts create mode 100644 apps/api/src/modules/messaging/application/commands/mark-read/mark-read.handler.ts create mode 100644 apps/api/src/modules/messaging/application/commands/send-message/send-message.command.ts create mode 100644 apps/api/src/modules/messaging/application/commands/send-message/send-message.handler.ts create mode 100644 apps/api/src/modules/messaging/application/queries/get-conversations/get-conversations.handler.ts create mode 100644 apps/api/src/modules/messaging/application/queries/get-conversations/get-conversations.query.ts create mode 100644 apps/api/src/modules/messaging/application/queries/get-messages/get-messages.handler.ts create mode 100644 apps/api/src/modules/messaging/application/queries/get-messages/get-messages.query.ts create mode 100644 apps/api/src/modules/messaging/domain/__tests__/messaging-domain.spec.ts create mode 100644 apps/api/src/modules/messaging/domain/entities/conversation.entity.ts create mode 100644 apps/api/src/modules/messaging/domain/entities/message.entity.ts create mode 100644 apps/api/src/modules/messaging/domain/events/message-sent.event.ts create mode 100644 apps/api/src/modules/messaging/domain/index.ts create mode 100644 apps/api/src/modules/messaging/domain/repositories/conversation.repository.ts create mode 100644 apps/api/src/modules/messaging/domain/repositories/message.repository.ts create mode 100644 apps/api/src/modules/messaging/domain/value-objects/conversation-status.vo.ts create mode 100644 apps/api/src/modules/messaging/domain/value-objects/message-type.vo.ts create mode 100644 apps/api/src/modules/messaging/index.ts create mode 100644 apps/api/src/modules/messaging/infrastructure/__tests__/prisma-conversation.repository.spec.ts create mode 100644 apps/api/src/modules/messaging/infrastructure/__tests__/prisma-message.repository.spec.ts create mode 100644 apps/api/src/modules/messaging/infrastructure/index.ts create mode 100644 apps/api/src/modules/messaging/infrastructure/repositories/prisma-conversation.repository.ts create mode 100644 apps/api/src/modules/messaging/infrastructure/repositories/prisma-message.repository.ts create mode 100644 apps/api/src/modules/messaging/messaging.module.ts create mode 100644 apps/api/src/modules/messaging/presentation/__tests__/messaging.controller.spec.ts create mode 100644 apps/api/src/modules/messaging/presentation/__tests__/messaging.gateway.spec.ts create mode 100644 apps/api/src/modules/messaging/presentation/controllers/messaging.controller.ts create mode 100644 apps/api/src/modules/messaging/presentation/gateways/messaging.gateway.ts diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index e46ea07..4f3bb93 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -13,6 +13,7 @@ import { InquiriesModule } from '@modules/inquiries'; import { LeadsModule } from '@modules/leads'; import { ListingsModule } from '@modules/listings'; import { McpIntegrationModule } from '@modules/mcp'; +import { MessagingModule } from '@modules/messaging'; import { HttpMetricsInterceptor, MetricsModule } from '@modules/metrics'; import { NotificationsModule } from '@modules/notifications'; import { PaymentsModule } from '@modules/payments'; @@ -46,6 +47,7 @@ import { AppController } from './app.controller'; AnalyticsModule, MetricsModule, McpIntegrationModule, + MessagingModule, // ── Rate Limiting ── // Default: 60 requests per 60 seconds per IP diff --git a/apps/api/src/modules/messaging/application/__tests__/create-conversation.handler.spec.ts b/apps/api/src/modules/messaging/application/__tests__/create-conversation.handler.spec.ts new file mode 100644 index 0000000..48b81d9 --- /dev/null +++ b/apps/api/src/modules/messaging/application/__tests__/create-conversation.handler.spec.ts @@ -0,0 +1,133 @@ +import { CreateConversationCommand } from '../commands/create-conversation/create-conversation.command'; +import { CreateConversationHandler } from '../commands/create-conversation/create-conversation.handler'; + +describe('CreateConversationHandler', () => { + let handler: CreateConversationHandler; + let mockConversationRepo: { + create: ReturnType; + findExistingBetweenUsers: ReturnType; + updateLastMessage: ReturnType; + incrementUnreadCount: ReturnType; + findById: ReturnType; + }; + let mockMessageRepo: { + create: ReturnType; + }; + let mockEventBus: { publish: ReturnType }; + let mockLogger: { log: ReturnType; warn: ReturnType; error: ReturnType }; + + const conversationEntity = { + id: 'conv-1', + listingId: 'listing-1', + subject: null, + status: 'ACTIVE' as const, + lastMessage: null, + lastMessageAt: null, + createdAt: new Date(), + updatedAt: new Date(), + participants: [ + { id: 'p-1', conversationId: 'conv-1', userId: 'user-1', unreadCount: 0, lastReadAt: null, joinedAt: new Date() }, + { id: 'p-2', conversationId: 'conv-1', userId: 'user-2', unreadCount: 0, lastReadAt: null, joinedAt: new Date() }, + ], + }; + + const messageEntity = { + id: 'msg-1', + conversationId: 'conv-1', + senderId: 'user-1', + type: 'TEXT' as const, + content: 'Hello!', + metadata: null, + editedAt: null, + deletedAt: null, + createdAt: new Date(), + }; + + beforeEach(() => { + mockConversationRepo = { + create: vi.fn().mockResolvedValue(conversationEntity), + findExistingBetweenUsers: vi.fn().mockResolvedValue(null), + updateLastMessage: vi.fn().mockResolvedValue(undefined), + incrementUnreadCount: vi.fn().mockResolvedValue(undefined), + findById: vi.fn(), + }; + mockMessageRepo = { + create: vi.fn().mockResolvedValue(messageEntity), + }; + mockEventBus = { publish: vi.fn() }; + mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + handler = new CreateConversationHandler( + mockConversationRepo as any, + mockMessageRepo as any, + mockEventBus as any, + mockLogger as any, + ); + }); + + it('creates a new conversation with participants', async () => { + const command = new CreateConversationCommand('user-1', 'user-2', 'listing-1'); + + const result = await handler.execute(command); + + expect(mockConversationRepo.findExistingBetweenUsers).toHaveBeenCalledWith( + ['user-1', 'user-2'], + 'listing-1', + ); + expect(mockConversationRepo.create).toHaveBeenCalledWith({ + listingId: 'listing-1', + subject: undefined, + participantUserIds: ['user-1', 'user-2'], + }); + expect(result.id).toBe('conv-1'); + }); + + it('creates conversation with initial message', async () => { + const command = new CreateConversationCommand('user-1', 'user-2', 'listing-1', undefined, 'Hello!'); + + await handler.execute(command); + + expect(mockMessageRepo.create).toHaveBeenCalledWith({ + conversationId: 'conv-1', + senderId: 'user-1', + type: 'TEXT', + content: 'Hello!', + }); + expect(mockConversationRepo.updateLastMessage).toHaveBeenCalled(); + expect(mockConversationRepo.incrementUnreadCount).toHaveBeenCalledWith('conv-1', 'user-1'); + expect(mockEventBus.publish).toHaveBeenCalled(); + }); + + it('returns existing conversation when one exists between users', async () => { + mockConversationRepo.findExistingBetweenUsers.mockResolvedValue(conversationEntity); + + const command = new CreateConversationCommand('user-1', 'user-2', 'listing-1'); + + const result = await handler.execute(command); + + expect(result.id).toBe('conv-1'); + expect(mockConversationRepo.create).not.toHaveBeenCalled(); + }); + + it('sends message in existing conversation when initial message provided', async () => { + mockConversationRepo.findExistingBetweenUsers.mockResolvedValue(conversationEntity); + + const command = new CreateConversationCommand('user-1', 'user-2', 'listing-1', undefined, 'Hello again!'); + + await handler.execute(command); + + expect(mockConversationRepo.create).not.toHaveBeenCalled(); + expect(mockMessageRepo.create).toHaveBeenCalledWith({ + conversationId: 'conv-1', + senderId: 'user-1', + type: 'TEXT', + content: 'Hello again!', + }); + }); + + it('throws when creating conversation with yourself', async () => { + const command = new CreateConversationCommand('user-1', 'user-1'); + + await expect(handler.execute(command)).rejects.toThrow('Không thể tạo hội thoại với chính mình'); + }); +}); diff --git a/apps/api/src/modules/messaging/application/__tests__/get-conversations.handler.spec.ts b/apps/api/src/modules/messaging/application/__tests__/get-conversations.handler.spec.ts new file mode 100644 index 0000000..889743c --- /dev/null +++ b/apps/api/src/modules/messaging/application/__tests__/get-conversations.handler.spec.ts @@ -0,0 +1,44 @@ +import { GetConversationsHandler } from '../queries/get-conversations/get-conversations.handler'; +import { GetConversationsQuery } from '../queries/get-conversations/get-conversations.query'; + +describe('GetConversationsHandler', () => { + let handler: GetConversationsHandler; + let mockConversationRepo: { + findByUserId: ReturnType; + countByUserId: ReturnType; + }; + + const conversations = [ + { + id: 'conv-1', + listingId: 'listing-1', + subject: null, + status: 'ACTIVE', + lastMessage: 'Hello!', + lastMessageAt: new Date(), + createdAt: new Date(), + updatedAt: new Date(), + participants: [], + }, + ]; + + beforeEach(() => { + mockConversationRepo = { + findByUserId: vi.fn().mockResolvedValue(conversations), + countByUserId: vi.fn().mockResolvedValue(1), + }; + + handler = new GetConversationsHandler(mockConversationRepo as any); + }); + + it('returns conversations with total count', async () => { + const query = new GetConversationsQuery('user-1', 20, 0); + + const result = await handler.execute(query); + + expect(result.conversations).toHaveLength(1); + expect(result.total).toBe(1); + expect(mockConversationRepo.findByUserId).toHaveBeenCalledWith('user-1', 20, 0); + expect(mockConversationRepo.countByUserId).toHaveBeenCalledWith('user-1'); + }); +}); diff --git a/apps/api/src/modules/messaging/application/__tests__/get-messages.handler.spec.ts b/apps/api/src/modules/messaging/application/__tests__/get-messages.handler.spec.ts new file mode 100644 index 0000000..d871bef --- /dev/null +++ b/apps/api/src/modules/messaging/application/__tests__/get-messages.handler.spec.ts @@ -0,0 +1,71 @@ +import { GetMessagesHandler } from '../queries/get-messages/get-messages.handler'; +import { GetMessagesQuery } from '../queries/get-messages/get-messages.query'; + +describe('GetMessagesHandler', () => { + let handler: GetMessagesHandler; + let mockConversationRepo: { + findById: ReturnType; + }; + let mockMessageRepo: { + findByConversationId: ReturnType; + }; + + const conversation = { + id: 'conv-1', + participants: [ + { userId: 'user-1' }, + { userId: 'user-2' }, + ], + }; + + const messages = [ + { + id: 'msg-1', + conversationId: 'conv-1', + senderId: 'user-1', + type: 'TEXT', + content: 'Hello!', + metadata: null, + editedAt: null, + deletedAt: null, + createdAt: new Date(), + }, + ]; + + beforeEach(() => { + mockConversationRepo = { + findById: vi.fn().mockResolvedValue(conversation), + }; + mockMessageRepo = { + findByConversationId: vi.fn().mockResolvedValue(messages), + }; + + handler = new GetMessagesHandler( + mockConversationRepo as any, + mockMessageRepo as any, + ); + }); + + it('returns messages for a conversation', async () => { + const query = new GetMessagesQuery('conv-1', 'user-1', 50); + + const result = await handler.execute(query); + + expect(result).toHaveLength(1); + expect(mockMessageRepo.findByConversationId).toHaveBeenCalledWith('conv-1', 50, undefined); + }); + + it('throws NotFoundException when conversation does not exist', async () => { + mockConversationRepo.findById.mockResolvedValue(null); + + const query = new GetMessagesQuery('conv-999', 'user-1'); + + await expect(handler.execute(query)).rejects.toThrow(); + }); + + it('throws ForbiddenException when user is not a participant', async () => { + const query = new GetMessagesQuery('conv-1', 'user-3'); + + await expect(handler.execute(query)).rejects.toThrow('Bạn không phải là thành viên của hội thoại này'); + }); +}); diff --git a/apps/api/src/modules/messaging/application/__tests__/mark-read.handler.spec.ts b/apps/api/src/modules/messaging/application/__tests__/mark-read.handler.spec.ts new file mode 100644 index 0000000..d66c7ca --- /dev/null +++ b/apps/api/src/modules/messaging/application/__tests__/mark-read.handler.spec.ts @@ -0,0 +1,55 @@ +import { MarkConversationReadCommand } from '../commands/mark-read/mark-read.command'; +import { MarkConversationReadHandler } from '../commands/mark-read/mark-read.handler'; + +describe('MarkConversationReadHandler', () => { + let handler: MarkConversationReadHandler; + let mockConversationRepo: { + findById: ReturnType; + resetUnreadCount: ReturnType; + }; + let mockLogger: { log: ReturnType; warn: ReturnType; error: ReturnType }; + + const conversation = { + id: 'conv-1', + status: 'ACTIVE' as const, + participants: [ + { id: 'p-1', conversationId: 'conv-1', userId: 'user-1', unreadCount: 3, lastReadAt: null, joinedAt: new Date() }, + { id: 'p-2', conversationId: 'conv-1', userId: 'user-2', unreadCount: 0, lastReadAt: null, joinedAt: new Date() }, + ], + }; + + beforeEach(() => { + mockConversationRepo = { + findById: vi.fn().mockResolvedValue(conversation), + resetUnreadCount: vi.fn().mockResolvedValue(undefined), + }; + mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + handler = new MarkConversationReadHandler( + mockConversationRepo as any, + mockLogger as any, + ); + }); + + it('marks conversation as read for the user', async () => { + const command = new MarkConversationReadCommand('conv-1', 'user-1'); + + await handler.execute(command); + + expect(mockConversationRepo.resetUnreadCount).toHaveBeenCalledWith('conv-1', 'user-1'); + }); + + it('throws NotFoundException when conversation does not exist', async () => { + mockConversationRepo.findById.mockResolvedValue(null); + + const command = new MarkConversationReadCommand('conv-999', 'user-1'); + + await expect(handler.execute(command)).rejects.toThrow(); + }); + + it('throws ForbiddenException when user is not a participant', async () => { + const command = new MarkConversationReadCommand('conv-1', 'user-3'); + + await expect(handler.execute(command)).rejects.toThrow('Bạn không phải là thành viên của hội thoại này'); + }); +}); diff --git a/apps/api/src/modules/messaging/application/__tests__/send-message.handler.spec.ts b/apps/api/src/modules/messaging/application/__tests__/send-message.handler.spec.ts new file mode 100644 index 0000000..8da0e9a --- /dev/null +++ b/apps/api/src/modules/messaging/application/__tests__/send-message.handler.spec.ts @@ -0,0 +1,113 @@ +import { SendMessageCommand } from '../commands/send-message/send-message.command'; +import { SendMessageHandler } from '../commands/send-message/send-message.handler'; + +describe('SendMessageHandler', () => { + let handler: SendMessageHandler; + let mockConversationRepo: { + findById: ReturnType; + updateLastMessage: ReturnType; + incrementUnreadCount: ReturnType; + }; + let mockMessageRepo: { + create: ReturnType; + }; + let mockEventBus: { publish: ReturnType }; + let mockLogger: { log: ReturnType; warn: ReturnType; error: ReturnType }; + + const conversation = { + id: 'conv-1', + status: 'ACTIVE' as const, + participants: [ + { id: 'p-1', conversationId: 'conv-1', userId: 'user-1', unreadCount: 0, lastReadAt: null, joinedAt: new Date() }, + { id: 'p-2', conversationId: 'conv-1', userId: 'user-2', unreadCount: 0, lastReadAt: null, joinedAt: new Date() }, + ], + }; + + const messageEntity = { + id: 'msg-1', + conversationId: 'conv-1', + senderId: 'user-1', + type: 'TEXT' as const, + content: 'Hello!', + metadata: null, + editedAt: null, + deletedAt: null, + createdAt: new Date(), + }; + + beforeEach(() => { + mockConversationRepo = { + findById: vi.fn().mockResolvedValue(conversation), + updateLastMessage: vi.fn().mockResolvedValue(undefined), + incrementUnreadCount: vi.fn().mockResolvedValue(undefined), + }; + mockMessageRepo = { + create: vi.fn().mockResolvedValue(messageEntity), + }; + mockEventBus = { publish: vi.fn() }; + mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() }; + + handler = new SendMessageHandler( + mockConversationRepo as any, + mockMessageRepo as any, + mockEventBus as any, + mockLogger as any, + ); + }); + + it('sends a message successfully', async () => { + const command = new SendMessageCommand('conv-1', 'user-1', 'Hello!'); + + const result = await handler.execute(command); + + expect(result.id).toBe('msg-1'); + expect(mockMessageRepo.create).toHaveBeenCalledWith({ + conversationId: 'conv-1', + senderId: 'user-1', + type: 'TEXT', + content: 'Hello!', + metadata: undefined, + }); + expect(mockConversationRepo.updateLastMessage).toHaveBeenCalled(); + expect(mockConversationRepo.incrementUnreadCount).toHaveBeenCalledWith('conv-1', 'user-1'); + expect(mockEventBus.publish).toHaveBeenCalled(); + }); + + it('throws NotFoundException when conversation does not exist', async () => { + mockConversationRepo.findById.mockResolvedValue(null); + + const command = new SendMessageCommand('conv-999', 'user-1', 'Hello!'); + + await expect(handler.execute(command)).rejects.toThrow(); + }); + + it('throws ForbiddenException when user is not a participant', async () => { + const command = new SendMessageCommand('conv-1', 'user-3', 'Hello!'); + + await expect(handler.execute(command)).rejects.toThrow('Bạn không phải là thành viên của hội thoại này'); + }); + + it('throws ValidationException when conversation is closed', async () => { + mockConversationRepo.findById.mockResolvedValue({ ...conversation, status: 'CLOSED' }); + + const command = new SendMessageCommand('conv-1', 'user-1', 'Hello!'); + + await expect(handler.execute(command)).rejects.toThrow('Hội thoại đã đóng'); + }); + + it('publishes MessageSentEvent after successful send', async () => { + const command = new SendMessageCommand('conv-1', 'user-1', 'Hello!'); + + await handler.execute(command); + + expect(mockEventBus.publish).toHaveBeenCalledWith( + expect.objectContaining({ + eventName: 'message.sent', + aggregateId: 'msg-1', + conversationId: 'conv-1', + senderId: 'user-1', + content: 'Hello!', + }), + ); + }); +}); diff --git a/apps/api/src/modules/messaging/application/commands/create-conversation/create-conversation.command.ts b/apps/api/src/modules/messaging/application/commands/create-conversation/create-conversation.command.ts new file mode 100644 index 0000000..7e15579 --- /dev/null +++ b/apps/api/src/modules/messaging/application/commands/create-conversation/create-conversation.command.ts @@ -0,0 +1,9 @@ +export class CreateConversationCommand { + constructor( + public readonly initiatorUserId: string, + public readonly participantUserId: string, + public readonly listingId?: string, + public readonly subject?: string, + public readonly initialMessage?: string, + ) {} +} diff --git a/apps/api/src/modules/messaging/application/commands/create-conversation/create-conversation.handler.ts b/apps/api/src/modules/messaging/application/commands/create-conversation/create-conversation.handler.ts new file mode 100644 index 0000000..0edba8b --- /dev/null +++ b/apps/api/src/modules/messaging/application/commands/create-conversation/create-conversation.handler.ts @@ -0,0 +1,94 @@ +import { Inject, InternalServerErrorException } from '@nestjs/common'; +import { CommandHandler, type ICommandHandler } from '@nestjs/cqrs'; +import { DomainException, ValidationException, type EventBusService, type LoggerService } from '@modules/shared'; +import type { ConversationEntity } from '../../../domain/entities/conversation.entity'; +import { MessageSentEvent } from '../../../domain/events/message-sent.event'; +import { + CONVERSATION_REPOSITORY, + type IConversationRepository, +} from '../../../domain/repositories/conversation.repository'; +import { + MESSAGE_REPOSITORY, + type IMessageRepository, +} from '../../../domain/repositories/message.repository'; +import { CreateConversationCommand } from './create-conversation.command'; + +@CommandHandler(CreateConversationCommand) +export class CreateConversationHandler implements ICommandHandler { + constructor( + @Inject(CONVERSATION_REPOSITORY) + private readonly conversationRepo: IConversationRepository, + @Inject(MESSAGE_REPOSITORY) + private readonly messageRepo: IMessageRepository, + private readonly eventBus: EventBusService, + private readonly logger: LoggerService, + ) {} + + async execute(command: CreateConversationCommand): Promise { + try { + const { initiatorUserId, participantUserId, listingId, subject, initialMessage } = command; + + if (initiatorUserId === participantUserId) { + throw new ValidationException('Không thể tạo hội thoại với chính mình'); + } + + // Check for existing conversation between these users for the same listing + const userIds = [initiatorUserId, participantUserId]; + const existing = await this.conversationRepo.findExistingBetweenUsers(userIds, listingId); + if (existing) { + // If there's an initial message, send it in the existing conversation + if (initialMessage) { + const message = await this.messageRepo.create({ + conversationId: existing.id, + senderId: initiatorUserId, + type: 'TEXT', + content: initialMessage, + }); + await this.conversationRepo.updateLastMessage(existing.id, initialMessage, message.createdAt); + await this.conversationRepo.incrementUnreadCount(existing.id, initiatorUserId); + this.eventBus.publish( + new MessageSentEvent(message.id, existing.id, initiatorUserId, 'TEXT', initialMessage), + ); + } + return existing; + } + + // Create new conversation + const conversation = await this.conversationRepo.create({ + listingId, + subject, + participantUserIds: userIds, + }); + + // Send initial message if provided + if (initialMessage) { + const message = await this.messageRepo.create({ + conversationId: conversation.id, + senderId: initiatorUserId, + type: 'TEXT', + content: initialMessage, + }); + await this.conversationRepo.updateLastMessage(conversation.id, initialMessage, message.createdAt); + await this.conversationRepo.incrementUnreadCount(conversation.id, initiatorUserId); + this.eventBus.publish( + new MessageSentEvent(message.id, conversation.id, initiatorUserId, 'TEXT', initialMessage), + ); + } + + this.logger.log( + `Conversation created: ${conversation.id} between [${userIds.join(', ')}]`, + 'CreateConversationHandler', + ); + + return conversation; + } catch (error) { + if (error instanceof DomainException) throw error; + this.logger.error( + `Failed to create conversation: ${error instanceof Error ? error.message : error}`, + error instanceof Error ? error.stack : undefined, + this.constructor.name, + ); + throw new InternalServerErrorException('Không thể tạo hội thoại'); + } + } +} diff --git a/apps/api/src/modules/messaging/application/commands/mark-read/mark-read.command.ts b/apps/api/src/modules/messaging/application/commands/mark-read/mark-read.command.ts new file mode 100644 index 0000000..70d6d54 --- /dev/null +++ b/apps/api/src/modules/messaging/application/commands/mark-read/mark-read.command.ts @@ -0,0 +1,6 @@ +export class MarkConversationReadCommand { + constructor( + public readonly conversationId: string, + public readonly userId: string, + ) {} +} diff --git a/apps/api/src/modules/messaging/application/commands/mark-read/mark-read.handler.ts b/apps/api/src/modules/messaging/application/commands/mark-read/mark-read.handler.ts new file mode 100644 index 0000000..5598b23 --- /dev/null +++ b/apps/api/src/modules/messaging/application/commands/mark-read/mark-read.handler.ts @@ -0,0 +1,43 @@ +import { Inject, InternalServerErrorException } from '@nestjs/common'; +import { CommandHandler, type ICommandHandler } from '@nestjs/cqrs'; +import { DomainException, ForbiddenException, NotFoundException, type LoggerService } from '@modules/shared'; +import { + CONVERSATION_REPOSITORY, + type IConversationRepository, +} from '../../../domain/repositories/conversation.repository'; +import { MarkConversationReadCommand } from './mark-read.command'; + +@CommandHandler(MarkConversationReadCommand) +export class MarkConversationReadHandler implements ICommandHandler { + constructor( + @Inject(CONVERSATION_REPOSITORY) + private readonly conversationRepo: IConversationRepository, + private readonly logger: LoggerService, + ) {} + + async execute(command: MarkConversationReadCommand): Promise { + try { + const { conversationId, userId } = command; + + const conversation = await this.conversationRepo.findById(conversationId); + if (!conversation) { + throw new NotFoundException('Conversation', conversationId); + } + + const isParticipant = conversation.participants.some((p) => p.userId === userId); + if (!isParticipant) { + throw new ForbiddenException('Bạn không phải là thành viên của hội thoại này'); + } + + await this.conversationRepo.resetUnreadCount(conversationId, userId); + } catch (error) { + if (error instanceof DomainException) throw error; + this.logger.error( + `Failed to mark conversation read: ${error instanceof Error ? error.message : error}`, + error instanceof Error ? error.stack : undefined, + this.constructor.name, + ); + throw new InternalServerErrorException('Không thể đánh dấu đã đọc'); + } + } +} diff --git a/apps/api/src/modules/messaging/application/commands/send-message/send-message.command.ts b/apps/api/src/modules/messaging/application/commands/send-message/send-message.command.ts new file mode 100644 index 0000000..be982f3 --- /dev/null +++ b/apps/api/src/modules/messaging/application/commands/send-message/send-message.command.ts @@ -0,0 +1,11 @@ +import type { MessageType } from '../../../domain/value-objects/message-type.vo'; + +export class SendMessageCommand { + constructor( + public readonly conversationId: string, + public readonly senderId: string, + public readonly content: string, + public readonly type: MessageType = 'TEXT', + public readonly metadata?: Record, + ) {} +} diff --git a/apps/api/src/modules/messaging/application/commands/send-message/send-message.handler.ts b/apps/api/src/modules/messaging/application/commands/send-message/send-message.handler.ts new file mode 100644 index 0000000..21f6d7e --- /dev/null +++ b/apps/api/src/modules/messaging/application/commands/send-message/send-message.handler.ts @@ -0,0 +1,77 @@ +import { Inject, InternalServerErrorException } from '@nestjs/common'; +import { CommandHandler, type ICommandHandler } from '@nestjs/cqrs'; +import { DomainException, ForbiddenException, NotFoundException, ValidationException, type EventBusService, type LoggerService } from '@modules/shared'; +import type { MessageEntity } from '../../../domain/entities/message.entity'; +import { MessageSentEvent } from '../../../domain/events/message-sent.event'; +import { + CONVERSATION_REPOSITORY, + type IConversationRepository, +} from '../../../domain/repositories/conversation.repository'; +import { + MESSAGE_REPOSITORY, + type IMessageRepository, +} from '../../../domain/repositories/message.repository'; +import { SendMessageCommand } from './send-message.command'; + +@CommandHandler(SendMessageCommand) +export class SendMessageHandler implements ICommandHandler { + constructor( + @Inject(CONVERSATION_REPOSITORY) + private readonly conversationRepo: IConversationRepository, + @Inject(MESSAGE_REPOSITORY) + private readonly messageRepo: IMessageRepository, + private readonly eventBus: EventBusService, + private readonly logger: LoggerService, + ) {} + + async execute(command: SendMessageCommand): Promise { + try { + const { conversationId, senderId, content, type, metadata } = command; + + // Verify conversation exists and sender is a participant + const conversation = await this.conversationRepo.findById(conversationId); + if (!conversation) { + throw new NotFoundException('Conversation', conversationId); + } + + const isParticipant = conversation.participants.some((p) => p.userId === senderId); + if (!isParticipant) { + throw new ForbiddenException('Bạn không phải là thành viên của hội thoại này'); + } + + if (conversation.status !== 'ACTIVE') { + throw new ValidationException('Hội thoại đã đóng'); + } + + // Create message + const message = await this.messageRepo.create({ + conversationId, + senderId, + type, + content, + metadata, + }); + + // Update conversation last message + await this.conversationRepo.updateLastMessage(conversationId, content, message.createdAt); + + // Increment unread count for other participants + await this.conversationRepo.incrementUnreadCount(conversationId, senderId); + + // Publish domain event + this.eventBus.publish( + new MessageSentEvent(message.id, conversationId, senderId, type, content), + ); + + return message; + } catch (error) { + if (error instanceof DomainException) throw error; + this.logger.error( + `Failed to send message: ${error instanceof Error ? error.message : error}`, + error instanceof Error ? error.stack : undefined, + this.constructor.name, + ); + throw new InternalServerErrorException('Không thể gửi tin nhắn'); + } + } +} diff --git a/apps/api/src/modules/messaging/application/queries/get-conversations/get-conversations.handler.ts b/apps/api/src/modules/messaging/application/queries/get-conversations/get-conversations.handler.ts new file mode 100644 index 0000000..0519d81 --- /dev/null +++ b/apps/api/src/modules/messaging/application/queries/get-conversations/get-conversations.handler.ts @@ -0,0 +1,24 @@ +import { Inject } from '@nestjs/common'; +import { type IQueryHandler, QueryHandler } from '@nestjs/cqrs'; +import type { ConversationEntity } from '../../../domain/entities/conversation.entity'; +import { + CONVERSATION_REPOSITORY, + type IConversationRepository, +} from '../../../domain/repositories/conversation.repository'; +import { GetConversationsQuery } from './get-conversations.query'; + +@QueryHandler(GetConversationsQuery) +export class GetConversationsHandler implements IQueryHandler { + constructor( + @Inject(CONVERSATION_REPOSITORY) + private readonly conversationRepo: IConversationRepository, + ) {} + + async execute(query: GetConversationsQuery): Promise<{ conversations: ConversationEntity[]; total: number }> { + const [conversations, total] = await Promise.all([ + this.conversationRepo.findByUserId(query.userId, query.limit, query.offset), + this.conversationRepo.countByUserId(query.userId), + ]); + return { conversations, total }; + } +} diff --git a/apps/api/src/modules/messaging/application/queries/get-conversations/get-conversations.query.ts b/apps/api/src/modules/messaging/application/queries/get-conversations/get-conversations.query.ts new file mode 100644 index 0000000..1d6dbcf --- /dev/null +++ b/apps/api/src/modules/messaging/application/queries/get-conversations/get-conversations.query.ts @@ -0,0 +1,7 @@ +export class GetConversationsQuery { + constructor( + public readonly userId: string, + public readonly limit: number = 20, + public readonly offset: number = 0, + ) {} +} diff --git a/apps/api/src/modules/messaging/application/queries/get-messages/get-messages.handler.ts b/apps/api/src/modules/messaging/application/queries/get-messages/get-messages.handler.ts new file mode 100644 index 0000000..58f443e --- /dev/null +++ b/apps/api/src/modules/messaging/application/queries/get-messages/get-messages.handler.ts @@ -0,0 +1,38 @@ +import { Inject } from '@nestjs/common'; +import { type IQueryHandler, QueryHandler } from '@nestjs/cqrs'; +import { ForbiddenException, NotFoundException } from '@modules/shared'; +import type { MessageEntity } from '../../../domain/entities/message.entity'; +import { + CONVERSATION_REPOSITORY, + type IConversationRepository, +} from '../../../domain/repositories/conversation.repository'; +import { + MESSAGE_REPOSITORY, + type IMessageRepository, +} from '../../../domain/repositories/message.repository'; +import { GetMessagesQuery } from './get-messages.query'; + +@QueryHandler(GetMessagesQuery) +export class GetMessagesHandler implements IQueryHandler { + constructor( + @Inject(CONVERSATION_REPOSITORY) + private readonly conversationRepo: IConversationRepository, + @Inject(MESSAGE_REPOSITORY) + private readonly messageRepo: IMessageRepository, + ) {} + + async execute(query: GetMessagesQuery): Promise { + // Verify access + const conversation = await this.conversationRepo.findById(query.conversationId); + if (!conversation) { + throw new NotFoundException('Conversation', query.conversationId); + } + + const isParticipant = conversation.participants.some((p) => p.userId === query.userId); + if (!isParticipant) { + throw new ForbiddenException('Bạn không phải là thành viên của hội thoại này'); + } + + return this.messageRepo.findByConversationId(query.conversationId, query.limit, query.before); + } +} diff --git a/apps/api/src/modules/messaging/application/queries/get-messages/get-messages.query.ts b/apps/api/src/modules/messaging/application/queries/get-messages/get-messages.query.ts new file mode 100644 index 0000000..7abfffc --- /dev/null +++ b/apps/api/src/modules/messaging/application/queries/get-messages/get-messages.query.ts @@ -0,0 +1,8 @@ +export class GetMessagesQuery { + constructor( + public readonly conversationId: string, + public readonly userId: string, + public readonly limit: number = 50, + public readonly before?: string, + ) {} +} diff --git a/apps/api/src/modules/messaging/domain/__tests__/messaging-domain.spec.ts b/apps/api/src/modules/messaging/domain/__tests__/messaging-domain.spec.ts new file mode 100644 index 0000000..ef15d29 --- /dev/null +++ b/apps/api/src/modules/messaging/domain/__tests__/messaging-domain.spec.ts @@ -0,0 +1,18 @@ +import { MessageSentEvent } from '../events/message-sent.event'; + +describe('Messaging Domain', () => { + describe('MessageSentEvent', () => { + it('creates event with correct eventName', () => { + const event = new MessageSentEvent('msg-1', 'conv-1', 'user-1', 'TEXT', 'Hello!'); + + expect(event.eventName).toBe('message.sent'); + expect(event.aggregateId).toBe('msg-1'); + expect(event.messageId).toBe('msg-1'); + expect(event.conversationId).toBe('conv-1'); + expect(event.senderId).toBe('user-1'); + expect(event.type).toBe('TEXT'); + expect(event.content).toBe('Hello!'); + expect(event.occurredAt).toBeInstanceOf(Date); + }); + }); +}); diff --git a/apps/api/src/modules/messaging/domain/entities/conversation.entity.ts b/apps/api/src/modules/messaging/domain/entities/conversation.entity.ts new file mode 100644 index 0000000..86d5661 --- /dev/null +++ b/apps/api/src/modules/messaging/domain/entities/conversation.entity.ts @@ -0,0 +1,22 @@ +import type { ConversationStatus } from '../value-objects/conversation-status.vo'; + +export interface ConversationParticipantEntity { + id: string; + conversationId: string; + userId: string; + unreadCount: number; + lastReadAt: Date | null; + joinedAt: Date; +} + +export interface ConversationEntity { + id: string; + listingId: string | null; + subject: string | null; + status: ConversationStatus; + lastMessage: string | null; + lastMessageAt: Date | null; + createdAt: Date; + updatedAt: Date; + participants: ConversationParticipantEntity[]; +} diff --git a/apps/api/src/modules/messaging/domain/entities/message.entity.ts b/apps/api/src/modules/messaging/domain/entities/message.entity.ts new file mode 100644 index 0000000..5c1c382 --- /dev/null +++ b/apps/api/src/modules/messaging/domain/entities/message.entity.ts @@ -0,0 +1,13 @@ +import type { MessageType } from '../value-objects/message-type.vo'; + +export interface MessageEntity { + id: string; + conversationId: string; + senderId: string; + type: MessageType; + content: string; + metadata: Record | null; + editedAt: Date | null; + deletedAt: Date | null; + createdAt: Date; +} diff --git a/apps/api/src/modules/messaging/domain/events/message-sent.event.ts b/apps/api/src/modules/messaging/domain/events/message-sent.event.ts new file mode 100644 index 0000000..18c32a3 --- /dev/null +++ b/apps/api/src/modules/messaging/domain/events/message-sent.event.ts @@ -0,0 +1,19 @@ +import { type DomainEvent } from '@modules/shared'; +import type { MessageType } from '../value-objects/message-type.vo'; + +export class MessageSentEvent implements DomainEvent { + readonly eventName = 'message.sent'; + readonly occurredAt = new Date(); + + constructor( + public readonly aggregateId: string, + public readonly conversationId: string, + public readonly senderId: string, + public readonly type: MessageType, + public readonly content: string, + ) {} + + get messageId(): string { + return this.aggregateId; + } +} diff --git a/apps/api/src/modules/messaging/domain/index.ts b/apps/api/src/modules/messaging/domain/index.ts new file mode 100644 index 0000000..8c34fe7 --- /dev/null +++ b/apps/api/src/modules/messaging/domain/index.ts @@ -0,0 +1,15 @@ +export type { ConversationEntity, ConversationParticipantEntity } from './entities/conversation.entity'; +export type { MessageEntity } from './entities/message.entity'; +export { MessageSentEvent } from './events/message-sent.event'; +export { + CONVERSATION_REPOSITORY, + type IConversationRepository, + type CreateConversationDto, +} from './repositories/conversation.repository'; +export { + MESSAGE_REPOSITORY, + type IMessageRepository, + type CreateMessageDto, +} from './repositories/message.repository'; +export type { ConversationStatus } from './value-objects/conversation-status.vo'; +export { type MessageType, ALL_MESSAGE_TYPES } from './value-objects/message-type.vo'; diff --git a/apps/api/src/modules/messaging/domain/repositories/conversation.repository.ts b/apps/api/src/modules/messaging/domain/repositories/conversation.repository.ts new file mode 100644 index 0000000..948b577 --- /dev/null +++ b/apps/api/src/modules/messaging/domain/repositories/conversation.repository.ts @@ -0,0 +1,22 @@ +import type { ConversationEntity } from '../entities/conversation.entity'; +import type { ConversationStatus } from '../value-objects/conversation-status.vo'; + +export const CONVERSATION_REPOSITORY = Symbol('CONVERSATION_REPOSITORY'); + +export interface CreateConversationDto { + listingId?: string; + subject?: string; + participantUserIds: string[]; +} + +export interface IConversationRepository { + create(dto: CreateConversationDto): Promise; + findById(id: string): Promise; + findByUserId(userId: string, limit?: number, offset?: number): Promise; + findExistingBetweenUsers(userIds: string[], listingId?: string): Promise; + updateStatus(id: string, status: ConversationStatus): Promise; + updateLastMessage(id: string, content: string, sentAt: Date): Promise; + incrementUnreadCount(conversationId: string, excludeUserId: string): Promise; + resetUnreadCount(conversationId: string, userId: string): Promise; + countByUserId(userId: string): Promise; +} diff --git a/apps/api/src/modules/messaging/domain/repositories/message.repository.ts b/apps/api/src/modules/messaging/domain/repositories/message.repository.ts new file mode 100644 index 0000000..9e8130a --- /dev/null +++ b/apps/api/src/modules/messaging/domain/repositories/message.repository.ts @@ -0,0 +1,19 @@ +import type { MessageEntity } from '../entities/message.entity'; +import type { MessageType } from '../value-objects/message-type.vo'; + +export const MESSAGE_REPOSITORY = Symbol('MESSAGE_REPOSITORY'); + +export interface CreateMessageDto { + conversationId: string; + senderId: string; + type: MessageType; + content: string; + metadata?: Record; +} + +export interface IMessageRepository { + create(dto: CreateMessageDto): Promise; + findByConversationId(conversationId: string, limit?: number, before?: string): Promise; + findById(id: string): Promise; + softDelete(id: string, senderId: string): Promise; +} diff --git a/apps/api/src/modules/messaging/domain/value-objects/conversation-status.vo.ts b/apps/api/src/modules/messaging/domain/value-objects/conversation-status.vo.ts new file mode 100644 index 0000000..67cc52c --- /dev/null +++ b/apps/api/src/modules/messaging/domain/value-objects/conversation-status.vo.ts @@ -0,0 +1 @@ +export type ConversationStatus = 'ACTIVE' | 'ARCHIVED' | 'CLOSED'; diff --git a/apps/api/src/modules/messaging/domain/value-objects/message-type.vo.ts b/apps/api/src/modules/messaging/domain/value-objects/message-type.vo.ts new file mode 100644 index 0000000..70d3286 --- /dev/null +++ b/apps/api/src/modules/messaging/domain/value-objects/message-type.vo.ts @@ -0,0 +1,3 @@ +export type MessageType = 'TEXT' | 'IMAGE' | 'FILE' | 'SYSTEM'; + +export const ALL_MESSAGE_TYPES: MessageType[] = ['TEXT', 'IMAGE', 'FILE', 'SYSTEM']; diff --git a/apps/api/src/modules/messaging/index.ts b/apps/api/src/modules/messaging/index.ts new file mode 100644 index 0000000..4b1f70c --- /dev/null +++ b/apps/api/src/modules/messaging/index.ts @@ -0,0 +1,4 @@ +export { MessagingModule } from './messaging.module'; +export { MessagingGateway } from './presentation/gateways/messaging.gateway'; +export { CreateConversationCommand } from './application/commands/create-conversation/create-conversation.command'; +export { SendMessageCommand } from './application/commands/send-message/send-message.command'; diff --git a/apps/api/src/modules/messaging/infrastructure/__tests__/prisma-conversation.repository.spec.ts b/apps/api/src/modules/messaging/infrastructure/__tests__/prisma-conversation.repository.spec.ts new file mode 100644 index 0000000..76f7ce7 --- /dev/null +++ b/apps/api/src/modules/messaging/infrastructure/__tests__/prisma-conversation.repository.spec.ts @@ -0,0 +1,134 @@ +import { PrismaConversationRepository } from '../repositories/prisma-conversation.repository'; + +describe('PrismaConversationRepository', () => { + let repository: PrismaConversationRepository; + let mockPrisma: { + conversation: { + create: ReturnType; + findUnique: ReturnType; + findMany: ReturnType; + findFirst: ReturnType; + update: ReturnType; + count: ReturnType; + }; + conversationParticipant: { + updateMany: ReturnType; + }; + }; + + const mockConversation = { + id: 'conv-1', + listingId: 'listing-1', + subject: null, + status: 'ACTIVE', + lastMessage: null, + lastMessageAt: null, + createdAt: new Date(), + updatedAt: new Date(), + participants: [ + { id: 'p-1', conversationId: 'conv-1', userId: 'user-1', unreadCount: 0, lastReadAt: null, joinedAt: new Date() }, + { id: 'p-2', conversationId: 'conv-1', userId: 'user-2', unreadCount: 0, lastReadAt: null, joinedAt: new Date() }, + ], + }; + + beforeEach(() => { + mockPrisma = { + conversation: { + create: vi.fn().mockResolvedValue(mockConversation), + findUnique: vi.fn().mockResolvedValue(mockConversation), + findMany: vi.fn().mockResolvedValue([mockConversation]), + findFirst: vi.fn().mockResolvedValue(null), + update: vi.fn().mockResolvedValue(mockConversation), + count: vi.fn().mockResolvedValue(1), + }, + conversationParticipant: { + updateMany: vi.fn().mockResolvedValue({ count: 1 }), + }, + }; + repository = new PrismaConversationRepository(mockPrisma as any); + }); + + it('creates a conversation with participants', async () => { + const result = await repository.create({ + listingId: 'listing-1', + participantUserIds: ['user-1', 'user-2'], + }); + + expect(mockPrisma.conversation.create).toHaveBeenCalledWith({ + data: { + listingId: 'listing-1', + subject: null, + participants: { + create: [{ userId: 'user-1' }, { userId: 'user-2' }], + }, + }, + include: { participants: true }, + }); + expect(result.id).toBe('conv-1'); + expect(result.participants).toHaveLength(2); + }); + + it('finds conversation by id', async () => { + const result = await repository.findById('conv-1'); + + expect(mockPrisma.conversation.findUnique).toHaveBeenCalledWith({ + where: { id: 'conv-1' }, + include: { participants: true }, + }); + expect(result?.id).toBe('conv-1'); + }); + + it('returns null when conversation not found', async () => { + mockPrisma.conversation.findUnique.mockResolvedValue(null); + + const result = await repository.findById('conv-999'); + + expect(result).toBeNull(); + }); + + it('finds conversations by user id', async () => { + const result = await repository.findByUserId('user-1', 20, 0); + + expect(mockPrisma.conversation.findMany).toHaveBeenCalledWith( + expect.objectContaining({ + where: { participants: { some: { userId: 'user-1' } } }, + take: 20, + skip: 0, + }), + ); + expect(result).toHaveLength(1); + }); + + it('updates conversation status', async () => { + await repository.updateStatus('conv-1', 'CLOSED'); + + expect(mockPrisma.conversation.update).toHaveBeenCalledWith({ + where: { id: 'conv-1' }, + data: { status: 'CLOSED' }, + }); + }); + + it('increments unread count for other participants', async () => { + await repository.incrementUnreadCount('conv-1', 'user-1'); + + expect(mockPrisma.conversationParticipant.updateMany).toHaveBeenCalledWith({ + where: { conversationId: 'conv-1', userId: { not: 'user-1' } }, + data: { unreadCount: { increment: 1 } }, + }); + }); + + it('resets unread count for a user', async () => { + await repository.resetUnreadCount('conv-1', 'user-1'); + + expect(mockPrisma.conversationParticipant.updateMany).toHaveBeenCalledWith({ + where: { conversationId: 'conv-1', userId: 'user-1' }, + data: { unreadCount: 0, lastReadAt: expect.any(Date) }, + }); + }); + + it('counts conversations by user id', async () => { + const count = await repository.countByUserId('user-1'); + + expect(count).toBe(1); + }); +}); diff --git a/apps/api/src/modules/messaging/infrastructure/__tests__/prisma-message.repository.spec.ts b/apps/api/src/modules/messaging/infrastructure/__tests__/prisma-message.repository.spec.ts new file mode 100644 index 0000000..228c626 --- /dev/null +++ b/apps/api/src/modules/messaging/infrastructure/__tests__/prisma-message.repository.spec.ts @@ -0,0 +1,88 @@ +import { PrismaMessageRepository } from '../repositories/prisma-message.repository'; + +describe('PrismaMessageRepository', () => { + let repository: PrismaMessageRepository; + let mockPrisma: { + message: { + create: ReturnType; + findMany: ReturnType; + findUnique: ReturnType; + updateMany: ReturnType; + }; + }; + + const mockMessage = { + id: 'msg-1', + conversationId: 'conv-1', + senderId: 'user-1', + type: 'TEXT', + content: 'Hello!', + metadata: null, + editedAt: null, + deletedAt: null, + createdAt: new Date(), + }; + + beforeEach(() => { + mockPrisma = { + message: { + create: vi.fn().mockResolvedValue(mockMessage), + findMany: vi.fn().mockResolvedValue([mockMessage]), + findUnique: vi.fn().mockResolvedValue(mockMessage), + updateMany: vi.fn().mockResolvedValue({ count: 1 }), + }, + }; + repository = new PrismaMessageRepository(mockPrisma as any); + }); + + it('creates a message', async () => { + const result = await repository.create({ + conversationId: 'conv-1', + senderId: 'user-1', + type: 'TEXT', + content: 'Hello!', + }); + + expect(mockPrisma.message.create).toHaveBeenCalledWith({ + data: { + conversationId: 'conv-1', + senderId: 'user-1', + type: 'TEXT', + content: 'Hello!', + metadata: undefined, + }, + }); + expect(result.id).toBe('msg-1'); + expect(result.type).toBe('TEXT'); + }); + + it('finds messages by conversation id', async () => { + const result = await repository.findByConversationId('conv-1', 50); + + expect(result).toHaveLength(1); + expect(result[0]!.content).toBe('Hello!'); + }); + + it('finds message by id', async () => { + const result = await repository.findById('msg-1'); + + expect(result?.id).toBe('msg-1'); + }); + + it('returns null when message not found', async () => { + mockPrisma.message.findUnique.mockResolvedValue(null); + + const result = await repository.findById('msg-999'); + + expect(result).toBeNull(); + }); + + it('soft-deletes a message by sender only', async () => { + await repository.softDelete('msg-1', 'user-1'); + + expect(mockPrisma.message.updateMany).toHaveBeenCalledWith({ + where: { id: 'msg-1', senderId: 'user-1' }, + data: { deletedAt: expect.any(Date) }, + }); + }); +}); diff --git a/apps/api/src/modules/messaging/infrastructure/index.ts b/apps/api/src/modules/messaging/infrastructure/index.ts new file mode 100644 index 0000000..831003c --- /dev/null +++ b/apps/api/src/modules/messaging/infrastructure/index.ts @@ -0,0 +1,2 @@ +export { PrismaConversationRepository } from './repositories/prisma-conversation.repository'; +export { PrismaMessageRepository } from './repositories/prisma-message.repository'; diff --git a/apps/api/src/modules/messaging/infrastructure/repositories/prisma-conversation.repository.ts b/apps/api/src/modules/messaging/infrastructure/repositories/prisma-conversation.repository.ts new file mode 100644 index 0000000..9a10b8c --- /dev/null +++ b/apps/api/src/modules/messaging/infrastructure/repositories/prisma-conversation.repository.ts @@ -0,0 +1,141 @@ +import { Injectable } from '@nestjs/common'; +import { type PrismaService } from '@modules/shared'; +import type { ConversationEntity, ConversationParticipantEntity } from '../../domain/entities/conversation.entity'; +import type { + IConversationRepository, + CreateConversationDto, +} from '../../domain/repositories/conversation.repository'; +import type { ConversationStatus } from '../../domain/value-objects/conversation-status.vo'; + +@Injectable() +export class PrismaConversationRepository implements IConversationRepository { + constructor(private readonly prisma: PrismaService) {} + + async create(dto: CreateConversationDto): Promise { + const record = await this.prisma.conversation.create({ + data: { + listingId: dto.listingId ?? null, + subject: dto.subject ?? null, + participants: { + create: dto.participantUserIds.map((userId) => ({ userId })), + }, + }, + include: { participants: true }, + }); + return this.toEntity(record); + } + + async findById(id: string): Promise { + const record = await this.prisma.conversation.findUnique({ + where: { id }, + include: { participants: true }, + }); + return record ? this.toEntity(record) : null; + } + + async findByUserId(userId: string, limit = 20, offset = 0): Promise { + const records = await this.prisma.conversation.findMany({ + where: { + participants: { some: { userId } }, + }, + include: { participants: true }, + orderBy: { lastMessageAt: { sort: 'desc', nulls: 'last' } }, + take: limit, + skip: offset, + }); + return records.map((r) => this.toEntity(r)); + } + + async findExistingBetweenUsers( + userIds: string[], + listingId?: string, + ): Promise { + const record = await this.prisma.conversation.findFirst({ + where: { + status: 'ACTIVE', + ...(listingId ? { listingId } : {}), + AND: userIds.map((userId) => ({ + participants: { some: { userId } }, + })), + participants: { every: { userId: { in: userIds } } }, + }, + include: { participants: true }, + }); + return record ? this.toEntity(record) : null; + } + + async updateStatus(id: string, status: ConversationStatus): Promise { + await this.prisma.conversation.update({ + where: { id }, + data: { status }, + }); + } + + async updateLastMessage(id: string, content: string, sentAt: Date): Promise { + await this.prisma.conversation.update({ + where: { id }, + data: { + lastMessage: content.length > 200 ? content.slice(0, 200) + '...' : content, + lastMessageAt: sentAt, + }, + }); + } + + async incrementUnreadCount(conversationId: string, excludeUserId: string): Promise { + await this.prisma.conversationParticipant.updateMany({ + where: { conversationId, userId: { not: excludeUserId } }, + data: { unreadCount: { increment: 1 } }, + }); + } + + async resetUnreadCount(conversationId: string, userId: string): Promise { + await this.prisma.conversationParticipant.updateMany({ + where: { conversationId, userId }, + data: { unreadCount: 0, lastReadAt: new Date() }, + }); + } + + async countByUserId(userId: string): Promise { + return this.prisma.conversation.count({ + where: { participants: { some: { userId } } }, + }); + } + + private toEntity(record: { + id: string; + listingId: string | null; + subject: string | null; + status: string; + lastMessage: string | null; + lastMessageAt: Date | null; + createdAt: Date; + updatedAt: Date; + participants: Array<{ + id: string; + conversationId: string; + userId: string; + unreadCount: number; + lastReadAt: Date | null; + joinedAt: Date; + }>; + }): ConversationEntity { + return { + id: record.id, + listingId: record.listingId, + subject: record.subject, + status: record.status as ConversationStatus, + lastMessage: record.lastMessage, + lastMessageAt: record.lastMessageAt, + createdAt: record.createdAt, + updatedAt: record.updatedAt, + participants: record.participants.map((p): ConversationParticipantEntity => ({ + id: p.id, + conversationId: p.conversationId, + userId: p.userId, + unreadCount: p.unreadCount, + lastReadAt: p.lastReadAt, + joinedAt: p.joinedAt, + })), + }; + } +} diff --git a/apps/api/src/modules/messaging/infrastructure/repositories/prisma-message.repository.ts b/apps/api/src/modules/messaging/infrastructure/repositories/prisma-message.repository.ts new file mode 100644 index 0000000..5268bca --- /dev/null +++ b/apps/api/src/modules/messaging/infrastructure/repositories/prisma-message.repository.ts @@ -0,0 +1,80 @@ +import { Injectable } from '@nestjs/common'; +import { type Prisma } from '@prisma/client'; +import { type PrismaService } from '@modules/shared'; +import type { MessageEntity } from '../../domain/entities/message.entity'; +import type { + IMessageRepository, + CreateMessageDto, +} from '../../domain/repositories/message.repository'; +import type { MessageType } from '../../domain/value-objects/message-type.vo'; + +@Injectable() +export class PrismaMessageRepository implements IMessageRepository { + constructor(private readonly prisma: PrismaService) {} + + async create(dto: CreateMessageDto): Promise { + const record = await this.prisma.message.create({ + data: { + conversationId: dto.conversationId, + senderId: dto.senderId, + type: dto.type, + content: dto.content, + metadata: (dto.metadata ?? undefined) as Prisma.InputJsonValue | undefined, + }, + }); + return this.toEntity(record); + } + + async findByConversationId( + conversationId: string, + limit = 50, + before?: string, + ): Promise { + const records = await this.prisma.message.findMany({ + where: { + conversationId, + deletedAt: null, + ...(before ? { createdAt: { lt: (await this.prisma.message.findUnique({ where: { id: before } }))?.createdAt } } : {}), + }, + orderBy: { createdAt: 'desc' }, + take: limit, + }); + return records.map((r) => this.toEntity(r)); + } + + async findById(id: string): Promise { + const record = await this.prisma.message.findUnique({ where: { id } }); + return record ? this.toEntity(record) : null; + } + + async softDelete(id: string, senderId: string): Promise { + await this.prisma.message.updateMany({ + where: { id, senderId }, + data: { deletedAt: new Date() }, + }); + } + + private toEntity(record: { + id: string; + conversationId: string; + senderId: string; + type: string; + content: string; + metadata: unknown; + editedAt: Date | null; + deletedAt: Date | null; + createdAt: Date; + }): MessageEntity { + return { + id: record.id, + conversationId: record.conversationId, + senderId: record.senderId, + type: record.type as MessageType, + content: record.content, + metadata: record.metadata as Record | null, + editedAt: record.editedAt, + deletedAt: record.deletedAt, + createdAt: record.createdAt, + }; + } +} diff --git a/apps/api/src/modules/messaging/messaging.module.ts b/apps/api/src/modules/messaging/messaging.module.ts new file mode 100644 index 0000000..51fc11e --- /dev/null +++ b/apps/api/src/modules/messaging/messaging.module.ts @@ -0,0 +1,44 @@ +import { Module } from '@nestjs/common'; +import { CqrsModule } from '@nestjs/cqrs'; +import { AuthModule } from '@modules/auth'; +import { CreateConversationHandler } from './application/commands/create-conversation/create-conversation.handler'; +import { MarkConversationReadHandler } from './application/commands/mark-read/mark-read.handler'; +import { SendMessageHandler } from './application/commands/send-message/send-message.handler'; +import { GetConversationsHandler } from './application/queries/get-conversations/get-conversations.handler'; +import { GetMessagesHandler } from './application/queries/get-messages/get-messages.handler'; +import { CONVERSATION_REPOSITORY } from './domain/repositories/conversation.repository'; +import { MESSAGE_REPOSITORY } from './domain/repositories/message.repository'; +import { PrismaConversationRepository } from './infrastructure/repositories/prisma-conversation.repository'; +import { PrismaMessageRepository } from './infrastructure/repositories/prisma-message.repository'; +import { MessagingController } from './presentation/controllers/messaging.controller'; +import { MessagingGateway } from './presentation/gateways/messaging.gateway'; + +const CommandHandlers = [ + CreateConversationHandler, + SendMessageHandler, + MarkConversationReadHandler, +]; + +const QueryHandlers = [ + GetConversationsHandler, + GetMessagesHandler, +]; + +@Module({ + imports: [CqrsModule, AuthModule], + controllers: [MessagingController], + providers: [ + // Repositories + { provide: CONVERSATION_REPOSITORY, useClass: PrismaConversationRepository }, + { provide: MESSAGE_REPOSITORY, useClass: PrismaMessageRepository }, + + // WebSocket Gateway + MessagingGateway, + + // CQRS + ...CommandHandlers, + ...QueryHandlers, + ], + exports: [MessagingGateway], +}) +export class MessagingModule {} diff --git a/apps/api/src/modules/messaging/presentation/__tests__/messaging.controller.spec.ts b/apps/api/src/modules/messaging/presentation/__tests__/messaging.controller.spec.ts new file mode 100644 index 0000000..3dbf36f --- /dev/null +++ b/apps/api/src/modules/messaging/presentation/__tests__/messaging.controller.spec.ts @@ -0,0 +1,97 @@ +import { CreateConversationCommand } from '../../application/commands/create-conversation/create-conversation.command'; +import { MarkConversationReadCommand } from '../../application/commands/mark-read/mark-read.command'; +import { SendMessageCommand } from '../../application/commands/send-message/send-message.command'; +import { GetConversationsQuery } from '../../application/queries/get-conversations/get-conversations.query'; +import { GetMessagesQuery } from '../../application/queries/get-messages/get-messages.query'; +import { MessagingController } from '../controllers/messaging.controller'; + +describe('MessagingController', () => { + let controller: MessagingController; + let mockCommandBus: { execute: ReturnType }; + let mockQueryBus: { execute: ReturnType }; + let mockMessageRepo: { softDelete: ReturnType }; + + const user = { sub: 'user-1', role: 'BUYER' }; + + beforeEach(() => { + mockCommandBus = { execute: vi.fn() }; + mockQueryBus = { execute: vi.fn() }; + mockMessageRepo = { softDelete: vi.fn() }; + + controller = new MessagingController( + mockCommandBus as any, + mockQueryBus as any, + mockMessageRepo as any, + ); + }); + + describe('createConversation', () => { + it('dispatches CreateConversationCommand', async () => { + const dto = { participantUserId: 'user-2', listingId: 'listing-1', initialMessage: 'Hi' }; + mockCommandBus.execute.mockResolvedValue({ id: 'conv-1' }); + + const result = await controller.createConversation(user as any, dto as any); + + expect(mockCommandBus.execute).toHaveBeenCalledWith( + expect.any(CreateConversationCommand), + ); + expect(result.id).toBe('conv-1'); + }); + }); + + describe('getConversations', () => { + it('dispatches GetConversationsQuery', async () => { + mockQueryBus.execute.mockResolvedValue({ conversations: [], total: 0 }); + + await controller.getConversations(user as any, 20, 0); + + expect(mockQueryBus.execute).toHaveBeenCalledWith( + expect.any(GetConversationsQuery), + ); + }); + }); + + describe('getMessages', () => { + it('dispatches GetMessagesQuery', async () => { + mockQueryBus.execute.mockResolvedValue([]); + + await controller.getMessages(user as any, 'conv-1', 50); + + expect(mockQueryBus.execute).toHaveBeenCalledWith( + expect.any(GetMessagesQuery), + ); + }); + }); + + describe('sendMessage', () => { + it('dispatches SendMessageCommand', async () => { + const dto = { content: 'Hello!' }; + mockCommandBus.execute.mockResolvedValue({ id: 'msg-1' }); + + const result = await controller.sendMessage(user as any, 'conv-1', dto as any); + + expect(mockCommandBus.execute).toHaveBeenCalledWith( + expect.any(SendMessageCommand), + ); + expect(result.id).toBe('msg-1'); + }); + }); + + describe('markAsRead', () => { + it('dispatches MarkConversationReadCommand', async () => { + await controller.markAsRead(user as any, 'conv-1'); + + expect(mockCommandBus.execute).toHaveBeenCalledWith( + expect.any(MarkConversationReadCommand), + ); + }); + }); + + describe('deleteMessage', () => { + it('calls messageRepo.softDelete with user id', async () => { + await controller.deleteMessage(user as any, 'msg-1'); + + expect(mockMessageRepo.softDelete).toHaveBeenCalledWith('msg-1', 'user-1'); + }); + }); +}); diff --git a/apps/api/src/modules/messaging/presentation/__tests__/messaging.gateway.spec.ts b/apps/api/src/modules/messaging/presentation/__tests__/messaging.gateway.spec.ts new file mode 100644 index 0000000..78aec1b --- /dev/null +++ b/apps/api/src/modules/messaging/presentation/__tests__/messaging.gateway.spec.ts @@ -0,0 +1,199 @@ +import { MessagingGateway } from '../gateways/messaging.gateway'; + +describe('MessagingGateway', () => { + let gateway: MessagingGateway; + let mockTokenService: { verifyAccessToken: ReturnType }; + let mockLogger: { log: ReturnType; warn: ReturnType; error: ReturnType; debug: ReturnType }; + let mockCommandBus: { execute: ReturnType }; + let mockConversationRepo: { + findByUserId: ReturnType; + findById: ReturnType; + }; + + beforeEach(() => { + mockTokenService = { verifyAccessToken: vi.fn() }; + mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }; + mockCommandBus = { execute: vi.fn() }; + mockConversationRepo = { + findByUserId: vi.fn().mockResolvedValue([]), + findById: vi.fn(), + }; + + gateway = new MessagingGateway( + mockTokenService as any, + mockLogger as any, + mockCommandBus as any, + mockConversationRepo as any, + ); + gateway.server = { + to: vi.fn().mockReturnThis(), + emit: vi.fn(), + } as any; + }); + + describe('handleConnection', () => { + it('authenticates client and joins rooms', async () => { + mockTokenService.verifyAccessToken.mockReturnValue({ sub: 'user-1', role: 'BUYER' }); + mockConversationRepo.findByUserId.mockResolvedValue([ + { id: 'conv-1' }, + { id: 'conv-2' }, + ]); + + const client = { + id: 'socket-1', + data: {}, + handshake: { auth: { token: 'valid-token' }, headers: {}, query: {} }, + join: vi.fn(), + emit: vi.fn(), + disconnect: vi.fn(), + }; + + await gateway.handleConnection(client as any); + + expect(client.data['userId']).toBe('user-1'); + expect(client.join).toHaveBeenCalledWith('user:user-1'); + expect(client.join).toHaveBeenCalledWith('conversation:conv-1'); + expect(client.join).toHaveBeenCalledWith('conversation:conv-2'); + }); + + it('disconnects client with no token', async () => { + const client = { + id: 'socket-1', + data: {}, + handshake: { auth: {}, headers: {}, query: {} }, + join: vi.fn(), + emit: vi.fn(), + disconnect: vi.fn(), + }; + + mockTokenService.verifyAccessToken.mockReturnValue(null); + + await gateway.handleConnection(client as any); + + expect(client.disconnect).toHaveBeenCalledWith(true); + }); + + it('disconnects client with invalid token', async () => { + mockTokenService.verifyAccessToken.mockReturnValue(null); + + const client = { + id: 'socket-1', + data: {}, + handshake: { auth: { token: 'invalid' }, headers: {}, query: {} }, + join: vi.fn(), + emit: vi.fn(), + disconnect: vi.fn(), + }; + + await gateway.handleConnection(client as any); + + expect(client.disconnect).toHaveBeenCalledWith(true); + }); + }); + + describe('handleDisconnect', () => { + it('cleans up socket tracking', () => { + // First connect + (gateway as any).userSockets.set('user-1', new Set(['socket-1'])); + + const client = { + id: 'socket-1', + data: { userId: 'user-1' }, + }; + + gateway.handleDisconnect(client as any); + + expect((gateway as any).userSockets.has('user-1')).toBe(false); + }); + }); + + describe('handleMessageSent', () => { + it('emits message:new to conversation room', async () => { + const event = { + aggregateId: 'msg-1', + conversationId: 'conv-1', + senderId: 'user-1', + type: 'TEXT', + content: 'Hello!', + occurredAt: new Date(), + }; + + await gateway.handleMessageSent(event as any); + + expect(gateway.server.to).toHaveBeenCalledWith('conversation:conv-1'); + }); + }); + + describe('handleSendMessage', () => { + it('sends message via command bus', async () => { + mockCommandBus.execute.mockResolvedValue({ id: 'msg-1' }); + + const client = { + data: { userId: 'user-1' }, + emit: vi.fn(), + }; + + await gateway.handleSendMessage( + client as any, + { conversationId: 'conv-1', content: 'Hello!' }, + ); + + expect(mockCommandBus.execute).toHaveBeenCalled(); + expect(client.emit).toHaveBeenCalledWith('message:sent', { + messageId: 'msg-1', + conversationId: 'conv-1', + }); + }); + + it('emits error when send fails', async () => { + mockCommandBus.execute.mockRejectedValue(new Error('Send failed')); + + const client = { + data: { userId: 'user-1' }, + emit: vi.fn(), + }; + + await gateway.handleSendMessage( + client as any, + { conversationId: 'conv-1', content: 'Hello!' }, + ); + + expect(client.emit).toHaveBeenCalledWith('message:error', { + conversationId: 'conv-1', + error: 'Send failed', + }); + }); + }); + + describe('handleJoinConversation', () => { + it('joins room when user is participant', async () => { + mockConversationRepo.findById.mockResolvedValue({ + participants: [{ userId: 'user-1' }], + }); + + const client = { + data: { userId: 'user-1' }, + join: vi.fn(), + }; + + await gateway.handleJoinConversation(client as any, { conversationId: 'conv-1' }); + + expect(client.join).toHaveBeenCalledWith('conversation:conv-1'); + }); + + it('does not join room when user is not participant', async () => { + mockConversationRepo.findById.mockResolvedValue({ + participants: [{ userId: 'user-2' }], + }); + + const client = { + data: { userId: 'user-1' }, + join: vi.fn(), + }; + + await gateway.handleJoinConversation(client as any, { conversationId: 'conv-1' }); + + expect(client.join).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/apps/api/src/modules/messaging/presentation/controllers/messaging.controller.ts b/apps/api/src/modules/messaging/presentation/controllers/messaging.controller.ts new file mode 100644 index 0000000..674c4f3 --- /dev/null +++ b/apps/api/src/modules/messaging/presentation/controllers/messaging.controller.ts @@ -0,0 +1,188 @@ +import { + Controller, + Get, + Post, + Patch, + Delete, + Body, + Param, + Query, + Inject, + UseGuards, + HttpCode, + HttpStatus, +} from '@nestjs/common'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports +import { CommandBus, QueryBus } from '@nestjs/cqrs'; +import { AuthGuard } from '@nestjs/passport'; +import { ApiTags, ApiOperation, ApiResponse, ApiBearerAuth, ApiQuery, ApiProperty } from '@nestjs/swagger'; +import { MessageType as PrismaMessageType } from '@prisma/client'; +import { IsString, IsOptional, IsEnum, MaxLength } from 'class-validator'; +import { CurrentUser, type JwtPayload } from '@modules/auth'; +import { CreateConversationCommand } from '../../application/commands/create-conversation/create-conversation.command'; +import { MarkConversationReadCommand } from '../../application/commands/mark-read/mark-read.command'; +import { SendMessageCommand } from '../../application/commands/send-message/send-message.command'; +import { GetConversationsQuery } from '../../application/queries/get-conversations/get-conversations.query'; +import { GetMessagesQuery } from '../../application/queries/get-messages/get-messages.query'; +import { + MESSAGE_REPOSITORY, + type IMessageRepository, +} from '../../domain/repositories/message.repository'; + +class CreateConversationDto { + @ApiProperty({ description: 'User ID of the other participant' }) + @IsString() + participantUserId!: string; + + @ApiProperty({ required: false, description: 'Associated listing ID' }) + @IsString() + @IsOptional() + listingId?: string; + + @ApiProperty({ required: false, description: 'Conversation subject' }) + @IsString() + @IsOptional() + @MaxLength(200) + subject?: string; + + @ApiProperty({ required: false, description: 'Initial message text' }) + @IsString() + @IsOptional() + @MaxLength(5000) + initialMessage?: string; +} + +class SendMessageDto { + @ApiProperty({ description: 'Message content' }) + @IsString() + @MaxLength(5000) + content!: string; + + @ApiProperty({ enum: PrismaMessageType, required: false, default: 'TEXT' }) + @IsEnum(PrismaMessageType) + @IsOptional() + type?: PrismaMessageType; + + @ApiProperty({ required: false, description: 'Additional metadata (e.g. file URL)' }) + @IsOptional() + metadata?: Record; +} + +@ApiTags('messaging') +@ApiBearerAuth('JWT') +@Controller('messaging') +@UseGuards(AuthGuard('jwt')) +export class MessagingController { + constructor( + private readonly commandBus: CommandBus, + private readonly queryBus: QueryBus, + @Inject(MESSAGE_REPOSITORY) + private readonly messageRepo: IMessageRepository, + ) {} + + @Post('conversations') + @ApiOperation({ summary: 'Create a conversation or return existing one' }) + @ApiResponse({ status: 201, description: 'Conversation created' }) + @ApiResponse({ status: 400, description: 'Cannot create conversation with yourself' }) + @ApiResponse({ status: 401, description: 'Unauthorized' }) + async createConversation( + @CurrentUser() user: JwtPayload, + @Body() dto: CreateConversationDto, + ) { + return this.commandBus.execute( + new CreateConversationCommand( + user.sub, + dto.participantUserId, + dto.listingId, + dto.subject, + dto.initialMessage, + ), + ); + } + + @Get('conversations') + @ApiOperation({ summary: 'List user conversations' }) + @ApiResponse({ status: 200, description: 'Conversations retrieved' }) + @ApiResponse({ status: 401, description: 'Unauthorized' }) + @ApiQuery({ name: 'limit', required: false, type: Number }) + @ApiQuery({ name: 'offset', required: false, type: Number }) + async getConversations( + @CurrentUser() user: JwtPayload, + @Query('limit') limit?: number, + @Query('offset') offset?: number, + ) { + return this.queryBus.execute( + new GetConversationsQuery(user.sub, limit ?? 20, offset ?? 0), + ); + } + + @Get('conversations/:id/messages') + @ApiOperation({ summary: 'Get messages in a conversation' }) + @ApiResponse({ status: 200, description: 'Messages retrieved' }) + @ApiResponse({ status: 401, description: 'Unauthorized' }) + @ApiResponse({ status: 403, description: 'Not a participant' }) + @ApiResponse({ status: 404, description: 'Conversation not found' }) + @ApiQuery({ name: 'limit', required: false, type: Number }) + @ApiQuery({ name: 'before', required: false, type: String, description: 'Cursor: message ID for pagination' }) + async getMessages( + @CurrentUser() user: JwtPayload, + @Param('id') conversationId: string, + @Query('limit') limit?: number, + @Query('before') before?: string, + ) { + return this.queryBus.execute( + new GetMessagesQuery(conversationId, user.sub, limit ?? 50, before), + ); + } + + @Post('conversations/:id/messages') + @HttpCode(HttpStatus.CREATED) + @ApiOperation({ summary: 'Send a message in a conversation' }) + @ApiResponse({ status: 201, description: 'Message sent' }) + @ApiResponse({ status: 401, description: 'Unauthorized' }) + @ApiResponse({ status: 403, description: 'Not a participant' }) + @ApiResponse({ status: 404, description: 'Conversation not found' }) + async sendMessage( + @CurrentUser() user: JwtPayload, + @Param('id') conversationId: string, + @Body() dto: SendMessageDto, + ) { + return this.commandBus.execute( + new SendMessageCommand( + conversationId, + user.sub, + dto.content, + dto.type ?? 'TEXT', + dto.metadata, + ), + ); + } + + @Patch('conversations/:id/read') + @HttpCode(HttpStatus.NO_CONTENT) + @ApiOperation({ summary: 'Mark a conversation as read' }) + @ApiResponse({ status: 204, description: 'Conversation marked as read' }) + @ApiResponse({ status: 401, description: 'Unauthorized' }) + @ApiResponse({ status: 403, description: 'Not a participant' }) + @ApiResponse({ status: 404, description: 'Conversation not found' }) + async markAsRead( + @CurrentUser() user: JwtPayload, + @Param('id') conversationId: string, + ) { + await this.commandBus.execute( + new MarkConversationReadCommand(conversationId, user.sub), + ); + } + + @Delete('conversations/:conversationId/messages/:messageId') + @HttpCode(HttpStatus.NO_CONTENT) + @ApiOperation({ summary: 'Soft-delete a message (sender only)' }) + @ApiResponse({ status: 204, description: 'Message deleted' }) + @ApiResponse({ status: 401, description: 'Unauthorized' }) + async deleteMessage( + @CurrentUser() user: JwtPayload, + @Param('messageId') messageId: string, + ) { + await this.messageRepo.softDelete(messageId, user.sub); + } +} diff --git a/apps/api/src/modules/messaging/presentation/gateways/messaging.gateway.ts b/apps/api/src/modules/messaging/presentation/gateways/messaging.gateway.ts new file mode 100644 index 0000000..d9727f0 --- /dev/null +++ b/apps/api/src/modules/messaging/presentation/gateways/messaging.gateway.ts @@ -0,0 +1,257 @@ +import { Inject } from '@nestjs/common'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports +import { CommandBus } from '@nestjs/cqrs'; +import { OnEvent } from '@nestjs/event-emitter'; +import { + WebSocketGateway, + WebSocketServer, + SubscribeMessage, + MessageBody, + ConnectedSocket, + type OnGatewayConnection, + type OnGatewayDisconnect, + type OnGatewayInit, +} from '@nestjs/websockets'; +import type { Server, Socket } from 'socket.io'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports +import { TokenService, type JwtPayload } from '@modules/auth'; +// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports +import { LoggerService } from '@modules/shared'; +import { MarkConversationReadCommand } from '../../application/commands/mark-read/mark-read.command'; +import { SendMessageCommand } from '../../application/commands/send-message/send-message.command'; +import type { MessageSentEvent } from '../../domain/events/message-sent.event'; +import { + CONVERSATION_REPOSITORY, + type IConversationRepository, +} from '../../domain/repositories/conversation.repository'; + +@WebSocketGateway({ + namespace: '/messaging', + cors: { + origin: (process.env['CORS_ORIGINS'] ?? 'http://localhost:3000') + .split(',') + .map((o) => o.trim()), + credentials: true, + }, +}) +export class MessagingGateway + implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect +{ + @WebSocketServer() + server!: Server; + + private readonly userSockets = new Map>(); + + constructor( + private readonly tokenService: TokenService, + private readonly logger: LoggerService, + private readonly commandBus: CommandBus, + @Inject(CONVERSATION_REPOSITORY) + private readonly conversationRepo: IConversationRepository, + ) {} + + afterInit(): void { + this.logger.log('MessagingGateway initialized', 'MessagingGateway'); + } + + async handleConnection(client: Socket): Promise { + try { + const payload = this.extractAndVerifyToken(client); + if (!payload) { + client.disconnect(true); + return; + } + + client.data['userId'] = payload.sub; + + // Join user's personal room for direct messaging events + await client.join(`user:${payload.sub}`); + + // Join rooms for all active conversations + const conversations = await this.conversationRepo.findByUserId(payload.sub, 100); + for (const conv of conversations) { + await client.join(`conversation:${conv.id}`); + } + + if (!this.userSockets.has(payload.sub)) { + this.userSockets.set(payload.sub, new Set()); + } + this.userSockets.get(payload.sub)!.add(client.id); + + this.logger.debug( + `WS messaging connected: user=${payload.sub} socket=${client.id} conversations=${conversations.length}`, + 'MessagingGateway', + ); + } catch (error) { + this.logger.error( + `WS messaging connection error: ${error instanceof Error ? error.message : error}`, + error instanceof Error ? error.stack : undefined, + 'MessagingGateway', + ); + client.disconnect(true); + } + } + + handleDisconnect(client: Socket): void { + const userId = client.data['userId'] as string | undefined; + if (userId) { + const sockets = this.userSockets.get(userId); + if (sockets) { + sockets.delete(client.id); + if (sockets.size === 0) { + this.userSockets.delete(userId); + } + } + } + this.logger.debug( + `WS messaging disconnected: user=${userId ?? 'unknown'} socket=${client.id}`, + 'MessagingGateway', + ); + } + + /* ──────────────────────────────────────────── + * Client → Server message handlers + * ──────────────────────────────────────────── */ + + @SubscribeMessage('message:send') + async handleSendMessage( + @ConnectedSocket() client: Socket, + @MessageBody() data: { conversationId: string; content: string; type?: string }, + ): Promise { + const userId = client.data['userId'] as string; + if (!userId) return; + + try { + const message = await this.commandBus.execute( + new SendMessageCommand( + data.conversationId, + userId, + data.content, + (data.type as 'TEXT' | 'IMAGE' | 'FILE') ?? 'TEXT', + ), + ); + client.emit('message:sent', { messageId: message.id, conversationId: data.conversationId }); + } catch (error) { + client.emit('message:error', { + conversationId: data.conversationId, + error: error instanceof Error ? error.message : 'Không thể gửi tin nhắn', + }); + } + } + + @SubscribeMessage('message:typing') + async handleTyping( + @ConnectedSocket() client: Socket, + @MessageBody() data: { conversationId: string }, + ): Promise { + const userId = client.data['userId'] as string; + if (!userId) return; + + client.to(`conversation:${data.conversationId}`).emit('message:typing', { + conversationId: data.conversationId, + userId, + }); + } + + @SubscribeMessage('message:stop-typing') + async handleStopTyping( + @ConnectedSocket() client: Socket, + @MessageBody() data: { conversationId: string }, + ): Promise { + const userId = client.data['userId'] as string; + if (!userId) return; + + client.to(`conversation:${data.conversationId}`).emit('message:stop-typing', { + conversationId: data.conversationId, + userId, + }); + } + + @SubscribeMessage('conversation:read') + async handleMarkRead( + @ConnectedSocket() client: Socket, + @MessageBody() data: { conversationId: string }, + ): Promise { + const userId = client.data['userId'] as string; + if (!userId) return; + + try { + await this.commandBus.execute( + new MarkConversationReadCommand(data.conversationId, userId), + ); + client.emit('conversation:read-ack', { conversationId: data.conversationId }); + } catch { + // Non-critical — silently fail + } + } + + @SubscribeMessage('conversation:join') + async handleJoinConversation( + @ConnectedSocket() client: Socket, + @MessageBody() data: { conversationId: string }, + ): Promise { + const userId = client.data['userId'] as string; + if (!userId) return; + + // Verify the user is a participant before joining the room + const conversation = await this.conversationRepo.findById(data.conversationId); + if (conversation && conversation.participants.some((p) => p.userId === userId)) { + await client.join(`conversation:${data.conversationId}`); + } + } + + /* ──────────────────────────────────────────── + * Domain event handlers + * ──────────────────────────────────────────── */ + + @OnEvent('message.sent', { async: true }) + async handleMessageSent(event: MessageSentEvent): Promise { + try { + this.server.to(`conversation:${event.conversationId}`).emit('message:new', { + id: event.aggregateId, + conversationId: event.conversationId, + senderId: event.senderId, + type: event.type, + content: event.content, + createdAt: event.occurredAt.toISOString(), + }); + } catch (error) { + this.logger.error( + `Failed to emit WS message for conversation ${event.conversationId}: ${ + error instanceof Error ? error.message : error + }`, + error instanceof Error ? error.stack : undefined, + 'MessagingGateway', + ); + } + } + + /* ──────────────────────────────────────────── + * Private helpers + * ──────────────────────────────────────────── */ + + private extractAndVerifyToken(client: Socket): JwtPayload | null { + const raw: unknown = + client.handshake.auth?.['token'] ?? + client.handshake.headers?.['authorization'] ?? + client.handshake.query?.['token']; + + if (!raw || typeof raw !== 'string') { + this.logger.warn( + `WS messaging auth failed: no token provided (socket=${client.id})`, + 'MessagingGateway', + ); + return null; + } + + const token = raw.startsWith('Bearer ') ? raw.slice(7) : raw; + const payload = this.tokenService.verifyAccessToken(token); + if (!payload) { + this.logger.warn( + `WS messaging auth failed: invalid token (socket=${client.id})`, + 'MessagingGateway', + ); + } + return payload; + } +} diff --git a/prisma/schema.prisma b/prisma/schema.prisma index fda4eb4..0ed8726 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -880,3 +880,212 @@ model Review { @@index([userId]) @@index([targetType, targetId, createdAt(sort: Desc)]) } + +// ============================================================================= +// INDUSTRIAL PARKS (KCN) +// ============================================================================= + +enum IndustrialParkStatus { + PLANNING + UNDER_CONSTRUCTION + OPERATIONAL + FULL +} + +enum IndustrialPropertyType { + INDUSTRIAL_LAND + READY_BUILT_FACTORY + READY_BUILT_WAREHOUSE + LOGISTICS_CENTER + OFFICE_IN_PARK + DATA_CENTER +} + +enum IndustrialLeaseType { + LAND_LEASE + FACTORY_LEASE + WAREHOUSE_LEASE + SUBLEASE +} + +enum IndustrialListingStatus { + DRAFT + ACTIVE + RESERVED + LEASED + EXPIRED +} + +enum VietnamRegion { + NORTH + CENTRAL + SOUTH +} + +model IndustrialPark { + id String @id @default(cuid()) + name String + nameEn String? + slug String @unique + developer String + operator String? + status IndustrialParkStatus @default(PLANNING) + location Unsupported("geometry(Point, 4326)") + address String + district String + province String + region VietnamRegion + totalAreaHa Float + leasableAreaHa Float + occupancyRate Float @default(0) // 0-100 + remainingAreaHa Float + tenantCount Int @default(0) + establishedYear Int? + landRentUsdM2Year Float? + rbfRentUsdM2Month Float? + rbwRentUsdM2Month Float? + managementFeeUsd Float? + infrastructure Json? // { electricity, water, wastewater, telecom, roads, fire } + connectivity Json? // { nearestPort, airport, highway, railway, seaport } + incentives Json? // { taxHoliday, importDuty, landRentReduction, specialZone } + targetIndustries String[] + existingTenants Json? // [{ name, country, industry }] + certifications Json? // ["ISO 14001", "Green park"] + media Json? + documents Json? + description String? @db.Text + descriptionEn String? @db.Text + isVerified Boolean @default(false) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + listings IndustrialListing[] + + @@index([status]) + @@index([province]) + @@index([region]) + @@index([developer]) + @@index([location], type: Gist) + @@index([isVerified]) + @@index([occupancyRate]) + @@index([landRentUsdM2Year]) + @@index([region, province, status]) + @@index([createdAt]) +} + +model IndustrialListing { + id String @id @default(cuid()) + parkId String + park IndustrialPark @relation(fields: [parkId], references: [id], onDelete: Cascade) + agentId String? + sellerId String + propertyType IndustrialPropertyType + leaseType IndustrialLeaseType + status IndustrialListingStatus @default(DRAFT) + title String + description String? @db.Text + areaM2 Float + ceilingHeightM Float? + floorLoadTonM2 Float? + columnSpacingM Float? + dockCount Int? + craneCapacityTon Float? + hasMezzanine Boolean @default(false) + hasOfficeArea Boolean @default(false) + officeAreaM2 Float? + priceUsdM2 Float? + pricingUnit String? // "usd/m2/month", "usd/m2/year" + totalLeasePrice Float? + managementFee Float? + depositMonths Int? + minLeaseYears Int? + maxLeaseYears Int? + leaseExpiry DateTime? + availableFrom DateTime? + powerCapacityKva Float? + waterSupplyM3Day Float? + media Json? + viewCount Int @default(0) + inquiryCount Int @default(0) + publishedAt DateTime? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([parkId]) + @@index([propertyType]) + @@index([leaseType]) + @@index([status]) + @@index([areaM2]) + @@index([priceUsdM2]) + @@index([sellerId]) + @@index([agentId]) + @@index([publishedAt]) + @@index([parkId, status]) + @@index([propertyType, leaseType, status]) + @@index([status, publishedAt(sort: Desc)]) +} + +// ============================================================================= +// MESSAGING (buyer ↔ agent / seller in-app chat) +// ============================================================================= + +enum ConversationStatus { + ACTIVE + ARCHIVED + CLOSED +} + +model Conversation { + id String @id @default(cuid()) + listingId String? + subject String? + status ConversationStatus @default(ACTIVE) + lastMessage String? @db.Text + lastMessageAt DateTime? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + participants ConversationParticipant[] + messages Message[] + + @@index([status]) + @@index([lastMessageAt(sort: Desc)]) + @@index([listingId]) +} + +model ConversationParticipant { + id String @id @default(cuid()) + conversationId String + conversation Conversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) + userId String + unreadCount Int @default(0) + lastReadAt DateTime? + joinedAt DateTime @default(now()) + + @@unique([conversationId, userId]) + @@index([userId]) + @@index([conversationId]) +} + +enum MessageType { + TEXT + IMAGE + FILE + SYSTEM +} + +model Message { + id String @id @default(cuid()) + conversationId String + conversation Conversation @relation(fields: [conversationId], references: [id], onDelete: Cascade) + senderId String + type MessageType @default(TEXT) + content String @db.Text + metadata Json? + editedAt DateTime? + deletedAt DateTime? + createdAt DateTime @default(now()) + + @@index([conversationId, createdAt]) + @@index([senderId]) +}