Files
pos-system/docs/en/architecture/event-driven-architecture.md

376 lines
11 KiB
Markdown

# Event-Driven Architecture
> Event-driven architecture for asynchronous communication using Apache Kafka
## Overview Diagram
```mermaid
graph TD
subgraph "Event Producers"
IAM[IAM Service]
Service1[Service A]
end
subgraph "Event Broker"
Kafka[Apache Kafka]
Topics[Topics: user.events, auth.events]
end
subgraph "Event Consumers"
Consumer1[Notification Service]
Consumer2[Audit Service]
end
IAM -->|Publish| Kafka
Service1 -->|Publish| Kafka
Kafka --> Topics
Topics -->|Subscribe| Consumer1
Topics -->|Subscribe| Consumer2
style Kafka fill:#e1f5ff
style Topics fill:#fff4e1
```
## Architecture Description
The GoodGo platform implements Event-Driven Architecture (EDA) for asynchronous communication between microservices.
**Core Principles**:
1. **Event-First Design**: All state changes emit domain events
2. **Loose Coupling**: Services communicate through events
3. **Eventual Consistency**: Accept temporary inconsistency
4. **Event Sourcing**: Store changes as event sequence
5. **CQRS Pattern**: Separate read/write operations
**Technology Stack**:
- Apache Kafka - Event streaming platform
- Schema Registry - Avro schemas for validation
- KafkaJS - Node.js client library
- Event Sourcing - Custom implementation in IAM
## Event Flow
```mermaid
sequenceDiagram
participant Producer as IAM Service
participant Kafka as Kafka Broker
participant Consumer as Notification Service
Producer->>Kafka: Publish Event (user.created)
Kafka->>Consumer: Deliver Event
Consumer->>Consumer: Process Event
Consumer-->>Kafka: Acknowledge
```
**Steps**: Publish → Distribute → Consume → Retry (if failed) → DLQ (after max retries) → Acknowledge
## Event Structure
```typescript
interface BaseEvent {
eventId: string; // UUID
eventType: string; // user.created.v1
eventVersion: string; // 1.0.0
timestamp: string; // ISO 8601
source: string; // iam-service
correlationId?: string; // Request correlation
data: unknown; // Event payload
}
```
**Example**:
```json
{
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventType": "user.created.v1",
"timestamp": "2024-01-15T10:30:00Z",
"source": "iam-service",
"data": {
"userId": "user_123",
"email": "user@example.com"
}
}
```
## Kafka Topics
```mermaid
graph LR
UserCreated[user.created<br/>Partitions: 3]
AuthLogin[auth.login.success<br/>Partitions: 5]
AuditEvents[audit.events<br/>Partitions: 10]
style UserCreated fill:#e1f5ff
style AuthLogin fill:#fff4e1
style AuditEvents fill:#f8d7da
```
**Naming Convention**: `{domain}.{action}.{version}`
**Examples**:
- `user.created.v1`
- `auth.login.success.v1`
- `audit.event.logged.v1`
## Error Handling
```mermaid
graph TD
Event[Event] --> Process[Process]
Process -->|Success| Ack[Acknowledge]
Process -->|Failure| Retry[Retry 3x]
Retry -->|Max Retries| DLQ[Dead Letter Queue]
DLQ --> Alert[Alert Team]
```
**Strategy**:
1. Retry with exponential backoff (100ms → 200ms → 400ms)
2. Max 3 attempts
3. Move to DLQ after max retries
4. Manual review and reprocess
## System Context
```mermaid
C4Context
title Event-Driven Architecture Context
System(iam, "IAM Service", "Event producer")
System(service_a, "Service A", "Event producer")
System(notification, "Notification Service", "Event consumer")
System(audit, "Audit Service", "Event consumer")
System_Ext(kafka, "Apache Kafka", "Event streaming platform")
System_Ext(registry, "Schema Registry", "Schema management")
System_Ext(monitoring, "Monitoring", "Kafka metrics & alerts")
Rel(iam, kafka, "Publishes events", "Kafka Protocol")
Rel(service_a, kafka, "Publishes events", "Kafka Protocol")
Rel(kafka, notification, "Delivers events", "Kafka Protocol")
Rel(kafka, audit, "Delivers events", "Kafka Protocol")
Rel(kafka, registry, "Validates schemas", "HTTP")
Rel(kafka, monitoring, "Sends metrics", "JMX")
```
**Context Description**:
- **Producers**: IAM Service and other services publish domain events
- **Kafka**: Central event broker, manages topics and partitions
- **Consumers**: Notification and Audit services consume events
- **Schema Registry**: Manages and validates Avro schemas
- **Monitoring**: Collects metrics from Kafka cluster
## Performance Characteristics
| Metric | Target | Notes |
|--------|--------|-------|
| **Event Publish Latency (P95)** | < 10ms | Fire-and-forget, async |
| **Event Delivery Latency (P95)** | < 100ms | End-to-end from publish to consume |
| **Throughput** | 10,000 events/s | Per topic, scalable with partitions |
| **Consumer Lag** | < 1000 messages | Per partition, monitored |
| **Event Size** | < 1MB | Recommended max size |
| **Retention** | 7 days | Default, configurable per topic |
| **Replication Factor** | 3 | For fault tolerance |
**Performance Optimizations**:
- **Batch Publishing**: Group multiple events to reduce network overhead
- **Compression**: Use Snappy or LZ4 compression
- **Partitioning**: Divide topics into multiple partitions for parallel processing
- **Consumer Groups**: Multiple consumers in same group for horizontal scaling
- **Async Publishing**: Fire-and-forget pattern, don't block request handlers
## Security Considerations
**Event Encryption**:
- TLS in-transit for all Kafka connections
- Optional payload encryption for sensitive data
- End-to-end encryption with custom encryption layer
**Access Control**:
- Kafka ACLs (Access Control Lists) per topic
- SASL/SCRAM authentication for producers and consumers
- Separate credentials per service
- Principle of least privilege - grant only necessary permissions
**Schema Validation**:
- Avro schemas in Schema Registry
- Schema evolution with backward/forward compatibility
- Reject events that don't match schema
**Audit**:
- Log all event publishes and consumes
- Correlation IDs to trace event flow
- Retention policy for audit logs (7 years)
**Data Retention**:
- Default 7 days retention
- Configurable per topic
- Automatic deletion after retention period
- GDPR compliance (right to erasure)
## Deployment
```mermaid
graph TD
subgraph "Kafka Cluster"
subgraph "Brokers"
Broker1[Kafka Broker 1<br/>Leader for partitions 0,3,6]
Broker2[Kafka Broker 2<br/>Leader for partitions 1,4,7]
Broker3[Kafka Broker 3<br/>Leader for partitions 2,5,8]
end
subgraph "Coordination"
ZK[Zookeeper Ensemble<br/>3 nodes]
end
Broker1 --> ZK
Broker2 --> ZK
Broker3 --> ZK
end
subgraph "Producers"
IAM[IAM Service]
ServiceA[Service A]
end
subgraph "Consumers"
Notification[Notification Service<br/>Consumer Group: notifications]
Audit[Audit Service<br/>Consumer Group: audit]
end
IAM --> Broker1
IAM --> Broker2
IAM --> Broker3
ServiceA --> Broker1
ServiceA --> Broker2
ServiceA --> Broker3
Broker1 --> Notification
Broker2 --> Notification
Broker3 --> Notification
Broker1 --> Audit
Broker2 --> Audit
Broker3 --> Audit
style Broker1 fill:#e1f5ff
style Broker2 fill:#fff4e1
style Broker3 fill:#d4edda
style ZK fill:#f0e1ff
```
**Kafka Cluster Configuration**:
- **Brokers**: 3 brokers minimum (5 for production)
- **Replication Factor**: 3 (for fault tolerance)
- **Min In-Sync Replicas**: 2 (ensure data durability)
- **Partitions**: 3-10 per topic (based on throughput needs)
- **Zookeeper**: 3-node ensemble (for coordination)
**Resource Allocation**:
| Component | CPU | Memory | Disk |
|-----------|-----|--------|------|
| **Kafka Broker** | 2 cores | 4GB RAM | 100GB SSD |
| **Zookeeper** | 1 core | 2GB RAM | 20GB SSD |
| **Schema Registry** | 500m | 1GB RAM | 10GB |
**Topic Configuration**:
```yaml
user.created:
partitions: 3
replication-factor: 3
retention-ms: 604800000 # 7 days
compression-type: snappy
auth.login.success:
partitions: 5
replication-factor: 3
retention-ms: 604800000
compression-type: snappy
audit.events:
partitions: 10
replication-factor: 3
retention-ms: 220752000000 # 7 years
compression-type: lz4
```
**High Availability**:
- Multiple brokers with partition replication
- Automatic leader election when broker fails
- Consumer group rebalancing
- Monitoring and alerting for broker health
## Monitoring & Observability
**Key Metrics**:
**Kafka Broker Metrics**:
- `kafka_server_brokertopicmetrics_messagesinpersec` - Messages in/sec
- `kafka_server_brokertopicmetrics_bytesinpersec` - Bytes in/sec
- `kafka_server_brokertopicmetrics_bytesoutpersec` - Bytes out/sec
- `kafka_controller_kafkacontroller_activecontrollercount` - Active controller
- `kafka_server_replicamanager_underreplicatedpartitions` - Under-replicated partitions
**Consumer Metrics**:
- `kafka_consumer_fetch_manager_records_lag_max` - Max consumer lag
- `kafka_consumer_fetch_manager_records_consumed_rate` - Records consumed/sec
- `kafka_consumer_coordinator_commit_latency_avg` - Commit latency
**Producer Metrics**:
- `kafka_producer_record_send_total` - Total records sent
- `kafka_producer_record_error_total` - Total send errors
- `kafka_producer_request_latency_avg` - Request latency
**Application Metrics**:
```typescript
// Custom metrics for event processing
const eventPublished = new Counter({
name: 'events_published_total',
help: 'Total events published',
labelNames: ['event_type', 'topic']
});
const eventConsumed = new Counter({
name: 'events_consumed_total',
help: 'Total events consumed',
labelNames: ['event_type', 'topic', 'consumer_group']
});
const eventProcessingDuration = new Histogram({
name: 'event_processing_duration_seconds',
help: 'Event processing duration',
labelNames: ['event_type'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5]
});
```
**Dashboards**:
- Kafka Cluster Overview (brokers, topics, partitions)
- Producer Performance (throughput, latency, errors)
- Consumer Performance (lag, throughput, errors)
- Topic Metrics (messages/sec, bytes/sec, retention)
**Logging**:
```typescript
// Structured logging for events
logger.info('Event published', {
eventId: event.eventId,
eventType: event.eventType,
topic: 'user.created',
correlationId: event.correlationId
});
logger.info('Event consumed', {
eventId: event.eventId,
eventType: event.eventType,
topic: 'user.created',
consumerGroup: 'notifications',
processingTime: duration
});
```
## Related Documentation
- [System Design](./system-design.md) - Overall architecture
- [IAM Architecture](./iam-proposal.md) - Event sourcing implementation