- Added request/response flow diagrams to api-design and api-gateway-advanced skills for better visualization of processes. - Introduced configuration loading flow in configuration-management skill to clarify the configuration process. - Included error propagation flow in error-handling-patterns skill to illustrate error handling across layers. - Enhanced various skills with additional diagrams to improve understanding of complex concepts. These updates aim to provide clearer guidance and improve the overall documentation experience for developers.
13 KiB
13 KiB
name, description
| name | description |
|---|---|
| event-driven-architecture | Event-driven architecture patterns with Apache Kafka for GoodGo microservices. Use when implementing async communication, event publishing/consuming, event sourcing, CQRS, or integrating event streams with HTTP endpoints. |
Event-Driven Architecture Patterns
When to Use This Skill
Use this skill when:
- Implementing asynchronous communication between services
- Decoupling services for better scalability
- Publishing domain events for downstream consumers
- Consuming events from other services
- Implementing event sourcing patterns
- Implementing CQRS (Command Query Responsibility Segregation)
- Exposing event streams via HTTP (SSE/WebSocket)
- Handling eventual consistency across services
- Building reactive systems that respond to changes
- Integrating with Apache Kafka message broker
Core Concepts
Event-Driven vs Request-Response
Request-Response (Synchronous):
- Client waits for response
- Tight coupling between services
- Blocking operations
- Immediate consistency
- Use Traefik API Gateway for HTTP/REST
Event-Driven (Asynchronous):
- Fire-and-forget publishing
- Loose coupling between services
- Non-blocking operations
- Eventual consistency
- Use Kafka for message broker
Kafka Fundamentals
- Topics: Named streams of events (e.g.,
user.created,order.placed) - Partitions: Physical division of topics for parallelism and scaling
- Consumer Groups: Groups of consumers that work together to process events
- Producers: Services that publish events to topics
- Consumers: Services that subscribe to topics and process events
Consumer Groups Architecture
The following diagram illustrates how consumer groups distribute work across partitions:
graph TB
subgraph Topic["Topic: user.created"]
P0["Partition 0"]
P1["Partition 1"]
P2["Partition 2"]
end
subgraph ConsumerGroup["Consumer Group: notification-service"]
C1["Consumer 1"]
C2["Consumer 2"]
end
subgraph ConsumerGroup2["Consumer Group: analytics-service"]
C3["Consumer 3"]
C4["Consumer 4"]
C5["Consumer 5"]
end
P0 --> C1
P1 --> C2
P2 --> C1
P0 --> C3
P1 --> C4
P2 --> C5
style Topic fill:#e1f5ff
style ConsumerGroup fill:#fff4e1
style ConsumerGroup2 fill:#e8f5e9
Key Points:
- Each partition is consumed by only one consumer per consumer group
- Multiple consumer groups can independently consume from the same topic
- Consumers in a group automatically rebalance when members join or leave
- More partitions enable better parallelism within a consumer group
Traefik Integration
Traefik serves dual purpose:
- API Gateway: Routes synchronous HTTP/REST requests
- Event Streaming Gateway: Routes SSE/WebSocket connections to event streaming endpoints
Services publish events to Kafka, then expose SSE/WebSocket endpoints that consume from Kafka for HTTP clients.
Key Patterns
Event Publishing
// src/core/events/event-publisher.ts
import { producer } from '../config/kafka.config';
import { logger } from '@goodgo/logger';
import { v4 as uuidv4 } from 'uuid';
export class EventPublisher {
async publish<T extends BaseEvent>(
topic: string,
event: Omit<T, 'eventId' | 'timestamp' | 'source'>,
options?: { partitionKey?: string }
): Promise<void> {
const fullEvent: T = {
...event,
eventId: uuidv4(),
timestamp: new Date().toISOString(),
source: this.serviceName,
} as T;
await producer.send({
topic,
messages: [{
key: options?.partitionKey || fullEvent.eventId,
value: JSON.stringify(fullEvent),
headers: {
'event-type': event.eventType,
'event-version': event.eventVersion,
},
}],
});
}
}
Event Publishing Flow
The following sequence diagram shows how events are published from a service to Kafka:
sequenceDiagram
participant Service as Service Layer
participant Publisher as EventPublisher
participant Kafka as Kafka Broker
participant Topic as Topic Partition
Service->>Publisher: publish(topic, event, options)
activate Publisher
Publisher->>Publisher: Generate eventId
Publisher->>Publisher: Add timestamp & source
Publisher->>Publisher: Determine partition key
Publisher->>Kafka: send({ topic, messages })
activate Kafka
Kafka->>Topic: Route to partition
activate Topic
Topic-->>Kafka: Acknowledge
deactivate Topic
Kafka-->>Publisher: Success
deactivate Kafka
Publisher-->>Service: Complete (fire-and-forget)
deactivate Publisher
Note over Service,Publisher: Non-blocking operation
Key Points:
- Publishing is asynchronous and non-blocking
- Partition key determines which partition receives the event
- Events are acknowledged by Kafka before completion
- Fire-and-forget pattern prevents blocking request handlers
Event Consuming
// src/core/events/event-consumer.ts
import { kafka } from '../config/kafka.config';
export class EventConsumer {
private handlers: Map<string, EventHandler[]> = new Map();
on<T extends BaseEvent>(eventType: string, handler: EventHandler<T>): void {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
}
this.handlers.get(eventType)!.push(handler);
}
async start(topics: string[]): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topics, fromBeginning: false });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event: BaseEvent = JSON.parse(message.value?.toString() || '{}');
const handlers = this.handlers.get(event.eventType) || [];
await Promise.all(handlers.map(h => h.handle(event)));
},
});
}
}
Outbox Pattern for Transactional Publishing
The Outbox pattern ensures transactional consistency by storing events in the database within the same transaction as business data, then publishing them asynchronously.
Outbox Pattern Flow
The following sequence diagram illustrates the outbox pattern workflow:
sequenceDiagram
participant Service as Service Layer
participant DB as Database
participant Outbox as Outbox Table
participant Processor as Outbox Processor
participant Publisher as EventPublisher
participant Kafka as Kafka Broker
Service->>DB: Begin Transaction
activate DB
Service->>DB: Create business entity
Service->>Outbox: Insert event (status: PENDING)
Outbox-->>DB: Stored
Service->>DB: Commit Transaction
deactivate DB
Note over Service,DB: Event stored atomically with business data
loop Polling Interval
Processor->>Outbox: Find PENDING events
Outbox-->>Processor: Return events
Processor->>Publisher: publish(event)
activate Publisher
Publisher->>Kafka: Send to topic
Kafka-->>Publisher: Acknowledge
Publisher-->>Processor: Success
deactivate Publisher
Processor->>Outbox: Update status to PUBLISHED
end
Key Points:
- Events are stored in the database within the same transaction as business data
- A separate background process (Outbox Processor) publishes events to Kafka
- Ensures at-least-once delivery guarantee
- Prevents lost events if Kafka is temporarily unavailable
// Store event in database within transaction
await prisma.outboxEvent.create({
data: {
eventType: 'user.created',
eventData: userData,
topic: 'user.created',
status: 'PENDING',
},
});
// Separate process publishes from outbox to Kafka
async function processOutbox() {
const events = await prisma.outboxEvent.findMany({
where: { status: 'PENDING' },
});
for (const event of events) {
await eventPublisher.publish(event.topic, event.eventData);
await prisma.outboxEvent.update({
where: { id: event.id },
data: { status: 'PUBLISHED' },
});
}
}
SSE Endpoint for Event Streaming
// src/modules/events/events.controller.ts
async streamEvents(req: Request, res: Response): Promise<void> {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const topic = req.query.topic as string;
const consumer = kafka.consumer({ groupId: `sse-${Date.now()}` });
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value?.toString() || '{}');
res.write(`data: ${JSON.stringify(event)}\n\n`);
},
});
req.on('close', async () => {
await consumer.disconnect();
});
}
Event Structure
interface BaseEvent {
eventId: string;
eventType: string;
eventVersion: string;
timestamp: string;
source: string;
correlationId?: string;
traceId?: string;
data: unknown;
}
Best Practices
Event Naming Conventions
- Event Type:
{domain}.{action}.v{version}(e.g.,user.created.v1) - Topic:
{domain}.{entity}.{action}(e.g.,user.created) - Use lowercase with dots as separators
- Keep names descriptive and consistent
Partition Key Selection
- Use entity ID for ordering guarantees (same entity → same partition)
- Use correlation ID for request tracing
- Use user ID for user-scoped events
- Avoid high-cardinality keys (distributes evenly)
Event Ordering Guarantees
- Kafka guarantees ordering per partition
- Use partition key to ensure related events go to same partition
- Events in different partitions have no ordering guarantee
- Don't rely on global ordering across all events
Error Handling
- Implement Dead Letter Queue (DLQ) for failed events
- Use retry with exponential backoff
- Log all event processing failures
- Monitor consumer lag and DLQ size
Observability
- Log all published and consumed events
- Track metrics: events published/consumed, processing duration, consumer lag
- Add distributed tracing to event flows
- Include correlation IDs for request tracking
Infrastructure Setup
Docker Compose (Local)
services:
kafka:
image: confluentinc/cp-kafka:7.4.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
Testing
Unit Testing
import { EventPublisher } from '../event-publisher';
import { producer } from '../../config/kafka.config';
jest.mock('../../config/kafka.config');
describe('EventPublisher', () => {
it('should publish event successfully', async () => {
const publisher = new EventPublisher();
const mockSend = jest.fn().mockResolvedValue({});
(producer.send as jest.Mock) = mockSend;
await publisher.publish('user.created', {
eventType: 'user.created',
eventVersion: '1.0.0',
data: { userId: '123' },
});
expect(mockSend).toHaveBeenCalled();
});
});
Integration Testing with Test Containers
import { KafkaContainer } from '@testcontainers/kafka';
describe('Event Flow E2E', () => {
let kafkaContainer: StartedKafkaContainer;
beforeAll(async () => {
kafkaContainer = await new KafkaContainer().start();
process.env.KAFKA_BROKERS = kafkaContainer.getBootstrapServer();
});
it('should publish and consume event', async () => {
// Test implementation
});
});
Common Use Cases
User Created Event Flow
- Auth Service creates user in database
- Publishes
user.createdevent to Kafka - Notification Service consumes event and sends welcome email
- Analytics Service consumes event and updates metrics
Order Processing with Multiple Consumers
- Order Service publishes
order.placedevent - Payment Service processes payment
- Inventory Service reserves items
- Notification Service sends confirmation
Related Skills
- Resilience Patterns - Circuit breaker, retry patterns
- Error Handling Patterns - Error handling best practices
- Observability & Monitoring - Logging, metrics, tracing
- Middleware Patterns - SSE endpoint middleware
- Project Rules - GoodGo coding standards
Resources
- KafkaJS Documentation - Node.js Kafka client
- Confluent Schema Registry - Schema versioning
- Kafka Best Practices - Official Kafka documentation
- Skill Source:
.cursor/skills/event-driven-architecture/SKILL.md