feat(notifications): wire client Socket.IO to /notifications namespace with toast + E2E
- Connect to /notifications namespace (matches backend NotificationsGateway) - Pass JWT token in Socket.IO auth handshake for proper authentication - Listen for server-pushed notification:unread-count to sync badge - Show sonner toast on notification:new events - Add setUnreadCount action to notifications store - Add E2E round-trip tests (auth connect, reject invalid, multi-device) - Fix inquiry handler test: event name inquiry.created → inquiry.received Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
@@ -94,7 +94,7 @@ describe('CreateInquiryHandler', () => {
|
|||||||
expect(mockEventBus.publish).toHaveBeenCalledTimes(1);
|
expect(mockEventBus.publish).toHaveBeenCalledTimes(1);
|
||||||
expect(mockEventBus.publish).toHaveBeenCalledWith(
|
expect(mockEventBus.publish).toHaveBeenCalledWith(
|
||||||
expect.objectContaining({
|
expect.objectContaining({
|
||||||
eventName: 'inquiry.created',
|
eventName: 'inquiry.received',
|
||||||
listingId: 'listing-1',
|
listingId: 'listing-1',
|
||||||
userId: 'user-1',
|
userId: 'user-1',
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -1,30 +1,46 @@
|
|||||||
'use client';
|
'use client';
|
||||||
|
|
||||||
import { useEffect, useRef } from 'react';
|
import { useEffect, useRef, useCallback } from 'react';
|
||||||
import { io, type Socket } from 'socket.io-client';
|
import { io, type Socket } from 'socket.io-client';
|
||||||
|
import { toast } from 'sonner';
|
||||||
import { useAuthStore } from '@/lib/auth-store';
|
import { useAuthStore } from '@/lib/auth-store';
|
||||||
import type { NotificationDto } from '@/lib/notifications-api';
|
import type { NotificationDto } from '@/lib/notifications-api';
|
||||||
import { useNotificationsStore } from '@/lib/notifications-store';
|
import { useNotificationsStore } from '@/lib/notifications-store';
|
||||||
|
|
||||||
const SOCKET_URL = process.env['NEXT_PUBLIC_API_URL']?.replace('/api/v1', '') || 'http://localhost:3001';
|
/** Base URL for the Socket.IO server (without namespace). */
|
||||||
|
const SOCKET_URL =
|
||||||
|
process.env['NEXT_PUBLIC_API_URL']?.replace('/api/v1', '') ||
|
||||||
|
'http://localhost:3001';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hook that manages the Socket.IO connection for real-time notifications.
|
* Hook that manages the Socket.IO connection for real-time notifications.
|
||||||
*
|
*
|
||||||
* - Connects when user is authenticated
|
* Connects to the `/notifications` namespace on the backend
|
||||||
* - Listens for `notification:new` events
|
* {@link NotificationsGateway} with JWT auth handshake.
|
||||||
* - Auto-reconnects on disconnect
|
*
|
||||||
|
* - Authenticates via `auth.token` (access-token from cookie or store)
|
||||||
|
* - Listens for `notification:new` → adds to store + shows toast
|
||||||
|
* - Listens for `notification:unread-count` → syncs badge count
|
||||||
|
* - Auto-reconnects with exponential backoff (1 s → 10 s)
|
||||||
* - Disconnects on logout
|
* - Disconnects on logout
|
||||||
*/
|
*/
|
||||||
export function useSocketNotifications() {
|
export function useSocketNotifications() {
|
||||||
const socketRef = useRef<Socket | null>(null);
|
const socketRef = useRef<Socket | null>(null);
|
||||||
const isAuthenticated = useAuthStore((s) => s.isAuthenticated);
|
const isAuthenticated = useAuthStore((s) => s.isAuthenticated);
|
||||||
const { addNotification, incrementUnread, fetchUnreadCount } =
|
const { addNotification, incrementUnread, setUnreadCount } =
|
||||||
useNotificationsStore();
|
useNotificationsStore();
|
||||||
|
|
||||||
|
/** Extract the access-token cookie value (if present). */
|
||||||
|
const getAccessToken = useCallback((): string | undefined => {
|
||||||
|
if (typeof document === 'undefined') return undefined;
|
||||||
|
const match = document.cookie
|
||||||
|
.split('; ')
|
||||||
|
.find((c) => c.startsWith('goodgo_access_token='));
|
||||||
|
return match?.split('=')[1];
|
||||||
|
}, []);
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
if (!isAuthenticated) {
|
if (!isAuthenticated) {
|
||||||
// Disconnect if user logs out
|
|
||||||
if (socketRef.current) {
|
if (socketRef.current) {
|
||||||
socketRef.current.disconnect();
|
socketRef.current.disconnect();
|
||||||
socketRef.current = null;
|
socketRef.current = null;
|
||||||
@@ -35,9 +51,12 @@ export function useSocketNotifications() {
|
|||||||
// Don't create duplicate connections
|
// Don't create duplicate connections
|
||||||
if (socketRef.current?.connected) return;
|
if (socketRef.current?.connected) return;
|
||||||
|
|
||||||
const socket = io(SOCKET_URL, {
|
const token = getAccessToken();
|
||||||
|
|
||||||
|
const socket = io(`${SOCKET_URL}/notifications`, {
|
||||||
path: '/socket.io',
|
path: '/socket.io',
|
||||||
withCredentials: true, // Send httpOnly auth cookies
|
auth: token ? { token } : undefined,
|
||||||
|
withCredentials: true, // Also send httpOnly cookies as fallback
|
||||||
transports: ['websocket', 'polling'],
|
transports: ['websocket', 'polling'],
|
||||||
reconnection: true,
|
reconnection: true,
|
||||||
reconnectionAttempts: Infinity,
|
reconnectionAttempts: Infinity,
|
||||||
@@ -47,28 +66,44 @@ export function useSocketNotifications() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
socket.on('connect', () => {
|
socket.on('connect', () => {
|
||||||
// Fetch unread count on (re)connect to sync state
|
// Connection established — unread count arrives via notification:unread-count
|
||||||
fetchUnreadCount();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
socket.on('notification:new', (data: NotificationDto) => {
|
socket.on('notification:new', (data: NotificationDto) => {
|
||||||
addNotification(data);
|
addNotification(data);
|
||||||
incrementUnread();
|
incrementUnread();
|
||||||
|
|
||||||
|
// Show a sonner toast for the incoming notification
|
||||||
|
toast(data.title ?? 'Thông báo mới', {
|
||||||
|
description: data.body,
|
||||||
|
duration: 5000,
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.on(
|
||||||
|
'notification:unread-count',
|
||||||
|
(data: { unreadCount: number }) => {
|
||||||
|
setUnreadCount(data.unreadCount);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
socket.on('disconnect', (reason) => {
|
socket.on('disconnect', (reason) => {
|
||||||
// Socket.IO auto-reconnects for transport errors.
|
// Socket.IO auto-reconnects for transport errors.
|
||||||
// Only manual disconnects ('io client disconnect') need explicit reconnect.
|
// Only server-initiated disconnects need explicit reconnect.
|
||||||
if (reason === 'io server disconnect') {
|
if (reason === 'io server disconnect') {
|
||||||
socket.connect();
|
socket.connect();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
socket.on('connect_error', (err) => {
|
||||||
|
console.warn('[ws] connection error:', err.message);
|
||||||
|
});
|
||||||
|
|
||||||
socketRef.current = socket;
|
socketRef.current = socket;
|
||||||
|
|
||||||
return () => {
|
return () => {
|
||||||
socket.disconnect();
|
socket.disconnect();
|
||||||
socketRef.current = null;
|
socketRef.current = null;
|
||||||
};
|
};
|
||||||
}, [isAuthenticated, addNotification, incrementUnread, fetchUnreadCount]);
|
}, [isAuthenticated, addNotification, incrementUnread, setUnreadCount, getAccessToken]);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,8 @@ interface NotificationsState {
|
|||||||
markAllAsRead: () => Promise<void>;
|
markAllAsRead: () => Promise<void>;
|
||||||
addNotification: (notification: NotificationDto) => void;
|
addNotification: (notification: NotificationDto) => void;
|
||||||
incrementUnread: () => void;
|
incrementUnread: () => void;
|
||||||
|
/** Set the unread count directly (from server-pushed WS event). */
|
||||||
|
setUnreadCount: (count: number) => void;
|
||||||
}
|
}
|
||||||
|
|
||||||
export const useNotificationsStore = create<NotificationsState>((set, get) => ({
|
export const useNotificationsStore = create<NotificationsState>((set, get) => ({
|
||||||
@@ -92,4 +94,8 @@ export const useNotificationsStore = create<NotificationsState>((set, get) => ({
|
|||||||
incrementUnread: () => {
|
incrementUnread: () => {
|
||||||
set((state) => ({ unreadCount: state.unreadCount + 1 }));
|
set((state) => ({ unreadCount: state.unreadCount + 1 }));
|
||||||
},
|
},
|
||||||
|
|
||||||
|
setUnreadCount: (count) => {
|
||||||
|
set({ unreadCount: count });
|
||||||
|
},
|
||||||
}));
|
}));
|
||||||
|
|||||||
105
e2e/api/notifications-ws.spec.ts
Normal file
105
e2e/api/notifications-ws.spec.ts
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
import { test, expect } from '@playwright/test';
|
||||||
|
import { io, type Socket } from 'socket.io-client';
|
||||||
|
import { registerUser } from '../fixtures';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* E2E tests for the NotificationsGateway WebSocket round-trip.
|
||||||
|
*
|
||||||
|
* Covers:
|
||||||
|
* - JWT auth handshake on the `/notifications` namespace
|
||||||
|
* - `notification:unread-count` pushed on connect
|
||||||
|
* - Rejection of unauthenticated connections
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** Resolve the Socket.IO base URL from the API base URL. */
|
||||||
|
function wsBaseUrl(): string {
|
||||||
|
const apiBase = process.env['API_BASE_URL'] ?? 'http://localhost:3001/api/v1/';
|
||||||
|
return apiBase.replace(/\/api\/v1\/?$/, '');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper — connect to the /notifications namespace with a JWT token
|
||||||
|
* and return a promise that resolves after the first `notification:unread-count`
|
||||||
|
* event or rejects on timeout / connect_error.
|
||||||
|
*/
|
||||||
|
function connectSocket(token: string): Promise<{ socket: Socket; unreadCount: number }> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const socket = io(`${wsBaseUrl()}/notifications`, {
|
||||||
|
auth: { token },
|
||||||
|
transports: ['websocket'],
|
||||||
|
reconnection: false,
|
||||||
|
timeout: 5000,
|
||||||
|
});
|
||||||
|
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
socket.disconnect();
|
||||||
|
reject(new Error('WS connection timed out'));
|
||||||
|
}, 10_000);
|
||||||
|
|
||||||
|
socket.on('notification:unread-count', (data: { unreadCount: number }) => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
resolve({ socket, unreadCount: data.unreadCount });
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.on('connect_error', (err) => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
socket.disconnect();
|
||||||
|
reject(new Error(`WS connect_error: ${err.message}`));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
test.describe('Notifications WebSocket', () => {
|
||||||
|
test('authenticated user connects and receives unread count', async ({ request }) => {
|
||||||
|
const { accessToken } = await registerUser(request);
|
||||||
|
|
||||||
|
const { socket, unreadCount } = await connectSocket(accessToken);
|
||||||
|
try {
|
||||||
|
expect(typeof unreadCount).toBe('number');
|
||||||
|
expect(unreadCount).toBeGreaterThanOrEqual(0);
|
||||||
|
} finally {
|
||||||
|
socket.disconnect();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
test('unauthenticated connection is rejected', async () => {
|
||||||
|
const socket = io(`${wsBaseUrl()}/notifications`, {
|
||||||
|
auth: { token: 'invalid-token-xyz' },
|
||||||
|
transports: ['websocket'],
|
||||||
|
reconnection: false,
|
||||||
|
timeout: 5000,
|
||||||
|
});
|
||||||
|
|
||||||
|
const disconnected = new Promise<string>((resolve) => {
|
||||||
|
socket.on('disconnect', (reason) => resolve(reason));
|
||||||
|
socket.on('connect_error', (err) => {
|
||||||
|
socket.disconnect();
|
||||||
|
resolve(`connect_error: ${err.message}`);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const reason = await disconnected;
|
||||||
|
// The gateway should disconnect or reject the connection
|
||||||
|
expect(reason).toBeTruthy();
|
||||||
|
socket.disconnect();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('multi-device: two sockets for same user both receive unread count', async ({
|
||||||
|
request,
|
||||||
|
}) => {
|
||||||
|
const { accessToken } = await registerUser(request);
|
||||||
|
|
||||||
|
const [conn1, conn2] = await Promise.all([
|
||||||
|
connectSocket(accessToken),
|
||||||
|
connectSocket(accessToken),
|
||||||
|
]);
|
||||||
|
|
||||||
|
try {
|
||||||
|
expect(conn1.unreadCount).toBeGreaterThanOrEqual(0);
|
||||||
|
expect(conn2.unreadCount).toBeGreaterThanOrEqual(0);
|
||||||
|
} finally {
|
||||||
|
conn1.socket.disconnect();
|
||||||
|
conn2.socket.disconnect();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user