- Added request/response flow diagrams to api-design and api-gateway-advanced skills for better visualization of processes. - Introduced configuration loading flow in configuration-management skill to clarify the configuration process. - Included error propagation flow in error-handling-patterns skill to illustrate error handling across layers. - Enhanced various skills with additional diagrams to improve understanding of complex concepts. These updates aim to provide clearer guidance and improve the overall documentation experience for developers.
20 KiB
Kiến Trúc Hướng Sự Kiện (Event-Driven Architecture)
Event-driven architecture patterns with Apache Kafka for GoodGo microservices. Use when implementing async communication, event publishing/consuming, event sourcing, CQRS, or integrating event streams with HTTP endpoints.
Các patterns kiến trúc hướng sự kiện với Apache Kafka cho GoodGo microservices. Sử dụng khi implement giao tiếp bất đồng bộ, publish/consume events, event sourcing, CQRS, hoặc tích hợp event streams với HTTP endpoints.
Tổng Quan
Event-Driven Architecture (EDA) enables asynchronous communication between services using Apache Kafka as the message broker. This pattern decouples services, improves scalability, and enables event sourcing, CQRS, and reactive systems. GoodGo platform uses Kafka for high-throughput event streaming and integrates with Traefik to expose events via SSE/WebSocket endpoints.
Kiến trúc hướng sự kiện (EDA) cho phép giao tiếp bất đồng bộ giữa các services sử dụng Apache Kafka làm message broker. Pattern này tách biệt các services, cải thiện khả năng mở rộng, và cho phép event sourcing, CQRS, và các hệ thống reactive. Nền tảng GoodGo sử dụng Kafka cho event streaming hiệu suất cao và tích hợp với Traefik để expose events qua SSE/WebSocket endpoints.
Khi Nào Sử Dụng
Use this skill when:
- Implementing asynchronous communication between services
- Decoupling services for better scalability
- Publishing domain events for downstream consumers
- Consuming events from other services
- Implementing event sourcing patterns
- Implementing CQRS (Command Query Responsibility Segregation)
- Exposing event streams via HTTP (SSE/WebSocket)
- Handling eventual consistency across services
- Building reactive systems that respond to changes
- Integrating with Apache Kafka message broker
Sử dụng skill này khi:
- Implement giao tiếp bất đồng bộ giữa các services
- Tách biệt services để cải thiện khả năng mở rộng
- Publish domain events cho downstream consumers
- Consume events từ các services khác
- Implement event sourcing patterns
- Implement CQRS (Command Query Responsibility Segregation)
- Expose event streams qua HTTP (SSE/WebSocket)
- Xử lý eventual consistency giữa các services
- Xây dựng reactive systems phản hồi với các thay đổi
- Tích hợp với Apache Kafka message broker
Khái Niệm Chính
Event-Driven vs Request-Response
Request-Response (Synchronous / Đồng Bộ):
- Client waits for response / Client đợi response
- Tight coupling between services / Liên kết chặt chẽ giữa các services
- Blocking operations / Các thao tác blocking
- Immediate consistency / Nhất quán ngay lập tức
- Use Traefik API Gateway for HTTP/REST / Sử dụng Traefik API Gateway cho HTTP/REST
Event-Driven (Asynchronous / Bất Đồng Bộ):
- Fire-and-forget publishing / Publish fire-and-forget
- Loose coupling between services / Liên kết lỏng lẻo giữa các services
- Non-blocking operations / Các thao tác non-blocking
- Eventual consistency / Nhất quán cuối cùng
- Use Kafka for message broker / Sử dụng Kafka cho message broker
Kafka Fundamentals / Các Khái Niệm Cơ Bản về Kafka
- Topics: Named streams of events (e.g.,
user.created,order.placed) / Các luồng sự kiện được đặt tên - Partitions: Physical division of topics for parallelism and scaling / Chia nhỏ vật lý của topics để song song hóa và mở rộng
- Consumer Groups: Groups of consumers that work together to process events / Các nhóm consumers làm việc cùng nhau để xử lý events
- Producers: Services that publish events to topics / Services phát hành events tới topics
- Consumers: Services that subscribe to topics and process events / Services đăng ký topics và xử lý events
Consumer Groups Architecture / Kiến Trúc Consumer Groups
The following diagram illustrates how consumer groups distribute work across partitions:
Biểu đồ sau minh họa cách consumer groups phân phối công việc qua các partitions:
graph TB
subgraph Topic["Topic: user.created"]
P0["Partition 0"]
P1["Partition 1"]
P2["Partition 2"]
end
subgraph ConsumerGroup["Consumer Group: notification-service"]
C1["Consumer 1"]
C2["Consumer 2"]
end
subgraph ConsumerGroup2["Consumer Group: analytics-service"]
C3["Consumer 3"]
C4["Consumer 4"]
C5["Consumer 5"]
end
P0 --> C1
P1 --> C2
P2 --> C1
P0 --> C3
P1 --> C4
P2 --> C5
style Topic fill:#e1f5ff
style ConsumerGroup fill:#fff4e1
style ConsumerGroup2 fill:#e8f5e9
Key Points:
- Each partition is consumed by only one consumer per consumer group / Mỗi partition chỉ được consume bởi một consumer trong mỗi consumer group
- Multiple consumer groups can independently consume from the same topic / Nhiều consumer groups có thể độc lập consume từ cùng một topic
- Consumers in a group automatically rebalance when members join or leave / Consumers trong một group tự động rebalance khi có thành viên tham gia hoặc rời khỏi
- More partitions enable better parallelism within a consumer group / Nhiều partitions hơn cho phép song song hóa tốt hơn trong consumer group
Traefik Integration / Tích Hợp Traefik
Traefik serves dual purpose:
- API Gateway: Routes synchronous HTTP/REST requests / Định tuyến các request HTTP/REST đồng bộ
- Event Streaming Gateway: Routes SSE/WebSocket connections to event streaming endpoints / Định tuyến các kết nối SSE/WebSocket tới event streaming endpoints
Services publish events to Kafka, then expose SSE/WebSocket endpoints that consume from Kafka for HTTP clients.
Services publish events vào Kafka, sau đó expose SSE/WebSocket endpoints consume từ Kafka cho HTTP clients.
Các Patterns Chính
Event Publishing / Phát Hành Events
// src/core/events/event-publisher.ts
import { producer } from '../config/kafka.config';
import { logger } from '@goodgo/logger';
import { v4 as uuidv4 } from 'uuid';
export class EventPublisher {
/**
* EN: Publish event to Kafka topic
* VI: Phát hành event tới Kafka topic
*/
async publish<T extends BaseEvent>(
topic: string,
event: Omit<T, 'eventId' | 'timestamp' | 'source'>,
options?: { partitionKey?: string }
): Promise<void> {
const fullEvent: T = {
...event,
eventId: uuidv4(),
timestamp: new Date().toISOString(),
source: this.serviceName,
} as T;
await producer.send({
topic,
messages: [{
key: options?.partitionKey || fullEvent.eventId,
value: JSON.stringify(fullEvent),
headers: {
'event-type': event.eventType,
'event-version': event.eventVersion,
},
}],
});
}
}
Tham Khảo: .cursor/skills/event-driven-architecture/SKILL.md
Event Publishing Flow / Luồng Phát Hành Events
The following sequence diagram shows how events are published from a service to Kafka:
Biểu đồ sequence sau cho thấy cách events được publish từ service tới Kafka:
sequenceDiagram
participant Service as Service Layer
participant Publisher as EventPublisher
participant Kafka as Kafka Broker
participant Topic as Topic Partition
Service->>Publisher: publish(topic, event, options)
activate Publisher
Publisher->>Publisher: Generate eventId
Publisher->>Publisher: Add timestamp & source
Publisher->>Publisher: Determine partition key
Publisher->>Kafka: send({ topic, messages })
activate Kafka
Kafka->>Topic: Route to partition
activate Topic
Topic-->>Kafka: Acknowledge
deactivate Topic
Kafka-->>Publisher: Success
deactivate Kafka
Publisher-->>Service: Complete (fire-and-forget)
deactivate Publisher
Note over Service,Publisher: Non-blocking operation
Key Points:
- Publishing is asynchronous and non-blocking / Publishing là bất đồng bộ và không blocking
- Partition key determines which partition receives the event / Partition key xác định partition nào nhận event
- Events are acknowledged by Kafka before completion / Events được Kafka xác nhận trước khi hoàn thành
- Fire-and-forget pattern prevents blocking request handlers / Pattern fire-and-forget ngăn chặn blocking request handlers
Event Consuming / Tiêu Thụ Events
// src/core/events/event-consumer.ts
import { kafka } from '../config/kafka.config';
export class EventConsumer {
private handlers: Map<string, EventHandler[]> = new Map();
/**
* EN: Register event handler
* VI: Đăng ký event handler
*/
on<T extends BaseEvent>(eventType: string, handler: EventHandler<T>): void {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
}
this.handlers.get(eventType)!.push(handler);
}
/**
* EN: Start consuming from topics
* VI: Bắt đầu consume từ topics
*/
async start(topics: string[]): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topics, fromBeginning: false });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event: BaseEvent = JSON.parse(message.value?.toString() || '{}');
const handlers = this.handlers.get(event.eventType) || [];
await Promise.all(handlers.map(h => h.handle(event)));
},
});
}
}
Outbox Pattern / Pattern Outbox
The Outbox pattern ensures transactional event publishing by storing events in the database within the same transaction as the business data.
Pattern Outbox đảm bảo publish events trong transaction bằng cách lưu events trong database trong cùng transaction với business data.
// EN: Store event in database within transaction
// VI: Lưu event vào database trong transaction
await prisma.outboxEvent.create({
data: {
eventType: 'user.created',
eventData: userData,
topic: 'user.created',
status: 'PENDING',
},
});
// EN: Separate process publishes from outbox to Kafka
// VI: Process riêng publish từ outbox tới Kafka
async function processOutbox() {
const events = await prisma.outboxEvent.findMany({
where: { status: 'PENDING' },
});
for (const event of events) {
await eventPublisher.publish(event.topic, event.eventData);
await prisma.outboxEvent.update({
where: { id: event.id },
data: { status: 'PUBLISHED' },
});
}
}
Outbox Pattern Flow / Luồng Outbox Pattern
The following sequence diagram illustrates the outbox pattern workflow:
Biểu đồ sequence sau minh họa luồng làm việc của outbox pattern:
sequenceDiagram
participant Service as Service Layer
participant DB as Database
participant Outbox as Outbox Table
participant Processor as Outbox Processor
participant Publisher as EventPublisher
participant Kafka as Kafka Broker
Service->>DB: Begin Transaction
activate DB
Service->>DB: Create business entity
Service->>Outbox: Insert event (status: PENDING)
Outbox-->>DB: Stored
Service->>DB: Commit Transaction
deactivate DB
Note over Service,DB: Event stored atomically with business data
loop Polling Interval
Processor->>Outbox: Find PENDING events
Outbox-->>Processor: Return events
Processor->>Publisher: publish(event)
activate Publisher
Publisher->>Kafka: Send to topic
Kafka-->>Publisher: Acknowledge
Publisher-->>Processor: Success
deactivate Publisher
Processor->>Outbox: Update status to PUBLISHED
end
Key Points:
- Events are stored in the database within the same transaction as business data / Events được lưu trong database trong cùng transaction với business data
- A separate background process (Outbox Processor) publishes events to Kafka / Một process nền riêng (Outbox Processor) publish events tới Kafka
- Ensures at-least-once delivery guarantee / Đảm bảo giao hàng ít nhất một lần
- Prevents lost events if Kafka is temporarily unavailable / Ngăn chặn mất events nếu Kafka tạm thời không khả dụng
SSE Endpoint / Endpoint SSE
Server-Sent Events (SSE) allows clients to receive event streams via HTTP.
Server-Sent Events (SSE) cho phép clients nhận event streams qua HTTP.
// src/modules/events/events.controller.ts
async streamEvents(req: Request, res: Response): Promise<void> {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const topic = req.query.topic as string;
const consumer = kafka.consumer({ groupId: `sse-${Date.now()}` });
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value?.toString() || '{}');
res.write(`data: ${JSON.stringify(event)}\n\n`);
},
});
req.on('close', async () => {
await consumer.disconnect();
});
}
Cấu Trúc Event / Event Structure
interface BaseEvent {
eventId: string; // EN: Unique event identifier / VI: Định danh event duy nhất
eventType: string; // EN: Event type (e.g., "user.created") / VI: Loại event
eventVersion: string; // EN: Schema version / VI: Phiên bản schema
timestamp: string; // EN: ISO 8601 timestamp / VI: Timestamp ISO 8601
source: string; // EN: Service that published the event / VI: Service phát hành event
correlationId?: string; // EN: Request correlation ID / VI: Correlation ID của request
traceId?: string; // EN: Distributed tracing ID / VI: ID phân tán tracing
data: unknown; // EN: Event payload / VI: Payload của event
}
Best Practices / Thực Hành Tốt
Event Naming Conventions / Quy Ước Đặt Tên Events
- Event Type:
{domain}.{action}.v{version}(e.g.,user.created.v1) / Loại Event:{domain}.{action}.v{version} - Topic:
{domain}.{entity}.{action}(e.g.,user.created) / Topic:{domain}.{entity}.{action} - Use lowercase with dots as separators / Sử dụng chữ thường với dấu chấm làm phân cách
- Keep names descriptive and consistent / Giữ tên mô tả và nhất quán
Partition Key Selection / Lựa Chọn Partition Key
- Use entity ID for ordering guarantees (same entity → same partition) / Sử dụng entity ID để đảm bảo thứ tự
- Use correlation ID for request tracing / Sử dụng correlation ID để trace request
- Use user ID for user-scoped events / Sử dụng user ID cho events phạm vi user
- Avoid high-cardinality keys (distributes evenly) / Tránh keys có độ phân tán cao
Event Ordering Guarantees / Đảm Bảo Thứ Tự Events
- Kafka guarantees ordering per partition / Kafka đảm bảo thứ tự theo partition
- Use partition key to ensure related events go to same partition / Sử dụng partition key để đảm bảo events liên quan cùng partition
- Events in different partitions have no ordering guarantee / Events ở các partitions khác nhau không có đảm bảo thứ tự
- Don't rely on global ordering across all events / Không phụ thuộc vào thứ tự toàn cục
Error Handling / Xử Lý Lỗi
- Implement Dead Letter Queue (DLQ) for failed events / Implement DLQ cho events failed
- Use retry with exponential backoff / Sử dụng retry với exponential backoff
- Log all event processing failures / Ghi log tất cả lỗi xử lý events
- Monitor consumer lag and DLQ size / Giám sát consumer lag và kích thước DLQ
Observability / Khả Năng Quan Sát
- Log all published and consumed events / Ghi log tất cả events đã publish và consume
- Track metrics: events published/consumed, processing duration, consumer lag / Theo dõi metrics: events published/consumed, thời gian xử lý, consumer lag
- Add distributed tracing to event flows / Thêm distributed tracing vào event flows
- Include correlation IDs for request tracking / Bao gồm correlation IDs để theo dõi request
Infrastructure Setup / Thiết Lập Hạ Tầng
Docker Compose (Local / Cục Bộ)
services:
kafka:
image: confluentinc/cp-kafka:7.4.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
Tham Khảo: deployments/local/docker-compose.yml
Testing / Kiểm Thử
Unit Testing / Kiểm Thử Đơn Vị
import { EventPublisher } from '../event-publisher';
import { producer } from '../../config/kafka.config';
jest.mock('../../config/kafka.config');
describe('EventPublisher', () => {
it('should publish event successfully', async () => {
const publisher = new EventPublisher();
const mockSend = jest.fn().mockResolvedValue({});
(producer.send as jest.Mock) = mockSend;
await publisher.publish('user.created', {
eventType: 'user.created',
eventVersion: '1.0.0',
data: { userId: '123' },
});
expect(mockSend).toHaveBeenCalled();
});
});
Integration Testing / Kiểm Thử Tích Hợp
Use Kafka test containers for integration testing:
Sử dụng Kafka test containers cho integration testing:
import { KafkaContainer } from '@testcontainers/kafka';
describe('Event Flow E2E', () => {
let kafkaContainer: StartedKafkaContainer;
beforeAll(async () => {
kafkaContainer = await new KafkaContainer().start();
process.env.KAFKA_BROKERS = kafkaContainer.getBootstrapServer();
});
it('should publish and consume event', async () => {
// EN: Test implementation
// VI: Implementation test
});
});
Use Cases / Các Trường Hợp Sử Dụng
User Created Event Flow / Luồng User Created Event
- Auth Service creates user in database / Auth Service tạo user trong database
- Publishes
user.createdevent to Kafka / Publish eventuser.createdtới Kafka - Notification Service consumes event and sends welcome email / Notification Service consume event và gửi email chào mừng
- Analytics Service consumes event and updates metrics / Analytics Service consume event và cập nhật metrics
Order Processing with Multiple Consumers / Xử Lý Order với Nhiều Consumers
- Order Service publishes
order.placedevent / Order Service publish eventorder.placed - Payment Service processes payment / Payment Service xử lý thanh toán
- Inventory Service reserves items / Inventory Service dự trữ items
- Notification Service sends confirmation / Notification Service gửi xác nhận
Skills Liên Quan
- Resilience Patterns - Circuit breaker, retry patterns / Circuit breaker, các patterns retry
- Error Handling Patterns - Error handling best practices / Best practices về error handling
- Observability & Monitoring - Logging, metrics, tracing / Logging, metrics, tracing
- Middleware Patterns - SSE endpoint middleware / Middleware SSE endpoint
- Project Rules - GoodGo coding standards / Tiêu chuẩn coding GoodGo
Tài Nguyên
- KafkaJS Documentation - Node.js Kafka client
- Confluent Schema Registry - Schema versioning
- Kafka Best Practices - Official Kafka documentation / Tài liệu chính thức Kafka
- Skill Source:
.cursor/skills/event-driven-architecture/SKILL.md- Source file đầy đủ / Full source file