feat: Thêm các module kỹ năng mới cho outbox pattern, API aggregation, event sourcing, service discovery và saga pattern cùng với các ví dụ tham chiếu.

This commit is contained in:
Ho Ngoc Hai
2026-01-15 22:39:16 +07:00
parent bcadf2b8e4
commit 76629ab7d3
11 changed files with 5054 additions and 1 deletions

View File

@@ -0,0 +1,477 @@
---
name: api-aggregation
description: API Gateway Aggregation và Backend for Frontend (BFF) patterns. Use for response composition, request routing, và client-specific APIs.
compatibility: ".NET 10+, YARP, Ocelot, GraphQL"
metadata:
author: Velik Ho
version: "1.0"
---
# API Aggregation & BFF / API Aggregation và BFF Pattern
Patterns cho API Gateway aggregation và Backend for Frontend trong microservices.
## When to Use This Skill / Khi Nào Sử Dụng
Use this skill when:
- Aggregating responses from multiple services / Tổng hợp responses từ nhiều services
- Creating client-specific APIs / Tạo APIs riêng cho từng client
- Reducing chatty client-server communication / Giảm giao tiếp "chatty"
- Implementing API composition / Triển khai API composition
## Core Concepts / Khái Niệm Cốt Lõi
### Gateway Patterns Overview
```
┌─────────────────────────────────────────────────────────────┐
│ GATEWAY PATTERNS │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ GATEWAY ROUTING (Reverse Proxy) │ │
│ │ Client ──▶ Gateway ──▶ Service A │ │
│ │ (Route) Service B │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ GATEWAY AGGREGATION │ │
│ │ Client ──▶ Gateway ──┬─▶ Service A ─┐ │ │
│ │ └─▶ Service B ─┼─▶ Combined │ │
│ │ └─▶ Service C ─┘ Response │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ BACKEND FOR FRONTEND (BFF) │ │
│ │ Mobile ──▶ Mobile BFF ──▶ Services │ │
│ │ Web ──▶ Web BFF ──▶ Services │ │
│ │ IoT ──▶ IoT BFF ──▶ Services │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
```
### When to Use What / Khi Nào Dùng Gì
| Pattern | Use Case | Example |
|---------|----------|---------|
| **Gateway Routing** | Simple proxy | Route `/api/users/*` to User Service |
| **Gateway Aggregation** | Combine multiple calls | Product page = Product + Reviews + Stock |
| **BFF** | Client-specific needs | Mobile needs different data than Web |
### Benefits and Trade-offs
| Benefit | Trade-off |
|---------|-----------|
| Reduces client complexity | Single point of failure |
| Optimizes for client needs | Additional latency hop |
| Enables caching | Gateway becomes bottleneck |
| Centralizes cross-cutting concerns | More complexity to maintain |
## Key Patterns / Mẫu Chính
### YARP Reverse Proxy Configuration
```csharp
/// <summary>
/// EN: Configure YARP reverse proxy for routing.
/// VI: Cấu hình YARP reverse proxy cho routing.
/// </summary>
// Program.cs
builder.Services.AddReverseProxy()
.LoadFromConfig(builder.Configuration.GetSection("ReverseProxy"));
var app = builder.Build();
app.MapReverseProxy();
app.Run();
// appsettings.json
{
"ReverseProxy": {
"Routes": {
"users-route": {
"ClusterId": "users-cluster",
"Match": {
"Path": "/api/users/{**catch-all}"
},
"Transforms": [
{ "PathRemovePrefix": "/api/users" }
]
},
"orders-route": {
"ClusterId": "orders-cluster",
"Match": {
"Path": "/api/orders/{**catch-all}"
}
}
},
"Clusters": {
"users-cluster": {
"Destinations": {
"user-service": {
"Address": "http://user-service:5001"
}
}
},
"orders-cluster": {
"Destinations": {
"order-service": {
"Address": "http://order-service:5002"
}
}
}
}
}
}
```
### API Aggregation Controller
```csharp
/// <summary>
/// EN: Aggregation controller combining multiple service responses.
/// VI: Controller aggregation kết hợp responses từ nhiều services.
/// </summary>
[ApiController]
[Route("api/v1/aggregator")]
public class AggregatorController : ControllerBase
{
private readonly IProductServiceClient _productClient;
private readonly IReviewServiceClient _reviewClient;
private readonly IInventoryServiceClient _inventoryClient;
private readonly ILogger<AggregatorController> _logger;
public AggregatorController(
IProductServiceClient productClient,
IReviewServiceClient reviewClient,
IInventoryServiceClient inventoryClient,
ILogger<AggregatorController> logger)
{
_productClient = productClient;
_reviewClient = reviewClient;
_inventoryClient = inventoryClient;
_logger = logger;
}
/// <summary>
/// EN: Get product details with reviews and stock info.
/// VI: Lấy chi tiết sản phẩm với reviews và thông tin tồn kho.
/// </summary>
[HttpGet("products/{productId}")]
public async Task<ActionResult<ProductDetailsDto>> GetProductDetails(
Guid productId,
CancellationToken ct)
{
// EN: Make parallel calls to services
// VI: Gọi song song đến các services
var productTask = _productClient.GetProductAsync(productId, ct);
var reviewsTask = _reviewClient.GetReviewsAsync(productId, ct);
var stockTask = _inventoryClient.GetStockAsync(productId, ct);
await Task.WhenAll(productTask, reviewsTask, stockTask);
var product = await productTask;
if (product == null)
return NotFound();
var reviews = await reviewsTask;
var stock = await stockTask;
// EN: Aggregate responses
// VI: Tổng hợp responses
return Ok(new ProductDetailsDto
{
Id = product.Id,
Name = product.Name,
Description = product.Description,
Price = product.Price,
ImageUrl = product.ImageUrl,
AverageRating = reviews.AverageRating,
ReviewCount = reviews.TotalCount,
TopReviews = reviews.Items.Take(3).ToList(),
InStock = stock.AvailableQuantity > 0,
AvailableQuantity = stock.AvailableQuantity
});
}
/// <summary>
/// EN: Get user dashboard data from multiple services.
/// VI: Lấy dữ liệu dashboard user từ nhiều services.
/// </summary>
[HttpGet("dashboard")]
[Authorize]
public async Task<ActionResult<DashboardDto>> GetDashboard(CancellationToken ct)
{
var userId = User.GetUserId();
var profileTask = _userClient.GetProfileAsync(userId, ct);
var ordersTask = _orderClient.GetRecentOrdersAsync(userId, 5, ct);
var notificationsTask = _notificationClient.GetUnreadCountAsync(userId, ct);
var walletTask = _walletClient.GetBalanceAsync(userId, ct);
await Task.WhenAll(profileTask, ordersTask, notificationsTask, walletTask);
return Ok(new DashboardDto
{
Profile = await profileTask,
RecentOrders = await ordersTask,
UnreadNotifications = await notificationsTask,
WalletBalance = await walletTask
});
}
}
```
### BFF for Mobile
```csharp
/// <summary>
/// EN: Mobile BFF with optimized responses.
/// VI: Mobile BFF với responses được tối ưu.
/// </summary>
[ApiController]
[Route("mobile/api/v1")]
public class MobileBffController : ControllerBase
{
private readonly IMediator _mediator;
public MobileBffController(IMediator mediator)
{
_mediator = mediator;
}
/// <summary>
/// EN: Mobile-optimized product list.
/// VI: Danh sách sản phẩm tối ưu cho mobile.
/// </summary>
[HttpGet("products")]
public async Task<ActionResult<MobileProductListDto>> GetProducts(
[FromQuery] int page = 1,
[FromQuery] int pageSize = 10,
CancellationToken ct = default)
{
// EN: Mobile gets smaller images and fewer fields
// VI: Mobile nhận images nhỏ hơn và ít fields hơn
var result = await _mediator.Send(new GetMobileProductsQuery
{
Page = page,
PageSize = pageSize,
ImageSize = "thumbnail" // Only thumbnails for mobile
}, ct);
return Ok(result);
}
/// <summary>
/// EN: Mobile-optimized checkout.
/// VI: Checkout tối ưu cho mobile.
/// </summary>
[HttpPost("checkout")]
[Authorize]
public async Task<ActionResult<MobileCheckoutResultDto>> Checkout(
MobileCheckoutRequest request,
CancellationToken ct)
{
// EN: Single endpoint handles entire checkout for mobile
// VI: Một endpoint xử lý toàn bộ checkout cho mobile
var result = await _mediator.Send(new MobileCheckoutCommand
{
UserId = User.GetUserId(),
CartId = request.CartId,
PaymentMethodId = request.PaymentMethodId,
ShippingAddressId = request.ShippingAddressId,
UseWalletBalance = request.UseWalletBalance
}, ct);
return Ok(result);
}
}
```
### GraphQL BFF
```csharp
/// <summary>
/// EN: GraphQL query for flexible aggregation.
/// VI: GraphQL query cho aggregation linh hoạt.
/// </summary>
public class Query
{
public async Task<ProductType?> GetProduct(
[ID] Guid productId,
ProductDataLoader productLoader,
ReviewDataLoader reviewLoader,
StockDataLoader stockLoader)
{
return await productLoader.LoadAsync(productId);
}
}
public class ProductType : ObjectType<Product>
{
protected override void Configure(IObjectTypeDescriptor<Product> descriptor)
{
descriptor.Field(p => p.Id).ID();
descriptor.Field(p => p.Name);
descriptor.Field(p => p.Price);
// EN: Lazy load reviews only when requested
// VI: Lazy load reviews chỉ khi được yêu cầu
descriptor
.Field("reviews")
.ResolveWith<ProductResolvers>(r => r.GetReviews(default!, default!))
.Type<ListType<ReviewType>>();
// EN: Lazy load stock only when requested
// VI: Lazy load stock chỉ khi được yêu cầu
descriptor
.Field("stock")
.ResolveWith<ProductResolvers>(r => r.GetStock(default!, default!))
.Type<StockType>();
}
}
public class ProductResolvers
{
public async Task<IEnumerable<Review>> GetReviews(
[Parent] Product product,
ReviewDataLoader loader)
{
return await loader.LoadAsync(product.Id);
}
public async Task<Stock?> GetStock(
[Parent] Product product,
StockDataLoader loader)
{
return await loader.LoadAsync(product.Id);
}
}
```
## Common Mistakes / Lỗi Thường Gặp
### 1. Sequential Instead of Parallel Calls
```csharp
// ❌ BAD: Sequential calls
public async Task<ProductDetailsDto> GetProductDetails(Guid productId)
{
var product = await _productClient.GetAsync(productId);
var reviews = await _reviewClient.GetAsync(productId); // Waits for product
var stock = await _stockClient.GetAsync(productId); // Waits for reviews
// Total time = product + reviews + stock
}
// ✅ GOOD: Parallel calls
public async Task<ProductDetailsDto> GetProductDetails(Guid productId)
{
var productTask = _productClient.GetAsync(productId);
var reviewsTask = _reviewClient.GetAsync(productId);
var stockTask = _stockClient.GetAsync(productId);
await Task.WhenAll(productTask, reviewsTask, stockTask);
// Total time = max(product, reviews, stock)
}
```
### 2. No Fallback on Partial Failure
```csharp
// ❌ BAD: Fails entirely if one service fails
public async Task<DashboardDto> GetDashboard()
{
var profile = await _userClient.GetProfileAsync(userId);
var orders = await _orderClient.GetOrdersAsync(userId); // If this fails, whole request fails
var notifications = await _notificationClient.GetAsync(userId);
}
// ✅ GOOD: Graceful degradation
public async Task<DashboardDto> GetDashboard()
{
var profileTask = _userClient.GetProfileAsync(userId);
var ordersTask = GetOrdersSafeAsync(userId);
var notificationsTask = GetNotificationsSafeAsync(userId);
await Task.WhenAll(profileTask, ordersTask, notificationsTask);
return new DashboardDto
{
Profile = await profileTask,
Orders = await ordersTask ?? Array.Empty<Order>(), // Empty if failed
Notifications = await notificationsTask ?? 0 // Zero if failed
};
}
private async Task<IEnumerable<Order>?> GetOrdersSafeAsync(string userId)
{
try
{
return await _orderClient.GetOrdersAsync(userId);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to get orders for user {UserId}", userId);
return null; // Return null, not throw
}
}
```
### 3. Missing Caching
```csharp
// ❌ BAD: No caching, repeated calls
public async Task<ProductDetailsDto> GetProductDetails(Guid productId)
{
var product = await _productClient.GetAsync(productId); // Called every time
return MapToDto(product);
}
// ✅ GOOD: With caching
public async Task<ProductDetailsDto> GetProductDetails(Guid productId)
{
var cacheKey = $"product:{productId}";
var cached = await _cache.GetAsync<ProductDetailsDto>(cacheKey);
if (cached != null)
return cached;
var product = await _productClient.GetAsync(productId);
var dto = MapToDto(product);
await _cache.SetAsync(cacheKey, dto, TimeSpan.FromMinutes(5));
return dto;
}
```
## Quick Reference / Tham Chiếu Nhanh
### Pattern Selection Guide
| Need | Pattern | Implementation |
|------|---------|----------------|
| Route requests | Gateway Routing | YARP, Ocelot |
| Combine data | Gateway Aggregation | Custom controller |
| Client-specific API | BFF | Separate project per client |
| Flexible queries | GraphQL | HotChocolate |
### Performance Checklist
| Optimization | Applied? |
|-------------|----------|
| Parallel service calls | ✅ |
| Response caching | ✅ |
| Graceful degradation | ✅ |
| Connection pooling | ✅ |
| Timeout configuration | ✅ |
## Resources / Tài Nguyên
- [Detailed Examples](./references/REFERENCE.md) - Full code examples
- [API Design](../api-design/SKILL.md) - REST API patterns
- [Redis Caching](../redis-caching/SKILL.md) - Caching strategies
- [Error Handling](../error-handling-patterns/SKILL.md) - Resilience patterns
- [Inter-service Communication](../inter-service-communication/SKILL.md) - HTTP clients

View File

@@ -0,0 +1,551 @@
# API Aggregation & BFF - Reference Examples
## Complete Implementation Examples
### 1. YARP Advanced Configuration
```csharp
/// <summary>
/// EN: Advanced YARP configuration with transforms and load balancing.
/// VI: Cấu hình YARP nâng cao với transforms và load balancing.
/// </summary>
// Program.cs
builder.Services.AddReverseProxy()
.LoadFromConfig(builder.Configuration.GetSection("ReverseProxy"))
.AddTransforms(transforms =>
{
// EN: Add correlation ID to all requests
transforms.AddRequestTransform(ctx =>
{
var correlationId = ctx.HttpContext.Request.Headers["X-Correlation-Id"]
.FirstOrDefault() ?? Guid.NewGuid().ToString();
ctx.ProxyRequest.Headers.TryAddWithoutValidation(
"X-Correlation-Id", correlationId);
return ValueTask.CompletedTask;
});
// EN: Add timing header to responses
transforms.AddResponseTransform(ctx =>
{
ctx.HttpContext.Response.Headers["X-Processed-At"] =
DateTime.UtcNow.ToString("O");
return ValueTask.CompletedTask;
});
});
// EN: Add health checks for backends
builder.Services.AddHealthChecks()
.AddUrlGroup(new Uri("http://user-service:5001/health"), name: "user-service")
.AddUrlGroup(new Uri("http://order-service:5002/health"), name: "order-service");
var app = builder.Build();
// EN: Add authentication before proxy
app.UseAuthentication();
app.UseAuthorization();
app.MapReverseProxy(proxyPipeline =>
{
proxyPipeline.UseSessionAffinity();
proxyPipeline.UseLoadBalancing();
proxyPipeline.UsePassiveHealthChecks();
});
```
### 2. Complete Service Clients with Resilience
```csharp
/// <summary>
/// EN: HTTP client configuration with resilience.
/// VI: Cấu hình HTTP client với resilience.
/// </summary>
public static class HttpClientExtensions
{
public static IServiceCollection AddServiceClients(
this IServiceCollection services,
IConfiguration configuration)
{
// EN: Product Service Client
services.AddHttpClient<IProductServiceClient, ProductServiceClient>(client =>
{
client.BaseAddress = new Uri(configuration["Services:Products:BaseUrl"]!);
client.DefaultRequestHeaders.Add("Accept", "application/json");
})
.AddStandardResilienceHandler();
// EN: Review Service Client
services.AddHttpClient<IReviewServiceClient, ReviewServiceClient>(client =>
{
client.BaseAddress = new Uri(configuration["Services:Reviews:BaseUrl"]!);
})
.AddStandardResilienceHandler();
// EN: Inventory Service Client
services.AddHttpClient<IInventoryServiceClient, InventoryServiceClient>(client =>
{
client.BaseAddress = new Uri(configuration["Services:Inventory:BaseUrl"]!);
})
.AddStandardResilienceHandler();
return services;
}
}
/// <summary>
/// EN: Product service client implementation.
/// VI: Triển khai client cho product service.
/// </summary>
public interface IProductServiceClient
{
Task<ProductDto?> GetProductAsync(Guid productId, CancellationToken ct = default);
Task<PagedResult<ProductDto>> GetProductsAsync(int page, int pageSize, CancellationToken ct = default);
}
public class ProductServiceClient : IProductServiceClient
{
private readonly HttpClient _httpClient;
private readonly ILogger<ProductServiceClient> _logger;
public ProductServiceClient(
HttpClient httpClient,
ILogger<ProductServiceClient> logger)
{
_httpClient = httpClient;
_logger = logger;
}
public async Task<ProductDto?> GetProductAsync(Guid productId, CancellationToken ct = default)
{
try
{
var response = await _httpClient.GetAsync($"/api/products/{productId}", ct);
if (response.StatusCode == HttpStatusCode.NotFound)
return null;
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<ProductDto>(ct);
}
catch (HttpRequestException ex)
{
_logger.LogError(ex, "Failed to get product {ProductId}", productId);
throw;
}
}
public async Task<PagedResult<ProductDto>> GetProductsAsync(
int page,
int pageSize,
CancellationToken ct = default)
{
var response = await _httpClient.GetAsync(
$"/api/products?page={page}&pageSize={pageSize}", ct);
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<PagedResult<ProductDto>>(ct)
?? new PagedResult<ProductDto>();
}
}
```
### 3. Aggregation with Timeout and Fallback
```csharp
/// <summary>
/// EN: Aggregator with timeout and fallback handling.
/// VI: Aggregator với xử lý timeout và fallback.
/// </summary>
public class ResilientAggregatorService
{
private readonly IProductServiceClient _productClient;
private readonly IReviewServiceClient _reviewClient;
private readonly IInventoryServiceClient _inventoryClient;
private readonly IDistributedCache _cache;
private readonly ILogger<ResilientAggregatorService> _logger;
private static readonly TimeSpan AggregationTimeout = TimeSpan.FromSeconds(5);
private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(5);
public ResilientAggregatorService(
IProductServiceClient productClient,
IReviewServiceClient reviewClient,
IInventoryServiceClient inventoryClient,
IDistributedCache cache,
ILogger<ResilientAggregatorService> logger)
{
_productClient = productClient;
_reviewClient = reviewClient;
_inventoryClient = inventoryClient;
_cache = cache;
_logger = logger;
}
public async Task<ProductDetailsDto?> GetProductDetailsAsync(
Guid productId,
CancellationToken ct = default)
{
// EN: Try cache first
var cacheKey = $"product-details:{productId}";
var cached = await _cache.GetStringAsync(cacheKey, ct);
if (!string.IsNullOrEmpty(cached))
{
return JsonSerializer.Deserialize<ProductDetailsDto>(cached);
}
// EN: Create timeout cancellation token
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(AggregationTimeout);
// EN: Fetch product (required)
var product = await _productClient.GetProductAsync(productId, timeoutCts.Token);
if (product == null)
return null;
// EN: Fetch optional data with fallbacks
var reviewsTask = GetReviewsSafeAsync(productId, timeoutCts.Token);
var stockTask = GetStockSafeAsync(productId, timeoutCts.Token);
await Task.WhenAll(reviewsTask, stockTask);
var reviews = await reviewsTask;
var stock = await stockTask;
var result = new ProductDetailsDto
{
Id = product.Id,
Name = product.Name,
Description = product.Description,
Price = product.Price,
ImageUrl = product.ImageUrl,
// EN: Reviews with fallback
AverageRating = reviews?.AverageRating ?? 0,
ReviewCount = reviews?.TotalCount ?? 0,
TopReviews = reviews?.Items.Take(3).ToList() ?? new List<ReviewDto>(),
ReviewsAvailable = reviews != null,
// EN: Stock with fallback
InStock = stock?.AvailableQuantity > 0 ?? true, // Assume in stock if unknown
AvailableQuantity = stock?.AvailableQuantity,
StockAvailable = stock != null
};
// EN: Cache the result
await _cache.SetStringAsync(
cacheKey,
JsonSerializer.Serialize(result),
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = CacheDuration
},
ct);
return result;
}
private async Task<ReviewSummaryDto?> GetReviewsSafeAsync(
Guid productId,
CancellationToken ct)
{
try
{
return await _reviewClient.GetReviewSummaryAsync(productId, ct);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to get reviews for product {ProductId}", productId);
return null;
}
}
private async Task<StockDto?> GetStockSafeAsync(
Guid productId,
CancellationToken ct)
{
try
{
return await _inventoryClient.GetStockAsync(productId, ct);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to get stock for product {ProductId}", productId);
return null;
}
}
}
```
### 4. Complete BFF Project Structure
```csharp
/// <summary>
/// EN: Mobile BFF project structure and implementation.
/// VI: Cấu trúc và triển khai Mobile BFF project.
/// </summary>
// MobileBff/Program.cs
var builder = WebApplication.CreateBuilder(args);
// EN: Add services
builder.Services.AddControllers();
builder.Services.AddMediatR(cfg =>
cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));
builder.Services.AddServiceClients(builder.Configuration);
builder.Services.AddStackExchangeRedisCache(options =>
{
options.Configuration = builder.Configuration.GetConnectionString("Redis");
});
// EN: Add authentication
builder.Services.AddJwtAuthentication(builder.Configuration);
var app = builder.Build();
app.UseAuthentication();
app.UseAuthorization();
app.MapControllers();
app.Run();
// MobileBff/Features/Home/GetHomeScreenQuery.cs
public record GetHomeScreenQuery : IRequest<HomeScreenDto>
{
public string UserId { get; init; } = default!;
}
public record HomeScreenDto
{
public UserProfileSummaryDto? Profile { get; init; }
public List<FeaturedProductDto> FeaturedProducts { get; init; } = new();
public List<CategoryDto> Categories { get; init; } = new();
public List<PromotionDto> ActivePromotions { get; init; } = new();
public int CartItemCount { get; init; }
public int UnreadNotifications { get; init; }
}
public class GetHomeScreenQueryHandler : IRequestHandler<GetHomeScreenQuery, HomeScreenDto>
{
private readonly IUserServiceClient _userClient;
private readonly IProductServiceClient _productClient;
private readonly ICategoryServiceClient _categoryClient;
private readonly IPromotionServiceClient _promotionClient;
private readonly ICartServiceClient _cartClient;
private readonly INotificationServiceClient _notificationClient;
private readonly ILogger<GetHomeScreenQueryHandler> _logger;
public GetHomeScreenQueryHandler(
IUserServiceClient userClient,
IProductServiceClient productClient,
ICategoryServiceClient categoryClient,
IPromotionServiceClient promotionClient,
ICartServiceClient cartClient,
INotificationServiceClient notificationClient,
ILogger<GetHomeScreenQueryHandler> logger)
{
_userClient = userClient;
_productClient = productClient;
_categoryClient = categoryClient;
_promotionClient = promotionClient;
_cartClient = cartClient;
_notificationClient = notificationClient;
_logger = logger;
}
public async Task<HomeScreenDto> Handle(
GetHomeScreenQuery request,
CancellationToken ct)
{
// EN: All calls in parallel with safe wrappers
var profileTask = SafeCallAsync(() =>
_userClient.GetProfileSummaryAsync(request.UserId, ct));
var featuredTask = SafeCallAsync(() =>
_productClient.GetFeaturedAsync(6, ct));
var categoriesTask = SafeCallAsync(() =>
_categoryClient.GetTopCategoriesAsync(8, ct));
var promotionsTask = SafeCallAsync(() =>
_promotionClient.GetActiveAsync(ct));
var cartCountTask = SafeCallAsync(() =>
_cartClient.GetItemCountAsync(request.UserId, ct));
var notificationCountTask = SafeCallAsync(() =>
_notificationClient.GetUnreadCountAsync(request.UserId, ct));
await Task.WhenAll(
profileTask, featuredTask, categoriesTask,
promotionsTask, cartCountTask, notificationCountTask);
return new HomeScreenDto
{
Profile = await profileTask,
FeaturedProducts = (await featuredTask)?.ToList() ?? new(),
Categories = (await categoriesTask)?.ToList() ?? new(),
ActivePromotions = (await promotionsTask)?.ToList() ?? new(),
CartItemCount = await cartCountTask ?? 0,
UnreadNotifications = await notificationCountTask ?? 0
};
}
private async Task<T?> SafeCallAsync<T>(Func<Task<T>> call)
{
try
{
return await call();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Service call failed in home screen aggregation");
return default;
}
}
}
```
### 5. GraphQL BFF with DataLoaders
```csharp
/// <summary>
/// EN: GraphQL BFF with efficient data loading.
/// VI: GraphQL BFF với data loading hiệu quả.
/// </summary>
// GraphQL/Query.cs
public class Query
{
[UseProjection]
[UseFiltering]
[UseSorting]
public async Task<IQueryable<Product>> GetProducts(
[Service] IProductRepository repository) =>
repository.GetAll();
public async Task<Product?> GetProduct(
[ID] Guid id,
ProductByIdDataLoader loader) =>
await loader.LoadAsync(id);
}
// GraphQL/DataLoaders/ProductByIdDataLoader.cs
public class ProductByIdDataLoader : BatchDataLoader<Guid, Product>
{
private readonly IProductServiceClient _client;
public ProductByIdDataLoader(
IProductServiceClient client,
IBatchScheduler batchScheduler,
DataLoaderOptions? options = null)
: base(batchScheduler, options)
{
_client = client;
}
protected override async Task<IReadOnlyDictionary<Guid, Product>> LoadBatchAsync(
IReadOnlyList<Guid> keys,
CancellationToken ct)
{
// EN: Batch load all requested products
var products = await _client.GetProductsByIdsAsync(keys, ct);
return products.ToDictionary(p => p.Id);
}
}
// GraphQL/Types/ProductType.cs
public class ProductType : ObjectType<Product>
{
protected override void Configure(IObjectTypeDescriptor<Product> descriptor)
{
descriptor.Field(p => p.Id).Type<NonNullType<IdType>>();
descriptor.Field(p => p.Name).Type<NonNullType<StringType>>();
descriptor.Field(p => p.Price).Type<NonNullType<DecimalType>>();
// EN: Reviews field with data loader
descriptor
.Field("reviews")
.Type<NonNullType<ListType<NonNullType<ReviewType>>>>()
.Resolve(async ctx =>
{
var loader = ctx.DataLoader<ReviewsByProductIdDataLoader>();
return await loader.LoadAsync(ctx.Parent<Product>().Id);
});
// EN: Stock field with data loader
descriptor
.Field("stock")
.Type<StockType>()
.Resolve(async ctx =>
{
var loader = ctx.DataLoader<StockByProductIdDataLoader>();
return await loader.LoadAsync(ctx.Parent<Product>().Id);
});
// EN: Related products
descriptor
.Field("relatedProducts")
.Type<ListType<ProductType>>()
.Argument("limit", a => a.Type<IntType>().DefaultValue(4))
.Resolve(async ctx =>
{
var product = ctx.Parent<Product>();
var limit = ctx.ArgumentValue<int>("limit");
var loader = ctx.DataLoader<RelatedProductsDataLoader>();
var related = await loader.LoadAsync(product.CategoryId);
return related.Where(p => p.Id != product.Id).Take(limit);
});
}
}
```
## Docker Compose for API Gateway
```yaml
version: '3.8'
services:
api-gateway:
build:
context: ./src/ApiGateway
ports:
- "5000:80"
environment:
- ASPNETCORE_ENVIRONMENT=Development
- ReverseProxy__Clusters__users-cluster__Destinations__user-service__Address=http://user-service:80
- ReverseProxy__Clusters__orders-cluster__Destinations__order-service__Address=http://order-service:80
depends_on:
- user-service
- order-service
mobile-bff:
build:
context: ./src/MobileBff
ports:
- "5001:80"
environment:
- Services__Products__BaseUrl=http://product-service:80
- Services__Reviews__BaseUrl=http://review-service:80
- ConnectionStrings__Redis=redis:6379
web-bff:
build:
context: ./src/WebBff
ports:
- "5002:80"
user-service:
build:
context: ./src/UserService
expose:
- "80"
order-service:
build:
context: ./src/OrderService
expose:
- "80"
redis:
image: redis:7-alpine
ports:
- "6379:6379"
```

View File

@@ -0,0 +1,455 @@
---
name: event-sourcing
description: Event Sourcing pattern - lưu trữ thay đổi trạng thái dưới dạng chuỗi sự kiện. Use for audit trails, temporal queries, CQRS integration, và event replay.
compatibility: ".NET 10+, EventStoreDB, Marten, EF Core"
metadata:
author: Velik Ho
version: "1.0"
---
# Event Sourcing / Event Sourcing Pattern
Event Sourcing pattern cho GoodGo microservices - lưu trữ mọi thay đổi trạng thái dưới dạng sự kiện bất biến.
## When to Use This Skill / Khi Nào Sử Dụng
Use this skill when:
- Building audit trails / Xây dựng audit trails
- Implementing temporal queries (time travel) / Truy vấn theo thời gian
- Debugging production issues / Debug lỗi production
- CQRS with event-driven projections / CQRS với projections
- Ensuring data consistency / Đảm bảo tính nhất quán dữ liệu
## Core Concepts / Khái Niệm Cốt Lõi
### Traditional vs Event Sourcing / Truyền Thống vs Event Sourcing
```
┌─────────────────────────────────────────────────────────────┐
│ TRADITIONAL (State-Based) │
├─────────────────────────────────────────────────────────────┤
│ Order Table │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Id: 1, Status: Shipped, Amount: 500, Updated: 10:30 │ │
│ └─────────────────────────────────────────────────────┘ │
│ ❌ Lost: Why was it shipped? What was previous status? │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ EVENT SOURCING (Event-Based) │
├─────────────────────────────────────────────────────────────┤
│ Event Stream: Order-1 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. OrderCreated { Amount: 500, UserId: "u1" } @10:00│ │
│ │ 2. OrderPaid { PaymentId: "p1" } @10:15 │ │
│ │ 3. OrderShipped { TrackingNo: "TN123" } @10:30 │ │
│ └─────────────────────────────────────────────────────┘ │
│ ✅ Full history preserved │
└─────────────────────────────────────────────────────────────┘
```
### Key Components / Thành Phần Chính
| Component | Purpose | Description |
|-----------|---------|-------------|
| **Event** | Immutable fact | Sự kiện đã xảy ra, không thể thay đổi |
| **Event Stream** | Ordered sequence | Chuỗi events theo thời gian cho mỗi aggregate |
| **Event Store** | Append-only log | Database lưu trữ events |
| **Projection** | Read model | View được xây dựng từ events |
| **Snapshot** | State cache | Cache trạng thái để tối ưu replay |
### Event Sourcing Flow / Luồng Event Sourcing
```
┌─────────┐ ┌─────────────┐ ┌─────────────┐
│ Command │───▶│ Aggregate │───▶│ Events │
└─────────┘ │ (Domain) │ │ (Facts) │
└─────────────┘ └──────┬──────┘
┌─────────────────────────┼─────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Event Store │ │ Projection │ │ Snapshot │
│ (Write) │ │ (Read) │ │ (Cache) │
└─────────────┘ └─────────────┘ └─────────────┘
```
## Key Patterns / Mẫu Chính
### Domain Event Definition / Định Nghĩa Domain Event
```csharp
/// <summary>
/// EN: Base interface for all domain events.
/// VI: Interface cơ sở cho tất cả domain events.
/// </summary>
public interface IDomainEvent
{
Guid EventId { get; }
DateTime OccurredOn { get; }
int Version { get; }
}
/// <summary>
/// EN: Order created event.
/// VI: Event tạo order.
/// </summary>
public record OrderCreated : IDomainEvent
{
public Guid EventId { get; init; } = Guid.NewGuid();
public DateTime OccurredOn { get; init; } = DateTime.UtcNow;
public int Version { get; init; }
public Guid OrderId { get; init; }
public string UserId { get; init; } = default!;
public decimal TotalAmount { get; init; }
public Address ShippingAddress { get; init; } = default!;
}
/// <summary>
/// EN: Order item added event.
/// VI: Event thêm item vào order.
/// </summary>
public record OrderItemAdded : IDomainEvent
{
public Guid EventId { get; init; } = Guid.NewGuid();
public DateTime OccurredOn { get; init; } = DateTime.UtcNow;
public int Version { get; init; }
public Guid OrderId { get; init; }
public Guid ProductId { get; init; }
public int Quantity { get; init; }
public decimal UnitPrice { get; init; }
}
```
### Event-Sourced Aggregate / Aggregate Event-Sourced
```csharp
/// <summary>
/// EN: Base class for event-sourced aggregates.
/// VI: Lớp cơ sở cho aggregate event-sourced.
/// </summary>
public abstract class EventSourcedAggregate
{
private readonly List<IDomainEvent> _uncommittedEvents = new();
public Guid Id { get; protected set; }
public int Version { get; protected set; } = -1;
public IReadOnlyList<IDomainEvent> UncommittedEvents => _uncommittedEvents.AsReadOnly();
protected void Apply(IDomainEvent @event)
{
When(@event);
_uncommittedEvents.Add(@event);
Version++;
}
protected abstract void When(IDomainEvent @event);
public void Load(IEnumerable<IDomainEvent> history)
{
foreach (var @event in history)
{
When(@event);
Version++;
}
}
public void ClearUncommittedEvents() => _uncommittedEvents.Clear();
}
/// <summary>
/// EN: Order aggregate with event sourcing.
/// VI: Order aggregate với event sourcing.
/// </summary>
public class Order : EventSourcedAggregate
{
public string UserId { get; private set; } = default!;
public OrderStatus Status { get; private set; }
public decimal TotalAmount { get; private set; }
private readonly List<OrderItem> _items = new();
public IReadOnlyList<OrderItem> Items => _items.AsReadOnly();
// EN: For rehydration / VI: Để khôi phục từ events
private Order() { }
// EN: Factory method for creation / VI: Factory method để tạo mới
public static Order Create(Guid orderId, string userId, Address shippingAddress)
{
var order = new Order();
order.Apply(new OrderCreated
{
OrderId = orderId,
UserId = userId,
ShippingAddress = shippingAddress,
Version = 0
});
return order;
}
public void AddItem(Guid productId, int quantity, decimal unitPrice)
{
Apply(new OrderItemAdded
{
OrderId = Id,
ProductId = productId,
Quantity = quantity,
UnitPrice = unitPrice,
Version = Version + 1
});
}
protected override void When(IDomainEvent @event)
{
switch (@event)
{
case OrderCreated e:
Id = e.OrderId;
UserId = e.UserId;
Status = OrderStatus.Created;
break;
case OrderItemAdded e:
_items.Add(new OrderItem(e.ProductId, e.Quantity, e.UnitPrice));
TotalAmount += e.Quantity * e.UnitPrice;
break;
case OrderPaid:
Status = OrderStatus.Paid;
break;
}
}
}
```
### Event Store Repository / Repository Event Store
```csharp
/// <summary>
/// EN: Repository for event-sourced aggregates.
/// VI: Repository cho aggregate event-sourced.
/// </summary>
public interface IEventSourcedRepository<T> where T : EventSourcedAggregate
{
Task<T?> GetByIdAsync(Guid id, CancellationToken ct = default);
Task SaveAsync(T aggregate, CancellationToken ct = default);
}
/// <summary>
/// EN: EF Core implementation of event store.
/// VI: Event store triển khai với EF Core.
/// </summary>
public class EfCoreEventStore<T> : IEventSourcedRepository<T>
where T : EventSourcedAggregate, new()
{
private readonly EventStoreDbContext _context;
private readonly ILogger<EfCoreEventStore<T>> _logger;
public EfCoreEventStore(
EventStoreDbContext context,
ILogger<EfCoreEventStore<T>> logger)
{
_context = context;
_logger = logger;
}
public async Task<T?> GetByIdAsync(Guid id, CancellationToken ct = default)
{
var streamName = $"{typeof(T).Name}-{id}";
var events = await _context.Events
.Where(e => e.StreamId == streamName)
.OrderBy(e => e.Version)
.ToListAsync(ct);
if (!events.Any())
return null;
var aggregate = new T();
aggregate.Load(events.Select(e => DeserializeEvent(e)));
return aggregate;
}
public async Task SaveAsync(T aggregate, CancellationToken ct = default)
{
var streamName = $"{typeof(T).Name}-{aggregate.Id}";
var expectedVersion = aggregate.Version - aggregate.UncommittedEvents.Count;
// EN: Optimistic concurrency check
// VI: Kiểm tra optimistic concurrency
var currentVersion = await _context.Events
.Where(e => e.StreamId == streamName)
.MaxAsync(e => (int?)e.Version, ct) ?? -1;
if (currentVersion != expectedVersion)
throw new ConcurrencyException(
$"Expected version {expectedVersion} but found {currentVersion}");
foreach (var @event in aggregate.UncommittedEvents)
{
_context.Events.Add(new StoredEvent
{
Id = @event.EventId,
StreamId = streamName,
EventType = @event.GetType().AssemblyQualifiedName!,
Data = JsonSerializer.Serialize(@event, @event.GetType()),
Version = @event.Version,
OccurredOn = @event.OccurredOn
});
}
await _context.SaveChangesAsync(ct);
aggregate.ClearUncommittedEvents();
_logger.LogInformation(
"Saved {Count} events to stream {Stream}",
aggregate.UncommittedEvents.Count,
streamName);
}
}
```
### Projections / Projections
```csharp
/// <summary>
/// EN: Projection handler for order read models.
/// VI: Projection handler cho order read models.
/// </summary>
public class OrderProjection : IEventHandler<OrderCreated>, IEventHandler<OrderPaid>
{
private readonly ReadDbContext _readDb;
public OrderProjection(ReadDbContext readDb)
{
_readDb = readDb;
}
public async Task HandleAsync(OrderCreated @event, CancellationToken ct)
{
var readModel = new OrderReadModel
{
Id = @event.OrderId,
UserId = @event.UserId,
Status = "Created",
TotalAmount = @event.TotalAmount,
CreatedAt = @event.OccurredOn
};
_readDb.Orders.Add(readModel);
await _readDb.SaveChangesAsync(ct);
}
public async Task HandleAsync(OrderPaid @event, CancellationToken ct)
{
var order = await _readDb.Orders.FindAsync(@event.OrderId);
if (order != null)
{
order.Status = "Paid";
order.PaidAt = @event.OccurredOn;
await _readDb.SaveChangesAsync(ct);
}
}
}
```
## Common Mistakes / Lỗi Thường Gặp
### 1. Mutable Events
```csharp
// ❌ BAD: Mutable event
public class OrderCreated
{
public string Status { get; set; } // Mutable!
}
// ✅ GOOD: Immutable record
public record OrderCreated
{
public string Status { get; init; } // Immutable
}
```
### 2. Storing Derived Data in Events
```csharp
// ❌ BAD: Storing computed values
public record OrderItemAdded
{
public decimal UnitPrice { get; init; }
public int Quantity { get; init; }
public decimal TotalPrice { get; init; } // Derived!
}
// ✅ GOOD: Store only facts, compute when needed
public record OrderItemAdded
{
public decimal UnitPrice { get; init; }
public int Quantity { get; init; }
// TotalPrice computed in aggregate
}
```
### 3. Large Event Streams Without Snapshots
```csharp
// ❌ BAD: Loading thousands of events
var order = await _repo.GetByIdAsync(orderId); // Slow!
// ✅ GOOD: Use snapshots for performance
public async Task<Order?> GetByIdAsync(Guid id, CancellationToken ct)
{
// Load from snapshot if exists
var snapshot = await _snapshotStore.GetLatestAsync<Order>(id, ct);
var fromVersion = snapshot?.Version ?? 0;
// Load only events after snapshot
var events = await _eventStore.GetEventsAsync(id, fromVersion, ct);
var aggregate = snapshot ?? new Order();
aggregate.Load(events);
return aggregate;
}
```
## Quick Reference / Tham Chiếu Nhanh
### When to Use Event Sourcing
| Scenario | Recommendation |
|----------|---------------|
| Simple CRUD | ❌ Overkill |
| Complex domains | ✅ Use ES |
| Audit requirements | ✅ Use ES |
| Temporal queries | ✅ Use ES |
| High write volume | ⚠️ Consider carefully |
### Event Naming Conventions
| Type | Convention | Example |
|------|-----------|---------|
| Create | `{Entity}Created` | `OrderCreated` |
| Update | `{Entity}{Property}Changed` | `OrderStatusChanged` |
| Delete | `{Entity}Deleted` | `OrderDeleted` |
| Action | `{Entity}{Action}` | `OrderShipped` |
### Snapshot Strategy
| Trigger | When to Snapshot |
|---------|-----------------|
| Version-based | Every N events (e.g., 100) |
| Time-based | Every N minutes |
| Size-based | When aggregate size > threshold |
## Resources / Tài Nguyên
- [Detailed Examples](./references/REFERENCE.md) - Full code examples
- [CQRS MediatR](../cqrs-mediatr/SKILL.md) - CQRS patterns
- [Outbox Pattern](../outbox-pattern/SKILL.md) - Reliable event publishing
- [Repository Pattern](../repository-pattern/SKILL.md) - Data access patterns
- [Domain Driven Design](../domain-driven-design/SKILL.md) - DDD patterns

View File

@@ -0,0 +1,525 @@
# Event Sourcing - Reference Examples
## Complete Implementation Examples
### 1. Event Store DbContext
```csharp
/// <summary>
/// EN: DbContext for storing events.
/// VI: DbContext để lưu events.
/// </summary>
public class EventStoreDbContext : DbContext
{
public DbSet<StoredEvent> Events => Set<StoredEvent>();
public DbSet<Snapshot> Snapshots => Set<Snapshot>();
public EventStoreDbContext(DbContextOptions<EventStoreDbContext> options)
: base(options) { }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<StoredEvent>(entity =>
{
entity.ToTable("EventStore");
entity.HasKey(e => e.Id);
entity.Property(e => e.StreamId)
.HasMaxLength(200)
.IsRequired();
entity.Property(e => e.EventType)
.HasMaxLength(500)
.IsRequired();
entity.Property(e => e.Data)
.HasColumnType("jsonb")
.IsRequired();
entity.HasIndex(e => new { e.StreamId, e.Version })
.IsUnique();
});
modelBuilder.Entity<Snapshot>(entity =>
{
entity.ToTable("Snapshots");
entity.HasKey(e => e.Id);
entity.Property(e => e.AggregateId)
.IsRequired();
entity.Property(e => e.Data)
.HasColumnType("jsonb")
.IsRequired();
entity.HasIndex(e => new { e.AggregateType, e.AggregateId, e.Version })
.IsUnique();
});
}
}
/// <summary>
/// EN: Stored event entity.
/// VI: Entity lưu event.
/// </summary>
public class StoredEvent
{
public Guid Id { get; set; }
public string StreamId { get; set; } = default!;
public string EventType { get; set; } = default!;
public string Data { get; set; } = default!;
public int Version { get; set; }
public DateTime OccurredOn { get; set; }
}
/// <summary>
/// EN: Snapshot entity.
/// VI: Entity snapshot.
/// </summary>
public class Snapshot
{
public Guid Id { get; set; }
public string AggregateType { get; set; } = default!;
public Guid AggregateId { get; set; }
public string Data { get; set; } = default!;
public int Version { get; set; }
public DateTime CreatedAt { get; set; }
}
```
### 2. Complete Order Aggregate
```csharp
/// <summary>
/// EN: Complete order aggregate with all events.
/// VI: Order aggregate hoàn chỉnh với tất cả events.
/// </summary>
public class Order : EventSourcedAggregate
{
public string UserId { get; private set; } = default!;
public OrderStatus Status { get; private set; }
public decimal TotalAmount { get; private set; }
public Address? ShippingAddress { get; private set; }
public string? PaymentId { get; private set; }
public string? TrackingNumber { get; private set; }
private readonly List<OrderItem> _items = new();
public IReadOnlyList<OrderItem> Items => _items.AsReadOnly();
private Order() { }
public static Order Create(Guid orderId, string userId, Address shippingAddress)
{
if (string.IsNullOrEmpty(userId))
throw new ArgumentException("UserId is required");
var order = new Order();
order.Apply(new OrderCreated
{
OrderId = orderId,
UserId = userId,
ShippingAddress = shippingAddress,
Version = 0
});
return order;
}
public void AddItem(Guid productId, int quantity, decimal unitPrice)
{
if (Status != OrderStatus.Created)
throw new InvalidOperationException("Cannot add items to non-draft order");
if (quantity <= 0)
throw new ArgumentException("Quantity must be positive");
Apply(new OrderItemAdded
{
OrderId = Id,
ProductId = productId,
Quantity = quantity,
UnitPrice = unitPrice,
Version = Version + 1
});
}
public void MarkAsPaid(string paymentId)
{
if (Status != OrderStatus.Created)
throw new InvalidOperationException("Order is not in Created status");
Apply(new OrderPaid
{
OrderId = Id,
PaymentId = paymentId,
Version = Version + 1
});
}
public void Ship(string trackingNumber)
{
if (Status != OrderStatus.Paid)
throw new InvalidOperationException("Order must be paid before shipping");
Apply(new OrderShipped
{
OrderId = Id,
TrackingNumber = trackingNumber,
Version = Version + 1
});
}
public void Cancel(string reason)
{
if (Status is OrderStatus.Shipped or OrderStatus.Delivered)
throw new InvalidOperationException("Cannot cancel shipped/delivered order");
Apply(new OrderCancelled
{
OrderId = Id,
Reason = reason,
Version = Version + 1
});
}
protected override void When(IDomainEvent @event)
{
switch (@event)
{
case OrderCreated e:
Id = e.OrderId;
UserId = e.UserId;
ShippingAddress = e.ShippingAddress;
Status = OrderStatus.Created;
break;
case OrderItemAdded e:
_items.Add(new OrderItem(e.ProductId, e.Quantity, e.UnitPrice));
TotalAmount += e.Quantity * e.UnitPrice;
break;
case OrderPaid e:
Status = OrderStatus.Paid;
PaymentId = e.PaymentId;
break;
case OrderShipped e:
Status = OrderStatus.Shipped;
TrackingNumber = e.TrackingNumber;
break;
case OrderCancelled:
Status = OrderStatus.Cancelled;
break;
}
}
}
```
### 3. Snapshot Service
```csharp
/// <summary>
/// EN: Service for managing aggregate snapshots.
/// VI: Service quản lý snapshots của aggregates.
/// </summary>
public interface ISnapshotService
{
Task<T?> GetLatestSnapshotAsync<T>(Guid aggregateId, CancellationToken ct = default)
where T : EventSourcedAggregate;
Task SaveSnapshotAsync<T>(T aggregate, CancellationToken ct = default)
where T : EventSourcedAggregate;
}
public class SnapshotService : ISnapshotService
{
private readonly EventStoreDbContext _context;
private readonly ILogger<SnapshotService> _logger;
private const int SnapshotThreshold = 100;
public SnapshotService(
EventStoreDbContext context,
ILogger<SnapshotService> logger)
{
_context = context;
_logger = logger;
}
public async Task<T?> GetLatestSnapshotAsync<T>(
Guid aggregateId,
CancellationToken ct = default) where T : EventSourcedAggregate
{
var snapshot = await _context.Snapshots
.Where(s => s.AggregateType == typeof(T).Name
&& s.AggregateId == aggregateId)
.OrderByDescending(s => s.Version)
.FirstOrDefaultAsync(ct);
if (snapshot == null)
return null;
return JsonSerializer.Deserialize<T>(snapshot.Data);
}
public async Task SaveSnapshotAsync<T>(
T aggregate,
CancellationToken ct = default) where T : EventSourcedAggregate
{
// EN: Only snapshot if threshold reached
// VI: Chỉ snapshot nếu đạt ngưỡng
if (aggregate.Version % SnapshotThreshold != 0)
return;
var snapshot = new Snapshot
{
Id = Guid.NewGuid(),
AggregateType = typeof(T).Name,
AggregateId = aggregate.Id,
Data = JsonSerializer.Serialize(aggregate),
Version = aggregate.Version,
CreatedAt = DateTime.UtcNow
};
_context.Snapshots.Add(snapshot);
await _context.SaveChangesAsync(ct);
_logger.LogInformation(
"Created snapshot for {AggregateType} {AggregateId} at version {Version}",
typeof(T).Name,
aggregate.Id,
aggregate.Version);
}
}
```
### 4. Projection Dispatcher
```csharp
/// <summary>
/// EN: Dispatches events to projections.
/// VI: Dispatch events đến projections.
/// </summary>
public interface IProjectionDispatcher
{
Task DispatchAsync(IDomainEvent @event, CancellationToken ct = default);
}
public class ProjectionDispatcher : IProjectionDispatcher
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<ProjectionDispatcher> _logger;
public ProjectionDispatcher(
IServiceProvider serviceProvider,
ILogger<ProjectionDispatcher> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
public async Task DispatchAsync(IDomainEvent @event, CancellationToken ct = default)
{
var eventType = @event.GetType();
var handlerType = typeof(IEventHandler<>).MakeGenericType(eventType);
var handlers = _serviceProvider.GetServices(handlerType);
foreach (var handler in handlers)
{
try
{
var method = handlerType.GetMethod("HandleAsync");
await (Task)method!.Invoke(handler, new object[] { @event, ct })!;
_logger.LogDebug(
"Projected {EventType} to {HandlerType}",
eventType.Name,
handler!.GetType().Name);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to project {EventType} to {HandlerType}",
eventType.Name,
handler!.GetType().Name);
throw;
}
}
}
}
```
### 5. Command Handler with Event Sourcing
```csharp
/// <summary>
/// EN: Command handler using event sourcing.
/// VI: Command handler sử dụng event sourcing.
/// </summary>
public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, OrderResult>
{
private readonly IEventSourcedRepository<Order> _repository;
private readonly IProjectionDispatcher _projections;
private readonly ISnapshotService _snapshots;
private readonly ILogger<CreateOrderCommandHandler> _logger;
public CreateOrderCommandHandler(
IEventSourcedRepository<Order> repository,
IProjectionDispatcher projections,
ISnapshotService snapshots,
ILogger<CreateOrderCommandHandler> logger)
{
_repository = repository;
_projections = projections;
_snapshots = snapshots;
_logger = logger;
}
public async Task<OrderResult> Handle(
CreateOrderCommand request,
CancellationToken ct)
{
var orderId = Guid.NewGuid();
var order = Order.Create(orderId, request.UserId, request.ShippingAddress);
foreach (var item in request.Items)
{
order.AddItem(item.ProductId, item.Quantity, item.UnitPrice);
}
// EN: Save events to event store
// VI: Lưu events vào event store
await _repository.SaveAsync(order, ct);
// EN: Update projections
// VI: Cập nhật projections
foreach (var @event in order.UncommittedEvents)
{
await _projections.DispatchAsync(@event, ct);
}
// EN: Save snapshot if threshold reached
// VI: Tạo snapshot nếu đạt ngưỡng
await _snapshots.SaveSnapshotAsync(order, ct);
_logger.LogInformation("Order created: {OrderId}", orderId);
return new OrderResult(orderId);
}
}
```
### 6. DI Registration
```csharp
/// <summary>
/// EN: Register event sourcing services.
/// VI: Đăng ký event sourcing services.
/// </summary>
public static class EventSourcingServiceExtensions
{
public static IServiceCollection AddEventSourcing(
this IServiceCollection services,
IConfiguration configuration)
{
// EN: Register Event Store DbContext
services.AddDbContext<EventStoreDbContext>(options =>
options.UseNpgsql(configuration.GetConnectionString("EventStore")));
// EN: Register repositories
services.AddScoped(typeof(IEventSourcedRepository<>), typeof(EfCoreEventStore<>));
// EN: Register snapshot service
services.AddScoped<ISnapshotService, SnapshotService>();
// EN: Register projection dispatcher
services.AddScoped<IProjectionDispatcher, ProjectionDispatcher>();
// EN: Register projections
services.AddScoped<IEventHandler<OrderCreated>, OrderProjection>();
services.AddScoped<IEventHandler<OrderPaid>, OrderProjection>();
services.AddScoped<IEventHandler<OrderShipped>, OrderProjection>();
return services;
}
}
```
### 7. Temporal Query Example
```csharp
/// <summary>
/// EN: Get order state at specific point in time.
/// VI: Lấy trạng thái order tại thời điểm cụ thể.
/// </summary>
public class GetOrderAtTimeQueryHandler
: IRequestHandler<GetOrderAtTimeQuery, OrderStateDto?>
{
private readonly EventStoreDbContext _context;
public GetOrderAtTimeQueryHandler(EventStoreDbContext context)
{
_context = context;
}
public async Task<OrderStateDto?> Handle(
GetOrderAtTimeQuery request,
CancellationToken ct)
{
var streamId = $"Order-{request.OrderId}";
// EN: Get events up to the specified time
// VI: Lấy events đến thời điểm chỉ định
var events = await _context.Events
.Where(e => e.StreamId == streamId && e.OccurredOn <= request.AsOfTime)
.OrderBy(e => e.Version)
.ToListAsync(ct);
if (!events.Any())
return null;
// EN: Replay events to reconstruct state
// VI: Phát lại events để khôi phục trạng thái
var order = new Order();
order.Load(events.Select(DeserializeEvent));
return new OrderStateDto
{
OrderId = order.Id,
Status = order.Status.ToString(),
TotalAmount = order.TotalAmount,
ItemCount = order.Items.Count,
AsOfTime = request.AsOfTime
};
}
}
```
## Database Migration
```sql
-- EN: Create EventStore table / VI: Tạo bảng EventStore
CREATE TABLE "EventStore" (
"Id" uuid PRIMARY KEY,
"StreamId" varchar(200) NOT NULL,
"EventType" varchar(500) NOT NULL,
"Data" jsonb NOT NULL,
"Version" integer NOT NULL,
"OccurredOn" timestamp with time zone NOT NULL
);
CREATE UNIQUE INDEX "IX_EventStore_StreamId_Version"
ON "EventStore" ("StreamId", "Version");
-- EN: Create Snapshots table / VI: Tạo bảng Snapshots
CREATE TABLE "Snapshots" (
"Id" uuid PRIMARY KEY,
"AggregateType" varchar(200) NOT NULL,
"AggregateId" uuid NOT NULL,
"Data" jsonb NOT NULL,
"Version" integer NOT NULL,
"CreatedAt" timestamp with time zone NOT NULL
);
CREATE UNIQUE INDEX "IX_Snapshots_Type_Id_Version"
ON "Snapshots" ("AggregateType", "AggregateId", "Version");
```

View File

@@ -0,0 +1,422 @@
---
name: outbox-pattern
description: Transactional Outbox Pattern - đảm bảo atomicity khi publish events. Use for reliable messaging, at-least-once delivery, và event consistency.
compatibility: ".NET 10+, EF Core, MassTransit, PostgreSQL"
metadata:
author: Velik Ho
version: "1.0"
---
# Outbox Pattern / Transactional Outbox
Pattern đảm bảo tính nhất quán khi publish events từ microservices.
## When to Use This Skill / Khi Nào Sử Dụng
Use this skill when:
- Publishing integration events reliably / Publish integration events tin cậy
- Ensuring at-least-once delivery / Đảm bảo gửi ít nhất một lần
- Avoiding dual-write problem / Tránh vấn đề dual-write
- Distributed transactions not feasible / Không khả thi dùng distributed transactions
## Core Concepts / Khái Niệm Cốt Lõi
### The Dual-Write Problem / Vấn Đề Dual-Write
```
┌─────────────────────────────────────────────────────────────┐
│ ❌ DUAL-WRITE PROBLEM │
├─────────────────────────────────────────────────────────────┤
│ │
│ Service A │
│ ┌─────────────────────────────────────────────┐ │
│ │ 1. await _db.SaveChangesAsync(); ✅ │ │
│ │ 2. await _bus.PublishAsync(event); ❌ FAIL! │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ Result: DB updated BUT message NOT sent = INCONSISTENCY │
│ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ✅ OUTBOX PATTERN │
├─────────────────────────────────────────────────────────────┤
│ │
│ Service A │
│ ┌─────────────────────────────────────────────┐ │
│ │ BEGIN TRANSACTION │ │
│ │ 1. INSERT INTO Orders ... │ │
│ │ 2. INSERT INTO Outbox (event) │ │
│ │ COMMIT │ │
│ └─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Outbox Processor (Background) │ │
│ │ 1. SELECT * FROM Outbox WHERE Sent = false │ │
│ │ 2. Publish to Message Bus │ │
│ │ 3. UPDATE Outbox SET Sent = true │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ Result: ATOMIC - Both or Neither │
│ │
└─────────────────────────────────────────────────────────────┘
```
### Outbox Flow / Luồng Outbox
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Command │───▶│ Handler │───▶│ DB + Outbox│───▶│ Processor │
│ (API) │ │ (Domain) │ │ (Same Tx) │ │ (Background)│
└─────────────┘ └─────────────┘ └─────────────┘ └──────┬──────┘
┌─────────────┐
│ Message Bus │
│ (RabbitMQ) │
└─────────────┘
```
### Key Components / Thành Phần Chính
| Component | Purpose | Implementation |
|-----------|---------|----------------|
| **Outbox Table** | Store pending events | EF Core entity |
| **Outbox Processor** | Publish pending events | BackgroundService |
| **Idempotency** | Prevent duplicates | Event ID tracking |
## Key Patterns / Mẫu Chính
### Outbox Entity / Entity Outbox
```csharp
/// <summary>
/// EN: Outbox message entity for transactional messaging.
/// VI: Entity outbox message cho transactional messaging.
/// </summary>
public class OutboxMessage
{
public Guid Id { get; set; }
public string EventType { get; set; } = default!;
public string Payload { get; set; } = default!;
public DateTime CreatedAt { get; set; }
public DateTime? ProcessedAt { get; set; }
public bool IsProcessed { get; set; }
public int RetryCount { get; set; }
public string? Error { get; set; }
}
```
### DbContext Configuration / Cấu Hình DbContext
```csharp
/// <summary>
/// EN: DbContext with outbox support.
/// VI: DbContext với hỗ trợ outbox.
/// </summary>
public class OrderDbContext : DbContext
{
public DbSet<Order> Orders => Set<Order>();
public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<OutboxMessage>(entity =>
{
entity.ToTable("OutboxMessages");
entity.HasKey(e => e.Id);
entity.Property(e => e.EventType)
.HasMaxLength(500)
.IsRequired();
entity.Property(e => e.Payload)
.HasColumnType("jsonb")
.IsRequired();
entity.HasIndex(e => new { e.IsProcessed, e.CreatedAt })
.HasFilter("\"IsProcessed\" = false");
});
}
}
```
### Unit of Work with Outbox / Unit of Work với Outbox
```csharp
/// <summary>
/// EN: Unit of Work that saves domain events to outbox.
/// VI: Unit of Work lưu domain events vào outbox.
/// </summary>
public interface IUnitOfWork
{
Task<int> SaveChangesAsync(CancellationToken ct = default);
}
public class UnitOfWork : IUnitOfWork
{
private readonly OrderDbContext _context;
private readonly ILogger<UnitOfWork> _logger;
public UnitOfWork(OrderDbContext context, ILogger<UnitOfWork> logger)
{
_context = context;
_logger = logger;
}
public async Task<int> SaveChangesAsync(CancellationToken ct = default)
{
// EN: Convert domain events to outbox messages
// VI: Chuyển domain events thành outbox messages
var aggregates = _context.ChangeTracker
.Entries<IAggregateRoot>()
.Where(e => e.Entity.DomainEvents.Any())
.Select(e => e.Entity)
.ToList();
foreach (var aggregate in aggregates)
{
foreach (var domainEvent in aggregate.DomainEvents)
{
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = domainEvent.GetType().AssemblyQualifiedName!,
Payload = JsonSerializer.Serialize(domainEvent, domainEvent.GetType()),
CreatedAt = DateTime.UtcNow,
IsProcessed = false
};
_context.OutboxMessages.Add(outboxMessage);
}
aggregate.ClearDomainEvents();
}
var result = await _context.SaveChangesAsync(ct);
_logger.LogDebug("Saved {Count} changes with outbox messages", result);
return result;
}
}
```
### Outbox Processor / Outbox Processor
```csharp
/// <summary>
/// EN: Background service that processes outbox messages.
/// VI: Background service xử lý outbox messages.
/// </summary>
public class OutboxProcessor : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<OutboxProcessor> _logger;
private readonly TimeSpan _pollingInterval = TimeSpan.FromSeconds(5);
private const int BatchSize = 100;
private const int MaxRetries = 5;
public OutboxProcessor(
IServiceScopeFactory scopeFactory,
ILogger<OutboxProcessor> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
_logger.LogInformation("Outbox Processor started");
while (!ct.IsCancellationRequested)
{
try
{
await ProcessPendingMessagesAsync(ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing outbox messages");
}
await Task.Delay(_pollingInterval, ct);
}
}
private async Task ProcessPendingMessagesAsync(CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<OrderDbContext>();
var publishEndpoint = scope.ServiceProvider.GetRequiredService<IPublishEndpoint>();
var messages = await context.OutboxMessages
.Where(m => !m.IsProcessed && m.RetryCount < MaxRetries)
.OrderBy(m => m.CreatedAt)
.Take(BatchSize)
.ToListAsync(ct);
foreach (var message in messages)
{
try
{
var eventType = Type.GetType(message.EventType);
if (eventType == null)
{
_logger.LogWarning("Unknown event type: {EventType}", message.EventType);
continue;
}
var @event = JsonSerializer.Deserialize(message.Payload, eventType);
await publishEndpoint.Publish(@event!, eventType, ct);
message.IsProcessed = true;
message.ProcessedAt = DateTime.UtcNow;
_logger.LogDebug("Published outbox message {Id}", message.Id);
}
catch (Exception ex)
{
message.RetryCount++;
message.Error = ex.Message;
_logger.LogWarning(ex, "Failed to publish message {Id}, retry {Retry}",
message.Id, message.RetryCount);
}
}
await context.SaveChangesAsync(ct);
}
}
```
### Command Handler with Outbox / Command Handler với Outbox
```csharp
/// <summary>
/// EN: Command handler that raises domain events saved to outbox.
/// VI: Command handler phát domain events được lưu vào outbox.
/// </summary>
public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, OrderResult>
{
private readonly IOrderRepository _repository;
private readonly ILogger<CreateOrderCommandHandler> _logger;
public CreateOrderCommandHandler(
IOrderRepository repository,
ILogger<CreateOrderCommandHandler> logger)
{
_repository = repository;
_logger = logger;
}
public async Task<OrderResult> Handle(CreateOrderCommand request, CancellationToken ct)
{
var order = new Order(request.UserId, request.ShippingAddress);
foreach (var item in request.Items)
{
order.AddItem(item.ProductId, item.Quantity, item.UnitPrice);
}
// EN: Domain events are raised inside aggregate
// VI: Domain events được phát trong aggregate
await _repository.AddAsync(order, ct);
// EN: SaveChanges converts domain events to outbox messages
// VI: SaveChanges chuyển domain events thành outbox messages
await _repository.UnitOfWork.SaveChangesAsync(ct);
_logger.LogInformation("Order created: {OrderId}", order.Id);
return new OrderResult(order.Id);
}
}
```
## Common Mistakes / Lỗi Thường Gặp
### 1. Publishing Before Saving
```csharp
// ❌ BAD: Publish before DB commit
await _publishEndpoint.Publish(new OrderCreatedEvent(order.Id));
await _dbContext.Orders.AddAsync(order);
await _dbContext.SaveChangesAsync();
// If SaveChanges fails, message is already sent!
// ✅ GOOD: Use outbox pattern
order.AddDomainEvent(new OrderCreatedEvent(order.Id));
await _dbContext.Orders.AddAsync(order);
await _dbContext.SaveChangesAsync(); // Saves both order AND outbox in same transaction
```
### 2. No Idempotency in Consumers
```csharp
// ❌ BAD: No idempotency check
public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
{
await _service.ProcessOrder(context.Message.OrderId);
// May process duplicate messages!
}
// ✅ GOOD: Idempotent consumer
public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
{
var eventId = context.MessageId;
if (await _processedMessages.ExistsAsync(eventId))
return; // Already processed
await _service.ProcessOrder(context.Message.OrderId);
await _processedMessages.MarkAsync(eventId);
}
```
### 3. No Retry Limit
```csharp
// ❌ BAD: Infinite retries
var messages = await context.OutboxMessages
.Where(m => !m.IsProcessed) // Will keep retrying failed messages forever
.ToListAsync();
// ✅ GOOD: Retry limit with dead letter
var messages = await context.OutboxMessages
.Where(m => !m.IsProcessed && m.RetryCount < MaxRetries)
.ToListAsync();
// Move failed messages to dead letter after max retries
```
## Quick Reference / Tham Chiếu Nhanh
### Outbox Table Schema
| Column | Type | Purpose |
|--------|------|---------|
| Id | UUID | Primary key |
| EventType | VARCHAR(500) | Full type name for deserialization |
| Payload | JSONB | Serialized event data |
| CreatedAt | TIMESTAMP | When event was created |
| ProcessedAt | TIMESTAMP | When event was published |
| IsProcessed | BOOLEAN | Processing status |
| RetryCount | INT | Failed attempts |
| Error | TEXT | Last error message |
### Delivery Guarantees
| Pattern | Guarantee | Trade-off |
|---------|-----------|-----------|
| Direct publish | Best-effort | May lose messages |
| Outbox | At-least-once | May duplicate messages |
| Inbox + Outbox | Exactly-once | More complexity |
## Resources / Tài Nguyên
- [Detailed Examples](./references/REFERENCE.md) - Full code examples
- [Event Sourcing](../event-sourcing/SKILL.md) - Event-based persistence
- [Inter-service Communication](../inter-service-communication/SKILL.md) - MassTransit
- [SAGA Pattern](../saga-pattern/SKILL.md) - Distributed transactions
- [Repository Pattern](../repository-pattern/SKILL.md) - Unit of Work

View File

@@ -0,0 +1,509 @@
# Outbox Pattern - Reference Examples
## Complete Implementation Examples
### 1. Complete Outbox Entity Configuration
```csharp
/// <summary>
/// EN: EF Core configuration for OutboxMessage.
/// VI: Cấu hình EF Core cho OutboxMessage.
/// </summary>
public class OutboxMessageConfiguration : IEntityTypeConfiguration<OutboxMessage>
{
public void Configure(EntityTypeBuilder<OutboxMessage> builder)
{
builder.ToTable("OutboxMessages");
builder.HasKey(e => e.Id);
builder.Property(e => e.EventType)
.HasMaxLength(500)
.IsRequired();
builder.Property(e => e.Payload)
.HasColumnType("jsonb")
.IsRequired();
builder.Property(e => e.CreatedAt)
.IsRequired();
builder.Property(e => e.Error)
.HasMaxLength(2000);
// EN: Index for fast retrieval of unprocessed messages
// VI: Index để truy vấn nhanh messages chưa xử lý
builder.HasIndex(e => new { e.IsProcessed, e.CreatedAt })
.HasFilter("\"IsProcessed\" = false")
.HasDatabaseName("IX_OutboxMessages_Pending");
// EN: Index for retry tracking
// VI: Index để theo dõi retry
builder.HasIndex(e => e.RetryCount)
.HasFilter("\"IsProcessed\" = false AND \"RetryCount\" >= 3")
.HasDatabaseName("IX_OutboxMessages_FailedRetries");
}
}
```
### 2. Inbox Table for Idempotency
```csharp
/// <summary>
/// EN: Inbox message for tracking processed events.
/// VI: Inbox message để theo dõi events đã xử lý.
/// </summary>
public class InboxMessage
{
public Guid Id { get; set; } // Same as incoming MessageId
public string ConsumerType { get; set; } = default!;
public DateTime ProcessedAt { get; set; }
}
/// <summary>
/// EN: Inbox repository for idempotency.
/// VI: Repository inbox cho idempotency.
/// </summary>
public class InboxRepository : IInboxRepository
{
private readonly AppDbContext _context;
public InboxRepository(AppDbContext context)
{
_context = context;
}
public async Task<bool> ExistsAsync(Guid messageId, string consumerType, CancellationToken ct)
{
return await _context.InboxMessages
.AnyAsync(m => m.Id == messageId && m.ConsumerType == consumerType, ct);
}
public async Task MarkAsProcessedAsync(Guid messageId, string consumerType, CancellationToken ct)
{
_context.InboxMessages.Add(new InboxMessage
{
Id = messageId,
ConsumerType = consumerType,
ProcessedAt = DateTime.UtcNow
});
await _context.SaveChangesAsync(ct);
}
}
```
### 3. Idempotent Consumer Base Class
```csharp
/// <summary>
/// EN: Base class for idempotent message consumers.
/// VI: Lớp cơ sở cho consumer idempotent.
/// </summary>
public abstract class IdempotentConsumer<TMessage> : IConsumer<TMessage>
where TMessage : class
{
private readonly IInboxRepository _inbox;
private readonly ILogger _logger;
protected IdempotentConsumer(
IInboxRepository inbox,
ILogger logger)
{
_inbox = inbox;
_logger = logger;
}
public async Task Consume(ConsumeContext<TMessage> context)
{
var messageId = context.MessageId ?? Guid.NewGuid();
var consumerType = GetType().Name;
// EN: Check if already processed
// VI: Kiểm tra đã xử lý chưa
if (await _inbox.ExistsAsync(messageId, consumerType, context.CancellationToken))
{
_logger.LogInformation(
"Message {MessageId} already processed by {Consumer}, skipping",
messageId, consumerType);
return;
}
try
{
await HandleAsync(context.Message, context.CancellationToken);
// EN: Mark as processed
// VI: Đánh dấu đã xử lý
await _inbox.MarkAsProcessedAsync(messageId, consumerType, context.CancellationToken);
_logger.LogInformation(
"Processed message {MessageId} by {Consumer}",
messageId, consumerType);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to process message {MessageId} by {Consumer}",
messageId, consumerType);
throw;
}
}
protected abstract Task HandleAsync(TMessage message, CancellationToken ct);
}
/// <summary>
/// EN: Example idempotent consumer.
/// VI: Ví dụ consumer idempotent.
/// </summary>
public class OrderCreatedConsumer : IdempotentConsumer<OrderCreatedIntegrationEvent>
{
private readonly IInventoryService _inventory;
public OrderCreatedConsumer(
IInboxRepository inbox,
IInventoryService inventory,
ILogger<OrderCreatedConsumer> logger)
: base(inbox, logger)
{
_inventory = inventory;
}
protected override async Task HandleAsync(
OrderCreatedIntegrationEvent message,
CancellationToken ct)
{
foreach (var item in message.Items)
{
await _inventory.ReserveAsync(
item.ProductId,
item.Quantity,
message.OrderId,
ct);
}
}
}
```
### 4. Enhanced Outbox Processor with Parallel Processing
```csharp
/// <summary>
/// EN: Enhanced outbox processor with parallel processing.
/// VI: Outbox processor nâng cao với xử lý song song.
/// </summary>
public class EnhancedOutboxProcessor : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<EnhancedOutboxProcessor> _logger;
private readonly OutboxOptions _options;
public EnhancedOutboxProcessor(
IServiceScopeFactory scopeFactory,
IOptions<OutboxOptions> options,
ILogger<EnhancedOutboxProcessor> logger)
{
_scopeFactory = scopeFactory;
_options = options.Value;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
_logger.LogInformation(
"Outbox Processor started with {Interval}s interval, batch size {BatchSize}",
_options.PollingIntervalSeconds,
_options.BatchSize);
using var timer = new PeriodicTimer(
TimeSpan.FromSeconds(_options.PollingIntervalSeconds));
while (await timer.WaitForNextTickAsync(ct))
{
try
{
var processedCount = await ProcessBatchAsync(ct);
if (processedCount > 0)
_logger.LogInformation("Processed {Count} outbox messages", processedCount);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogError(ex, "Error processing outbox batch");
}
}
}
private async Task<int> ProcessBatchAsync(CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var publishEndpoint = scope.ServiceProvider.GetRequiredService<IPublishEndpoint>();
// EN: Use SKIP LOCKED for concurrent processing
// VI: Dùng SKIP LOCKED cho xử lý đồng thời
var messages = await context.OutboxMessages
.FromSqlRaw(@"
SELECT * FROM ""OutboxMessages""
WHERE ""IsProcessed"" = false
AND ""RetryCount"" < @p0
ORDER BY ""CreatedAt""
LIMIT @p1
FOR UPDATE SKIP LOCKED",
_options.MaxRetries,
_options.BatchSize)
.ToListAsync(ct);
if (!messages.Any())
return 0;
// EN: Process in parallel with semaphore
// VI: Xử lý song song với semaphore
using var semaphore = new SemaphoreSlim(_options.MaxParallelism);
var tasks = messages.Select(async message =>
{
await semaphore.WaitAsync(ct);
try
{
await ProcessMessageAsync(message, publishEndpoint, ct);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks);
await context.SaveChangesAsync(ct);
return messages.Count;
}
private async Task ProcessMessageAsync(
OutboxMessage message,
IPublishEndpoint publishEndpoint,
CancellationToken ct)
{
try
{
var eventType = Type.GetType(message.EventType);
if (eventType == null)
{
message.Error = $"Unknown event type: {message.EventType}";
message.IsProcessed = true; // Dead letter
return;
}
var @event = JsonSerializer.Deserialize(message.Payload, eventType);
await publishEndpoint.Publish(@event!, eventType, ct);
message.IsProcessed = true;
message.ProcessedAt = DateTime.UtcNow;
message.Error = null;
}
catch (Exception ex)
{
message.RetryCount++;
message.Error = ex.Message.Length > 2000
? ex.Message[..2000]
: ex.Message;
_logger.LogWarning(ex,
"Failed to publish message {Id}, retry {Retry}/{Max}",
message.Id, message.RetryCount, _options.MaxRetries);
}
}
}
public class OutboxOptions
{
public int PollingIntervalSeconds { get; set; } = 5;
public int BatchSize { get; set; } = 100;
public int MaxRetries { get; set; } = 5;
public int MaxParallelism { get; set; } = 4;
}
```
### 5. MassTransit Outbox Integration
```csharp
/// <summary>
/// EN: Configure MassTransit with built-in outbox.
/// VI: Cấu hình MassTransit với outbox tích hợp.
/// </summary>
public static class MassTransitOutboxExtensions
{
public static IServiceCollection AddMassTransitWithOutbox(
this IServiceCollection services,
IConfiguration configuration)
{
services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>();
x.AddConsumer<PaymentProcessedConsumer>();
// EN: Add Entity Framework Outbox
// VI: Thêm Entity Framework Outbox
x.AddEntityFrameworkOutbox<AppDbContext>(o =>
{
o.UsePostgres();
o.UseBusOutbox();
// EN: Configure delivery service
// VI: Cấu hình delivery service
o.QueryDelay = TimeSpan.FromSeconds(5);
o.QueryMessageLimit = 100;
o.QueryTimeout = TimeSpan.FromSeconds(30);
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(configuration["RabbitMQ:Host"], "/", h =>
{
h.Username(configuration["RabbitMQ:Username"]!);
h.Password(configuration["RabbitMQ:Password"]!);
});
cfg.ConfigureEndpoints(context);
});
});
return services;
}
}
/// <summary>
/// EN: Using MassTransit outbox in handler.
/// VI: Sử dụng MassTransit outbox trong handler.
/// </summary>
public class CreateOrderHandler : IRequestHandler<CreateOrderCommand, OrderResult>
{
private readonly AppDbContext _context;
private readonly IPublishEndpoint _publishEndpoint;
public CreateOrderHandler(
AppDbContext context,
IPublishEndpoint publishEndpoint)
{
_context = context;
_publishEndpoint = publishEndpoint;
}
public async Task<OrderResult> Handle(CreateOrderCommand request, CancellationToken ct)
{
var order = new Order(request.UserId, request.Address);
_context.Orders.Add(order);
// EN: MassTransit automatically saves to outbox table
// VI: MassTransit tự động lưu vào outbox table
await _publishEndpoint.Publish(new OrderCreatedIntegrationEvent
{
OrderId = order.Id,
UserId = order.UserId
}, ct);
// EN: Both order and outbox message saved in same transaction
// VI: Cả order và outbox message được lưu trong cùng transaction
await _context.SaveChangesAsync(ct);
return new OrderResult(order.Id);
}
}
```
### 6. Cleanup Job for Processed Messages
```csharp
/// <summary>
/// EN: Background job to clean up processed outbox messages.
/// VI: Job background để dọn dẹp outbox messages đã xử lý.
/// </summary>
public class OutboxCleanupJob : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<OutboxCleanupJob> _logger;
private readonly TimeSpan _cleanupInterval = TimeSpan.FromHours(1);
private readonly TimeSpan _retentionPeriod = TimeSpan.FromDays(7);
public OutboxCleanupJob(
IServiceScopeFactory scopeFactory,
ILogger<OutboxCleanupJob> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(_cleanupInterval, ct);
try
{
await CleanupAsync(ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during outbox cleanup");
}
}
}
private async Task CleanupAsync(CancellationToken ct)
{
using var scope = _scopeFactory.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var cutoffDate = DateTime.UtcNow - _retentionPeriod;
var deletedCount = await context.OutboxMessages
.Where(m => m.IsProcessed && m.ProcessedAt < cutoffDate)
.ExecuteDeleteAsync(ct);
if (deletedCount > 0)
_logger.LogInformation("Cleaned up {Count} processed outbox messages", deletedCount);
// EN: Clean up inbox messages too
// VI: Dọn dẹp inbox messages
var inboxDeletedCount = await context.InboxMessages
.Where(m => m.ProcessedAt < cutoffDate)
.ExecuteDeleteAsync(ct);
if (inboxDeletedCount > 0)
_logger.LogInformation("Cleaned up {Count} inbox messages", inboxDeletedCount);
}
}
```
## Database Migrations
```sql
-- EN: Create OutboxMessages table / VI: Tạo bảng OutboxMessages
CREATE TABLE "OutboxMessages" (
"Id" uuid PRIMARY KEY,
"EventType" varchar(500) NOT NULL,
"Payload" jsonb NOT NULL,
"CreatedAt" timestamp with time zone NOT NULL,
"ProcessedAt" timestamp with time zone,
"IsProcessed" boolean NOT NULL DEFAULT false,
"RetryCount" integer NOT NULL DEFAULT 0,
"Error" varchar(2000)
);
CREATE INDEX "IX_OutboxMessages_Pending"
ON "OutboxMessages" ("IsProcessed", "CreatedAt")
WHERE "IsProcessed" = false;
-- EN: Create InboxMessages table / VI: Tạo bảng InboxMessages
CREATE TABLE "InboxMessages" (
"Id" uuid NOT NULL,
"ConsumerType" varchar(500) NOT NULL,
"ProcessedAt" timestamp with time zone NOT NULL,
PRIMARY KEY ("Id", "ConsumerType")
);
CREATE INDEX "IX_InboxMessages_ProcessedAt"
ON "InboxMessages" ("ProcessedAt");
```

View File

@@ -0,0 +1,519 @@
---
name: saga-pattern
description: SAGA Pattern - quản lý distributed transactions qua chuỗi local transactions với compensating actions. Use for checkout flows, order processing, và multi-service orchestration.
compatibility: ".NET 10+, MassTransit, RabbitMQ, EF Core"
metadata:
author: Velik Ho
version: "1.0"
---
# SAGA Pattern / Mẫu SAGA
Pattern quản lý distributed transactions trong microservices bằng chuỗi local transactions.
## When to Use This Skill / Khi Nào Sử Dụng
Use this skill when:
- Multi-service transactions required / Cần transactions đa dịch vụ
- Distributed transaction (2PC) not feasible / 2PC không khả thi
- Long-running business processes / Quy trình nghiệp vụ chạy lâu
- Need compensating transactions / Cần transactions bù trừ
## Core Concepts / Khái Niệm Cốt Lõi
### Why SAGA? / Tại Sao Dùng SAGA?
```
┌─────────────────────────────────────────────────────────────┐
│ ❌ DISTRIBUTED TRANSACTION (2PC) - PROBLEMS │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌───────┐ ┌───────┐ ┌───────┐ │
│ │Order │────▶│Payment│────▶│Inventory│ │
│ │ DB │ │ DB │ │ DB │ │
│ └───────┘ └───────┘ └───────┘ │
│ │ │ │ │
│ └─────────────┴─────────────┘ │
│ Transaction Coordinator │
│ │
│ Problems: │
│ • Locks resources across services │
│ • Single point of failure │
│ • Poor performance │
│ • Not always supported (NoSQL, external APIs) │
│ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ✅ SAGA - LOCAL TRANSACTIONS │
├─────────────────────────────────────────────────────────────┤
│ │
│ Step 1 Step 2 Step 3 │
│ ┌───────┐ ┌───────┐ ┌───────┐ │
│ │Create │─────▶│Reserve│─────▶│Charge │ │
│ │Order │ │Stock │ │Payment│ │
│ └───────┘ └───────┘ └───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ [Local Tx] [Local Tx] [Local Tx] │
│ │
│ If Step 3 fails: │
│ ┌───────┐ ┌───────┐ │
│ │Release│◀─────│Cancel │ │
│ │Stock │ │Order │ │
│ └───────┘ └───────┘ │
│ Compensate Compensate │
│ │
└─────────────────────────────────────────────────────────────┘
```
### Choreography vs Orchestration
```
┌─────────────────────────────────────────────────────────────┐
│ CHOREOGRAPHY │
├─────────────────────────────────────────────────────────────┤
│ │
│ Order Event Bus Inventory Payment │
│ │ │ │ │ │
│ │──Create──▶│ │ │ │
│ │ │──OrderCreated─▶│ │ │
│ │ │ │──Reserve──▶│ │
│ │ │ │ │──Charge────▶ │
│ │ │◀─────PaymentProcessed──────│ │
│ │
│ ✅ Loose coupling, simple services │
│ ❌ Hard to track flow, complex rollback │
│ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ORCHESTRATION │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ SAGA │ │
│ │ Orchestrator│ │
│ └─────┬───────┘ │
│ ┌──────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Order │ │Inventory│ │ Payment │ │
│ │ Service │ │ Service │ │ Service │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ ✅ Clear flow, centralized error handling │
│ ❌ More coupling, orchestrator complexity │
│ │
└─────────────────────────────────────────────────────────────┘
```
### SAGA Components / Thành Phần SAGA
| Component | Purpose | Example |
|-----------|---------|---------|
| **Saga State Machine** | Tracks saga state | OrderSaga |
| **Saga Step** | Individual transaction | ReserveInventory |
| **Compensating Action** | Undo step on failure | ReleaseInventory |
| **Saga Data** | Shared state across steps | OrderId, Items |
## Key Patterns / Mẫu Chính
### Saga State Machine with MassTransit
```csharp
/// <summary>
/// EN: Order saga state tracking.
/// VI: Theo dõi state của Order saga.
/// </summary>
public class OrderSagaState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; } = default!;
// EN: Saga data / VI: Dữ liệu saga
public Guid OrderId { get; set; }
public string UserId { get; set; } = default!;
public decimal TotalAmount { get; set; }
public string? PaymentId { get; set; }
public string? FailureReason { get; set; }
}
/// <summary>
/// EN: Order saga state machine definition.
/// VI: Định nghĩa state machine cho Order saga.
/// </summary>
public class OrderSaga : MassTransitStateMachine<OrderSagaState>
{
// EN: States / VI: Các trạng thái
public State OrderCreated { get; private set; } = null!;
public State InventoryReserved { get; private set; } = null!;
public State PaymentProcessed { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Failed { get; private set; } = null!;
// EN: Events / VI: Các sự kiện
public Event<OrderSubmitted> OrderSubmittedEvent { get; private set; } = null!;
public Event<InventoryReserved> InventoryReservedEvent { get; private set; } = null!;
public Event<InventoryReservationFailed> InventoryFailedEvent { get; private set; } = null!;
public Event<PaymentProcessed> PaymentProcessedEvent { get; private set; } = null!;
public Event<PaymentFailed> PaymentFailedEvent { get; private set; } = null!;
public OrderSaga()
{
InstanceState(x => x.CurrentState);
// EN: Define events / VI: Định nghĩa events
Event(() => OrderSubmittedEvent, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => InventoryReservedEvent, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => InventoryFailedEvent, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PaymentProcessedEvent, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PaymentFailedEvent, x => x.CorrelateById(m => m.Message.OrderId));
// EN: Define state transitions / VI: Định nghĩa chuyển trạng thái
Initially(
When(OrderSubmittedEvent)
.Then(context =>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.UserId = context.Message.UserId;
context.Saga.TotalAmount = context.Message.TotalAmount;
})
.Publish(context => new ReserveInventory
{
OrderId = context.Saga.OrderId,
Items = context.Message.Items
})
.TransitionTo(OrderCreated)
);
During(OrderCreated,
When(InventoryReservedEvent)
.Publish(context => new ProcessPayment
{
OrderId = context.Saga.OrderId,
UserId = context.Saga.UserId,
Amount = context.Saga.TotalAmount
})
.TransitionTo(InventoryReserved),
When(InventoryFailedEvent)
.Then(context => context.Saga.FailureReason = context.Message.Reason)
.Publish(context => new CancelOrder
{
OrderId = context.Saga.OrderId,
Reason = context.Message.Reason
})
.TransitionTo(Failed)
);
During(InventoryReserved,
When(PaymentProcessedEvent)
.Then(context => context.Saga.PaymentId = context.Message.PaymentId)
.Publish(context => new CompleteOrder
{
OrderId = context.Saga.OrderId,
PaymentId = context.Message.PaymentId
})
.TransitionTo(Completed)
.Finalize(),
When(PaymentFailedEvent)
.Then(context => context.Saga.FailureReason = context.Message.Reason)
// EN: Compensating action - release inventory
// VI: Action bù trừ - giải phóng inventory
.Publish(context => new ReleaseInventory
{
OrderId = context.Saga.OrderId
})
.Publish(context => new CancelOrder
{
OrderId = context.Saga.OrderId,
Reason = context.Message.Reason
})
.TransitionTo(Failed)
);
SetCompletedWhenFinalized();
}
}
```
### Saga Events / Events của SAGA
```csharp
/// <summary>
/// EN: Events for order saga.
/// VI: Events cho order saga.
/// </summary>
// EN: Trigger events / VI: Events kích hoạt
public record OrderSubmitted
{
public Guid OrderId { get; init; }
public string UserId { get; init; } = default!;
public decimal TotalAmount { get; init; }
public List<OrderItemInfo> Items { get; init; } = new();
}
// EN: Command messages / VI: Command messages
public record ReserveInventory
{
public Guid OrderId { get; init; }
public List<OrderItemInfo> Items { get; init; } = new();
}
public record ProcessPayment
{
public Guid OrderId { get; init; }
public string UserId { get; init; } = default!;
public decimal Amount { get; init; }
}
public record ReleaseInventory // Compensating
{
public Guid OrderId { get; init; }
}
public record CancelOrder // Compensating
{
public Guid OrderId { get; init; }
public string Reason { get; init; } = default!;
}
public record CompleteOrder
{
public Guid OrderId { get; init; }
public string PaymentId { get; init; } = default!;
}
// EN: Response events / VI: Events phản hồi
public record InventoryReserved
{
public Guid OrderId { get; init; }
}
public record InventoryReservationFailed
{
public Guid OrderId { get; init; }
public string Reason { get; init; } = default!;
}
public record PaymentProcessed
{
public Guid OrderId { get; init; }
public string PaymentId { get; init; } = default!;
}
public record PaymentFailed
{
public Guid OrderId { get; init; }
public string Reason { get; init; } = default!;
}
```
### Saga Consumer / Consumer của SAGA
```csharp
/// <summary>
/// EN: Inventory service consumer for saga.
/// VI: Consumer của Inventory service cho saga.
/// </summary>
public class ReserveInventoryConsumer : IConsumer<ReserveInventory>
{
private readonly IInventoryRepository _repository;
private readonly ILogger<ReserveInventoryConsumer> _logger;
public ReserveInventoryConsumer(
IInventoryRepository repository,
ILogger<ReserveInventoryConsumer> logger)
{
_repository = repository;
_logger = logger;
}
public async Task Consume(ConsumeContext<ReserveInventory> context)
{
var message = context.Message;
try
{
foreach (var item in message.Items)
{
var success = await _repository.ReserveAsync(
item.ProductId,
item.Quantity,
message.OrderId,
context.CancellationToken);
if (!success)
{
// EN: Not enough stock, publish failure
// VI: Không đủ hàng, publish failure
await context.Publish(new InventoryReservationFailed
{
OrderId = message.OrderId,
Reason = $"Insufficient stock for product {item.ProductId}"
});
return;
}
}
// EN: All items reserved successfully
// VI: Tất cả items đã được reserve thành công
await context.Publish(new InventoryReserved
{
OrderId = message.OrderId
});
_logger.LogInformation("Inventory reserved for order {OrderId}", message.OrderId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to reserve inventory for order {OrderId}", message.OrderId);
await context.Publish(new InventoryReservationFailed
{
OrderId = message.OrderId,
Reason = ex.Message
});
}
}
}
/// <summary>
/// EN: Compensating consumer to release inventory.
/// VI: Consumer bù trừ để giải phóng inventory.
/// </summary>
public class ReleaseInventoryConsumer : IConsumer<ReleaseInventory>
{
private readonly IInventoryRepository _repository;
private readonly ILogger<ReleaseInventoryConsumer> _logger;
public ReleaseInventoryConsumer(
IInventoryRepository repository,
ILogger<ReleaseInventoryConsumer> logger)
{
_repository = repository;
_logger = logger;
}
public async Task Consume(ConsumeContext<ReleaseInventory> context)
{
await _repository.ReleaseReservationAsync(
context.Message.OrderId,
context.CancellationToken);
_logger.LogInformation(
"Released inventory reservation for order {OrderId}",
context.Message.OrderId);
}
}
```
## Common Mistakes / Lỗi Thường Gặp
### 1. Missing Compensating Actions
```csharp
// ❌ BAD: No compensation handling
During(InventoryReserved,
When(PaymentFailedEvent)
.TransitionTo(Failed) // Inventory still reserved!
);
// ✅ GOOD: Proper compensation
During(InventoryReserved,
When(PaymentFailedEvent)
.Publish(context => new ReleaseInventory
{
OrderId = context.Saga.OrderId
})
.TransitionTo(Failed)
);
```
### 2. Non-Idempotent Compensations
```csharp
// ❌ BAD: Can fail if called twice
public async Task Consume(ConsumeContext<ReleaseInventory> context)
{
var reservation = await _repo.GetReservationAsync(context.Message.OrderId);
await _repo.DeleteAsync(reservation); // Throws if already deleted
}
// ✅ GOOD: Idempotent compensation
public async Task Consume(ConsumeContext<ReleaseInventory> context)
{
var reservation = await _repo.GetReservationAsync(context.Message.OrderId);
if (reservation != null)
{
await _repo.DeleteAsync(reservation);
}
// Safe to call multiple times
}
```
### 3. No Timeout Handling
```csharp
// ❌ BAD: Saga can hang forever
public OrderSaga()
{
During(OrderCreated,
When(InventoryReservedEvent)
.TransitionTo(InventoryReserved)
);
// What if InventoryReservedEvent never arrives?
}
// ✅ GOOD: Add timeout handling
public OrderSaga()
{
Schedule(() => ReservationTimeout, x => x.ReservationTimeoutToken,
s => s.Delay = TimeSpan.FromMinutes(5));
During(OrderCreated,
When(InventoryReservedEvent)
.Unschedule(ReservationTimeout)
.TransitionTo(InventoryReserved),
When(ReservationTimeout.Received)
.Then(ctx => ctx.Saga.FailureReason = "Reservation timeout")
.Publish(ctx => new CancelOrder { OrderId = ctx.Saga.OrderId })
.TransitionTo(Failed)
);
}
```
## Quick Reference / Tham Chiếu Nhanh
### Choreography vs Orchestration
| Aspect | Choreography | Orchestration |
|--------|-------------|---------------|
| Coupling | Low | Higher |
| Complexity | Distributed | Centralized |
| Debugging | Hard | Easier |
| Single point of failure | No | Yes (orchestrator) |
| Best for | Simple flows | Complex flows |
### SAGA Design Checklist
| Step | Required |
|------|----------|
| Define all saga steps | ✅ |
| Define compensating action for each step | ✅ |
| Make compensations idempotent | ✅ |
| Add timeout handling | ✅ |
| Log state transitions | ✅ |
| Design for partial failures | ✅ |
## Resources / Tài Nguyên
- [Detailed Examples](./references/REFERENCE.md) - Full code examples
- [Outbox Pattern](../outbox-pattern/SKILL.md) - Reliable event publishing
- [Inter-service Communication](../inter-service-communication/SKILL.md) - MassTransit
- [Event Sourcing](../event-sourcing/SKILL.md) - Event-based persistence
- [Error Handling](../error-handling-patterns/SKILL.md) - Resilience patterns

View File

@@ -0,0 +1,513 @@
# SAGA Pattern - Reference Examples
## Complete Implementation Examples
### 1. Complete MassTransit Configuration
```csharp
/// <summary>
/// EN: Configure MassTransit with saga support.
/// VI: Cấu hình MassTransit với hỗ trợ saga.
/// </summary>
public static class SagaServiceExtensions
{
public static IServiceCollection AddSagaSupport(
this IServiceCollection services,
IConfiguration configuration)
{
services.AddMassTransit(x =>
{
// EN: Add saga state machine
// VI: Thêm saga state machine
x.AddSagaStateMachine<OrderSaga, OrderSagaState>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
r.AddDbContext<DbContext, SagaDbContext>((provider, builder) =>
{
builder.UseNpgsql(configuration.GetConnectionString("SagaDb"));
});
});
// EN: Add consumers
x.AddConsumer<ReserveInventoryConsumer>();
x.AddConsumer<ReleaseInventoryConsumer>();
x.AddConsumer<ProcessPaymentConsumer>();
x.AddConsumer<RefundPaymentConsumer>();
x.AddConsumer<CompleteOrderConsumer>();
x.AddConsumer<CancelOrderConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(configuration["RabbitMQ:Host"], "/", h =>
{
h.Username(configuration["RabbitMQ:Username"]!);
h.Password(configuration["RabbitMQ:Password"]!);
});
// EN: Configure retry policy
cfg.UseMessageRetry(r => r.Intervals(
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(15)));
cfg.ConfigureEndpoints(context);
});
});
return services;
}
}
```
### 2. Saga DbContext
```csharp
/// <summary>
/// EN: DbContext for saga state persistence.
/// VI: DbContext để lưu trữ saga state.
/// </summary>
public class SagaDbContext : DbContext
{
public DbSet<OrderSagaState> OrderSagas => Set<OrderSagaState>();
public SagaDbContext(DbContextOptions<SagaDbContext> options)
: base(options) { }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<OrderSagaState>(entity =>
{
entity.ToTable("OrderSagas");
entity.HasKey(x => x.CorrelationId);
entity.Property(x => x.CurrentState)
.HasMaxLength(100)
.IsRequired();
entity.Property(x => x.UserId)
.HasMaxLength(100);
entity.Property(x => x.PaymentId)
.HasMaxLength(100);
entity.Property(x => x.FailureReason)
.HasMaxLength(500);
// EN: Optimistic concurrency
entity.Property(x => x.RowVersion)
.IsRowVersion();
entity.HasIndex(x => x.OrderId);
entity.HasIndex(x => x.CurrentState);
});
}
}
/// <summary>
/// EN: Extended saga state with row version.
/// VI: Saga state mở rộng với row version.
/// </summary>
public class OrderSagaState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; } = default!;
public Guid OrderId { get; set; }
public string UserId { get; set; } = default!;
public decimal TotalAmount { get; set; }
public string? PaymentId { get; set; }
public string? FailureReason { get; set; }
// EN: For timeout scheduling
public Guid? ReservationTimeoutToken { get; set; }
public Guid? PaymentTimeoutToken { get; set; }
// EN: Optimistic concurrency
public byte[] RowVersion { get; set; } = null!;
}
```
### 3. Complete Saga with Timeouts
```csharp
/// <summary>
/// EN: Complete order saga with timeout handling.
/// VI: Order saga hoàn chỉnh với xử lý timeout.
/// </summary>
public class OrderSagaWithTimeouts : MassTransitStateMachine<OrderSagaState>
{
public State OrderCreated { get; private set; } = null!;
public State InventoryReserved { get; private set; } = null!;
public State PaymentProcessing { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Failed { get; private set; } = null!;
public State Compensating { get; private set; } = null!;
public Event<OrderSubmitted> OrderSubmittedEvent { get; private set; } = null!;
public Event<InventoryReserved> InventoryReservedEvent { get; private set; } = null!;
public Event<InventoryReservationFailed> InventoryFailedEvent { get; private set; } = null!;
public Event<PaymentProcessed> PaymentProcessedEvent { get; private set; } = null!;
public Event<PaymentFailed> PaymentFailedEvent { get; private set; } = null!;
public Event<InventoryReleased> InventoryReleasedEvent { get; private set; } = null!;
// EN: Timeout schedules / VI: Lịch timeout
public Schedule<OrderSagaState, ReservationTimeout> ReservationTimeoutSchedule { get; private set; } = null!;
public Schedule<OrderSagaState, PaymentTimeout> PaymentTimeoutSchedule { get; private set; } = null!;
public OrderSagaWithTimeouts()
{
InstanceState(x => x.CurrentState);
// EN: Configure events
ConfigureEvents();
// EN: Configure timeouts
ConfigureTimeouts();
// EN: Configure state machine
ConfigureStateMachine();
SetCompletedWhenFinalized();
}
private void ConfigureEvents()
{
Event(() => OrderSubmittedEvent, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => InventoryReservedEvent, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => InventoryFailedEvent, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PaymentProcessedEvent, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PaymentFailedEvent, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => InventoryReleasedEvent, x => x.CorrelateById(m => m.Message.OrderId));
}
private void ConfigureTimeouts()
{
Schedule(() => ReservationTimeoutSchedule,
x => x.ReservationTimeoutToken,
s => s.Delay = TimeSpan.FromMinutes(5));
Schedule(() => PaymentTimeoutSchedule,
x => x.PaymentTimeoutToken,
s => s.Delay = TimeSpan.FromMinutes(10));
}
private void ConfigureStateMachine()
{
Initially(
When(OrderSubmittedEvent)
.Then(InitializeSagaData)
.Schedule(ReservationTimeoutSchedule, ctx => new ReservationTimeout
{
OrderId = ctx.Saga.OrderId
})
.Publish(ctx => new ReserveInventory
{
OrderId = ctx.Saga.OrderId,
Items = ctx.Message.Items
})
.TransitionTo(OrderCreated)
);
During(OrderCreated,
When(InventoryReservedEvent)
.Unschedule(ReservationTimeoutSchedule)
.Schedule(PaymentTimeoutSchedule, ctx => new PaymentTimeout
{
OrderId = ctx.Saga.OrderId
})
.Publish(ctx => new ProcessPayment
{
OrderId = ctx.Saga.OrderId,
UserId = ctx.Saga.UserId,
Amount = ctx.Saga.TotalAmount
})
.TransitionTo(InventoryReserved),
When(InventoryFailedEvent)
.Unschedule(ReservationTimeoutSchedule)
.Then(ctx => ctx.Saga.FailureReason = ctx.Message.Reason)
.Publish(ctx => new CancelOrder
{
OrderId = ctx.Saga.OrderId,
Reason = ctx.Message.Reason
})
.TransitionTo(Failed),
When(ReservationTimeoutSchedule.Received)
.Then(ctx => ctx.Saga.FailureReason = "Inventory reservation timeout")
.Publish(ctx => new CancelOrder
{
OrderId = ctx.Saga.OrderId,
Reason = "Timeout waiting for inventory"
})
.TransitionTo(Failed)
);
During(InventoryReserved,
When(PaymentProcessedEvent)
.Unschedule(PaymentTimeoutSchedule)
.Then(ctx => ctx.Saga.PaymentId = ctx.Message.PaymentId)
.Publish(ctx => new CompleteOrder
{
OrderId = ctx.Saga.OrderId,
PaymentId = ctx.Message.PaymentId
})
.TransitionTo(Completed)
.Finalize(),
When(PaymentFailedEvent)
.Unschedule(PaymentTimeoutSchedule)
.Then(ctx => ctx.Saga.FailureReason = ctx.Message.Reason)
.Publish(ctx => new ReleaseInventory
{
OrderId = ctx.Saga.OrderId
})
.TransitionTo(Compensating),
When(PaymentTimeoutSchedule.Received)
.Then(ctx => ctx.Saga.FailureReason = "Payment processing timeout")
.Publish(ctx => new ReleaseInventory
{
OrderId = ctx.Saga.OrderId
})
.TransitionTo(Compensating)
);
During(Compensating,
When(InventoryReleasedEvent)
.Publish(ctx => new CancelOrder
{
OrderId = ctx.Saga.OrderId,
Reason = ctx.Saga.FailureReason ?? "Unknown error"
})
.TransitionTo(Failed)
);
}
private static void InitializeSagaData(BehaviorContext<OrderSagaState, OrderSubmitted> ctx)
{
ctx.Saga.OrderId = ctx.Message.OrderId;
ctx.Saga.UserId = ctx.Message.UserId;
ctx.Saga.TotalAmount = ctx.Message.TotalAmount;
}
}
public record ReservationTimeout { public Guid OrderId { get; init; } }
public record PaymentTimeout { public Guid OrderId { get; init; } }
```
### 4. Choreography-Style Saga
```csharp
/// <summary>
/// EN: Choreography-style saga using domain events.
/// VI: Saga kiểu choreography dùng domain events.
/// </summary>
// EN: Order Service publishes event after creation
public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, OrderResult>
{
private readonly IOrderRepository _repository;
private readonly IPublishEndpoint _publishEndpoint;
public async Task<OrderResult> Handle(CreateOrderCommand request, CancellationToken ct)
{
var order = new Order(request.UserId, request.Address);
foreach (var item in request.Items)
order.AddItem(item.ProductId, item.Quantity, item.UnitPrice);
await _repository.AddAsync(order, ct);
await _repository.UnitOfWork.SaveChangesAsync(ct);
// EN: Publish event for choreography
// VI: Publish event cho choreography
await _publishEndpoint.Publish(new OrderCreatedIntegrationEvent
{
OrderId = order.Id,
UserId = order.UserId,
TotalAmount = order.TotalAmount,
Items = order.Items.Select(i => new OrderItemInfo(
i.ProductId, i.Quantity, i.UnitPrice)).ToList()
}, ct);
return new OrderResult(order.Id);
}
}
// EN: Inventory Service reacts to OrderCreated
public class OrderCreatedInventoryConsumer : IConsumer<OrderCreatedIntegrationEvent>
{
private readonly IInventoryService _inventory;
private readonly IPublishEndpoint _publishEndpoint;
public async Task Consume(ConsumeContext<OrderCreatedIntegrationEvent> context)
{
try
{
foreach (var item in context.Message.Items)
{
await _inventory.ReserveAsync(
item.ProductId,
item.Quantity,
context.Message.OrderId);
}
// EN: Notify success
await context.Publish(new InventoryReservedIntegrationEvent
{
OrderId = context.Message.OrderId
});
}
catch (InsufficientStockException ex)
{
// EN: Notify failure
await context.Publish(new InventoryReservationFailedIntegrationEvent
{
OrderId = context.Message.OrderId,
Reason = ex.Message
});
}
}
}
// EN: Payment Service reacts to InventoryReserved
public class InventoryReservedPaymentConsumer : IConsumer<InventoryReservedIntegrationEvent>
{
private readonly IPaymentService _payment;
private readonly IOrderRepository _orders;
private readonly IPublishEndpoint _publishEndpoint;
public async Task Consume(ConsumeContext<InventoryReservedIntegrationEvent> context)
{
var order = await _orders.GetByIdAsync(context.Message.OrderId);
try
{
var paymentId = await _payment.ChargeAsync(
order!.UserId,
order.TotalAmount);
await context.Publish(new PaymentProcessedIntegrationEvent
{
OrderId = context.Message.OrderId,
PaymentId = paymentId
});
}
catch (PaymentFailedException ex)
{
await context.Publish(new PaymentFailedIntegrationEvent
{
OrderId = context.Message.OrderId,
Reason = ex.Message
});
}
}
}
// EN: Inventory Service compensates on payment failure
public class PaymentFailedInventoryConsumer : IConsumer<PaymentFailedIntegrationEvent>
{
private readonly IInventoryService _inventory;
public async Task Consume(ConsumeContext<PaymentFailedIntegrationEvent> context)
{
// EN: Compensating action
await _inventory.ReleaseReservationAsync(context.Message.OrderId);
}
}
```
### 5. Saga Monitoring and Querying
```csharp
/// <summary>
/// EN: Query service for saga status.
/// VI: Service truy vấn trạng thái saga.
/// </summary>
public class SagaQueryService
{
private readonly SagaDbContext _context;
public SagaQueryService(SagaDbContext context)
{
_context = context;
}
public async Task<OrderSagaStatusDto?> GetOrderSagaStatusAsync(
Guid orderId,
CancellationToken ct = default)
{
var saga = await _context.OrderSagas
.FirstOrDefaultAsync(s => s.OrderId == orderId, ct);
if (saga == null)
return null;
return new OrderSagaStatusDto
{
OrderId = saga.OrderId,
CurrentState = saga.CurrentState,
PaymentId = saga.PaymentId,
FailureReason = saga.FailureReason,
IsCompleted = saga.CurrentState == "Completed",
IsFailed = saga.CurrentState == "Failed"
};
}
public async Task<List<OrderSagaStatusDto>> GetStuckSagasAsync(
TimeSpan maxAge,
CancellationToken ct = default)
{
var cutoff = DateTime.UtcNow - maxAge;
return await _context.OrderSagas
.Where(s => s.CurrentState != "Completed"
&& s.CurrentState != "Failed"
&& s.CreatedAt < cutoff)
.Select(s => new OrderSagaStatusDto
{
OrderId = s.OrderId,
CurrentState = s.CurrentState,
FailureReason = "Potentially stuck"
})
.ToListAsync(ct);
}
}
public record OrderSagaStatusDto
{
public Guid OrderId { get; init; }
public string CurrentState { get; init; } = default!;
public string? PaymentId { get; init; }
public string? FailureReason { get; init; }
public bool IsCompleted { get; init; }
public bool IsFailed { get; init; }
}
```
## Database Migrations
```sql
-- EN: Create OrderSagas table / VI: Tạo bảng OrderSagas
CREATE TABLE "OrderSagas" (
"CorrelationId" uuid PRIMARY KEY,
"CurrentState" varchar(100) NOT NULL,
"OrderId" uuid NOT NULL,
"UserId" varchar(100),
"TotalAmount" decimal(18,2) NOT NULL DEFAULT 0,
"PaymentId" varchar(100),
"FailureReason" varchar(500),
"ReservationTimeoutToken" uuid,
"PaymentTimeoutToken" uuid,
"RowVersion" bytea NOT NULL,
"CreatedAt" timestamp with time zone NOT NULL DEFAULT NOW()
);
CREATE INDEX "IX_OrderSagas_OrderId" ON "OrderSagas" ("OrderId");
CREATE INDEX "IX_OrderSagas_CurrentState" ON "OrderSagas" ("CurrentState");
CREATE INDEX "IX_OrderSagas_StuckSagas"
ON "OrderSagas" ("CurrentState", "CreatedAt")
WHERE "CurrentState" NOT IN ('Completed', 'Failed');
```

View File

@@ -0,0 +1,518 @@
---
name: service-discovery
description: Service Discovery và Service Registry patterns. Use for dynamic service location, health checking, và load balancing trong microservices.
compatibility: ".NET 10+, Consul, Kubernetes, DNS"
metadata:
author: Velik Ho
version: "1.0"
---
# Service Discovery / Service Discovery Pattern
Patterns cho service discovery và dynamic service location trong microservices.
## When to Use This Skill / Khi Nào Sử Dụng
Use this skill when:
- Services need to find each other dynamically / Services cần tìm nhau động
- Deploying to Kubernetes / Triển khai trên Kubernetes
- Implementing client-side load balancing / Triển khai load balancing phía client
- Health checking service instances / Kiểm tra health của service instances
## Core Concepts / Khái Niệm Cốt Lõi
### Service Discovery Patterns
```
┌─────────────────────────────────────────────────────────────┐
│ SERVICE DISCOVERY PATTERNS │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ CLIENT-SIDE DISCOVERY │ │
│ │ │ │
│ │ Client ──Query──▶ Registry ──▶ Service List │ │
│ │ │ │ │
│ │ └──Direct Call──▶ Service Instance │ │
│ │ │ │
│ │ Pros: Simpler, fewer hops │ │
│ │ Cons: Client needs discovery logic │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SERVER-SIDE DISCOVERY │ │
│ │ │ │
│ │ Client ──▶ Load Balancer ──Query──▶ Registry │ │
│ │ │ │ │
│ │ └──▶ Service Instance │ │
│ │ │ │
│ │ Pros: Client stays simple │ │
│ │ Cons: Extra network hop │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
```
### Service Registry / Registry dịch vụ
```
┌─────────────────────────────────────────────────────────────┐
│ SERVICE REGISTRY │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Service: user-service │ │
│ │ Instances: │ │
│ │ - host: 10.0.1.10, port: 5001, health: ✅ │ │
│ │ - host: 10.0.1.11, port: 5001, health: ✅ │ │
│ │ - host: 10.0.1.12, port: 5001, health: ❌ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Service: order-service │ │
│ │ Instances: │ │
│ │ - host: 10.0.2.10, port: 5002, health: ✅ │ │
│ │ - host: 10.0.2.11, port: 5002, health: ✅ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
```
### Discovery Options Comparison
| Option | Type | Complexity | Best For |
|--------|------|------------|----------|
| **Kubernetes DNS** | Server-side | Low | K8s deployments |
| **Consul** | Client-side | Medium | Multi-platform |
| **Eureka** | Client-side | Medium | Spring ecosystem |
| **DNS + Load Balancer** | Server-side | Low | Simple setups |
## Key Patterns / Mẫu Chính
### Kubernetes Service Discovery
```yaml
# EN: Kubernetes Service for internal discovery
# VI: Kubernetes Service cho discovery nội bộ
apiVersion: v1
kind: Service
metadata:
name: user-service
namespace: goodgo
spec:
selector:
app: user-service
ports:
- port: 80
targetPort: 5001
type: ClusterIP
---
# EN: Deployment with health probes
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
namespace: goodgo
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: goodgo/user-service:latest
ports:
- containerPort: 5001
livenessProbe:
httpGet:
path: /health/live
port: 5001
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 5001
initialDelaySeconds: 5
periodSeconds: 5
```
### Using Kubernetes DNS in .NET
```csharp
/// <summary>
/// EN: HTTP client using Kubernetes DNS.
/// VI: HTTP client sử dụng Kubernetes DNS.
/// </summary>
// Program.cs
builder.Services.AddHttpClient<IUserServiceClient, UserServiceClient>(client =>
{
// EN: Use Kubernetes service DNS name
// VI: Sử dụng Kubernetes service DNS name
client.BaseAddress = new Uri("http://user-service.goodgo.svc.cluster.local");
})
.AddStandardResilienceHandler();
// appsettings.json for different environments
{
"Services": {
"UserService": {
"BaseUrl": "http://user-service.goodgo.svc.cluster.local" // K8s
// "BaseUrl": "http://localhost:5001" // Local dev
}
}
}
```
### Consul Service Registration
```csharp
/// <summary>
/// EN: Register service with Consul.
/// VI: Đăng ký service với Consul.
/// </summary>
public static class ConsulServiceExtensions
{
public static IServiceCollection AddConsulServiceDiscovery(
this IServiceCollection services,
IConfiguration configuration)
{
services.AddSingleton<IConsulClient, ConsulClient>(sp =>
{
var consulAddress = configuration["Consul:Address"];
return new ConsulClient(cfg =>
{
cfg.Address = new Uri(consulAddress!);
});
});
services.AddHostedService<ConsulRegistrationService>();
return services;
}
}
/// <summary>
/// EN: Background service for Consul registration.
/// VI: Background service cho Consul registration.
/// </summary>
public class ConsulRegistrationService : IHostedService
{
private readonly IConsulClient _consulClient;
private readonly IConfiguration _configuration;
private readonly ILogger<ConsulRegistrationService> _logger;
private string? _registrationId;
public ConsulRegistrationService(
IConsulClient consulClient,
IConfiguration configuration,
ILogger<ConsulRegistrationService> logger)
{
_consulClient = consulClient;
_configuration = configuration;
_logger = logger;
}
public async Task StartAsync(CancellationToken ct)
{
var serviceName = _configuration["Service:Name"]!;
var serviceHost = _configuration["Service:Host"]!;
var servicePort = int.Parse(_configuration["Service:Port"]!);
_registrationId = $"{serviceName}-{Guid.NewGuid():N}";
var registration = new AgentServiceRegistration
{
ID = _registrationId,
Name = serviceName,
Address = serviceHost,
Port = servicePort,
Tags = new[] { "api", "v1" },
Check = new AgentServiceCheck
{
HTTP = $"http://{serviceHost}:{servicePort}/health",
Interval = TimeSpan.FromSeconds(10),
Timeout = TimeSpan.FromSeconds(5),
DeregisterCriticalServiceAfter = TimeSpan.FromMinutes(1)
}
};
await _consulClient.Agent.ServiceRegister(registration, ct);
_logger.LogInformation(
"Registered service {ServiceName} with Consul as {RegistrationId}",
serviceName, _registrationId);
}
public async Task StopAsync(CancellationToken ct)
{
if (_registrationId != null)
{
await _consulClient.Agent.ServiceDeregister(_registrationId, ct);
_logger.LogInformation("Deregistered service {RegistrationId}", _registrationId);
}
}
}
```
### Client-Side Load Balancing with Consul
```csharp
/// <summary>
/// EN: HTTP client with Consul-based service discovery.
/// VI: HTTP client với service discovery dựa trên Consul.
/// </summary>
public class ConsulServiceDiscoveryClient
{
private readonly IConsulClient _consulClient;
private readonly ILogger<ConsulServiceDiscoveryClient> _logger;
public ConsulServiceDiscoveryClient(
IConsulClient consulClient,
ILogger<ConsulServiceDiscoveryClient> logger)
{
_consulClient = consulClient;
_logger = logger;
}
public async Task<Uri?> GetServiceUriAsync(
string serviceName,
CancellationToken ct = default)
{
var services = await _consulClient.Health.Service(
serviceName,
tag: null,
passingOnly: true,
ct);
if (services.Response.Length == 0)
{
_logger.LogWarning("No healthy instances found for {ServiceName}", serviceName);
return null;
}
// EN: Simple round-robin load balancing
// VI: Load balancing round-robin đơn giản
var service = services.Response[Random.Shared.Next(services.Response.Length)];
var uri = new Uri($"http://{service.Service.Address}:{service.Service.Port}");
_logger.LogDebug(
"Resolved {ServiceName} to {Uri}",
serviceName, uri);
return uri;
}
}
/// <summary>
/// EN: HTTP message handler with service discovery.
/// VI: HTTP message handler với service discovery.
/// </summary>
public class ServiceDiscoveryHandler : DelegatingHandler
{
private readonly ConsulServiceDiscoveryClient _discovery;
private readonly string _serviceName;
public ServiceDiscoveryHandler(
ConsulServiceDiscoveryClient discovery,
string serviceName)
{
_discovery = discovery;
_serviceName = serviceName;
}
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken ct)
{
var serviceUri = await _discovery.GetServiceUriAsync(_serviceName, ct);
if (serviceUri == null)
throw new ServiceNotFoundException(_serviceName);
// EN: Replace the request URI with discovered service
// VI: Thay thế URI request với service được discover
var builder = new UriBuilder(request.RequestUri!)
{
Scheme = serviceUri.Scheme,
Host = serviceUri.Host,
Port = serviceUri.Port
};
request.RequestUri = builder.Uri;
return await base.SendAsync(request, ct);
}
}
```
### Health Check Endpoints
```csharp
/// <summary>
/// EN: Health check endpoints for service discovery.
/// VI: Health check endpoints cho service discovery.
/// </summary>
// Program.cs
builder.Services.AddHealthChecks()
.AddDbContextCheck<AppDbContext>("database")
.AddRedis(builder.Configuration.GetConnectionString("Redis")!, "redis")
.AddRabbitMQ(builder.Configuration.GetConnectionString("RabbitMQ")!, "rabbitmq");
var app = builder.Build();
// EN: Liveness probe - is the service running?
app.MapHealthChecks("/health/live", new HealthCheckOptions
{
Predicate = _ => false, // No checks, just confirm the process is running
ResponseWriter = WriteMinimalResponse
});
// EN: Readiness probe - can the service handle requests?
app.MapHealthChecks("/health/ready", new HealthCheckOptions
{
Predicate = check => check.Tags.Contains("ready"),
ResponseWriter = WriteDetailedResponse
});
// EN: Full health check for monitoring
app.MapHealthChecks("/health", new HealthCheckOptions
{
ResponseWriter = WriteDetailedResponse
});
static Task WriteMinimalResponse(HttpContext context, HealthReport report)
{
context.Response.ContentType = "text/plain";
return context.Response.WriteAsync(report.Status.ToString());
}
static Task WriteDetailedResponse(HttpContext context, HealthReport report)
{
context.Response.ContentType = "application/json";
var response = new
{
status = report.Status.ToString(),
checks = report.Entries.Select(e => new
{
name = e.Key,
status = e.Value.Status.ToString(),
description = e.Value.Description,
duration = e.Value.Duration.TotalMilliseconds
}),
totalDuration = report.TotalDuration.TotalMilliseconds
};
return context.Response.WriteAsJsonAsync(response);
}
```
## Common Mistakes / Lỗi Thường Gặp
### 1. Hardcoded Service URLs
```csharp
// ❌ BAD: Hardcoded URLs
var client = new HttpClient
{
BaseAddress = new Uri("http://10.0.1.50:5001") // What if IP changes?
};
// ✅ GOOD: Use service discovery
var client = new HttpClient
{
BaseAddress = new Uri("http://user-service.goodgo.svc.cluster.local")
};
// Or with Consul
var serviceUri = await _discovery.GetServiceUriAsync("user-service");
```
### 2. No Health Checks
```yaml
# ❌ BAD: No health probes
spec:
containers:
- name: api
image: my-api:latest
# ✅ GOOD: With health probes
spec:
containers:
- name: api
image: my-api:latest
livenessProbe:
httpGet:
path: /health/live
port: 80
readinessProbe:
httpGet:
path: /health/ready
port: 80
```
### 3. Missing Retry on Discovery Failure
```csharp
// ❌ BAD: Single attempt
var service = await _discovery.GetServiceAsync("order-service");
await _client.GetAsync(service.Uri + "/api/orders");
// ✅ GOOD: With retry and fallback
var services = await _discovery.GetHealthyServicesAsync("order-service");
foreach (var service in services)
{
try
{
return await _client.GetAsync(service.Uri + "/api/orders");
}
catch (HttpRequestException)
{
continue; // Try next instance
}
}
throw new ServiceUnavailableException("order-service");
```
## Quick Reference / Tham Chiếu Nhanh
### Kubernetes Service DNS Format
| Type | Format | Example |
|------|--------|---------|
| Same namespace | `<service-name>` | `user-service` |
| Different namespace | `<service>.<namespace>` | `user-service.goodgo` |
| Full FQDN | `<service>.<namespace>.svc.cluster.local` | `user-service.goodgo.svc.cluster.local` |
### Consul Health Check Types
| Type | Use For |
|------|---------|
| HTTP | REST APIs |
| TCP | Databases, caches |
| gRPC | gRPC services |
| Script | Custom checks |
### Health Probe Configuration
| Probe | Purpose | Failure Action |
|-------|---------|----------------|
| Liveness | Is process alive? | Restart container |
| Readiness | Can handle traffic? | Remove from LB |
| Startup | Is starting up? | Wait before checking |
## Resources / Tài Nguyên
- [Detailed Examples](./references/REFERENCE.md) - Full code examples
- [Deployment Kubernetes](../deployment-kubernetes/SKILL.md) - K8s patterns
- [Docker Traefik](../docker-traefik/SKILL.md) - Container networking
- [Inter-service Communication](../inter-service-communication/SKILL.md) - HTTP clients
- [Error Handling](../error-handling-patterns/SKILL.md) - Resilience patterns

View File

@@ -0,0 +1,559 @@
# Service Discovery - Reference Examples
## Complete Implementation Examples
### 1. Complete Consul Integration
```csharp
/// <summary>
/// EN: Complete Consul service discovery setup.
/// VI: Cài đặt Consul service discovery hoàn chỉnh.
/// </summary>
// Extensions/ConsulExtensions.cs
public static class ConsulExtensions
{
public static IServiceCollection AddConsulDiscovery(
this IServiceCollection services,
IConfiguration configuration)
{
var consulConfig = configuration.GetSection("Consul").Get<ConsulConfig>()!;
// EN: Register Consul client
services.AddSingleton<IConsulClient>(new ConsulClient(cfg =>
{
cfg.Address = new Uri(consulConfig.Address);
if (!string.IsNullOrEmpty(consulConfig.Token))
{
cfg.Token = consulConfig.Token;
}
}));
// EN: Register service discovery
services.AddSingleton<IServiceDiscovery, ConsulServiceDiscovery>();
// EN: Register background service for registration
services.AddHostedService<ConsulRegistrationHostedService>();
return services;
}
public static IHttpClientBuilder AddServiceDiscovery(
this IHttpClientBuilder builder,
string serviceName)
{
builder.AddHttpMessageHandler(sp =>
{
var discovery = sp.GetRequiredService<IServiceDiscovery>();
return new ServiceDiscoveryDelegatingHandler(discovery, serviceName);
});
return builder;
}
}
public class ConsulConfig
{
public string Address { get; set; } = "http://localhost:8500";
public string? Token { get; set; }
public string ServiceName { get; set; } = default!;
public string ServiceHost { get; set; } = default!;
public int ServicePort { get; set; }
public string[] Tags { get; set; } = Array.Empty<string>();
}
```
### 2. Service Discovery Interface and Implementation
```csharp
/// <summary>
/// EN: Service discovery abstraction.
/// VI: Abstraction cho service discovery.
/// </summary>
public interface IServiceDiscovery
{
Task<ServiceInstance?> GetServiceAsync(string serviceName, CancellationToken ct = default);
Task<IReadOnlyList<ServiceInstance>> GetServicesAsync(string serviceName, CancellationToken ct = default);
Task RegisterAsync(ServiceRegistration registration, CancellationToken ct = default);
Task DeregisterAsync(string serviceId, CancellationToken ct = default);
}
public record ServiceInstance(
string Id,
string Name,
string Host,
int Port,
IReadOnlyDictionary<string, string> Metadata)
{
public Uri Uri => new($"http://{Host}:{Port}");
}
public record ServiceRegistration(
string Name,
string Host,
int Port,
string[] Tags,
TimeSpan HealthCheckInterval);
/// <summary>
/// EN: Consul implementation of service discovery.
/// VI: Triển khai Consul cho service discovery.
/// </summary>
public class ConsulServiceDiscovery : IServiceDiscovery
{
private readonly IConsulClient _consulClient;
private readonly ILogger<ConsulServiceDiscovery> _logger;
private readonly ConcurrentDictionary<string, DateTime> _lastCacheTime = new();
private readonly ConcurrentDictionary<string, IReadOnlyList<ServiceInstance>> _cache = new();
private readonly TimeSpan _cacheDuration = TimeSpan.FromSeconds(10);
public ConsulServiceDiscovery(
IConsulClient consulClient,
ILogger<ConsulServiceDiscovery> logger)
{
_consulClient = consulClient;
_logger = logger;
}
public async Task<ServiceInstance?> GetServiceAsync(
string serviceName,
CancellationToken ct = default)
{
var services = await GetServicesAsync(serviceName, ct);
if (services.Count == 0)
return null;
// EN: Random selection for load balancing
// VI: Chọn ngẫu nhiên để load balancing
return services[Random.Shared.Next(services.Count)];
}
public async Task<IReadOnlyList<ServiceInstance>> GetServicesAsync(
string serviceName,
CancellationToken ct = default)
{
// EN: Check cache
if (_lastCacheTime.TryGetValue(serviceName, out var lastTime) &&
DateTime.UtcNow - lastTime < _cacheDuration &&
_cache.TryGetValue(serviceName, out var cached))
{
return cached;
}
// EN: Query Consul for healthy services
var result = await _consulClient.Health.Service(
serviceName,
tag: null,
passingOnly: true,
ct);
var instances = result.Response
.Select(s => new ServiceInstance(
s.Service.ID,
s.Service.Service,
s.Service.Address,
s.Service.Port,
s.Service.Meta ?? new Dictionary<string, string>()))
.ToList();
// EN: Update cache
_cache[serviceName] = instances;
_lastCacheTime[serviceName] = DateTime.UtcNow;
_logger.LogDebug(
"Discovered {Count} instances for {ServiceName}",
instances.Count, serviceName);
return instances;
}
public async Task RegisterAsync(
ServiceRegistration registration,
CancellationToken ct = default)
{
var serviceId = $"{registration.Name}-{Guid.NewGuid():N}";
var consulRegistration = new AgentServiceRegistration
{
ID = serviceId,
Name = registration.Name,
Address = registration.Host,
Port = registration.Port,
Tags = registration.Tags,
Check = new AgentServiceCheck
{
HTTP = $"http://{registration.Host}:{registration.Port}/health",
Interval = registration.HealthCheckInterval,
Timeout = TimeSpan.FromSeconds(5),
DeregisterCriticalServiceAfter = TimeSpan.FromMinutes(1)
}
};
await _consulClient.Agent.ServiceRegister(consulRegistration, ct);
_logger.LogInformation(
"Registered service {ServiceName} as {ServiceId}",
registration.Name, serviceId);
}
public async Task DeregisterAsync(string serviceId, CancellationToken ct = default)
{
await _consulClient.Agent.ServiceDeregister(serviceId, ct);
_logger.LogInformation("Deregistered service {ServiceId}", serviceId);
}
}
```
### 3. Service Discovery HTTP Handler
```csharp
/// <summary>
/// EN: HTTP message handler that resolves service URIs via discovery.
/// VI: HTTP message handler giải quyết URIs qua discovery.
/// </summary>
public class ServiceDiscoveryDelegatingHandler : DelegatingHandler
{
private readonly IServiceDiscovery _discovery;
private readonly string _serviceName;
private readonly ILogger<ServiceDiscoveryDelegatingHandler> _logger;
public ServiceDiscoveryDelegatingHandler(
IServiceDiscovery discovery,
string serviceName)
{
_discovery = discovery;
_serviceName = serviceName;
_logger = LoggerFactory.Create(b => b.AddConsole())
.CreateLogger<ServiceDiscoveryDelegatingHandler>();
}
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken ct)
{
var services = await _discovery.GetServicesAsync(_serviceName, ct);
if (services.Count == 0)
{
throw new ServiceNotFoundException(_serviceName);
}
// EN: Try each healthy instance
// VI: Thử từng instance healthy
var shuffled = services.OrderBy(_ => Random.Shared.Next()).ToList();
Exception? lastException = null;
foreach (var service in shuffled)
{
try
{
// EN: Clone request for retry
using var clonedRequest = await CloneRequestAsync(request);
// EN: Update request URI
var builder = new UriBuilder(clonedRequest.RequestUri!)
{
Scheme = service.Uri.Scheme,
Host = service.Uri.Host,
Port = service.Uri.Port
};
clonedRequest.RequestUri = builder.Uri;
_logger.LogDebug(
"Sending request to {Uri}",
clonedRequest.RequestUri);
return await base.SendAsync(clonedRequest, ct);
}
catch (HttpRequestException ex)
{
_logger.LogWarning(ex,
"Request to {Service} failed, trying next instance",
service.Uri);
lastException = ex;
}
}
throw new ServiceUnavailableException(
_serviceName,
"All service instances failed",
lastException);
}
private static async Task<HttpRequestMessage> CloneRequestAsync(
HttpRequestMessage request)
{
var clone = new HttpRequestMessage(request.Method, request.RequestUri);
foreach (var header in request.Headers)
{
clone.Headers.TryAddWithoutValidation(header.Key, header.Value);
}
if (request.Content != null)
{
var content = await request.Content.ReadAsByteArrayAsync();
clone.Content = new ByteArrayContent(content);
foreach (var header in request.Content.Headers)
{
clone.Content.Headers.TryAddWithoutValidation(header.Key, header.Value);
}
}
return clone;
}
}
public class ServiceNotFoundException : Exception
{
public string ServiceName { get; }
public ServiceNotFoundException(string serviceName)
: base($"No healthy instances found for service: {serviceName}")
{
ServiceName = serviceName;
}
}
public class ServiceUnavailableException : Exception
{
public string ServiceName { get; }
public ServiceUnavailableException(
string serviceName,
string message,
Exception? innerException = null)
: base($"Service {serviceName} unavailable: {message}", innerException)
{
ServiceName = serviceName;
}
}
```
### 4. Kubernetes-Native Service Discovery
```csharp
/// <summary>
/// EN: Service discovery for Kubernetes using DNS.
/// VI: Service discovery cho Kubernetes dùng DNS.
/// </summary>
public class KubernetesServiceDiscovery : IServiceDiscovery
{
private readonly IConfiguration _configuration;
private readonly ILogger<KubernetesServiceDiscovery> _logger;
public KubernetesServiceDiscovery(
IConfiguration configuration,
ILogger<KubernetesServiceDiscovery> logger)
{
_configuration = configuration;
_logger = logger;
}
public Task<ServiceInstance?> GetServiceAsync(
string serviceName,
CancellationToken ct = default)
{
// EN: In Kubernetes, use DNS for service discovery
// VI: Trong Kubernetes, dùng DNS cho service discovery
var namespace_ = _configuration["Kubernetes:Namespace"] ?? "default";
var host = $"{serviceName}.{namespace_}.svc.cluster.local";
var port = GetServicePort(serviceName);
var instance = new ServiceInstance(
$"{serviceName}-k8s",
serviceName,
host,
port,
new Dictionary<string, string>());
return Task.FromResult<ServiceInstance?>(instance);
}
public Task<IReadOnlyList<ServiceInstance>> GetServicesAsync(
string serviceName,
CancellationToken ct = default)
{
// EN: For K8s, we typically use the service DNS which load balances automatically
// VI: Với K8s, ta thường dùng service DNS vốn tự động load balance
var instance = GetServiceAsync(serviceName, ct).Result;
return Task.FromResult<IReadOnlyList<ServiceInstance>>(
instance != null ? new[] { instance } : Array.Empty<ServiceInstance>());
}
public Task RegisterAsync(ServiceRegistration registration, CancellationToken ct = default)
{
// EN: In Kubernetes, registration happens via Pod labels/Service selectors
// VI: Trong Kubernetes, đăng ký qua Pod labels/Service selectors
_logger.LogInformation(
"Kubernetes service registration is automatic via Pod labels");
return Task.CompletedTask;
}
public Task DeregisterAsync(string serviceId, CancellationToken ct = default)
{
// EN: In Kubernetes, deregistration happens when Pod terminates
// VI: Trong Kubernetes, hủy đăng ký khi Pod kết thúc
_logger.LogInformation(
"Kubernetes service deregistration is automatic when Pod terminates");
return Task.CompletedTask;
}
private int GetServicePort(string serviceName)
{
// EN: Get port from configuration or use default
// VI: Lấy port từ config hoặc dùng mặc định
var configKey = $"Services:{serviceName}:Port";
return _configuration.GetValue<int?>(configKey) ?? 80;
}
}
/// <summary>
/// EN: Extension to configure service discovery based on environment.
/// VI: Extension cấu hình service discovery theo môi trường.
/// </summary>
public static class ServiceDiscoveryExtensions
{
public static IServiceCollection AddServiceDiscovery(
this IServiceCollection services,
IConfiguration configuration)
{
var discoveryType = configuration.GetValue<string>("ServiceDiscovery:Type");
switch (discoveryType?.ToLower())
{
case "consul":
services.AddConsulDiscovery(configuration);
break;
case "kubernetes":
default:
services.AddSingleton<IServiceDiscovery, KubernetesServiceDiscovery>();
break;
}
return services;
}
}
```
### 5. Complete Usage Example
```csharp
/// <summary>
/// EN: Complete example using service discovery.
/// VI: Ví dụ hoàn chỉnh sử dụng service discovery.
/// </summary>
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// EN: Add service discovery
builder.Services.AddServiceDiscovery(builder.Configuration);
// EN: Add HTTP clients with discovery
builder.Services.AddHttpClient<IUserServiceClient, UserServiceClient>()
.AddServiceDiscovery("user-service")
.AddStandardResilienceHandler();
builder.Services.AddHttpClient<IOrderServiceClient, OrderServiceClient>()
.AddServiceDiscovery("order-service")
.AddStandardResilienceHandler();
// EN: Add health checks
builder.Services.AddHealthChecks()
.AddCheck<ServiceDiscoveryHealthCheck>("service-discovery");
var app = builder.Build();
app.MapHealthChecks("/health");
app.MapControllers();
app.Run();
// HealthChecks/ServiceDiscoveryHealthCheck.cs
public class ServiceDiscoveryHealthCheck : IHealthCheck
{
private readonly IServiceDiscovery _discovery;
private readonly string[] _requiredServices = { "user-service", "order-service" };
public ServiceDiscoveryHealthCheck(IServiceDiscovery discovery)
{
_discovery = discovery;
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
var missingServices = new List<string>();
foreach (var service in _requiredServices)
{
var instances = await _discovery.GetServicesAsync(service, ct);
if (instances.Count == 0)
{
missingServices.Add(service);
}
}
if (missingServices.Any())
{
return HealthCheckResult.Unhealthy(
$"Missing services: {string.Join(", ", missingServices)}");
}
return HealthCheckResult.Healthy("All required services available");
}
}
```
## Docker Compose for Local Development
```yaml
version: '3.8'
services:
consul:
image: consul:1.15
ports:
- "8500:8500"
command: agent -dev -ui -client=0.0.0.0
user-service:
build:
context: ./src/UserService
environment:
- ASPNETCORE_ENVIRONMENT=Development
- Consul__Address=http://consul:8500
- Service__Name=user-service
- Service__Host=user-service
- Service__Port=80
depends_on:
- consul
order-service:
build:
context: ./src/OrderService
environment:
- ASPNETCORE_ENVIRONMENT=Development
- Consul__Address=http://consul:8500
- Service__Name=order-service
- Service__Host=order-service
- Service__Port=80
depends_on:
- consul
api-gateway:
build:
context: ./src/ApiGateway
ports:
- "5000:80"
environment:
- Consul__Address=http://consul:8500
depends_on:
- consul
- user-service
- order-service
```

View File

@@ -16,4 +16,9 @@ rm-urlencoded" \
> -d "scope=openid profile email api offline_access" 2>
&1 | jq .
1. Kiểm tra hỗ trợ cho MSSQL, PSQL, MongoDB
1. Kiểm tra hỗ trợ cho MSSQL, PSQL, MongoDB
cd deployments/local
docker compose build wallet-service-net
docker compose up -d wallet-service-net