7.9 KiB
trigger
| trigger |
|---|
| always_on |
Event-Driven Architecture Patterns
When to Use This Skill
Use this skill when:
- Implementing asynchronous communication between services
- Decoupling services for better scalability
- Publishing domain events for downstream consumers
- Consuming events from other services
- Implementing event sourcing patterns
- Implementing CQRS (Command Query Responsibility Segregation)
- Exposing event streams via HTTP (SSE/WebSocket)
- Handling eventual consistency across services
- Building reactive systems that respond to changes
- Integrating with Apache Kafka message broker
Core Concepts
Event-Driven vs Request-Response
| Aspect | Request-Response | Event-Driven |
|---|---|---|
| Communication | Synchronous | Asynchronous |
| Coupling | Tight | Loose |
| Blocking | Yes | No |
| Consistency | Immediate | Eventual |
| Infrastructure | Traefik API Gateway | Kafka |
Kafka Fundamentals
Topics: Named streams of events (e.g., user.created, order.placed)
- Organized by domain and action
- Divided into partitions for parallelism
Partitions: Physical division of topics
- Enables horizontal scaling
- Maintains ordering per partition key
- Multiple consumers can process different partitions
Consumer Groups: Group of consumers working together
- Each partition consumed by only one consumer in group
- Enables parallel processing
- Automatically rebalances on consumer join/leave
Producers: Services that publish events to topics
Consumers: Services that subscribe to topics and process events
Event Structure
interface BaseEvent {
eventId: string; // Unique event identifier
eventType: string; // Event type (e.g., "user.created")
eventVersion: string; // Schema version (e.g., "1.0.0")
timestamp: string; // ISO 8601 timestamp
source: string; // Service that published the event
correlationId?: string; // Request correlation ID
traceId?: string; // Distributed tracing ID
data: unknown; // Event payload
}
Event Naming Conventions
Event Type Format: {domain}.{action}.v{version}
user.created.v1order.placed.v1payment.processed.v2
Topic Naming: {domain}.{entity}.{action}
user.createdorder.placedpayment.processed
Key Patterns
1. Event Publishing
// Fire-and-forget with error logging
eventPublisher.publish('user.created', event, { partitionKey: user.id })
.catch(err => logger.error('Failed to publish', { err }));
2. Event Consuming
consumer.on('user.created', {
handle: async (event) => {
await processEvent(event);
},
});
await consumer.start(['user.created']);
3. Outbox Pattern (Transactional)
Store events in database within same transaction, then publish asynchronously:
await prisma.$transaction(async (tx) => {
const user = await tx.user.create({ data });
await outboxService.addToOutbox('user.created', userData, 'user.created');
return user;
});
4. Dead Letter Queue (DLQ)
After max retries, send failed events to DLQ topic for manual inspection:
after maxRetries → send to topic.dlq
5. Idempotency
Consumers must handle duplicate events:
if (await this.isProcessed(event.eventId)) return;
await processEvent(event);
await this.markProcessed(event.eventId);
Best Practices
Partition Key Selection
- Use entity ID for ordering guarantees (same entity → same partition)
- Use correlation ID for request tracing
- Use user ID for user-scoped events
- Avoid high-cardinality keys (distributes evenly)
Event Ordering Guarantees
- Kafka guarantees ordering per partition
- Use partition key to ensure related events go to same partition
- Events in different partitions have no ordering guarantee
- Don't rely on global ordering across all events
Event Size Limits
- Recommended: < 1MB per event
- Kafka default: 1MB (configurable)
- For large payloads: Store data elsewhere, send reference in event
Performance Optimization
- Batch Publishing: Group multiple events for better throughput
- Async Publishing: Don't block request handlers
- Consumer Parallelism: Use multiple partitions and consumers
- Connection Pooling: Reuse Kafka client instances
- Compression: Enable compression for better network usage
Common Mistakes
-
Blocking on Publish: Slowing down request handlers
// BAD: Await in request handler await eventPublisher.publish('user.created', event); res.json({ success: true }); // GOOD: Fire and forget with error logging eventPublisher.publish('user.created', event) .catch(err => logger.error('Failed to publish', { err })); res.json({ success: true }); -
No Idempotency: Duplicate event processing issues
// BAD: No duplicate check async handle(event) { await createUser(event.data); } // GOOD: Check for duplicates async handle(event) { if (await this.isProcessed(event.eventId)) return; await createUser(event.data); await this.markProcessed(event.eventId); } -
Missing Partition Key: Events for same entity out of order
// BAD: No partition key await publish('user.updated', event); // GOOD: Use entity ID as partition key await publish('user.updated', event, { partitionKey: userId }); -
No Dead Letter Queue: Lost events on failure
// GOOD: Always implement DLQ for failed events after maxRetries → send to topic.dlq -
Breaking Schema Changes: Use versioning strategy instead
-
Global Ordering Expectations: Understand partition ordering only
Quick Reference
| Concept | Description |
|---|---|
| Topic | Named stream of events (e.g., user.created) |
| Partition | Division of topic for parallelism |
| Consumer Group | Consumers sharing workload |
| Offset | Position in partition |
Event Structure:
{
eventId: "uuid", // Unique identifier
eventType: "user.created", // Event type
eventVersion: "1.0.0", // Schema version
timestamp: "ISO-8601", // When published
source: "auth-service", // Publisher service
correlationId: "uuid", // Request trace ID
data: { ... } // Event payload
}
Topic Naming:
{domain}.{action}
user.created
order.placed
payment.processed
Essential Commands:
# List topics
kafka-topics --list --bootstrap-server localhost:9092
# Create topic
kafka-topics --create --topic user.created --partitions 3
# Consume from beginning
kafka-console-consumer --topic user.created --from-beginning
KafkaJS Quick Setup:
import { Kafka } from 'kafkajs';
const kafka = new Kafka({ brokers: ['localhost:9092'], clientId: 'my-app' });
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'my-group' });
Environment Variables:
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=my-service
KAFKA_CONSUMER_GROUP_ID=my-service-consumers
SCHEMA_REGISTRY_URL=http://localhost:8081
Resources
- Detailed Reference - Full code examples and implementation details
- KafkaJS Documentation - Node.js Kafka client
- Confluent Schema Registry - Schema versioning
- Kafka Best Practices - Official Kafka documentation
- Resilience Patterns - Circuit breaker, retry patterns
- Error Handling Patterns - Error handling best practices
- Observability & Monitoring - Logging, metrics, tracing
- Project Rules - GoodGo coding standards