feat(notifications): R8.1 Stringee SMS adapter + rate limiting (TEC-2764)

- Add NotificationChannelPort domain port for SMS/transactional channels.
- Refactor StringeeSmsService to implement the port; routes OTP template
  keys through the tighter otp bucket and transactional keys through the
  wider bucket.
- Add SmsRateLimiterService using a Redis sorted-set sliding window with
  per-minute + per-hour limits per phone; fails open on Redis errors.
- Rate-limit violations throw DomainException(TOO_MANY_REQUESTS, 429)
  with retryAfterSeconds in the details payload.
- Cover adapter + rate limiter with unit tests (22 specs); all 148
  notifications tests still green.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-18 15:37:45 +07:00
parent 8c6e3b92d0
commit caa0a58afd
8 changed files with 463 additions and 23 deletions

View File

@@ -14,3 +14,9 @@ export {
NotificationChannel,
ALL_CHANNELS,
} from './value-objects/notification-channel.vo';
export {
SMS_NOTIFICATION_CHANNEL,
type NotificationChannelPort,
type SendChannelMessageDto,
type SendChannelMessageResult,
} from './ports/notification-channel.port';

View File

@@ -0,0 +1,21 @@
import { type NotificationChannel } from '../value-objects/notification-channel.vo';
export interface SendChannelMessageDto {
recipient: string;
subject: string;
body: string;
templateKey: string;
metadata?: Record<string, unknown>;
}
export interface SendChannelMessageResult {
messageId: string;
}
export interface NotificationChannelPort {
readonly channel: NotificationChannel;
readonly isAvailable: boolean;
send(dto: SendChannelMessageDto): Promise<SendChannelMessageResult>;
}
export const SMS_NOTIFICATION_CHANNEL = Symbol('SMS_NOTIFICATION_CHANNEL');

View File

@@ -0,0 +1,77 @@
import {
SMS_RATE_LIMIT_BUCKETS,
SmsRateLimiterService,
} from '../services/sms-rate-limiter.service';
describe('SmsRateLimiterService', () => {
let mockRedis: { getClient: ReturnType<typeof vi.fn> };
let mockClient: { eval: ReturnType<typeof vi.fn> };
let mockLogger: {
log: ReturnType<typeof vi.fn>;
warn: ReturnType<typeof vi.fn>;
error: ReturnType<typeof vi.fn>;
};
let service: SmsRateLimiterService;
beforeEach(() => {
mockClient = { eval: vi.fn() };
mockRedis = { getClient: vi.fn().mockReturnValue(mockClient) };
mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() };
service = new SmsRateLimiterService(mockRedis as any, mockLogger as any);
});
it('allows the request when Lua script reports under limit', async () => {
mockClient.eval.mockResolvedValue([1, 0]);
const decision = await service.check('+84901234567', 'otp');
expect(decision.allowed).toBe(true);
expect(decision.current).toBe(1);
expect(decision.limit).toBe(SMS_RATE_LIMIT_BUCKETS.otp.limit);
expect(decision.retryAfterSeconds).toBe(0);
expect(decision.bucket).toBe('otp');
});
it('blocks the request and returns retryAfter when limit reached', async () => {
mockClient.eval.mockResolvedValue([SMS_RATE_LIMIT_BUCKETS.otp.limit, 12_345]);
const decision = await service.check('+84901234567', 'otp');
expect(decision.allowed).toBe(false);
expect(decision.retryAfterSeconds).toBeGreaterThanOrEqual(1);
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('SMS rate limit hit'),
'SmsRateLimiterService',
);
});
it('namespaces the key per phone and bucket', async () => {
mockClient.eval.mockResolvedValue([1, 0]);
await service.check('+84901234567', 'transactional');
expect(mockClient.eval).toHaveBeenCalledWith(
expect.any(String),
1,
'sms_rate_limit:transactional:+84901234567',
expect.any(Number),
SMS_RATE_LIMIT_BUCKETS.transactional.windowSeconds * 1000,
SMS_RATE_LIMIT_BUCKETS.transactional.limit,
expect.any(String),
SMS_RATE_LIMIT_BUCKETS.transactional.windowSeconds,
);
});
it('fails open when Redis throws (allows the send, logs warning)', async () => {
mockClient.eval.mockRejectedValue(new Error('redis down'));
const decision = await service.check('+84901234567', 'otpHourly');
expect(decision.allowed).toBe(true);
expect(decision.current).toBe(0);
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('Redis error'),
'SmsRateLimiterService',
);
});
});

View File

@@ -1,5 +1,23 @@
import { HttpStatus } from '@nestjs/common';
import { DomainException } from '@modules/shared';
import { StringeeSmsService } from '../services/stringee-sms.service';
const allowedDecision = {
allowed: true,
current: 1,
limit: 5,
retryAfterSeconds: 0,
bucket: 'otp' as const,
};
const blockedDecision = {
allowed: false,
current: 5,
limit: 5,
retryAfterSeconds: 42,
bucket: 'otp' as const,
};
describe('StringeeSmsService', () => {
let service: StringeeSmsService;
let mockLogger: {
@@ -7,10 +25,12 @@ describe('StringeeSmsService', () => {
warn: ReturnType<typeof vi.fn>;
error: ReturnType<typeof vi.fn>;
};
let mockRateLimiter: { check: ReturnType<typeof vi.fn> };
beforeEach(() => {
mockLogger = { log: vi.fn(), warn: vi.fn(), error: vi.fn() };
service = new StringeeSmsService(mockLogger as any);
mockRateLimiter = { check: vi.fn().mockResolvedValue(allowedDecision) };
service = new StringeeSmsService(mockLogger as any, mockRateLimiter as any);
vi.restoreAllMocks();
});
@@ -56,6 +76,12 @@ describe('StringeeSmsService', () => {
});
});
describe('NotificationChannelPort contract', () => {
it('exposes the SMS channel identifier', () => {
expect(service.channel).toBe('SMS');
});
});
describe('sendNotification', () => {
beforeEach(() => {
process.env['STRINGEE_API_KEY'] = 'test-api-key';
@@ -183,7 +209,7 @@ describe('StringeeSmsService', () => {
});
it('throws when not initialized', async () => {
const uninitService = new StringeeSmsService(mockLogger as any);
const uninitService = new StringeeSmsService(mockLogger as any, mockRateLimiter as any);
await expect(
uninitService.sendNotification({ to: '0901234567', message: 'Test' }),
@@ -217,5 +243,117 @@ describe('StringeeSmsService', () => {
expect(callBody.text).toContain('GoodGo');
expect(callBody.text).toContain('5 phut');
});
it('applies the OTP rate-limit bucket before sending', async () => {
const mockResponse = {
ok: true,
json: vi.fn().mockResolvedValue({ r: 0, message_id: 'otp-456' }),
text: vi.fn(),
};
vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse as any);
await service.sendOTP({ to: '0901234567', code: '987654' });
expect(mockRateLimiter.check).toHaveBeenNthCalledWith(1, '+84901234567', 'otp');
expect(mockRateLimiter.check).toHaveBeenNthCalledWith(2, '+84901234567', 'otpHourly');
});
});
describe('rate limiting', () => {
beforeEach(() => {
process.env['STRINGEE_API_KEY'] = 'test-api-key';
service.onModuleInit();
});
it('rejects with TOO_MANY_REQUESTS when per-minute bucket is blocked', async () => {
mockRateLimiter.check.mockResolvedValueOnce(blockedDecision);
const fetchSpy = vi.spyOn(globalThis, 'fetch');
await expect(
service.sendOTP({ to: '0901234567', code: '123456' }),
).rejects.toMatchObject({
errorCode: 'TOO_MANY_REQUESTS',
status: HttpStatus.TOO_MANY_REQUESTS,
});
expect(fetchSpy).not.toHaveBeenCalled();
});
it('checks hourly bucket when per-minute passes', async () => {
mockRateLimiter.check
.mockResolvedValueOnce(allowedDecision)
.mockResolvedValueOnce({ ...blockedDecision, bucket: 'otpHourly' as const });
const fetchSpy = vi.spyOn(globalThis, 'fetch');
await expect(
service.sendOTP({ to: '0901234567', code: '123456' }),
).rejects.toBeInstanceOf(DomainException);
expect(fetchSpy).not.toHaveBeenCalled();
expect(mockRateLimiter.check).toHaveBeenCalledTimes(2);
});
it('uses transactional bucket for generic notifications', async () => {
const mockResponse = {
ok: true,
json: vi.fn().mockResolvedValue({ r: 0, message_id: 'tx-1' }),
text: vi.fn(),
};
vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse as any);
await service.sendNotification({ to: '0901234567', message: 'Payment confirmed' });
expect(mockRateLimiter.check).toHaveBeenNthCalledWith(1, '+84901234567', 'transactional');
expect(mockRateLimiter.check).toHaveBeenNthCalledWith(
2,
'+84901234567',
'transactionalHourly',
);
});
});
describe('NotificationChannelPort.send', () => {
beforeEach(() => {
process.env['STRINGEE_API_KEY'] = 'test-api-key';
service.onModuleInit();
});
it('routes OTP template keys through the otp bucket', async () => {
const mockResponse = {
ok: true,
json: vi.fn().mockResolvedValue({ r: 0, message_id: 'port-otp' }),
text: vi.fn(),
};
vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse as any);
await service.send({
recipient: '0901234567',
subject: 'OTP',
body: '<p>Code 123456</p>',
templateKey: 'user.phone_change_otp',
});
expect(mockRateLimiter.check).toHaveBeenNthCalledWith(1, '+84901234567', 'otp');
const body = JSON.parse((globalThis.fetch as any).mock.calls[0][1].body);
expect(body.text).toBe('Code 123456');
});
it('strips HTML and uses transactional bucket for non-OTP templates', async () => {
const mockResponse = {
ok: true,
json: vi.fn().mockResolvedValue({ r: 0, message_id: 'port-tx' }),
text: vi.fn(),
};
vi.spyOn(globalThis, 'fetch').mockResolvedValue(mockResponse as any);
await service.send({
recipient: '0901234567',
subject: 'Subscription renewed',
body: '<p>Your <b>GoodGo</b> plan is active.</p>',
templateKey: 'subscription.renewed',
});
expect(mockRateLimiter.check).toHaveBeenNthCalledWith(1, '+84901234567', 'transactional');
const body = JSON.parse((globalThis.fetch as any).mock.calls[0][1].body);
expect(body.text).toBe('Your GoodGo plan is active.');
});
});
});

View File

@@ -3,6 +3,13 @@ export { PrismaNotificationPreferenceRepository } from './repositories/prisma-no
export { EmailService, type SendEmailDto } from './services/email.service';
export { FcmService, type SendPushDto } from './services/fcm.service';
export { StringeeSmsService, type SendSmsDto, type SendOtpDto } from './services/stringee-sms.service';
export {
SmsRateLimiterService,
SMS_RATE_LIMIT_BUCKETS,
type SmsRateLimitBucket,
type SmsRateLimitDecision,
type SmsRateLimitOptions,
} from './services/sms-rate-limiter.service';
export { TemplateService, type RenderedTemplate, type TemplateDefinition } from './services/template.service';
export { ZaloOaService, type SendZaloOaDto, type ZaloOaMessageResult } from './services/zalo-oa.service';
export { getZaloZnsTemplates, type ZaloZnsTemplateConfig } from './services/zalo-zns-templates';

View File

@@ -0,0 +1,121 @@
import { Injectable } from '@nestjs/common';
import { type LoggerService, type RedisService } from '@modules/shared';
export interface SmsRateLimitOptions {
limit: number;
windowSeconds: number;
}
export interface SmsRateLimitDecision {
allowed: boolean;
current: number;
limit: number;
retryAfterSeconds: number;
bucket: string;
}
export const SMS_RATE_LIMIT_BUCKETS = {
otp: { limit: 5, windowSeconds: 60 } satisfies SmsRateLimitOptions,
otpHourly: { limit: 10, windowSeconds: 60 * 60 } satisfies SmsRateLimitOptions,
transactional: { limit: 20, windowSeconds: 60 } satisfies SmsRateLimitOptions,
transactionalHourly: { limit: 100, windowSeconds: 60 * 60 } satisfies SmsRateLimitOptions,
} as const;
export type SmsRateLimitBucket = keyof typeof SMS_RATE_LIMIT_BUCKETS;
const SLIDING_WINDOW_LUA = `
local key = KEYS[1]
local now = tonumber(ARGV[1])
local windowMs = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local requestId = ARGV[4]
local windowSec = tonumber(ARGV[5])
redis.call('ZREMRANGEBYSCORE', key, 0, now - windowMs)
local current = redis.call('ZCARD', key)
if current < limit then
redis.call('ZADD', key, now, requestId)
redis.call('EXPIRE', key, windowSec + 1)
return {current + 1, 0}
else
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
local retryAfterMs = 0
if #oldest >= 2 then
retryAfterMs = tonumber(oldest[2]) + windowMs - now
if retryAfterMs < 0 then retryAfterMs = 0 end
end
return {current, retryAfterMs}
end
`;
let requestCounter = 0;
@Injectable()
export class SmsRateLimiterService {
constructor(
private readonly redis: RedisService,
private readonly logger: LoggerService,
) {}
async check(phone: string, bucket: SmsRateLimitBucket): Promise<SmsRateLimitDecision> {
const options = SMS_RATE_LIMIT_BUCKETS[bucket];
const key = `sms_rate_limit:${bucket}:${phone}`;
try {
const client = this.redis.getClient();
const now = Date.now();
const requestId = `${now}:${process.pid}:${++requestCounter}`;
const result = (await client.eval(
SLIDING_WINDOW_LUA,
1,
key,
now,
options.windowSeconds * 1000,
options.limit,
requestId,
options.windowSeconds,
)) as [number, number];
const current = result[0];
const retryAfterMs = result[1];
const allowed = retryAfterMs === 0 && current <= options.limit;
const retryAfterSeconds = allowed ? 0 : Math.max(1, Math.ceil(retryAfterMs / 1000));
if (!allowed) {
this.logger.warn(
`SMS rate limit hit for ${this.maskPhone(phone)} bucket=${bucket} ` +
`current=${current}/${options.limit} retryAfter=${retryAfterSeconds}s`,
'SmsRateLimiterService',
);
}
return {
allowed,
current,
limit: options.limit,
retryAfterSeconds,
bucket,
};
} catch (error) {
this.logger.warn(
`SMS rate limit check failed (Redis error), failing open for ${this.maskPhone(phone)}: ` +
`${error instanceof Error ? error.message : 'unknown'}`,
'SmsRateLimiterService',
);
return {
allowed: true,
current: 0,
limit: options.limit,
retryAfterSeconds: 0,
bucket,
};
}
}
private maskPhone(phone: string): string {
if (phone.length <= 4) return '***';
return `${phone.slice(0, 3)}***${phone.slice(-2)}`;
}
}

View File

@@ -1,9 +1,21 @@
import { Injectable, type OnModuleInit } from '@nestjs/common';
import { type LoggerService } from '@modules/shared';
import { HttpStatus, Injectable, type OnModuleInit } from '@nestjs/common';
import { DomainException, ErrorCode, type LoggerService } from '@modules/shared';
import type {
NotificationChannelPort,
SendChannelMessageDto,
SendChannelMessageResult,
} from '../../domain/ports/notification-channel.port';
import { type NotificationChannel } from '../../domain/value-objects/notification-channel.vo';
import {
type SmsRateLimitBucket,
type SmsRateLimiterService,
} from './sms-rate-limiter.service';
export interface SendSmsDto {
to: string;
message: string;
/** Rate-limit bucket; defaults to `transactional`. OTP flows should pass `otp`. */
bucket?: SmsRateLimitBucket;
}
export interface SendOtpDto {
@@ -13,15 +25,26 @@ export interface SendOtpDto {
const MAX_RETRIES = 3;
const BASE_DELAY_MS = 1000;
const OTP_TEMPLATE_KEYS = new Set([
'user.phone_change_otp',
'auth.login_otp',
'auth.kyc_otp',
'auth.phone_verify_otp',
]);
@Injectable()
export class StringeeSmsService implements OnModuleInit {
export class StringeeSmsService implements OnModuleInit, NotificationChannelPort {
readonly channel: NotificationChannel = 'SMS';
private apiKey = '';
private brandName = '';
private initialized = false;
private readonly baseUrl = 'https://api.stringee.com/v1/sms';
constructor(private readonly logger: LoggerService) {}
constructor(
private readonly logger: LoggerService,
private readonly rateLimiter: SmsRateLimiterService,
) {}
onModuleInit(): void {
this.apiKey = process.env['STRINGEE_API_KEY'] ?? '';
@@ -46,26 +69,63 @@ export class StringeeSmsService implements OnModuleInit {
return this.initialized;
}
async sendOTP(dto: SendOtpDto): Promise<{ messageId: string }> {
async sendOTP(dto: SendOtpDto): Promise<SendChannelMessageResult> {
const message = `[${this.brandName}] Ma xac thuc cua ban la: ${dto.code}. Ma co hieu luc trong 5 phut.`;
return this.sendWithRetry({ to: dto.to, message });
return this.dispatch({ to: dto.to, message, bucket: 'otp' });
}
async sendNotification(dto: SendSmsDto): Promise<{ messageId: string }> {
return this.sendWithRetry(dto);
async sendNotification(dto: SendSmsDto): Promise<SendChannelMessageResult> {
return this.dispatch(dto);
}
private async sendWithRetry(dto: SendSmsDto): Promise<{ messageId: string }> {
async send(dto: SendChannelMessageDto): Promise<SendChannelMessageResult> {
const bucket: SmsRateLimitBucket = OTP_TEMPLATE_KEYS.has(dto.templateKey) ? 'otp' : 'transactional';
const plainText = this.stripHtml(dto.body);
return this.dispatch({ to: dto.recipient, message: plainText, bucket });
}
private async dispatch(dto: SendSmsDto): Promise<SendChannelMessageResult> {
if (!this.initialized) {
throw new Error('Stringee SMS not initialized — STRINGEE_API_KEY not configured');
}
const phone = this.normalizePhone(dto.to);
const bucket: SmsRateLimitBucket = dto.bucket ?? 'transactional';
await this.enforceRateLimit(phone, bucket);
return this.sendWithRetry(phone, dto.message);
}
private async enforceRateLimit(phone: string, bucket: SmsRateLimitBucket): Promise<void> {
const perMinute = await this.rateLimiter.check(phone, bucket);
if (!perMinute.allowed) {
throw new DomainException(
ErrorCode.TOO_MANY_REQUESTS,
`SMS rate limit exceeded. Retry after ${perMinute.retryAfterSeconds}s.`,
HttpStatus.TOO_MANY_REQUESTS,
{ bucket: perMinute.bucket, retryAfterSeconds: perMinute.retryAfterSeconds },
);
}
const hourlyBucket: SmsRateLimitBucket = bucket === 'otp' ? 'otpHourly' : 'transactionalHourly';
const perHour = await this.rateLimiter.check(phone, hourlyBucket);
if (!perHour.allowed) {
throw new DomainException(
ErrorCode.TOO_MANY_REQUESTS,
`Hourly SMS limit exceeded. Retry after ${perHour.retryAfterSeconds}s.`,
HttpStatus.TOO_MANY_REQUESTS,
{ bucket: perHour.bucket, retryAfterSeconds: perHour.retryAfterSeconds },
);
}
}
private async sendWithRetry(phone: string, message: string): Promise<SendChannelMessageResult> {
let lastError: Error | undefined;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
const result = await this.send(dto);
return result;
return await this.postToStringee(phone, message);
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
@@ -87,13 +147,11 @@ export class StringeeSmsService implements OnModuleInit {
throw lastError;
}
private async send(dto: SendSmsDto): Promise<{ messageId: string }> {
const phone = this.normalizePhone(dto.to);
private async postToStringee(phone: string, message: string): Promise<SendChannelMessageResult> {
const body = {
from: { type: 'sms', number: this.brandName, alias: this.brandName },
to: [{ type: 'sms', number: phone }],
text: dto.message,
text: message,
};
const response = await fetch(this.baseUrl, {
@@ -112,7 +170,6 @@ export class StringeeSmsService implements OnModuleInit {
const data = (await response.json()) as { message_id?: string; r?: number; message?: string };
// Stringee returns r=0 on success
if (data.r !== undefined && data.r !== 0) {
throw new Error(`Stringee SMS rejected (code ${data.r}): ${data.message ?? 'Unknown reason'}`);
}
@@ -127,10 +184,6 @@ export class StringeeSmsService implements OnModuleInit {
return { messageId };
}
/**
* Normalize VN phone numbers to E.164 format (+84...).
* Accepts: 0901234567, +84901234567, 84901234567
*/
private normalizePhone(phone: string): string {
const cleaned = phone.replace(/[\s\-()]/g, '');
@@ -146,6 +199,10 @@ export class StringeeSmsService implements OnModuleInit {
return cleaned;
}
private stripHtml(html: string): string {
return html.replace(/<[^>]*>/g, '').trim();
}
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View File

@@ -24,12 +24,14 @@ import { SubscriptionExpiringListener } from './application/listeners/subscripti
import { SubscriptionRenewedListener } from './application/listeners/subscription-renewed.listener';
import { UserKycUpdatedListener } from './application/listeners/user-kyc-updated.listener';
import { UserRegisteredListener } from './application/listeners/user-registered.listener';
import { SMS_NOTIFICATION_CHANNEL } from './domain/ports/notification-channel.port';
import { NOTIFICATION_PREFERENCE_REPOSITORY } from './domain/repositories/notification-preference.repository';
import { NOTIFICATION_REPOSITORY } from './domain/repositories/notification.repository';
import { PrismaNotificationPreferenceRepository } from './infrastructure/repositories/prisma-notification-preference.repository';
import { PrismaNotificationRepository } from './infrastructure/repositories/prisma-notification.repository';
import { EmailService } from './infrastructure/services/email.service';
import { FcmService } from './infrastructure/services/fcm.service';
import { SmsRateLimiterService } from './infrastructure/services/sms-rate-limiter.service';
import { StringeeSmsService } from './infrastructure/services/stringee-sms.service';
import { TemplateService } from './infrastructure/services/template.service';
import { ZaloOaService } from './infrastructure/services/zalo-oa.service';
@@ -72,7 +74,9 @@ const EventListeners = [
// Services
EmailService,
FcmService,
SmsRateLimiterService,
StringeeSmsService,
{ provide: SMS_NOTIFICATION_CHANNEL, useExisting: StringeeSmsService },
ZaloOaService,
TemplateService,
@@ -85,6 +89,15 @@ const EventListeners = [
// Event Listeners
...EventListeners,
],
exports: [EmailService, FcmService, StringeeSmsService, ZaloOaService, TemplateService, NotificationsGateway],
exports: [
EmailService,
FcmService,
SmsRateLimiterService,
StringeeSmsService,
SMS_NOTIFICATION_CHANNEL,
ZaloOaService,
TemplateService,
NotificationsGateway,
],
})
export class NotificationsModule {}