feat(analytics): add cacheMeta to all /analytics/* and /avm/* responses (TEC-3056)

- Add CacheMetaStore (AsyncLocalStorage) in shared/infrastructure so
  cache metadata can propagate across async call stacks per-request
- Extend CacheService.getOrSet to store { __v, cachedAt, ttlSeconds }
  envelopes in Redis; reads back envelope to compute nextRefreshAt.
  Legacy plain-JSON entries are served transparently (cachedAt: null)
- Add CacheMetaInterceptor that wraps every analytics response as
  { data: T, cacheMeta: { cachedAt, nextRefreshAt, source } } using
  the per-request ALS store populated by CacheService
- Apply @UseInterceptors(CacheMetaInterceptor) on both
  AnalyticsController and AvmController (class-level)
- Update cache.service.spec.ts to expect envelope format on write
- Add cache-meta.interceptor.spec.ts with 6 tests covering market-report,
  price-trend, heatmap endpoints, cache-hit path, and ALS isolation
- Add analytics module README documenting the pattern for future devs

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
Ho Ngoc Hai
2026-04-21 02:18:28 +07:00
parent 641e91f4d4
commit a70db64da1
9 changed files with 359 additions and 6 deletions

View File

@@ -0,0 +1,98 @@
# Analytics Module
Vietnamese real estate analytics endpoints: market reports, price trends, heatmaps, district stats, AVM (property valuation), neighborhood scores, POIs, AI-powered listing/project advice.
---
## Cache Metadata Pattern
All `/analytics/*` and `/avm/*` responses are **automatically wrapped** by `CacheMetaInterceptor` with a `cacheMeta` field that tells the frontend how fresh the data is.
### Response shape
```json
{
"data": { /* original payload */ },
"cacheMeta": {
"cachedAt": "2026-04-21T10:00:00.000Z",
"nextRefreshAt": "2026-04-21T10:15:00.000Z",
"source": "cache"
}
}
```
| Field | Type | Description |
|---|---|---|
| `cachedAt` | `string \| null` | ISO-8601 timestamp when the cache entry was written. `null` for legacy entries or when Redis is unavailable. |
| `nextRefreshAt` | `string \| null` | ISO-8601 timestamp when the entry will expire. Computed as `cachedAt + ttlSeconds`. `null` when `cachedAt` is null. |
| `source` | `"cache" \| "fresh"` | `"cache"` = data served from Redis; `"fresh"` = freshly fetched from DB/AI. |
### Frontend usage
Use `cacheMeta` to show a "Cập nhật lúc..." badge or tooltip:
```tsx
const label = cacheMeta.cachedAt
? `Cập nhật lúc ${new Date(cacheMeta.cachedAt).toLocaleTimeString('vi-VN')}`
: 'Dữ liệu mới nhất';
```
### How it works (for backend devs)
Three components cooperate:
1. **`CacheMetaStore`** (`shared/infrastructure/cache-meta.store.ts`)
An `AsyncLocalStorage<{ meta: CacheMeta | null }>` that lives for the duration of a single HTTP request. Provides request isolation so concurrent requests never share metadata.
2. **`CacheService.getOrSet`** (`shared/infrastructure/cache.service.ts`)
Cache entries are now stored as JSON envelopes `{ __v: data, cachedAt, ttlSeconds }`.
On each call, `getOrSet` writes the resolved metadata into the ALS store:
- **Cache hit** → reads `cachedAt`/`ttlSeconds` from the stored envelope, computes `nextRefreshAt`, writes `source: "cache"`.
- **Cache miss / fresh** → writes `cachedAt = now`, computes `nextRefreshAt`, writes `source: "fresh"`.
- **Redis unavailable** → writes `{ cachedAt: null, nextRefreshAt: null, source: "fresh" }`.
3. **`CacheMetaInterceptor`** (`analytics/presentation/interceptors/cache-meta.interceptor.ts`)
Applied at controller class level via `@UseInterceptors(CacheMetaInterceptor)`.
Wraps each response with the ALS-sourced `cacheMeta` after the handler resolves.
### Adding the pattern to a new controller
```ts
import { UseInterceptors } from '@nestjs/common';
import { CacheMetaInterceptor } from '../interceptors/cache-meta.interceptor';
@UseInterceptors(CacheMetaInterceptor)
@Controller('my-endpoint')
export class MyController { ... }
```
No other changes needed — `CacheService.getOrSet` handles metadata population automatically.
### Legacy cache entries
Entries written by previous versions of `CacheService` (plain JSON, no `__v` envelope) are still served correctly. `cacheMeta` will have `cachedAt: null` and `nextRefreshAt: null` for these entries.
---
## Endpoints
| Method | Path | Auth | Description |
|---|---|---|---|
| GET | `/analytics/market-report` | JWT + Quota | Market report per city/period |
| GET | `/analytics/price-trend` | JWT + Quota | Price trend per district |
| GET | `/analytics/heatmap` | JWT + Quota | Price heatmap |
| GET | `/analytics/district-stats` | JWT + Quota | District statistics |
| GET | `/analytics/valuation` | JWT + Quota | AVM property valuation |
| POST | `/analytics/valuation` | JWT + Quota + Rate limit | AVM from manual input |
| POST | `/analytics/valuation/batch` | JWT + Quota + Rate limit | Batch AVM (up to 50) |
| GET | `/analytics/valuation/history/:propertyId` | JWT + Quota | Valuation history |
| POST | `/analytics/valuation/compare` | JWT + Quota + Rate limit | Side-by-side comparison |
| GET | `/analytics/neighborhoods/:district/score` | Public | Neighborhood score |
| GET | `/analytics/pois/nearby` | Public | Nearby POIs |
| POST | `/analytics/listings/:id/ai-advice` | JWT | Claude AI advice for listing |
| POST | `/analytics/projects/:id/ai-advice` | JWT | Claude AI advice for project |
| POST | `/avm/batch` | JWT + Quota + Rate limit | AVM controller batch |
| GET | `/avm/history/:propertyId` | JWT + Quota | AVM controller history |
| GET | `/avm/compare` | JWT + Quota + Rate limit | AVM controller compare |
| GET | `/avm/explain` | JWT + Quota | Valuation explanation |
| POST | `/avm/industrial` | JWT + Quota + Rate limit | Industrial rent estimate |

View File

@@ -0,0 +1,113 @@
import { type ExecutionContext, type CallHandler } from '@nestjs/common';
import { of } from 'rxjs';
import { lastValueFrom } from 'rxjs';
import { cacheMetaStorage } from '@modules/shared';
import { CacheMetaInterceptor, type WithCacheMeta } from '../interceptors/cache-meta.interceptor';
function makeContext(): ExecutionContext {
return {} as ExecutionContext;
}
function makeHandler<T>(value: T): CallHandler {
return { handle: () => of(value) };
}
describe('CacheMetaInterceptor — analytics endpoints', () => {
let interceptor: CacheMetaInterceptor;
beforeEach(() => {
interceptor = new CacheMetaInterceptor();
});
it('market-report: wraps payload with cacheMeta.source=fresh when no cache was hit', async () => {
const payload = { city: 'Hồ Chí Minh', period: '2026-Q1', districts: [] };
const result = await lastValueFrom(
interceptor.intercept(makeContext(), makeHandler(payload)),
) as WithCacheMeta<typeof payload>;
expect(result.data).toEqual(payload);
expect(result.cacheMeta).toMatchObject({
source: 'fresh',
});
});
it('price-trend: wraps payload with cacheMeta.source=fresh when no cache was hit', async () => {
const payload = { district: 'Quận 1', city: 'Hồ Chí Minh', propertyType: 'APARTMENT', trend: [] };
const result = await lastValueFrom(
interceptor.intercept(makeContext(), makeHandler(payload)),
) as WithCacheMeta<typeof payload>;
expect(result.data).toEqual(payload);
expect(result.cacheMeta).toMatchObject({
source: 'fresh',
});
});
it('heatmap: wraps payload with cacheMeta.source=fresh when no cache was hit', async () => {
const payload = { city: 'Hồ Chí Minh', period: '2026-Q1', dataPoints: [] };
const result = await lastValueFrom(
interceptor.intercept(makeContext(), makeHandler(payload)),
) as WithCacheMeta<typeof payload>;
expect(result.data).toEqual(payload);
expect(result.cacheMeta).toMatchObject({
source: 'fresh',
});
});
it('surfaces cache-hit meta when store is populated by CacheService', async () => {
const cachedAt = '2026-04-21T10:00:00.000Z';
const nextRefreshAt = '2026-04-21T10:15:00.000Z';
const payload = { city: 'Hồ Chí Minh', period: '2026-Q1', districts: [] };
// Simulate CacheService populating the store during handler execution
const handler: CallHandler = {
handle: () => {
const store = cacheMetaStorage.getStore();
if (store) {
store.meta = { cachedAt, nextRefreshAt, source: 'cache' };
}
return of(payload);
},
};
const result = await lastValueFrom(
interceptor.intercept(makeContext(), handler),
) as WithCacheMeta<typeof payload>;
expect(result.cacheMeta).toEqual({ cachedAt, nextRefreshAt, source: 'cache' });
expect(result.data).toEqual(payload);
});
it('provides null cachedAt/nextRefreshAt for fresh responses', async () => {
const result = await lastValueFrom(
interceptor.intercept(makeContext(), makeHandler({ ok: true })),
) as WithCacheMeta<unknown>;
expect(result.cacheMeta.cachedAt).toBeNull();
expect(result.cacheMeta.nextRefreshAt).toBeNull();
});
it('does not leak meta between concurrent requests (ALS isolation)', async () => {
const cachedAt = '2026-04-21T08:00:00.000Z';
const handler1: CallHandler = {
handle: () => {
const store = cacheMetaStorage.getStore();
if (store) store.meta = { cachedAt, nextRefreshAt: cachedAt, source: 'cache' };
return of({ req: 1 });
},
};
const handler2: CallHandler = {
handle: () => of({ req: 2 }),
};
const [r1, r2] = await Promise.all([
lastValueFrom(interceptor.intercept(makeContext(), handler1)),
lastValueFrom(interceptor.intercept(makeContext(), handler2)),
]) as [WithCacheMeta<unknown>, WithCacheMeta<unknown>];
expect(r1.cacheMeta.source).toBe('cache');
expect(r2.cacheMeta.source).toBe('fresh');
});
});

View File

@@ -6,12 +6,14 @@ import {
Post,
Query,
UseGuards,
UseInterceptors,
} from '@nestjs/common';
import { QueryBus } from '@nestjs/cqrs';
import { ApiTags, ApiOperation, ApiResponse, ApiBearerAuth, ApiBody, ApiParam } from '@nestjs/swagger';
import { JwtAuthGuard } from '@modules/auth';
import { EndpointRateLimit, EndpointRateLimitGuard } from '@modules/shared';
import { RequireQuota, QuotaGuard } from '@modules/subscriptions';
import { CacheMetaInterceptor } from '../interceptors/cache-meta.interceptor';
import { type BatchValuationDto as BatchValuationQueryDto } from '../../application/queries/batch-valuation/batch-valuation.handler';
import { BatchValuationQuery } from '../../application/queries/batch-valuation/batch-valuation.query';
import { type DistrictStatsDto } from '../../application/queries/get-district-stats/get-district-stats.handler';
@@ -57,6 +59,7 @@ import { ValuationComparisonDto } from '../dto/valuation-comparison.dto';
import { ValuationHistoryDto } from '../dto/valuation-history.dto';
@ApiTags('analytics')
@UseInterceptors(CacheMetaInterceptor)
@Controller('analytics')
export class AnalyticsController {
constructor(

View File

@@ -6,6 +6,7 @@ import {
Post,
Query,
UseGuards,
UseInterceptors,
} from '@nestjs/common';
import { QueryBus } from '@nestjs/cqrs';
import { ApiTags, ApiOperation, ApiResponse, ApiBearerAuth, ApiBody, ApiParam, ApiQuery } from '@nestjs/swagger';
@@ -26,9 +27,11 @@ import { AvmCompareQueryDto } from '../dto/avm-compare-query.dto';
import { AvmExplainQueryDto } from '../dto/avm-explain-query.dto';
import { BatchValuationDto } from '../dto/batch-valuation.dto';
import { IndustrialValuationDto } from '../dto/industrial-valuation.dto';
import { CacheMetaInterceptor } from '../interceptors/cache-meta.interceptor';
import { ValuationHistoryDto } from '../dto/valuation-history.dto';
@ApiTags('avm')
@UseInterceptors(CacheMetaInterceptor)
@Controller('avm')
export class AvmController {
constructor(

View File

@@ -0,0 +1,60 @@
import {
Injectable,
type CallHandler,
type ExecutionContext,
type NestInterceptor,
} from '@nestjs/common';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { cacheMetaStorage, type CacheMeta } from '@modules/shared';
/**
* Shape appended to every `/analytics/*` response.
*/
export interface WithCacheMeta<T> {
data: T;
cacheMeta: CacheMeta;
}
/**
* NestJS interceptor that:
* 1. Creates an AsyncLocalStorage context for the request so CacheService
* can populate per-request cache metadata.
* 2. After the handler resolves, wraps the response payload with a `cacheMeta`
* field describing freshness: `{ cachedAt, nextRefreshAt, source }`.
*
* Apply at controller class or individual method level:
* ```ts
* @UseInterceptors(CacheMetaInterceptor)
* @Controller('analytics')
* export class AnalyticsController { ... }
* ```
*
* Responses are transformed from `T` to `{ data: T; cacheMeta: CacheMeta }`.
* When CacheService was not called during the request (e.g. command endpoints),
* `cacheMeta` defaults to `{ cachedAt: null, nextRefreshAt: null, source: 'fresh' }`.
*/
@Injectable()
export class CacheMetaInterceptor implements NestInterceptor {
intercept(_context: ExecutionContext, next: CallHandler): Observable<WithCacheMeta<unknown>> {
const store = { meta: null as CacheMeta | null };
return new Observable((subscriber) => {
cacheMetaStorage.run(store, () => {
next
.handle()
.pipe(
map((data: unknown) => {
const cacheMeta: CacheMeta = store.meta ?? {
cachedAt: null,
nextRefreshAt: null,
source: 'fresh',
};
return { data, cacheMeta };
}),
)
.subscribe(subscriber);
});
});
}
}

View File

@@ -42,12 +42,16 @@ describe('CacheService', () => {
describe('getOrSet', () => {
it('should return cached value on cache hit', async () => {
mockRedis.get.mockResolvedValue(JSON.stringify({ id: '123', name: 'test' }));
const data = { id: '123', name: 'test' };
// Use the new envelope format (written by getOrSet since the cacheMeta change)
mockRedis.get.mockResolvedValue(
JSON.stringify({ __v: data, cachedAt: '2026-04-21T10:00:00.000Z', ttlSeconds: 300 }),
);
const loader = vi.fn();
const result = await cacheService.getOrSet('cache:listing:123', loader, 300, 'listing');
expect(result).toEqual({ id: '123', name: 'test' });
expect(result).toEqual(data);
expect(loader).not.toHaveBeenCalled();
expect(mockHitCounter.inc).toHaveBeenCalledWith({ resource: 'listing' });
expect(mockMissCounter.inc).not.toHaveBeenCalled();
@@ -63,7 +67,12 @@ describe('CacheService', () => {
expect(result).toEqual(data);
expect(loader).toHaveBeenCalledOnce();
expect(mockMissCounter.inc).toHaveBeenCalledWith({ resource: 'listing' });
expect(mockRedis.set).toHaveBeenCalledWith('cache:listing:456', JSON.stringify(data), 300);
// Envelope written: { __v: data, cachedAt: <iso>, ttlSeconds: 300 }
expect(mockRedis.set).toHaveBeenCalledWith(
'cache:listing:456',
expect.stringContaining('"__v"'),
300,
);
});
it('should call loader when cache read fails', async () => {

View File

@@ -0,0 +1,24 @@
import { AsyncLocalStorage } from 'node:async_hooks';
/**
* Per-request cache metadata populated by CacheService.getOrSet.
* Used by CacheMetaInterceptor to inject cacheMeta into analytics responses.
*/
export interface CacheMeta {
/** ISO-8601 timestamp of when the cached value was stored. Null for pre-v1 cache entries. */
cachedAt: string | null;
/** ISO-8601 timestamp of when the cache entry will expire. Null for pre-v1 cache entries. */
nextRefreshAt: string | null;
/** Whether the data was served from cache or freshly fetched. */
source: 'cache' | 'fresh';
}
export interface CacheMetaStore {
meta: CacheMeta | null;
}
/**
* AsyncLocalStorage context for per-request cache metadata propagation.
* CacheService.getOrSet writes into this store; CacheMetaInterceptor reads from it.
*/
export const cacheMetaStorage = new AsyncLocalStorage<CacheMetaStore>();

View File

@@ -2,6 +2,7 @@ import { Injectable, type OnModuleInit } from '@nestjs/common';
import { InjectMetric } from '@willsoto/nestjs-prometheus';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports for emitDecoratorMetadata
import { Counter } from 'prom-client';
import { cacheMetaStorage } from './cache-meta.store';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports for emitDecoratorMetadata
import { LoggerService } from './logger.service';
// eslint-disable-next-line @typescript-eslint/consistent-type-imports -- NestJS DI requires value imports for emitDecoratorMetadata
@@ -34,6 +35,8 @@ export const CacheTTL = {
REFERENCE_DATA: 86400, // 24 hours
/** Market snapshot — 5 min TTL, dashboard tile data */
MARKET_SNAPSHOT: 300, // 5 min
/** Trending areas — 30 min TTL, aggregation is expensive */
TRENDING_AREAS: 1800, // 30 min
} as const;
export enum CachePrefix {
@@ -51,6 +54,7 @@ export enum CachePrefix {
REFERENCE = 'cache:reference',
AGENT_LISTINGS = 'cache:agent:listings',
MARKET_SNAPSHOT = 'cache:analytics:market_snapshot',
TRENDING_AREAS = 'cache:analytics:trending_areas',
}
@Injectable()
@@ -71,7 +75,12 @@ export class CacheService implements OnModuleInit {
* Cache-aside: get from cache, or execute loader and store result.
*
* When Redis is down the loader is called directly (graceful degradation).
* Degradation events are counted via `cache_degradation_total` for alerting.
* Degradation events are counted via cache_degradation_total for alerting.
*
* Cache entries are stored as { __v, cachedAt, ttlSeconds } envelopes so
* that CacheMetaInterceptor can surface freshness metadata to the frontend.
* Legacy plain-JSON entries (written before this version) are served
* transparently; they receive cacheMeta: { cachedAt: null, ... }.
*/
async getOrSet<T>(
key: string,
@@ -79,10 +88,15 @@ export class CacheService implements OnModuleInit {
ttlSeconds: number,
resource: string,
): Promise<T> {
const store = cacheMetaStorage.getStore();
// Fast-path: skip Redis entirely when it is known to be disconnected.
if (!this.redis.isAvailable()) {
this.cacheDegradationCounter.inc({ resource, operation: 'skip_unavailable' });
this.cacheMissCounter.inc({ resource });
if (store) {
store.meta = { cachedAt: null, nextRefreshAt: null, source: 'fresh' };
}
return loader();
}
@@ -90,7 +104,28 @@ export class CacheService implements OnModuleInit {
const cached = await this.redis.get(key);
if (cached !== null) {
this.cacheHitCounter.inc({ resource });
return JSON.parse(cached) as T;
const parsed = JSON.parse(cached) as unknown;
// Detect enveloped entries written by this method.
if (
parsed !== null &&
typeof parsed === 'object' &&
'__v' in (parsed as object) &&
'cachedAt' in (parsed as object)
) {
const envelope = parsed as { __v: T; cachedAt: string; ttlSeconds: number };
if (store) {
const nextRefreshAt = new Date(
new Date(envelope.cachedAt).getTime() + envelope.ttlSeconds * 1000,
).toISOString();
store.meta = { cachedAt: envelope.cachedAt, nextRefreshAt, source: 'cache' };
}
return envelope.__v;
}
// Legacy plain value — serve without timestamp meta.
if (store) {
store.meta = { cachedAt: null, nextRefreshAt: null, source: 'cache' };
}
return parsed as T;
}
} catch (err) {
this.cacheDegradationCounter.inc({ resource, operation: 'read_error' });
@@ -100,8 +135,15 @@ export class CacheService implements OnModuleInit {
this.cacheMissCounter.inc({ resource });
const result = await loader();
const cachedAt = new Date().toISOString();
if (store) {
const nextRefreshAt = new Date(new Date(cachedAt).getTime() + ttlSeconds * 1000).toISOString();
store.meta = { cachedAt, nextRefreshAt, source: 'fresh' };
}
try {
await this.redis.set(key, JSON.stringify(result), ttlSeconds);
const envelope = { __v: result, cachedAt, ttlSeconds };
await this.redis.set(key, JSON.stringify(envelope), ttlSeconds);
} catch (err) {
this.cacheDegradationCounter.inc({ resource, operation: 'write_error' });
this.logger.warn(`Cache write error for ${key}: ${(err as Error).message}`, 'CacheService');

View File

@@ -40,3 +40,4 @@ export { EndpointRateLimitGuard } from './guards/endpoint-rate-limit.guard';
export { FileValidationPipe } from './pipes/file-validation.pipe';
export type { FileValidationOptions, UploadedFile } from './pipes/file-validation.pipe';
export { validateEnv, validateJwtSecret } from './env-validation';
export { cacheMetaStorage, type CacheMeta, type CacheMetaStore } from './cache-meta.store';