Files
pos-system/docs/en/skills/event-driven-architecture.md
Ho Ngoc Hai 478254400a Refactor service documentation and enhance bilingual support
- Updated service template structure in `ARCHITECTURE.md` and `README.md` for clarity and usability.
- Enhanced bilingual documentation across skills, increasing the number of available skills from 15 to 25.
- Added new sections on event-driven architecture, inter-service communication, and performance optimization.
- Improved formatting and removed outdated references to streamline the documentation experience.
2026-01-01 10:06:27 +07:00

9.3 KiB

name, description
name description
event-driven-architecture Event-driven architecture patterns with Apache Kafka for GoodGo microservices. Use when implementing async communication, event publishing/consuming, event sourcing, CQRS, or integrating event streams with HTTP endpoints.

Event-Driven Architecture Patterns

When to Use This Skill

Use this skill when:

  • Implementing asynchronous communication between services
  • Decoupling services for better scalability
  • Publishing domain events for downstream consumers
  • Consuming events from other services
  • Implementing event sourcing patterns
  • Implementing CQRS (Command Query Responsibility Segregation)
  • Exposing event streams via HTTP (SSE/WebSocket)
  • Handling eventual consistency across services
  • Building reactive systems that respond to changes
  • Integrating with Apache Kafka message broker

Core Concepts

Event-Driven vs Request-Response

Request-Response (Synchronous):

  • Client waits for response
  • Tight coupling between services
  • Blocking operations
  • Immediate consistency
  • Use Traefik API Gateway for HTTP/REST

Event-Driven (Asynchronous):

  • Fire-and-forget publishing
  • Loose coupling between services
  • Non-blocking operations
  • Eventual consistency
  • Use Kafka for message broker

Kafka Fundamentals

  • Topics: Named streams of events (e.g., user.created, order.placed)
  • Partitions: Physical division of topics for parallelism and scaling
  • Consumer Groups: Groups of consumers that work together to process events
  • Producers: Services that publish events to topics
  • Consumers: Services that subscribe to topics and process events

Traefik Integration

Traefik serves dual purpose:

  • API Gateway: Routes synchronous HTTP/REST requests
  • Event Streaming Gateway: Routes SSE/WebSocket connections to event streaming endpoints

Services publish events to Kafka, then expose SSE/WebSocket endpoints that consume from Kafka for HTTP clients.

Key Patterns

Event Publishing

// src/core/events/event-publisher.ts
import { producer } from '../config/kafka.config';
import { logger } from '@goodgo/logger';
import { v4 as uuidv4 } from 'uuid';

export class EventPublisher {
  async publish<T extends BaseEvent>(
    topic: string,
    event: Omit<T, 'eventId' | 'timestamp' | 'source'>,
    options?: { partitionKey?: string }
  ): Promise<void> {
    const fullEvent: T = {
      ...event,
      eventId: uuidv4(),
      timestamp: new Date().toISOString(),
      source: this.serviceName,
    } as T;

    await producer.send({
      topic,
      messages: [{
        key: options?.partitionKey || fullEvent.eventId,
        value: JSON.stringify(fullEvent),
        headers: {
          'event-type': event.eventType,
          'event-version': event.eventVersion,
        },
      }],
    });
  }
}

Event Consuming

// src/core/events/event-consumer.ts
import { kafka } from '../config/kafka.config';

export class EventConsumer {
  private handlers: Map<string, EventHandler[]> = new Map();

  on<T extends BaseEvent>(eventType: string, handler: EventHandler<T>): void {
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, []);
    }
    this.handlers.get(eventType)!.push(handler);
  }

  async start(topics: string[]): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topics, fromBeginning: false });

    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const event: BaseEvent = JSON.parse(message.value?.toString() || '{}');
        const handlers = this.handlers.get(event.eventType) || [];
        await Promise.all(handlers.map(h => h.handle(event)));
      },
    });
  }
}

Outbox Pattern for Transactional Publishing

// Store event in database within transaction
await prisma.outboxEvent.create({
  data: {
    eventType: 'user.created',
    eventData: userData,
    topic: 'user.created',
    status: 'PENDING',
  },
});

// Separate process publishes from outbox to Kafka
async function processOutbox() {
  const events = await prisma.outboxEvent.findMany({
    where: { status: 'PENDING' },
  });
  
  for (const event of events) {
    await eventPublisher.publish(event.topic, event.eventData);
    await prisma.outboxEvent.update({
      where: { id: event.id },
      data: { status: 'PUBLISHED' },
    });
  }
}

SSE Endpoint for Event Streaming

// src/modules/events/events.controller.ts
async streamEvents(req: Request, res: Response): Promise<void> {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  const topic = req.query.topic as string;
  const consumer = kafka.consumer({ groupId: `sse-${Date.now()}` });

  await consumer.connect();
  await consumer.subscribe({ topic, fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const event = JSON.parse(message.value?.toString() || '{}');
      res.write(`data: ${JSON.stringify(event)}\n\n`);
    },
  });

  req.on('close', async () => {
    await consumer.disconnect();
  });
}

Event Structure

interface BaseEvent {
  eventId: string;
  eventType: string;
  eventVersion: string;
  timestamp: string;
  source: string;
  correlationId?: string;
  traceId?: string;
  data: unknown;
}

Best Practices

Event Naming Conventions

  • Event Type: {domain}.{action}.v{version} (e.g., user.created.v1)
  • Topic: {domain}.{entity}.{action} (e.g., user.created)
  • Use lowercase with dots as separators
  • Keep names descriptive and consistent

Partition Key Selection

  • Use entity ID for ordering guarantees (same entity → same partition)
  • Use correlation ID for request tracing
  • Use user ID for user-scoped events
  • Avoid high-cardinality keys (distributes evenly)

Event Ordering Guarantees

  • Kafka guarantees ordering per partition
  • Use partition key to ensure related events go to same partition
  • Events in different partitions have no ordering guarantee
  • Don't rely on global ordering across all events

Error Handling

  • Implement Dead Letter Queue (DLQ) for failed events
  • Use retry with exponential backoff
  • Log all event processing failures
  • Monitor consumer lag and DLQ size

Observability

  • Log all published and consumed events
  • Track metrics: events published/consumed, processing duration, consumer lag
  • Add distributed tracing to event flows
  • Include correlation IDs for request tracking

Infrastructure Setup

Docker Compose (Local)

services:
  kafka:
    image: confluentinc/cp-kafka:7.4.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

  schema-registry:
    image: confluentinc/cp-schema-registry:7.4.0
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092

Testing

Unit Testing

import { EventPublisher } from '../event-publisher';
import { producer } from '../../config/kafka.config';

jest.mock('../../config/kafka.config');

describe('EventPublisher', () => {
  it('should publish event successfully', async () => {
    const publisher = new EventPublisher();
    const mockSend = jest.fn().mockResolvedValue({});
    (producer.send as jest.Mock) = mockSend;

    await publisher.publish('user.created', {
      eventType: 'user.created',
      eventVersion: '1.0.0',
      data: { userId: '123' },
    });

    expect(mockSend).toHaveBeenCalled();
  });
});

Integration Testing with Test Containers

import { KafkaContainer } from '@testcontainers/kafka';

describe('Event Flow E2E', () => {
  let kafkaContainer: StartedKafkaContainer;

  beforeAll(async () => {
    kafkaContainer = await new KafkaContainer().start();
    process.env.KAFKA_BROKERS = kafkaContainer.getBootstrapServer();
  });

  it('should publish and consume event', async () => {
    // Test implementation
  });
});

Common Use Cases

User Created Event Flow

  1. Auth Service creates user in database
  2. Publishes user.created event to Kafka
  3. Notification Service consumes event and sends welcome email
  4. Analytics Service consumes event and updates metrics

Order Processing with Multiple Consumers

  1. Order Service publishes order.placed event
  2. Payment Service processes payment
  3. Inventory Service reserves items
  4. Notification Service sends confirmation

Resources