diff --git a/.agent/skills/api-aggregation/SKILL.md b/.agent/skills/api-aggregation/SKILL.md new file mode 100644 index 00000000..d66cee6e --- /dev/null +++ b/.agent/skills/api-aggregation/SKILL.md @@ -0,0 +1,477 @@ +--- +name: api-aggregation +description: API Gateway Aggregation và Backend for Frontend (BFF) patterns. Use for response composition, request routing, và client-specific APIs. +compatibility: ".NET 10+, YARP, Ocelot, GraphQL" +metadata: + author: Velik Ho + version: "1.0" +--- + +# API Aggregation & BFF / API Aggregation và BFF Pattern + +Patterns cho API Gateway aggregation và Backend for Frontend trong microservices. + +## When to Use This Skill / Khi Nào Sử Dụng + +Use this skill when: +- Aggregating responses from multiple services / Tổng hợp responses từ nhiều services +- Creating client-specific APIs / Tạo APIs riêng cho từng client +- Reducing chatty client-server communication / Giảm giao tiếp "chatty" +- Implementing API composition / Triển khai API composition + +## Core Concepts / Khái Niệm Cốt Lõi + +### Gateway Patterns Overview + +``` +┌─────────────────────────────────────────────────────────────┐ +│ GATEWAY PATTERNS │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ GATEWAY ROUTING (Reverse Proxy) │ │ +│ │ Client ──▶ Gateway ──▶ Service A │ │ +│ │ (Route) Service B │ │ +│ └──────────────────────────────────────────────────────┘ │ +│ │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ GATEWAY AGGREGATION │ │ +│ │ Client ──▶ Gateway ──┬─▶ Service A ─┐ │ │ +│ │ └─▶ Service B ─┼─▶ Combined │ │ +│ │ └─▶ Service C ─┘ Response │ │ +│ └──────────────────────────────────────────────────────┘ │ +│ │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ BACKEND FOR FRONTEND (BFF) │ │ +│ │ Mobile ──▶ Mobile BFF ──▶ Services │ │ +│ │ Web ──▶ Web BFF ──▶ Services │ │ +│ │ IoT ──▶ IoT BFF ──▶ Services │ │ +│ └──────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### When to Use What / Khi Nào Dùng Gì + +| Pattern | Use Case | Example | +|---------|----------|---------| +| **Gateway Routing** | Simple proxy | Route `/api/users/*` to User Service | +| **Gateway Aggregation** | Combine multiple calls | Product page = Product + Reviews + Stock | +| **BFF** | Client-specific needs | Mobile needs different data than Web | + +### Benefits and Trade-offs + +| Benefit | Trade-off | +|---------|-----------| +| Reduces client complexity | Single point of failure | +| Optimizes for client needs | Additional latency hop | +| Enables caching | Gateway becomes bottleneck | +| Centralizes cross-cutting concerns | More complexity to maintain | + +## Key Patterns / Mẫu Chính + +### YARP Reverse Proxy Configuration + +```csharp +/// +/// EN: Configure YARP reverse proxy for routing. +/// VI: Cấu hình YARP reverse proxy cho routing. +/// + +// Program.cs +builder.Services.AddReverseProxy() + .LoadFromConfig(builder.Configuration.GetSection("ReverseProxy")); + +var app = builder.Build(); +app.MapReverseProxy(); +app.Run(); + +// appsettings.json +{ + "ReverseProxy": { + "Routes": { + "users-route": { + "ClusterId": "users-cluster", + "Match": { + "Path": "/api/users/{**catch-all}" + }, + "Transforms": [ + { "PathRemovePrefix": "/api/users" } + ] + }, + "orders-route": { + "ClusterId": "orders-cluster", + "Match": { + "Path": "/api/orders/{**catch-all}" + } + } + }, + "Clusters": { + "users-cluster": { + "Destinations": { + "user-service": { + "Address": "http://user-service:5001" + } + } + }, + "orders-cluster": { + "Destinations": { + "order-service": { + "Address": "http://order-service:5002" + } + } + } + } + } +} +``` + +### API Aggregation Controller + +```csharp +/// +/// EN: Aggregation controller combining multiple service responses. +/// VI: Controller aggregation kết hợp responses từ nhiều services. +/// +[ApiController] +[Route("api/v1/aggregator")] +public class AggregatorController : ControllerBase +{ + private readonly IProductServiceClient _productClient; + private readonly IReviewServiceClient _reviewClient; + private readonly IInventoryServiceClient _inventoryClient; + private readonly ILogger _logger; + + public AggregatorController( + IProductServiceClient productClient, + IReviewServiceClient reviewClient, + IInventoryServiceClient inventoryClient, + ILogger logger) + { + _productClient = productClient; + _reviewClient = reviewClient; + _inventoryClient = inventoryClient; + _logger = logger; + } + + /// + /// EN: Get product details with reviews and stock info. + /// VI: Lấy chi tiết sản phẩm với reviews và thông tin tồn kho. + /// + [HttpGet("products/{productId}")] + public async Task> GetProductDetails( + Guid productId, + CancellationToken ct) + { + // EN: Make parallel calls to services + // VI: Gọi song song đến các services + var productTask = _productClient.GetProductAsync(productId, ct); + var reviewsTask = _reviewClient.GetReviewsAsync(productId, ct); + var stockTask = _inventoryClient.GetStockAsync(productId, ct); + + await Task.WhenAll(productTask, reviewsTask, stockTask); + + var product = await productTask; + if (product == null) + return NotFound(); + + var reviews = await reviewsTask; + var stock = await stockTask; + + // EN: Aggregate responses + // VI: Tổng hợp responses + return Ok(new ProductDetailsDto + { + Id = product.Id, + Name = product.Name, + Description = product.Description, + Price = product.Price, + ImageUrl = product.ImageUrl, + AverageRating = reviews.AverageRating, + ReviewCount = reviews.TotalCount, + TopReviews = reviews.Items.Take(3).ToList(), + InStock = stock.AvailableQuantity > 0, + AvailableQuantity = stock.AvailableQuantity + }); + } + + /// + /// EN: Get user dashboard data from multiple services. + /// VI: Lấy dữ liệu dashboard user từ nhiều services. + /// + [HttpGet("dashboard")] + [Authorize] + public async Task> GetDashboard(CancellationToken ct) + { + var userId = User.GetUserId(); + + var profileTask = _userClient.GetProfileAsync(userId, ct); + var ordersTask = _orderClient.GetRecentOrdersAsync(userId, 5, ct); + var notificationsTask = _notificationClient.GetUnreadCountAsync(userId, ct); + var walletTask = _walletClient.GetBalanceAsync(userId, ct); + + await Task.WhenAll(profileTask, ordersTask, notificationsTask, walletTask); + + return Ok(new DashboardDto + { + Profile = await profileTask, + RecentOrders = await ordersTask, + UnreadNotifications = await notificationsTask, + WalletBalance = await walletTask + }); + } +} +``` + +### BFF for Mobile + +```csharp +/// +/// EN: Mobile BFF with optimized responses. +/// VI: Mobile BFF với responses được tối ưu. +/// +[ApiController] +[Route("mobile/api/v1")] +public class MobileBffController : ControllerBase +{ + private readonly IMediator _mediator; + + public MobileBffController(IMediator mediator) + { + _mediator = mediator; + } + + /// + /// EN: Mobile-optimized product list. + /// VI: Danh sách sản phẩm tối ưu cho mobile. + /// + [HttpGet("products")] + public async Task> GetProducts( + [FromQuery] int page = 1, + [FromQuery] int pageSize = 10, + CancellationToken ct = default) + { + // EN: Mobile gets smaller images and fewer fields + // VI: Mobile nhận images nhỏ hơn và ít fields hơn + var result = await _mediator.Send(new GetMobileProductsQuery + { + Page = page, + PageSize = pageSize, + ImageSize = "thumbnail" // Only thumbnails for mobile + }, ct); + + return Ok(result); + } + + /// + /// EN: Mobile-optimized checkout. + /// VI: Checkout tối ưu cho mobile. + /// + [HttpPost("checkout")] + [Authorize] + public async Task> Checkout( + MobileCheckoutRequest request, + CancellationToken ct) + { + // EN: Single endpoint handles entire checkout for mobile + // VI: Một endpoint xử lý toàn bộ checkout cho mobile + var result = await _mediator.Send(new MobileCheckoutCommand + { + UserId = User.GetUserId(), + CartId = request.CartId, + PaymentMethodId = request.PaymentMethodId, + ShippingAddressId = request.ShippingAddressId, + UseWalletBalance = request.UseWalletBalance + }, ct); + + return Ok(result); + } +} +``` + +### GraphQL BFF + +```csharp +/// +/// EN: GraphQL query for flexible aggregation. +/// VI: GraphQL query cho aggregation linh hoạt. +/// +public class Query +{ + public async Task GetProduct( + [ID] Guid productId, + ProductDataLoader productLoader, + ReviewDataLoader reviewLoader, + StockDataLoader stockLoader) + { + return await productLoader.LoadAsync(productId); + } +} + +public class ProductType : ObjectType +{ + protected override void Configure(IObjectTypeDescriptor descriptor) + { + descriptor.Field(p => p.Id).ID(); + + descriptor.Field(p => p.Name); + descriptor.Field(p => p.Price); + + // EN: Lazy load reviews only when requested + // VI: Lazy load reviews chỉ khi được yêu cầu + descriptor + .Field("reviews") + .ResolveWith(r => r.GetReviews(default!, default!)) + .Type>(); + + // EN: Lazy load stock only when requested + // VI: Lazy load stock chỉ khi được yêu cầu + descriptor + .Field("stock") + .ResolveWith(r => r.GetStock(default!, default!)) + .Type(); + } +} + +public class ProductResolvers +{ + public async Task> GetReviews( + [Parent] Product product, + ReviewDataLoader loader) + { + return await loader.LoadAsync(product.Id); + } + + public async Task GetStock( + [Parent] Product product, + StockDataLoader loader) + { + return await loader.LoadAsync(product.Id); + } +} +``` + +## Common Mistakes / Lỗi Thường Gặp + +### 1. Sequential Instead of Parallel Calls + +```csharp +// ❌ BAD: Sequential calls +public async Task GetProductDetails(Guid productId) +{ + var product = await _productClient.GetAsync(productId); + var reviews = await _reviewClient.GetAsync(productId); // Waits for product + var stock = await _stockClient.GetAsync(productId); // Waits for reviews + // Total time = product + reviews + stock +} + +// ✅ GOOD: Parallel calls +public async Task GetProductDetails(Guid productId) +{ + var productTask = _productClient.GetAsync(productId); + var reviewsTask = _reviewClient.GetAsync(productId); + var stockTask = _stockClient.GetAsync(productId); + + await Task.WhenAll(productTask, reviewsTask, stockTask); + // Total time = max(product, reviews, stock) +} +``` + +### 2. No Fallback on Partial Failure + +```csharp +// ❌ BAD: Fails entirely if one service fails +public async Task GetDashboard() +{ + var profile = await _userClient.GetProfileAsync(userId); + var orders = await _orderClient.GetOrdersAsync(userId); // If this fails, whole request fails + var notifications = await _notificationClient.GetAsync(userId); +} + +// ✅ GOOD: Graceful degradation +public async Task GetDashboard() +{ + var profileTask = _userClient.GetProfileAsync(userId); + var ordersTask = GetOrdersSafeAsync(userId); + var notificationsTask = GetNotificationsSafeAsync(userId); + + await Task.WhenAll(profileTask, ordersTask, notificationsTask); + + return new DashboardDto + { + Profile = await profileTask, + Orders = await ordersTask ?? Array.Empty(), // Empty if failed + Notifications = await notificationsTask ?? 0 // Zero if failed + }; +} + +private async Task?> GetOrdersSafeAsync(string userId) +{ + try + { + return await _orderClient.GetOrdersAsync(userId); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to get orders for user {UserId}", userId); + return null; // Return null, not throw + } +} +``` + +### 3. Missing Caching + +```csharp +// ❌ BAD: No caching, repeated calls +public async Task GetProductDetails(Guid productId) +{ + var product = await _productClient.GetAsync(productId); // Called every time + return MapToDto(product); +} + +// ✅ GOOD: With caching +public async Task GetProductDetails(Guid productId) +{ + var cacheKey = $"product:{productId}"; + + var cached = await _cache.GetAsync(cacheKey); + if (cached != null) + return cached; + + var product = await _productClient.GetAsync(productId); + var dto = MapToDto(product); + + await _cache.SetAsync(cacheKey, dto, TimeSpan.FromMinutes(5)); + + return dto; +} +``` + +## Quick Reference / Tham Chiếu Nhanh + +### Pattern Selection Guide + +| Need | Pattern | Implementation | +|------|---------|----------------| +| Route requests | Gateway Routing | YARP, Ocelot | +| Combine data | Gateway Aggregation | Custom controller | +| Client-specific API | BFF | Separate project per client | +| Flexible queries | GraphQL | HotChocolate | + +### Performance Checklist + +| Optimization | Applied? | +|-------------|----------| +| Parallel service calls | ✅ | +| Response caching | ✅ | +| Graceful degradation | ✅ | +| Connection pooling | ✅ | +| Timeout configuration | ✅ | + +## Resources / Tài Nguyên + +- [Detailed Examples](./references/REFERENCE.md) - Full code examples +- [API Design](../api-design/SKILL.md) - REST API patterns +- [Redis Caching](../redis-caching/SKILL.md) - Caching strategies +- [Error Handling](../error-handling-patterns/SKILL.md) - Resilience patterns +- [Inter-service Communication](../inter-service-communication/SKILL.md) - HTTP clients diff --git a/.agent/skills/api-aggregation/references/REFERENCE.md b/.agent/skills/api-aggregation/references/REFERENCE.md new file mode 100644 index 00000000..57856eb8 --- /dev/null +++ b/.agent/skills/api-aggregation/references/REFERENCE.md @@ -0,0 +1,551 @@ +# API Aggregation & BFF - Reference Examples + +## Complete Implementation Examples + +### 1. YARP Advanced Configuration + +```csharp +/// +/// EN: Advanced YARP configuration with transforms and load balancing. +/// VI: Cấu hình YARP nâng cao với transforms và load balancing. +/// + +// Program.cs +builder.Services.AddReverseProxy() + .LoadFromConfig(builder.Configuration.GetSection("ReverseProxy")) + .AddTransforms(transforms => + { + // EN: Add correlation ID to all requests + transforms.AddRequestTransform(ctx => + { + var correlationId = ctx.HttpContext.Request.Headers["X-Correlation-Id"] + .FirstOrDefault() ?? Guid.NewGuid().ToString(); + ctx.ProxyRequest.Headers.TryAddWithoutValidation( + "X-Correlation-Id", correlationId); + return ValueTask.CompletedTask; + }); + + // EN: Add timing header to responses + transforms.AddResponseTransform(ctx => + { + ctx.HttpContext.Response.Headers["X-Processed-At"] = + DateTime.UtcNow.ToString("O"); + return ValueTask.CompletedTask; + }); + }); + +// EN: Add health checks for backends +builder.Services.AddHealthChecks() + .AddUrlGroup(new Uri("http://user-service:5001/health"), name: "user-service") + .AddUrlGroup(new Uri("http://order-service:5002/health"), name: "order-service"); + +var app = builder.Build(); + +// EN: Add authentication before proxy +app.UseAuthentication(); +app.UseAuthorization(); + +app.MapReverseProxy(proxyPipeline => +{ + proxyPipeline.UseSessionAffinity(); + proxyPipeline.UseLoadBalancing(); + proxyPipeline.UsePassiveHealthChecks(); +}); +``` + +### 2. Complete Service Clients with Resilience + +```csharp +/// +/// EN: HTTP client configuration with resilience. +/// VI: Cấu hình HTTP client với resilience. +/// +public static class HttpClientExtensions +{ + public static IServiceCollection AddServiceClients( + this IServiceCollection services, + IConfiguration configuration) + { + // EN: Product Service Client + services.AddHttpClient(client => + { + client.BaseAddress = new Uri(configuration["Services:Products:BaseUrl"]!); + client.DefaultRequestHeaders.Add("Accept", "application/json"); + }) + .AddStandardResilienceHandler(); + + // EN: Review Service Client + services.AddHttpClient(client => + { + client.BaseAddress = new Uri(configuration["Services:Reviews:BaseUrl"]!); + }) + .AddStandardResilienceHandler(); + + // EN: Inventory Service Client + services.AddHttpClient(client => + { + client.BaseAddress = new Uri(configuration["Services:Inventory:BaseUrl"]!); + }) + .AddStandardResilienceHandler(); + + return services; + } +} + +/// +/// EN: Product service client implementation. +/// VI: Triển khai client cho product service. +/// +public interface IProductServiceClient +{ + Task GetProductAsync(Guid productId, CancellationToken ct = default); + Task> GetProductsAsync(int page, int pageSize, CancellationToken ct = default); +} + +public class ProductServiceClient : IProductServiceClient +{ + private readonly HttpClient _httpClient; + private readonly ILogger _logger; + + public ProductServiceClient( + HttpClient httpClient, + ILogger logger) + { + _httpClient = httpClient; + _logger = logger; + } + + public async Task GetProductAsync(Guid productId, CancellationToken ct = default) + { + try + { + var response = await _httpClient.GetAsync($"/api/products/{productId}", ct); + + if (response.StatusCode == HttpStatusCode.NotFound) + return null; + + response.EnsureSuccessStatusCode(); + + return await response.Content.ReadFromJsonAsync(ct); + } + catch (HttpRequestException ex) + { + _logger.LogError(ex, "Failed to get product {ProductId}", productId); + throw; + } + } + + public async Task> GetProductsAsync( + int page, + int pageSize, + CancellationToken ct = default) + { + var response = await _httpClient.GetAsync( + $"/api/products?page={page}&pageSize={pageSize}", ct); + + response.EnsureSuccessStatusCode(); + + return await response.Content.ReadFromJsonAsync>(ct) + ?? new PagedResult(); + } +} +``` + +### 3. Aggregation with Timeout and Fallback + +```csharp +/// +/// EN: Aggregator with timeout and fallback handling. +/// VI: Aggregator với xử lý timeout và fallback. +/// +public class ResilientAggregatorService +{ + private readonly IProductServiceClient _productClient; + private readonly IReviewServiceClient _reviewClient; + private readonly IInventoryServiceClient _inventoryClient; + private readonly IDistributedCache _cache; + private readonly ILogger _logger; + + private static readonly TimeSpan AggregationTimeout = TimeSpan.FromSeconds(5); + private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(5); + + public ResilientAggregatorService( + IProductServiceClient productClient, + IReviewServiceClient reviewClient, + IInventoryServiceClient inventoryClient, + IDistributedCache cache, + ILogger logger) + { + _productClient = productClient; + _reviewClient = reviewClient; + _inventoryClient = inventoryClient; + _cache = cache; + _logger = logger; + } + + public async Task GetProductDetailsAsync( + Guid productId, + CancellationToken ct = default) + { + // EN: Try cache first + var cacheKey = $"product-details:{productId}"; + var cached = await _cache.GetStringAsync(cacheKey, ct); + if (!string.IsNullOrEmpty(cached)) + { + return JsonSerializer.Deserialize(cached); + } + + // EN: Create timeout cancellation token + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + timeoutCts.CancelAfter(AggregationTimeout); + + // EN: Fetch product (required) + var product = await _productClient.GetProductAsync(productId, timeoutCts.Token); + if (product == null) + return null; + + // EN: Fetch optional data with fallbacks + var reviewsTask = GetReviewsSafeAsync(productId, timeoutCts.Token); + var stockTask = GetStockSafeAsync(productId, timeoutCts.Token); + + await Task.WhenAll(reviewsTask, stockTask); + + var reviews = await reviewsTask; + var stock = await stockTask; + + var result = new ProductDetailsDto + { + Id = product.Id, + Name = product.Name, + Description = product.Description, + Price = product.Price, + ImageUrl = product.ImageUrl, + + // EN: Reviews with fallback + AverageRating = reviews?.AverageRating ?? 0, + ReviewCount = reviews?.TotalCount ?? 0, + TopReviews = reviews?.Items.Take(3).ToList() ?? new List(), + ReviewsAvailable = reviews != null, + + // EN: Stock with fallback + InStock = stock?.AvailableQuantity > 0 ?? true, // Assume in stock if unknown + AvailableQuantity = stock?.AvailableQuantity, + StockAvailable = stock != null + }; + + // EN: Cache the result + await _cache.SetStringAsync( + cacheKey, + JsonSerializer.Serialize(result), + new DistributedCacheEntryOptions + { + AbsoluteExpirationRelativeToNow = CacheDuration + }, + ct); + + return result; + } + + private async Task GetReviewsSafeAsync( + Guid productId, + CancellationToken ct) + { + try + { + return await _reviewClient.GetReviewSummaryAsync(productId, ct); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Failed to get reviews for product {ProductId}", productId); + return null; + } + } + + private async Task GetStockSafeAsync( + Guid productId, + CancellationToken ct) + { + try + { + return await _inventoryClient.GetStockAsync(productId, ct); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Failed to get stock for product {ProductId}", productId); + return null; + } + } +} +``` + +### 4. Complete BFF Project Structure + +```csharp +/// +/// EN: Mobile BFF project structure and implementation. +/// VI: Cấu trúc và triển khai Mobile BFF project. +/// + +// MobileBff/Program.cs +var builder = WebApplication.CreateBuilder(args); + +// EN: Add services +builder.Services.AddControllers(); +builder.Services.AddMediatR(cfg => + cfg.RegisterServicesFromAssembly(typeof(Program).Assembly)); +builder.Services.AddServiceClients(builder.Configuration); +builder.Services.AddStackExchangeRedisCache(options => +{ + options.Configuration = builder.Configuration.GetConnectionString("Redis"); +}); + +// EN: Add authentication +builder.Services.AddJwtAuthentication(builder.Configuration); + +var app = builder.Build(); + +app.UseAuthentication(); +app.UseAuthorization(); +app.MapControllers(); +app.Run(); + +// MobileBff/Features/Home/GetHomeScreenQuery.cs +public record GetHomeScreenQuery : IRequest +{ + public string UserId { get; init; } = default!; +} + +public record HomeScreenDto +{ + public UserProfileSummaryDto? Profile { get; init; } + public List FeaturedProducts { get; init; } = new(); + public List Categories { get; init; } = new(); + public List ActivePromotions { get; init; } = new(); + public int CartItemCount { get; init; } + public int UnreadNotifications { get; init; } +} + +public class GetHomeScreenQueryHandler : IRequestHandler +{ + private readonly IUserServiceClient _userClient; + private readonly IProductServiceClient _productClient; + private readonly ICategoryServiceClient _categoryClient; + private readonly IPromotionServiceClient _promotionClient; + private readonly ICartServiceClient _cartClient; + private readonly INotificationServiceClient _notificationClient; + private readonly ILogger _logger; + + public GetHomeScreenQueryHandler( + IUserServiceClient userClient, + IProductServiceClient productClient, + ICategoryServiceClient categoryClient, + IPromotionServiceClient promotionClient, + ICartServiceClient cartClient, + INotificationServiceClient notificationClient, + ILogger logger) + { + _userClient = userClient; + _productClient = productClient; + _categoryClient = categoryClient; + _promotionClient = promotionClient; + _cartClient = cartClient; + _notificationClient = notificationClient; + _logger = logger; + } + + public async Task Handle( + GetHomeScreenQuery request, + CancellationToken ct) + { + // EN: All calls in parallel with safe wrappers + var profileTask = SafeCallAsync(() => + _userClient.GetProfileSummaryAsync(request.UserId, ct)); + var featuredTask = SafeCallAsync(() => + _productClient.GetFeaturedAsync(6, ct)); + var categoriesTask = SafeCallAsync(() => + _categoryClient.GetTopCategoriesAsync(8, ct)); + var promotionsTask = SafeCallAsync(() => + _promotionClient.GetActiveAsync(ct)); + var cartCountTask = SafeCallAsync(() => + _cartClient.GetItemCountAsync(request.UserId, ct)); + var notificationCountTask = SafeCallAsync(() => + _notificationClient.GetUnreadCountAsync(request.UserId, ct)); + + await Task.WhenAll( + profileTask, featuredTask, categoriesTask, + promotionsTask, cartCountTask, notificationCountTask); + + return new HomeScreenDto + { + Profile = await profileTask, + FeaturedProducts = (await featuredTask)?.ToList() ?? new(), + Categories = (await categoriesTask)?.ToList() ?? new(), + ActivePromotions = (await promotionsTask)?.ToList() ?? new(), + CartItemCount = await cartCountTask ?? 0, + UnreadNotifications = await notificationCountTask ?? 0 + }; + } + + private async Task SafeCallAsync(Func> call) + { + try + { + return await call(); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Service call failed in home screen aggregation"); + return default; + } + } +} +``` + +### 5. GraphQL BFF with DataLoaders + +```csharp +/// +/// EN: GraphQL BFF with efficient data loading. +/// VI: GraphQL BFF với data loading hiệu quả. +/// + +// GraphQL/Query.cs +public class Query +{ + [UseProjection] + [UseFiltering] + [UseSorting] + public async Task> GetProducts( + [Service] IProductRepository repository) => + repository.GetAll(); + + public async Task GetProduct( + [ID] Guid id, + ProductByIdDataLoader loader) => + await loader.LoadAsync(id); +} + +// GraphQL/DataLoaders/ProductByIdDataLoader.cs +public class ProductByIdDataLoader : BatchDataLoader +{ + private readonly IProductServiceClient _client; + + public ProductByIdDataLoader( + IProductServiceClient client, + IBatchScheduler batchScheduler, + DataLoaderOptions? options = null) + : base(batchScheduler, options) + { + _client = client; + } + + protected override async Task> LoadBatchAsync( + IReadOnlyList keys, + CancellationToken ct) + { + // EN: Batch load all requested products + var products = await _client.GetProductsByIdsAsync(keys, ct); + return products.ToDictionary(p => p.Id); + } +} + +// GraphQL/Types/ProductType.cs +public class ProductType : ObjectType +{ + protected override void Configure(IObjectTypeDescriptor descriptor) + { + descriptor.Field(p => p.Id).Type>(); + descriptor.Field(p => p.Name).Type>(); + descriptor.Field(p => p.Price).Type>(); + + // EN: Reviews field with data loader + descriptor + .Field("reviews") + .Type>>>() + .Resolve(async ctx => + { + var loader = ctx.DataLoader(); + return await loader.LoadAsync(ctx.Parent().Id); + }); + + // EN: Stock field with data loader + descriptor + .Field("stock") + .Type() + .Resolve(async ctx => + { + var loader = ctx.DataLoader(); + return await loader.LoadAsync(ctx.Parent().Id); + }); + + // EN: Related products + descriptor + .Field("relatedProducts") + .Type>() + .Argument("limit", a => a.Type().DefaultValue(4)) + .Resolve(async ctx => + { + var product = ctx.Parent(); + var limit = ctx.ArgumentValue("limit"); + var loader = ctx.DataLoader(); + var related = await loader.LoadAsync(product.CategoryId); + return related.Where(p => p.Id != product.Id).Take(limit); + }); + } +} +``` + +## Docker Compose for API Gateway + +```yaml +version: '3.8' + +services: + api-gateway: + build: + context: ./src/ApiGateway + ports: + - "5000:80" + environment: + - ASPNETCORE_ENVIRONMENT=Development + - ReverseProxy__Clusters__users-cluster__Destinations__user-service__Address=http://user-service:80 + - ReverseProxy__Clusters__orders-cluster__Destinations__order-service__Address=http://order-service:80 + depends_on: + - user-service + - order-service + + mobile-bff: + build: + context: ./src/MobileBff + ports: + - "5001:80" + environment: + - Services__Products__BaseUrl=http://product-service:80 + - Services__Reviews__BaseUrl=http://review-service:80 + - ConnectionStrings__Redis=redis:6379 + + web-bff: + build: + context: ./src/WebBff + ports: + - "5002:80" + + user-service: + build: + context: ./src/UserService + expose: + - "80" + + order-service: + build: + context: ./src/OrderService + expose: + - "80" + + redis: + image: redis:7-alpine + ports: + - "6379:6379" +``` diff --git a/.agent/skills/event-sourcing/SKILL.md b/.agent/skills/event-sourcing/SKILL.md new file mode 100644 index 00000000..ccfcc881 --- /dev/null +++ b/.agent/skills/event-sourcing/SKILL.md @@ -0,0 +1,455 @@ +--- +name: event-sourcing +description: Event Sourcing pattern - lưu trữ thay đổi trạng thái dưới dạng chuỗi sự kiện. Use for audit trails, temporal queries, CQRS integration, và event replay. +compatibility: ".NET 10+, EventStoreDB, Marten, EF Core" +metadata: + author: Velik Ho + version: "1.0" +--- + +# Event Sourcing / Event Sourcing Pattern + +Event Sourcing pattern cho GoodGo microservices - lưu trữ mọi thay đổi trạng thái dưới dạng sự kiện bất biến. + +## When to Use This Skill / Khi Nào Sử Dụng + +Use this skill when: +- Building audit trails / Xây dựng audit trails +- Implementing temporal queries (time travel) / Truy vấn theo thời gian +- Debugging production issues / Debug lỗi production +- CQRS with event-driven projections / CQRS với projections +- Ensuring data consistency / Đảm bảo tính nhất quán dữ liệu + +## Core Concepts / Khái Niệm Cốt Lõi + +### Traditional vs Event Sourcing / Truyền Thống vs Event Sourcing + +``` +┌─────────────────────────────────────────────────────────────┐ +│ TRADITIONAL (State-Based) │ +├─────────────────────────────────────────────────────────────┤ +│ Order Table │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ Id: 1, Status: Shipped, Amount: 500, Updated: 10:30 │ │ +│ └─────────────────────────────────────────────────────┘ │ +│ ❌ Lost: Why was it shipped? What was previous status? │ +└─────────────────────────────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────┐ +│ EVENT SOURCING (Event-Based) │ +├─────────────────────────────────────────────────────────────┤ +│ Event Stream: Order-1 │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ 1. OrderCreated { Amount: 500, UserId: "u1" } @10:00│ │ +│ │ 2. OrderPaid { PaymentId: "p1" } @10:15 │ │ +│ │ 3. OrderShipped { TrackingNo: "TN123" } @10:30 │ │ +│ └─────────────────────────────────────────────────────┘ │ +│ ✅ Full history preserved │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Key Components / Thành Phần Chính + +| Component | Purpose | Description | +|-----------|---------|-------------| +| **Event** | Immutable fact | Sự kiện đã xảy ra, không thể thay đổi | +| **Event Stream** | Ordered sequence | Chuỗi events theo thời gian cho mỗi aggregate | +| **Event Store** | Append-only log | Database lưu trữ events | +| **Projection** | Read model | View được xây dựng từ events | +| **Snapshot** | State cache | Cache trạng thái để tối ưu replay | + +### Event Sourcing Flow / Luồng Event Sourcing + +``` +┌─────────┐ ┌─────────────┐ ┌─────────────┐ +│ Command │───▶│ Aggregate │───▶│ Events │ +└─────────┘ │ (Domain) │ │ (Facts) │ + └─────────────┘ └──────┬──────┘ + │ + ┌─────────────────────────┼─────────────────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ + │ Event Store │ │ Projection │ │ Snapshot │ + │ (Write) │ │ (Read) │ │ (Cache) │ + └─────────────┘ └─────────────┘ └─────────────┘ +``` + +## Key Patterns / Mẫu Chính + +### Domain Event Definition / Định Nghĩa Domain Event + +```csharp +/// +/// EN: Base interface for all domain events. +/// VI: Interface cơ sở cho tất cả domain events. +/// +public interface IDomainEvent +{ + Guid EventId { get; } + DateTime OccurredOn { get; } + int Version { get; } +} + +/// +/// EN: Order created event. +/// VI: Event tạo order. +/// +public record OrderCreated : IDomainEvent +{ + public Guid EventId { get; init; } = Guid.NewGuid(); + public DateTime OccurredOn { get; init; } = DateTime.UtcNow; + public int Version { get; init; } + + public Guid OrderId { get; init; } + public string UserId { get; init; } = default!; + public decimal TotalAmount { get; init; } + public Address ShippingAddress { get; init; } = default!; +} + +/// +/// EN: Order item added event. +/// VI: Event thêm item vào order. +/// +public record OrderItemAdded : IDomainEvent +{ + public Guid EventId { get; init; } = Guid.NewGuid(); + public DateTime OccurredOn { get; init; } = DateTime.UtcNow; + public int Version { get; init; } + + public Guid OrderId { get; init; } + public Guid ProductId { get; init; } + public int Quantity { get; init; } + public decimal UnitPrice { get; init; } +} +``` + +### Event-Sourced Aggregate / Aggregate Event-Sourced + +```csharp +/// +/// EN: Base class for event-sourced aggregates. +/// VI: Lớp cơ sở cho aggregate event-sourced. +/// +public abstract class EventSourcedAggregate +{ + private readonly List _uncommittedEvents = new(); + + public Guid Id { get; protected set; } + public int Version { get; protected set; } = -1; + + public IReadOnlyList UncommittedEvents => _uncommittedEvents.AsReadOnly(); + + protected void Apply(IDomainEvent @event) + { + When(@event); + _uncommittedEvents.Add(@event); + Version++; + } + + protected abstract void When(IDomainEvent @event); + + public void Load(IEnumerable history) + { + foreach (var @event in history) + { + When(@event); + Version++; + } + } + + public void ClearUncommittedEvents() => _uncommittedEvents.Clear(); +} + +/// +/// EN: Order aggregate with event sourcing. +/// VI: Order aggregate với event sourcing. +/// +public class Order : EventSourcedAggregate +{ + public string UserId { get; private set; } = default!; + public OrderStatus Status { get; private set; } + public decimal TotalAmount { get; private set; } + private readonly List _items = new(); + public IReadOnlyList Items => _items.AsReadOnly(); + + // EN: For rehydration / VI: Để khôi phục từ events + private Order() { } + + // EN: Factory method for creation / VI: Factory method để tạo mới + public static Order Create(Guid orderId, string userId, Address shippingAddress) + { + var order = new Order(); + order.Apply(new OrderCreated + { + OrderId = orderId, + UserId = userId, + ShippingAddress = shippingAddress, + Version = 0 + }); + return order; + } + + public void AddItem(Guid productId, int quantity, decimal unitPrice) + { + Apply(new OrderItemAdded + { + OrderId = Id, + ProductId = productId, + Quantity = quantity, + UnitPrice = unitPrice, + Version = Version + 1 + }); + } + + protected override void When(IDomainEvent @event) + { + switch (@event) + { + case OrderCreated e: + Id = e.OrderId; + UserId = e.UserId; + Status = OrderStatus.Created; + break; + + case OrderItemAdded e: + _items.Add(new OrderItem(e.ProductId, e.Quantity, e.UnitPrice)); + TotalAmount += e.Quantity * e.UnitPrice; + break; + + case OrderPaid: + Status = OrderStatus.Paid; + break; + } + } +} +``` + +### Event Store Repository / Repository Event Store + +```csharp +/// +/// EN: Repository for event-sourced aggregates. +/// VI: Repository cho aggregate event-sourced. +/// +public interface IEventSourcedRepository where T : EventSourcedAggregate +{ + Task GetByIdAsync(Guid id, CancellationToken ct = default); + Task SaveAsync(T aggregate, CancellationToken ct = default); +} + +/// +/// EN: EF Core implementation of event store. +/// VI: Event store triển khai với EF Core. +/// +public class EfCoreEventStore : IEventSourcedRepository + where T : EventSourcedAggregate, new() +{ + private readonly EventStoreDbContext _context; + private readonly ILogger> _logger; + + public EfCoreEventStore( + EventStoreDbContext context, + ILogger> logger) + { + _context = context; + _logger = logger; + } + + public async Task GetByIdAsync(Guid id, CancellationToken ct = default) + { + var streamName = $"{typeof(T).Name}-{id}"; + + var events = await _context.Events + .Where(e => e.StreamId == streamName) + .OrderBy(e => e.Version) + .ToListAsync(ct); + + if (!events.Any()) + return null; + + var aggregate = new T(); + aggregate.Load(events.Select(e => DeserializeEvent(e))); + + return aggregate; + } + + public async Task SaveAsync(T aggregate, CancellationToken ct = default) + { + var streamName = $"{typeof(T).Name}-{aggregate.Id}"; + var expectedVersion = aggregate.Version - aggregate.UncommittedEvents.Count; + + // EN: Optimistic concurrency check + // VI: Kiểm tra optimistic concurrency + var currentVersion = await _context.Events + .Where(e => e.StreamId == streamName) + .MaxAsync(e => (int?)e.Version, ct) ?? -1; + + if (currentVersion != expectedVersion) + throw new ConcurrencyException( + $"Expected version {expectedVersion} but found {currentVersion}"); + + foreach (var @event in aggregate.UncommittedEvents) + { + _context.Events.Add(new StoredEvent + { + Id = @event.EventId, + StreamId = streamName, + EventType = @event.GetType().AssemblyQualifiedName!, + Data = JsonSerializer.Serialize(@event, @event.GetType()), + Version = @event.Version, + OccurredOn = @event.OccurredOn + }); + } + + await _context.SaveChangesAsync(ct); + aggregate.ClearUncommittedEvents(); + + _logger.LogInformation( + "Saved {Count} events to stream {Stream}", + aggregate.UncommittedEvents.Count, + streamName); + } +} +``` + +### Projections / Projections + +```csharp +/// +/// EN: Projection handler for order read models. +/// VI: Projection handler cho order read models. +/// +public class OrderProjection : IEventHandler, IEventHandler +{ + private readonly ReadDbContext _readDb; + + public OrderProjection(ReadDbContext readDb) + { + _readDb = readDb; + } + + public async Task HandleAsync(OrderCreated @event, CancellationToken ct) + { + var readModel = new OrderReadModel + { + Id = @event.OrderId, + UserId = @event.UserId, + Status = "Created", + TotalAmount = @event.TotalAmount, + CreatedAt = @event.OccurredOn + }; + + _readDb.Orders.Add(readModel); + await _readDb.SaveChangesAsync(ct); + } + + public async Task HandleAsync(OrderPaid @event, CancellationToken ct) + { + var order = await _readDb.Orders.FindAsync(@event.OrderId); + if (order != null) + { + order.Status = "Paid"; + order.PaidAt = @event.OccurredOn; + await _readDb.SaveChangesAsync(ct); + } + } +} +``` + +## Common Mistakes / Lỗi Thường Gặp + +### 1. Mutable Events + +```csharp +// ❌ BAD: Mutable event +public class OrderCreated +{ + public string Status { get; set; } // Mutable! +} + +// ✅ GOOD: Immutable record +public record OrderCreated +{ + public string Status { get; init; } // Immutable +} +``` + +### 2. Storing Derived Data in Events + +```csharp +// ❌ BAD: Storing computed values +public record OrderItemAdded +{ + public decimal UnitPrice { get; init; } + public int Quantity { get; init; } + public decimal TotalPrice { get; init; } // Derived! +} + +// ✅ GOOD: Store only facts, compute when needed +public record OrderItemAdded +{ + public decimal UnitPrice { get; init; } + public int Quantity { get; init; } + // TotalPrice computed in aggregate +} +``` + +### 3. Large Event Streams Without Snapshots + +```csharp +// ❌ BAD: Loading thousands of events +var order = await _repo.GetByIdAsync(orderId); // Slow! + +// ✅ GOOD: Use snapshots for performance +public async Task GetByIdAsync(Guid id, CancellationToken ct) +{ + // Load from snapshot if exists + var snapshot = await _snapshotStore.GetLatestAsync(id, ct); + var fromVersion = snapshot?.Version ?? 0; + + // Load only events after snapshot + var events = await _eventStore.GetEventsAsync(id, fromVersion, ct); + + var aggregate = snapshot ?? new Order(); + aggregate.Load(events); + return aggregate; +} +``` + +## Quick Reference / Tham Chiếu Nhanh + +### When to Use Event Sourcing + +| Scenario | Recommendation | +|----------|---------------| +| Simple CRUD | ❌ Overkill | +| Complex domains | ✅ Use ES | +| Audit requirements | ✅ Use ES | +| Temporal queries | ✅ Use ES | +| High write volume | ⚠️ Consider carefully | + +### Event Naming Conventions + +| Type | Convention | Example | +|------|-----------|---------| +| Create | `{Entity}Created` | `OrderCreated` | +| Update | `{Entity}{Property}Changed` | `OrderStatusChanged` | +| Delete | `{Entity}Deleted` | `OrderDeleted` | +| Action | `{Entity}{Action}` | `OrderShipped` | + +### Snapshot Strategy + +| Trigger | When to Snapshot | +|---------|-----------------| +| Version-based | Every N events (e.g., 100) | +| Time-based | Every N minutes | +| Size-based | When aggregate size > threshold | + +## Resources / Tài Nguyên + +- [Detailed Examples](./references/REFERENCE.md) - Full code examples +- [CQRS MediatR](../cqrs-mediatr/SKILL.md) - CQRS patterns +- [Outbox Pattern](../outbox-pattern/SKILL.md) - Reliable event publishing +- [Repository Pattern](../repository-pattern/SKILL.md) - Data access patterns +- [Domain Driven Design](../domain-driven-design/SKILL.md) - DDD patterns diff --git a/.agent/skills/event-sourcing/references/REFERENCE.md b/.agent/skills/event-sourcing/references/REFERENCE.md new file mode 100644 index 00000000..080dfef6 --- /dev/null +++ b/.agent/skills/event-sourcing/references/REFERENCE.md @@ -0,0 +1,525 @@ +# Event Sourcing - Reference Examples + +## Complete Implementation Examples + +### 1. Event Store DbContext + +```csharp +/// +/// EN: DbContext for storing events. +/// VI: DbContext để lưu events. +/// +public class EventStoreDbContext : DbContext +{ + public DbSet Events => Set(); + public DbSet Snapshots => Set(); + + public EventStoreDbContext(DbContextOptions options) + : base(options) { } + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + modelBuilder.Entity(entity => + { + entity.ToTable("EventStore"); + entity.HasKey(e => e.Id); + + entity.Property(e => e.StreamId) + .HasMaxLength(200) + .IsRequired(); + + entity.Property(e => e.EventType) + .HasMaxLength(500) + .IsRequired(); + + entity.Property(e => e.Data) + .HasColumnType("jsonb") + .IsRequired(); + + entity.HasIndex(e => new { e.StreamId, e.Version }) + .IsUnique(); + }); + + modelBuilder.Entity(entity => + { + entity.ToTable("Snapshots"); + entity.HasKey(e => e.Id); + + entity.Property(e => e.AggregateId) + .IsRequired(); + + entity.Property(e => e.Data) + .HasColumnType("jsonb") + .IsRequired(); + + entity.HasIndex(e => new { e.AggregateType, e.AggregateId, e.Version }) + .IsUnique(); + }); + } +} + +/// +/// EN: Stored event entity. +/// VI: Entity lưu event. +/// +public class StoredEvent +{ + public Guid Id { get; set; } + public string StreamId { get; set; } = default!; + public string EventType { get; set; } = default!; + public string Data { get; set; } = default!; + public int Version { get; set; } + public DateTime OccurredOn { get; set; } +} + +/// +/// EN: Snapshot entity. +/// VI: Entity snapshot. +/// +public class Snapshot +{ + public Guid Id { get; set; } + public string AggregateType { get; set; } = default!; + public Guid AggregateId { get; set; } + public string Data { get; set; } = default!; + public int Version { get; set; } + public DateTime CreatedAt { get; set; } +} +``` + +### 2. Complete Order Aggregate + +```csharp +/// +/// EN: Complete order aggregate with all events. +/// VI: Order aggregate hoàn chỉnh với tất cả events. +/// +public class Order : EventSourcedAggregate +{ + public string UserId { get; private set; } = default!; + public OrderStatus Status { get; private set; } + public decimal TotalAmount { get; private set; } + public Address? ShippingAddress { get; private set; } + public string? PaymentId { get; private set; } + public string? TrackingNumber { get; private set; } + + private readonly List _items = new(); + public IReadOnlyList Items => _items.AsReadOnly(); + + private Order() { } + + public static Order Create(Guid orderId, string userId, Address shippingAddress) + { + if (string.IsNullOrEmpty(userId)) + throw new ArgumentException("UserId is required"); + + var order = new Order(); + order.Apply(new OrderCreated + { + OrderId = orderId, + UserId = userId, + ShippingAddress = shippingAddress, + Version = 0 + }); + return order; + } + + public void AddItem(Guid productId, int quantity, decimal unitPrice) + { + if (Status != OrderStatus.Created) + throw new InvalidOperationException("Cannot add items to non-draft order"); + + if (quantity <= 0) + throw new ArgumentException("Quantity must be positive"); + + Apply(new OrderItemAdded + { + OrderId = Id, + ProductId = productId, + Quantity = quantity, + UnitPrice = unitPrice, + Version = Version + 1 + }); + } + + public void MarkAsPaid(string paymentId) + { + if (Status != OrderStatus.Created) + throw new InvalidOperationException("Order is not in Created status"); + + Apply(new OrderPaid + { + OrderId = Id, + PaymentId = paymentId, + Version = Version + 1 + }); + } + + public void Ship(string trackingNumber) + { + if (Status != OrderStatus.Paid) + throw new InvalidOperationException("Order must be paid before shipping"); + + Apply(new OrderShipped + { + OrderId = Id, + TrackingNumber = trackingNumber, + Version = Version + 1 + }); + } + + public void Cancel(string reason) + { + if (Status is OrderStatus.Shipped or OrderStatus.Delivered) + throw new InvalidOperationException("Cannot cancel shipped/delivered order"); + + Apply(new OrderCancelled + { + OrderId = Id, + Reason = reason, + Version = Version + 1 + }); + } + + protected override void When(IDomainEvent @event) + { + switch (@event) + { + case OrderCreated e: + Id = e.OrderId; + UserId = e.UserId; + ShippingAddress = e.ShippingAddress; + Status = OrderStatus.Created; + break; + + case OrderItemAdded e: + _items.Add(new OrderItem(e.ProductId, e.Quantity, e.UnitPrice)); + TotalAmount += e.Quantity * e.UnitPrice; + break; + + case OrderPaid e: + Status = OrderStatus.Paid; + PaymentId = e.PaymentId; + break; + + case OrderShipped e: + Status = OrderStatus.Shipped; + TrackingNumber = e.TrackingNumber; + break; + + case OrderCancelled: + Status = OrderStatus.Cancelled; + break; + } + } +} +``` + +### 3. Snapshot Service + +```csharp +/// +/// EN: Service for managing aggregate snapshots. +/// VI: Service quản lý snapshots của aggregates. +/// +public interface ISnapshotService +{ + Task GetLatestSnapshotAsync(Guid aggregateId, CancellationToken ct = default) + where T : EventSourcedAggregate; + Task SaveSnapshotAsync(T aggregate, CancellationToken ct = default) + where T : EventSourcedAggregate; +} + +public class SnapshotService : ISnapshotService +{ + private readonly EventStoreDbContext _context; + private readonly ILogger _logger; + private const int SnapshotThreshold = 100; + + public SnapshotService( + EventStoreDbContext context, + ILogger logger) + { + _context = context; + _logger = logger; + } + + public async Task GetLatestSnapshotAsync( + Guid aggregateId, + CancellationToken ct = default) where T : EventSourcedAggregate + { + var snapshot = await _context.Snapshots + .Where(s => s.AggregateType == typeof(T).Name + && s.AggregateId == aggregateId) + .OrderByDescending(s => s.Version) + .FirstOrDefaultAsync(ct); + + if (snapshot == null) + return null; + + return JsonSerializer.Deserialize(snapshot.Data); + } + + public async Task SaveSnapshotAsync( + T aggregate, + CancellationToken ct = default) where T : EventSourcedAggregate + { + // EN: Only snapshot if threshold reached + // VI: Chỉ snapshot nếu đạt ngưỡng + if (aggregate.Version % SnapshotThreshold != 0) + return; + + var snapshot = new Snapshot + { + Id = Guid.NewGuid(), + AggregateType = typeof(T).Name, + AggregateId = aggregate.Id, + Data = JsonSerializer.Serialize(aggregate), + Version = aggregate.Version, + CreatedAt = DateTime.UtcNow + }; + + _context.Snapshots.Add(snapshot); + await _context.SaveChangesAsync(ct); + + _logger.LogInformation( + "Created snapshot for {AggregateType} {AggregateId} at version {Version}", + typeof(T).Name, + aggregate.Id, + aggregate.Version); + } +} +``` + +### 4. Projection Dispatcher + +```csharp +/// +/// EN: Dispatches events to projections. +/// VI: Dispatch events đến projections. +/// +public interface IProjectionDispatcher +{ + Task DispatchAsync(IDomainEvent @event, CancellationToken ct = default); +} + +public class ProjectionDispatcher : IProjectionDispatcher +{ + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + + public ProjectionDispatcher( + IServiceProvider serviceProvider, + ILogger logger) + { + _serviceProvider = serviceProvider; + _logger = logger; + } + + public async Task DispatchAsync(IDomainEvent @event, CancellationToken ct = default) + { + var eventType = @event.GetType(); + var handlerType = typeof(IEventHandler<>).MakeGenericType(eventType); + + var handlers = _serviceProvider.GetServices(handlerType); + + foreach (var handler in handlers) + { + try + { + var method = handlerType.GetMethod("HandleAsync"); + await (Task)method!.Invoke(handler, new object[] { @event, ct })!; + + _logger.LogDebug( + "Projected {EventType} to {HandlerType}", + eventType.Name, + handler!.GetType().Name); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to project {EventType} to {HandlerType}", + eventType.Name, + handler!.GetType().Name); + throw; + } + } + } +} +``` + +### 5. Command Handler with Event Sourcing + +```csharp +/// +/// EN: Command handler using event sourcing. +/// VI: Command handler sử dụng event sourcing. +/// +public class CreateOrderCommandHandler : IRequestHandler +{ + private readonly IEventSourcedRepository _repository; + private readonly IProjectionDispatcher _projections; + private readonly ISnapshotService _snapshots; + private readonly ILogger _logger; + + public CreateOrderCommandHandler( + IEventSourcedRepository repository, + IProjectionDispatcher projections, + ISnapshotService snapshots, + ILogger logger) + { + _repository = repository; + _projections = projections; + _snapshots = snapshots; + _logger = logger; + } + + public async Task Handle( + CreateOrderCommand request, + CancellationToken ct) + { + var orderId = Guid.NewGuid(); + var order = Order.Create(orderId, request.UserId, request.ShippingAddress); + + foreach (var item in request.Items) + { + order.AddItem(item.ProductId, item.Quantity, item.UnitPrice); + } + + // EN: Save events to event store + // VI: Lưu events vào event store + await _repository.SaveAsync(order, ct); + + // EN: Update projections + // VI: Cập nhật projections + foreach (var @event in order.UncommittedEvents) + { + await _projections.DispatchAsync(@event, ct); + } + + // EN: Save snapshot if threshold reached + // VI: Tạo snapshot nếu đạt ngưỡng + await _snapshots.SaveSnapshotAsync(order, ct); + + _logger.LogInformation("Order created: {OrderId}", orderId); + + return new OrderResult(orderId); + } +} +``` + +### 6. DI Registration + +```csharp +/// +/// EN: Register event sourcing services. +/// VI: Đăng ký event sourcing services. +/// +public static class EventSourcingServiceExtensions +{ + public static IServiceCollection AddEventSourcing( + this IServiceCollection services, + IConfiguration configuration) + { + // EN: Register Event Store DbContext + services.AddDbContext(options => + options.UseNpgsql(configuration.GetConnectionString("EventStore"))); + + // EN: Register repositories + services.AddScoped(typeof(IEventSourcedRepository<>), typeof(EfCoreEventStore<>)); + + // EN: Register snapshot service + services.AddScoped(); + + // EN: Register projection dispatcher + services.AddScoped(); + + // EN: Register projections + services.AddScoped, OrderProjection>(); + services.AddScoped, OrderProjection>(); + services.AddScoped, OrderProjection>(); + + return services; + } +} +``` + +### 7. Temporal Query Example + +```csharp +/// +/// EN: Get order state at specific point in time. +/// VI: Lấy trạng thái order tại thời điểm cụ thể. +/// +public class GetOrderAtTimeQueryHandler + : IRequestHandler +{ + private readonly EventStoreDbContext _context; + + public GetOrderAtTimeQueryHandler(EventStoreDbContext context) + { + _context = context; + } + + public async Task Handle( + GetOrderAtTimeQuery request, + CancellationToken ct) + { + var streamId = $"Order-{request.OrderId}"; + + // EN: Get events up to the specified time + // VI: Lấy events đến thời điểm chỉ định + var events = await _context.Events + .Where(e => e.StreamId == streamId && e.OccurredOn <= request.AsOfTime) + .OrderBy(e => e.Version) + .ToListAsync(ct); + + if (!events.Any()) + return null; + + // EN: Replay events to reconstruct state + // VI: Phát lại events để khôi phục trạng thái + var order = new Order(); + order.Load(events.Select(DeserializeEvent)); + + return new OrderStateDto + { + OrderId = order.Id, + Status = order.Status.ToString(), + TotalAmount = order.TotalAmount, + ItemCount = order.Items.Count, + AsOfTime = request.AsOfTime + }; + } +} +``` + +## Database Migration + +```sql +-- EN: Create EventStore table / VI: Tạo bảng EventStore +CREATE TABLE "EventStore" ( + "Id" uuid PRIMARY KEY, + "StreamId" varchar(200) NOT NULL, + "EventType" varchar(500) NOT NULL, + "Data" jsonb NOT NULL, + "Version" integer NOT NULL, + "OccurredOn" timestamp with time zone NOT NULL +); + +CREATE UNIQUE INDEX "IX_EventStore_StreamId_Version" + ON "EventStore" ("StreamId", "Version"); + +-- EN: Create Snapshots table / VI: Tạo bảng Snapshots +CREATE TABLE "Snapshots" ( + "Id" uuid PRIMARY KEY, + "AggregateType" varchar(200) NOT NULL, + "AggregateId" uuid NOT NULL, + "Data" jsonb NOT NULL, + "Version" integer NOT NULL, + "CreatedAt" timestamp with time zone NOT NULL +); + +CREATE UNIQUE INDEX "IX_Snapshots_Type_Id_Version" + ON "Snapshots" ("AggregateType", "AggregateId", "Version"); +``` diff --git a/.agent/skills/outbox-pattern/SKILL.md b/.agent/skills/outbox-pattern/SKILL.md new file mode 100644 index 00000000..4d598a53 --- /dev/null +++ b/.agent/skills/outbox-pattern/SKILL.md @@ -0,0 +1,422 @@ +--- +name: outbox-pattern +description: Transactional Outbox Pattern - đảm bảo atomicity khi publish events. Use for reliable messaging, at-least-once delivery, và event consistency. +compatibility: ".NET 10+, EF Core, MassTransit, PostgreSQL" +metadata: + author: Velik Ho + version: "1.0" +--- + +# Outbox Pattern / Transactional Outbox + +Pattern đảm bảo tính nhất quán khi publish events từ microservices. + +## When to Use This Skill / Khi Nào Sử Dụng + +Use this skill when: +- Publishing integration events reliably / Publish integration events tin cậy +- Ensuring at-least-once delivery / Đảm bảo gửi ít nhất một lần +- Avoiding dual-write problem / Tránh vấn đề dual-write +- Distributed transactions not feasible / Không khả thi dùng distributed transactions + +## Core Concepts / Khái Niệm Cốt Lõi + +### The Dual-Write Problem / Vấn Đề Dual-Write + +``` +┌─────────────────────────────────────────────────────────────┐ +│ ❌ DUAL-WRITE PROBLEM │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ Service A │ +│ ┌─────────────────────────────────────────────┐ │ +│ │ 1. await _db.SaveChangesAsync(); ✅ │ │ +│ │ 2. await _bus.PublishAsync(event); ❌ FAIL! │ │ +│ └─────────────────────────────────────────────┘ │ +│ │ +│ Result: DB updated BUT message NOT sent = INCONSISTENCY │ +│ │ +└─────────────────────────────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────┐ +│ ✅ OUTBOX PATTERN │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ Service A │ +│ ┌─────────────────────────────────────────────┐ │ +│ │ BEGIN TRANSACTION │ │ +│ │ 1. INSERT INTO Orders ... │ │ +│ │ 2. INSERT INTO Outbox (event) │ │ +│ │ COMMIT │ │ +│ └─────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────────────────┐ │ +│ │ Outbox Processor (Background) │ │ +│ │ 1. SELECT * FROM Outbox WHERE Sent = false │ │ +│ │ 2. Publish to Message Bus │ │ +│ │ 3. UPDATE Outbox SET Sent = true │ │ +│ └─────────────────────────────────────────────┘ │ +│ │ +│ Result: ATOMIC - Both or Neither │ +│ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Outbox Flow / Luồng Outbox + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ Command │───▶│ Handler │───▶│ DB + Outbox│───▶│ Processor │ +│ (API) │ │ (Domain) │ │ (Same Tx) │ │ (Background)│ +└─────────────┘ └─────────────┘ └─────────────┘ └──────┬──────┘ + │ + ▼ + ┌─────────────┐ + │ Message Bus │ + │ (RabbitMQ) │ + └─────────────┘ +``` + +### Key Components / Thành Phần Chính + +| Component | Purpose | Implementation | +|-----------|---------|----------------| +| **Outbox Table** | Store pending events | EF Core entity | +| **Outbox Processor** | Publish pending events | BackgroundService | +| **Idempotency** | Prevent duplicates | Event ID tracking | + +## Key Patterns / Mẫu Chính + +### Outbox Entity / Entity Outbox + +```csharp +/// +/// EN: Outbox message entity for transactional messaging. +/// VI: Entity outbox message cho transactional messaging. +/// +public class OutboxMessage +{ + public Guid Id { get; set; } + public string EventType { get; set; } = default!; + public string Payload { get; set; } = default!; + public DateTime CreatedAt { get; set; } + public DateTime? ProcessedAt { get; set; } + public bool IsProcessed { get; set; } + public int RetryCount { get; set; } + public string? Error { get; set; } +} +``` + +### DbContext Configuration / Cấu Hình DbContext + +```csharp +/// +/// EN: DbContext with outbox support. +/// VI: DbContext với hỗ trợ outbox. +/// +public class OrderDbContext : DbContext +{ + public DbSet Orders => Set(); + public DbSet OutboxMessages => Set(); + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + modelBuilder.Entity(entity => + { + entity.ToTable("OutboxMessages"); + entity.HasKey(e => e.Id); + + entity.Property(e => e.EventType) + .HasMaxLength(500) + .IsRequired(); + + entity.Property(e => e.Payload) + .HasColumnType("jsonb") + .IsRequired(); + + entity.HasIndex(e => new { e.IsProcessed, e.CreatedAt }) + .HasFilter("\"IsProcessed\" = false"); + }); + } +} +``` + +### Unit of Work with Outbox / Unit of Work với Outbox + +```csharp +/// +/// EN: Unit of Work that saves domain events to outbox. +/// VI: Unit of Work lưu domain events vào outbox. +/// +public interface IUnitOfWork +{ + Task SaveChangesAsync(CancellationToken ct = default); +} + +public class UnitOfWork : IUnitOfWork +{ + private readonly OrderDbContext _context; + private readonly ILogger _logger; + + public UnitOfWork(OrderDbContext context, ILogger logger) + { + _context = context; + _logger = logger; + } + + public async Task SaveChangesAsync(CancellationToken ct = default) + { + // EN: Convert domain events to outbox messages + // VI: Chuyển domain events thành outbox messages + var aggregates = _context.ChangeTracker + .Entries() + .Where(e => e.Entity.DomainEvents.Any()) + .Select(e => e.Entity) + .ToList(); + + foreach (var aggregate in aggregates) + { + foreach (var domainEvent in aggregate.DomainEvents) + { + var outboxMessage = new OutboxMessage + { + Id = Guid.NewGuid(), + EventType = domainEvent.GetType().AssemblyQualifiedName!, + Payload = JsonSerializer.Serialize(domainEvent, domainEvent.GetType()), + CreatedAt = DateTime.UtcNow, + IsProcessed = false + }; + + _context.OutboxMessages.Add(outboxMessage); + } + + aggregate.ClearDomainEvents(); + } + + var result = await _context.SaveChangesAsync(ct); + + _logger.LogDebug("Saved {Count} changes with outbox messages", result); + + return result; + } +} +``` + +### Outbox Processor / Outbox Processor + +```csharp +/// +/// EN: Background service that processes outbox messages. +/// VI: Background service xử lý outbox messages. +/// +public class OutboxProcessor : BackgroundService +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly ILogger _logger; + private readonly TimeSpan _pollingInterval = TimeSpan.FromSeconds(5); + private const int BatchSize = 100; + private const int MaxRetries = 5; + + public OutboxProcessor( + IServiceScopeFactory scopeFactory, + ILogger logger) + { + _scopeFactory = scopeFactory; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken ct) + { + _logger.LogInformation("Outbox Processor started"); + + while (!ct.IsCancellationRequested) + { + try + { + await ProcessPendingMessagesAsync(ct); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing outbox messages"); + } + + await Task.Delay(_pollingInterval, ct); + } + } + + private async Task ProcessPendingMessagesAsync(CancellationToken ct) + { + using var scope = _scopeFactory.CreateScope(); + var context = scope.ServiceProvider.GetRequiredService(); + var publishEndpoint = scope.ServiceProvider.GetRequiredService(); + + var messages = await context.OutboxMessages + .Where(m => !m.IsProcessed && m.RetryCount < MaxRetries) + .OrderBy(m => m.CreatedAt) + .Take(BatchSize) + .ToListAsync(ct); + + foreach (var message in messages) + { + try + { + var eventType = Type.GetType(message.EventType); + if (eventType == null) + { + _logger.LogWarning("Unknown event type: {EventType}", message.EventType); + continue; + } + + var @event = JsonSerializer.Deserialize(message.Payload, eventType); + await publishEndpoint.Publish(@event!, eventType, ct); + + message.IsProcessed = true; + message.ProcessedAt = DateTime.UtcNow; + + _logger.LogDebug("Published outbox message {Id}", message.Id); + } + catch (Exception ex) + { + message.RetryCount++; + message.Error = ex.Message; + _logger.LogWarning(ex, "Failed to publish message {Id}, retry {Retry}", + message.Id, message.RetryCount); + } + } + + await context.SaveChangesAsync(ct); + } +} +``` + +### Command Handler with Outbox / Command Handler với Outbox + +```csharp +/// +/// EN: Command handler that raises domain events saved to outbox. +/// VI: Command handler phát domain events được lưu vào outbox. +/// +public class CreateOrderCommandHandler : IRequestHandler +{ + private readonly IOrderRepository _repository; + private readonly ILogger _logger; + + public CreateOrderCommandHandler( + IOrderRepository repository, + ILogger logger) + { + _repository = repository; + _logger = logger; + } + + public async Task Handle(CreateOrderCommand request, CancellationToken ct) + { + var order = new Order(request.UserId, request.ShippingAddress); + + foreach (var item in request.Items) + { + order.AddItem(item.ProductId, item.Quantity, item.UnitPrice); + } + + // EN: Domain events are raised inside aggregate + // VI: Domain events được phát trong aggregate + + await _repository.AddAsync(order, ct); + + // EN: SaveChanges converts domain events to outbox messages + // VI: SaveChanges chuyển domain events thành outbox messages + await _repository.UnitOfWork.SaveChangesAsync(ct); + + _logger.LogInformation("Order created: {OrderId}", order.Id); + + return new OrderResult(order.Id); + } +} +``` + +## Common Mistakes / Lỗi Thường Gặp + +### 1. Publishing Before Saving + +```csharp +// ❌ BAD: Publish before DB commit +await _publishEndpoint.Publish(new OrderCreatedEvent(order.Id)); +await _dbContext.Orders.AddAsync(order); +await _dbContext.SaveChangesAsync(); +// If SaveChanges fails, message is already sent! + +// ✅ GOOD: Use outbox pattern +order.AddDomainEvent(new OrderCreatedEvent(order.Id)); +await _dbContext.Orders.AddAsync(order); +await _dbContext.SaveChangesAsync(); // Saves both order AND outbox in same transaction +``` + +### 2. No Idempotency in Consumers + +```csharp +// ❌ BAD: No idempotency check +public async Task Consume(ConsumeContext context) +{ + await _service.ProcessOrder(context.Message.OrderId); + // May process duplicate messages! +} + +// ✅ GOOD: Idempotent consumer +public async Task Consume(ConsumeContext context) +{ + var eventId = context.MessageId; + if (await _processedMessages.ExistsAsync(eventId)) + return; // Already processed + + await _service.ProcessOrder(context.Message.OrderId); + await _processedMessages.MarkAsync(eventId); +} +``` + +### 3. No Retry Limit + +```csharp +// ❌ BAD: Infinite retries +var messages = await context.OutboxMessages + .Where(m => !m.IsProcessed) // Will keep retrying failed messages forever + .ToListAsync(); + +// ✅ GOOD: Retry limit with dead letter +var messages = await context.OutboxMessages + .Where(m => !m.IsProcessed && m.RetryCount < MaxRetries) + .ToListAsync(); + +// Move failed messages to dead letter after max retries +``` + +## Quick Reference / Tham Chiếu Nhanh + +### Outbox Table Schema + +| Column | Type | Purpose | +|--------|------|---------| +| Id | UUID | Primary key | +| EventType | VARCHAR(500) | Full type name for deserialization | +| Payload | JSONB | Serialized event data | +| CreatedAt | TIMESTAMP | When event was created | +| ProcessedAt | TIMESTAMP | When event was published | +| IsProcessed | BOOLEAN | Processing status | +| RetryCount | INT | Failed attempts | +| Error | TEXT | Last error message | + +### Delivery Guarantees + +| Pattern | Guarantee | Trade-off | +|---------|-----------|-----------| +| Direct publish | Best-effort | May lose messages | +| Outbox | At-least-once | May duplicate messages | +| Inbox + Outbox | Exactly-once | More complexity | + +## Resources / Tài Nguyên + +- [Detailed Examples](./references/REFERENCE.md) - Full code examples +- [Event Sourcing](../event-sourcing/SKILL.md) - Event-based persistence +- [Inter-service Communication](../inter-service-communication/SKILL.md) - MassTransit +- [SAGA Pattern](../saga-pattern/SKILL.md) - Distributed transactions +- [Repository Pattern](../repository-pattern/SKILL.md) - Unit of Work diff --git a/.agent/skills/outbox-pattern/references/REFERENCE.md b/.agent/skills/outbox-pattern/references/REFERENCE.md new file mode 100644 index 00000000..d5d9e060 --- /dev/null +++ b/.agent/skills/outbox-pattern/references/REFERENCE.md @@ -0,0 +1,509 @@ +# Outbox Pattern - Reference Examples + +## Complete Implementation Examples + +### 1. Complete Outbox Entity Configuration + +```csharp +/// +/// EN: EF Core configuration for OutboxMessage. +/// VI: Cấu hình EF Core cho OutboxMessage. +/// +public class OutboxMessageConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.ToTable("OutboxMessages"); + + builder.HasKey(e => e.Id); + + builder.Property(e => e.EventType) + .HasMaxLength(500) + .IsRequired(); + + builder.Property(e => e.Payload) + .HasColumnType("jsonb") + .IsRequired(); + + builder.Property(e => e.CreatedAt) + .IsRequired(); + + builder.Property(e => e.Error) + .HasMaxLength(2000); + + // EN: Index for fast retrieval of unprocessed messages + // VI: Index để truy vấn nhanh messages chưa xử lý + builder.HasIndex(e => new { e.IsProcessed, e.CreatedAt }) + .HasFilter("\"IsProcessed\" = false") + .HasDatabaseName("IX_OutboxMessages_Pending"); + + // EN: Index for retry tracking + // VI: Index để theo dõi retry + builder.HasIndex(e => e.RetryCount) + .HasFilter("\"IsProcessed\" = false AND \"RetryCount\" >= 3") + .HasDatabaseName("IX_OutboxMessages_FailedRetries"); + } +} +``` + +### 2. Inbox Table for Idempotency + +```csharp +/// +/// EN: Inbox message for tracking processed events. +/// VI: Inbox message để theo dõi events đã xử lý. +/// +public class InboxMessage +{ + public Guid Id { get; set; } // Same as incoming MessageId + public string ConsumerType { get; set; } = default!; + public DateTime ProcessedAt { get; set; } +} + +/// +/// EN: Inbox repository for idempotency. +/// VI: Repository inbox cho idempotency. +/// +public class InboxRepository : IInboxRepository +{ + private readonly AppDbContext _context; + + public InboxRepository(AppDbContext context) + { + _context = context; + } + + public async Task ExistsAsync(Guid messageId, string consumerType, CancellationToken ct) + { + return await _context.InboxMessages + .AnyAsync(m => m.Id == messageId && m.ConsumerType == consumerType, ct); + } + + public async Task MarkAsProcessedAsync(Guid messageId, string consumerType, CancellationToken ct) + { + _context.InboxMessages.Add(new InboxMessage + { + Id = messageId, + ConsumerType = consumerType, + ProcessedAt = DateTime.UtcNow + }); + await _context.SaveChangesAsync(ct); + } +} +``` + +### 3. Idempotent Consumer Base Class + +```csharp +/// +/// EN: Base class for idempotent message consumers. +/// VI: Lớp cơ sở cho consumer idempotent. +/// +public abstract class IdempotentConsumer : IConsumer + where TMessage : class +{ + private readonly IInboxRepository _inbox; + private readonly ILogger _logger; + + protected IdempotentConsumer( + IInboxRepository inbox, + ILogger logger) + { + _inbox = inbox; + _logger = logger; + } + + public async Task Consume(ConsumeContext context) + { + var messageId = context.MessageId ?? Guid.NewGuid(); + var consumerType = GetType().Name; + + // EN: Check if already processed + // VI: Kiểm tra đã xử lý chưa + if (await _inbox.ExistsAsync(messageId, consumerType, context.CancellationToken)) + { + _logger.LogInformation( + "Message {MessageId} already processed by {Consumer}, skipping", + messageId, consumerType); + return; + } + + try + { + await HandleAsync(context.Message, context.CancellationToken); + + // EN: Mark as processed + // VI: Đánh dấu đã xử lý + await _inbox.MarkAsProcessedAsync(messageId, consumerType, context.CancellationToken); + + _logger.LogInformation( + "Processed message {MessageId} by {Consumer}", + messageId, consumerType); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to process message {MessageId} by {Consumer}", + messageId, consumerType); + throw; + } + } + + protected abstract Task HandleAsync(TMessage message, CancellationToken ct); +} + +/// +/// EN: Example idempotent consumer. +/// VI: Ví dụ consumer idempotent. +/// +public class OrderCreatedConsumer : IdempotentConsumer +{ + private readonly IInventoryService _inventory; + + public OrderCreatedConsumer( + IInboxRepository inbox, + IInventoryService inventory, + ILogger logger) + : base(inbox, logger) + { + _inventory = inventory; + } + + protected override async Task HandleAsync( + OrderCreatedIntegrationEvent message, + CancellationToken ct) + { + foreach (var item in message.Items) + { + await _inventory.ReserveAsync( + item.ProductId, + item.Quantity, + message.OrderId, + ct); + } + } +} +``` + +### 4. Enhanced Outbox Processor with Parallel Processing + +```csharp +/// +/// EN: Enhanced outbox processor with parallel processing. +/// VI: Outbox processor nâng cao với xử lý song song. +/// +public class EnhancedOutboxProcessor : BackgroundService +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly ILogger _logger; + private readonly OutboxOptions _options; + + public EnhancedOutboxProcessor( + IServiceScopeFactory scopeFactory, + IOptions options, + ILogger logger) + { + _scopeFactory = scopeFactory; + _options = options.Value; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken ct) + { + _logger.LogInformation( + "Outbox Processor started with {Interval}s interval, batch size {BatchSize}", + _options.PollingIntervalSeconds, + _options.BatchSize); + + using var timer = new PeriodicTimer( + TimeSpan.FromSeconds(_options.PollingIntervalSeconds)); + + while (await timer.WaitForNextTickAsync(ct)) + { + try + { + var processedCount = await ProcessBatchAsync(ct); + + if (processedCount > 0) + _logger.LogInformation("Processed {Count} outbox messages", processedCount); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogError(ex, "Error processing outbox batch"); + } + } + } + + private async Task ProcessBatchAsync(CancellationToken ct) + { + using var scope = _scopeFactory.CreateScope(); + var context = scope.ServiceProvider.GetRequiredService(); + var publishEndpoint = scope.ServiceProvider.GetRequiredService(); + + // EN: Use SKIP LOCKED for concurrent processing + // VI: Dùng SKIP LOCKED cho xử lý đồng thời + var messages = await context.OutboxMessages + .FromSqlRaw(@" + SELECT * FROM ""OutboxMessages"" + WHERE ""IsProcessed"" = false + AND ""RetryCount"" < @p0 + ORDER BY ""CreatedAt"" + LIMIT @p1 + FOR UPDATE SKIP LOCKED", + _options.MaxRetries, + _options.BatchSize) + .ToListAsync(ct); + + if (!messages.Any()) + return 0; + + // EN: Process in parallel with semaphore + // VI: Xử lý song song với semaphore + using var semaphore = new SemaphoreSlim(_options.MaxParallelism); + var tasks = messages.Select(async message => + { + await semaphore.WaitAsync(ct); + try + { + await ProcessMessageAsync(message, publishEndpoint, ct); + } + finally + { + semaphore.Release(); + } + }); + + await Task.WhenAll(tasks); + await context.SaveChangesAsync(ct); + + return messages.Count; + } + + private async Task ProcessMessageAsync( + OutboxMessage message, + IPublishEndpoint publishEndpoint, + CancellationToken ct) + { + try + { + var eventType = Type.GetType(message.EventType); + if (eventType == null) + { + message.Error = $"Unknown event type: {message.EventType}"; + message.IsProcessed = true; // Dead letter + return; + } + + var @event = JsonSerializer.Deserialize(message.Payload, eventType); + await publishEndpoint.Publish(@event!, eventType, ct); + + message.IsProcessed = true; + message.ProcessedAt = DateTime.UtcNow; + message.Error = null; + } + catch (Exception ex) + { + message.RetryCount++; + message.Error = ex.Message.Length > 2000 + ? ex.Message[..2000] + : ex.Message; + + _logger.LogWarning(ex, + "Failed to publish message {Id}, retry {Retry}/{Max}", + message.Id, message.RetryCount, _options.MaxRetries); + } + } +} + +public class OutboxOptions +{ + public int PollingIntervalSeconds { get; set; } = 5; + public int BatchSize { get; set; } = 100; + public int MaxRetries { get; set; } = 5; + public int MaxParallelism { get; set; } = 4; +} +``` + +### 5. MassTransit Outbox Integration + +```csharp +/// +/// EN: Configure MassTransit with built-in outbox. +/// VI: Cấu hình MassTransit với outbox tích hợp. +/// +public static class MassTransitOutboxExtensions +{ + public static IServiceCollection AddMassTransitWithOutbox( + this IServiceCollection services, + IConfiguration configuration) + { + services.AddMassTransit(x => + { + x.AddConsumer(); + x.AddConsumer(); + + // EN: Add Entity Framework Outbox + // VI: Thêm Entity Framework Outbox + x.AddEntityFrameworkOutbox(o => + { + o.UsePostgres(); + o.UseBusOutbox(); + + // EN: Configure delivery service + // VI: Cấu hình delivery service + o.QueryDelay = TimeSpan.FromSeconds(5); + o.QueryMessageLimit = 100; + o.QueryTimeout = TimeSpan.FromSeconds(30); + }); + + x.UsingRabbitMq((context, cfg) => + { + cfg.Host(configuration["RabbitMQ:Host"], "/", h => + { + h.Username(configuration["RabbitMQ:Username"]!); + h.Password(configuration["RabbitMQ:Password"]!); + }); + + cfg.ConfigureEndpoints(context); + }); + }); + + return services; + } +} + +/// +/// EN: Using MassTransit outbox in handler. +/// VI: Sử dụng MassTransit outbox trong handler. +/// +public class CreateOrderHandler : IRequestHandler +{ + private readonly AppDbContext _context; + private readonly IPublishEndpoint _publishEndpoint; + + public CreateOrderHandler( + AppDbContext context, + IPublishEndpoint publishEndpoint) + { + _context = context; + _publishEndpoint = publishEndpoint; + } + + public async Task Handle(CreateOrderCommand request, CancellationToken ct) + { + var order = new Order(request.UserId, request.Address); + + _context.Orders.Add(order); + + // EN: MassTransit automatically saves to outbox table + // VI: MassTransit tự động lưu vào outbox table + await _publishEndpoint.Publish(new OrderCreatedIntegrationEvent + { + OrderId = order.Id, + UserId = order.UserId + }, ct); + + // EN: Both order and outbox message saved in same transaction + // VI: Cả order và outbox message được lưu trong cùng transaction + await _context.SaveChangesAsync(ct); + + return new OrderResult(order.Id); + } +} +``` + +### 6. Cleanup Job for Processed Messages + +```csharp +/// +/// EN: Background job to clean up processed outbox messages. +/// VI: Job background để dọn dẹp outbox messages đã xử lý. +/// +public class OutboxCleanupJob : BackgroundService +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly ILogger _logger; + private readonly TimeSpan _cleanupInterval = TimeSpan.FromHours(1); + private readonly TimeSpan _retentionPeriod = TimeSpan.FromDays(7); + + public OutboxCleanupJob( + IServiceScopeFactory scopeFactory, + ILogger logger) + { + _scopeFactory = scopeFactory; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + await Task.Delay(_cleanupInterval, ct); + + try + { + await CleanupAsync(ct); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during outbox cleanup"); + } + } + } + + private async Task CleanupAsync(CancellationToken ct) + { + using var scope = _scopeFactory.CreateScope(); + var context = scope.ServiceProvider.GetRequiredService(); + + var cutoffDate = DateTime.UtcNow - _retentionPeriod; + + var deletedCount = await context.OutboxMessages + .Where(m => m.IsProcessed && m.ProcessedAt < cutoffDate) + .ExecuteDeleteAsync(ct); + + if (deletedCount > 0) + _logger.LogInformation("Cleaned up {Count} processed outbox messages", deletedCount); + + // EN: Clean up inbox messages too + // VI: Dọn dẹp inbox messages + var inboxDeletedCount = await context.InboxMessages + .Where(m => m.ProcessedAt < cutoffDate) + .ExecuteDeleteAsync(ct); + + if (inboxDeletedCount > 0) + _logger.LogInformation("Cleaned up {Count} inbox messages", inboxDeletedCount); + } +} +``` + +## Database Migrations + +```sql +-- EN: Create OutboxMessages table / VI: Tạo bảng OutboxMessages +CREATE TABLE "OutboxMessages" ( + "Id" uuid PRIMARY KEY, + "EventType" varchar(500) NOT NULL, + "Payload" jsonb NOT NULL, + "CreatedAt" timestamp with time zone NOT NULL, + "ProcessedAt" timestamp with time zone, + "IsProcessed" boolean NOT NULL DEFAULT false, + "RetryCount" integer NOT NULL DEFAULT 0, + "Error" varchar(2000) +); + +CREATE INDEX "IX_OutboxMessages_Pending" + ON "OutboxMessages" ("IsProcessed", "CreatedAt") + WHERE "IsProcessed" = false; + +-- EN: Create InboxMessages table / VI: Tạo bảng InboxMessages +CREATE TABLE "InboxMessages" ( + "Id" uuid NOT NULL, + "ConsumerType" varchar(500) NOT NULL, + "ProcessedAt" timestamp with time zone NOT NULL, + PRIMARY KEY ("Id", "ConsumerType") +); + +CREATE INDEX "IX_InboxMessages_ProcessedAt" + ON "InboxMessages" ("ProcessedAt"); +``` diff --git a/.agent/skills/saga-pattern/SKILL.md b/.agent/skills/saga-pattern/SKILL.md new file mode 100644 index 00000000..d458b09a --- /dev/null +++ b/.agent/skills/saga-pattern/SKILL.md @@ -0,0 +1,519 @@ +--- +name: saga-pattern +description: SAGA Pattern - quản lý distributed transactions qua chuỗi local transactions với compensating actions. Use for checkout flows, order processing, và multi-service orchestration. +compatibility: ".NET 10+, MassTransit, RabbitMQ, EF Core" +metadata: + author: Velik Ho + version: "1.0" +--- + +# SAGA Pattern / Mẫu SAGA + +Pattern quản lý distributed transactions trong microservices bằng chuỗi local transactions. + +## When to Use This Skill / Khi Nào Sử Dụng + +Use this skill when: +- Multi-service transactions required / Cần transactions đa dịch vụ +- Distributed transaction (2PC) not feasible / 2PC không khả thi +- Long-running business processes / Quy trình nghiệp vụ chạy lâu +- Need compensating transactions / Cần transactions bù trừ + +## Core Concepts / Khái Niệm Cốt Lõi + +### Why SAGA? / Tại Sao Dùng SAGA? + +``` +┌─────────────────────────────────────────────────────────────┐ +│ ❌ DISTRIBUTED TRANSACTION (2PC) - PROBLEMS │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ ┌───────┐ ┌───────┐ ┌───────┐ │ +│ │Order │────▶│Payment│────▶│Inventory│ │ +│ │ DB │ │ DB │ │ DB │ │ +│ └───────┘ └───────┘ └───────┘ │ +│ │ │ │ │ +│ └─────────────┴─────────────┘ │ +│ Transaction Coordinator │ +│ │ +│ Problems: │ +│ • Locks resources across services │ +│ • Single point of failure │ +│ • Poor performance │ +│ • Not always supported (NoSQL, external APIs) │ +│ │ +└─────────────────────────────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────┐ +│ ✅ SAGA - LOCAL TRANSACTIONS │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ Step 1 Step 2 Step 3 │ +│ ┌───────┐ ┌───────┐ ┌───────┐ │ +│ │Create │─────▶│Reserve│─────▶│Charge │ │ +│ │Order │ │Stock │ │Payment│ │ +│ └───────┘ └───────┘ └───────┘ │ +│ │ │ │ │ +│ ▼ ▼ ▼ │ +│ [Local Tx] [Local Tx] [Local Tx] │ +│ │ +│ If Step 3 fails: │ +│ ┌───────┐ ┌───────┐ │ +│ │Release│◀─────│Cancel │ │ +│ │Stock │ │Order │ │ +│ └───────┘ └───────┘ │ +│ Compensate Compensate │ +│ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Choreography vs Orchestration + +``` +┌─────────────────────────────────────────────────────────────┐ +│ CHOREOGRAPHY │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ Order Event Bus Inventory Payment │ +│ │ │ │ │ │ +│ │──Create──▶│ │ │ │ +│ │ │──OrderCreated─▶│ │ │ +│ │ │ │──Reserve──▶│ │ +│ │ │ │ │──Charge────▶ │ +│ │ │◀─────PaymentProcessed──────│ │ +│ │ +│ ✅ Loose coupling, simple services │ +│ ❌ Hard to track flow, complex rollback │ +│ │ +└─────────────────────────────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────┐ +│ ORCHESTRATION │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────┐ │ +│ │ SAGA │ │ +│ │ Orchestrator│ │ +│ └─────┬───────┘ │ +│ ┌──────────────┼──────────────┐ │ +│ ▼ ▼ ▼ │ +│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ +│ │ Order │ │Inventory│ │ Payment │ │ +│ │ Service │ │ Service │ │ Service │ │ +│ └─────────┘ └─────────┘ └─────────┘ │ +│ │ +│ ✅ Clear flow, centralized error handling │ +│ ❌ More coupling, orchestrator complexity │ +│ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### SAGA Components / Thành Phần SAGA + +| Component | Purpose | Example | +|-----------|---------|---------| +| **Saga State Machine** | Tracks saga state | OrderSaga | +| **Saga Step** | Individual transaction | ReserveInventory | +| **Compensating Action** | Undo step on failure | ReleaseInventory | +| **Saga Data** | Shared state across steps | OrderId, Items | + +## Key Patterns / Mẫu Chính + +### Saga State Machine with MassTransit + +```csharp +/// +/// EN: Order saga state tracking. +/// VI: Theo dõi state của Order saga. +/// +public class OrderSagaState : SagaStateMachineInstance +{ + public Guid CorrelationId { get; set; } + public string CurrentState { get; set; } = default!; + + // EN: Saga data / VI: Dữ liệu saga + public Guid OrderId { get; set; } + public string UserId { get; set; } = default!; + public decimal TotalAmount { get; set; } + public string? PaymentId { get; set; } + public string? FailureReason { get; set; } +} + +/// +/// EN: Order saga state machine definition. +/// VI: Định nghĩa state machine cho Order saga. +/// +public class OrderSaga : MassTransitStateMachine +{ + // EN: States / VI: Các trạng thái + public State OrderCreated { get; private set; } = null!; + public State InventoryReserved { get; private set; } = null!; + public State PaymentProcessed { get; private set; } = null!; + public State Completed { get; private set; } = null!; + public State Failed { get; private set; } = null!; + + // EN: Events / VI: Các sự kiện + public Event OrderSubmittedEvent { get; private set; } = null!; + public Event InventoryReservedEvent { get; private set; } = null!; + public Event InventoryFailedEvent { get; private set; } = null!; + public Event PaymentProcessedEvent { get; private set; } = null!; + public Event PaymentFailedEvent { get; private set; } = null!; + + public OrderSaga() + { + InstanceState(x => x.CurrentState); + + // EN: Define events / VI: Định nghĩa events + Event(() => OrderSubmittedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + Event(() => InventoryReservedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + Event(() => InventoryFailedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + Event(() => PaymentProcessedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + Event(() => PaymentFailedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + + // EN: Define state transitions / VI: Định nghĩa chuyển trạng thái + Initially( + When(OrderSubmittedEvent) + .Then(context => + { + context.Saga.OrderId = context.Message.OrderId; + context.Saga.UserId = context.Message.UserId; + context.Saga.TotalAmount = context.Message.TotalAmount; + }) + .Publish(context => new ReserveInventory + { + OrderId = context.Saga.OrderId, + Items = context.Message.Items + }) + .TransitionTo(OrderCreated) + ); + + During(OrderCreated, + When(InventoryReservedEvent) + .Publish(context => new ProcessPayment + { + OrderId = context.Saga.OrderId, + UserId = context.Saga.UserId, + Amount = context.Saga.TotalAmount + }) + .TransitionTo(InventoryReserved), + + When(InventoryFailedEvent) + .Then(context => context.Saga.FailureReason = context.Message.Reason) + .Publish(context => new CancelOrder + { + OrderId = context.Saga.OrderId, + Reason = context.Message.Reason + }) + .TransitionTo(Failed) + ); + + During(InventoryReserved, + When(PaymentProcessedEvent) + .Then(context => context.Saga.PaymentId = context.Message.PaymentId) + .Publish(context => new CompleteOrder + { + OrderId = context.Saga.OrderId, + PaymentId = context.Message.PaymentId + }) + .TransitionTo(Completed) + .Finalize(), + + When(PaymentFailedEvent) + .Then(context => context.Saga.FailureReason = context.Message.Reason) + // EN: Compensating action - release inventory + // VI: Action bù trừ - giải phóng inventory + .Publish(context => new ReleaseInventory + { + OrderId = context.Saga.OrderId + }) + .Publish(context => new CancelOrder + { + OrderId = context.Saga.OrderId, + Reason = context.Message.Reason + }) + .TransitionTo(Failed) + ); + + SetCompletedWhenFinalized(); + } +} +``` + +### Saga Events / Events của SAGA + +```csharp +/// +/// EN: Events for order saga. +/// VI: Events cho order saga. +/// + +// EN: Trigger events / VI: Events kích hoạt +public record OrderSubmitted +{ + public Guid OrderId { get; init; } + public string UserId { get; init; } = default!; + public decimal TotalAmount { get; init; } + public List Items { get; init; } = new(); +} + +// EN: Command messages / VI: Command messages +public record ReserveInventory +{ + public Guid OrderId { get; init; } + public List Items { get; init; } = new(); +} + +public record ProcessPayment +{ + public Guid OrderId { get; init; } + public string UserId { get; init; } = default!; + public decimal Amount { get; init; } +} + +public record ReleaseInventory // Compensating +{ + public Guid OrderId { get; init; } +} + +public record CancelOrder // Compensating +{ + public Guid OrderId { get; init; } + public string Reason { get; init; } = default!; +} + +public record CompleteOrder +{ + public Guid OrderId { get; init; } + public string PaymentId { get; init; } = default!; +} + +// EN: Response events / VI: Events phản hồi +public record InventoryReserved +{ + public Guid OrderId { get; init; } +} + +public record InventoryReservationFailed +{ + public Guid OrderId { get; init; } + public string Reason { get; init; } = default!; +} + +public record PaymentProcessed +{ + public Guid OrderId { get; init; } + public string PaymentId { get; init; } = default!; +} + +public record PaymentFailed +{ + public Guid OrderId { get; init; } + public string Reason { get; init; } = default!; +} +``` + +### Saga Consumer / Consumer của SAGA + +```csharp +/// +/// EN: Inventory service consumer for saga. +/// VI: Consumer của Inventory service cho saga. +/// +public class ReserveInventoryConsumer : IConsumer +{ + private readonly IInventoryRepository _repository; + private readonly ILogger _logger; + + public ReserveInventoryConsumer( + IInventoryRepository repository, + ILogger logger) + { + _repository = repository; + _logger = logger; + } + + public async Task Consume(ConsumeContext context) + { + var message = context.Message; + + try + { + foreach (var item in message.Items) + { + var success = await _repository.ReserveAsync( + item.ProductId, + item.Quantity, + message.OrderId, + context.CancellationToken); + + if (!success) + { + // EN: Not enough stock, publish failure + // VI: Không đủ hàng, publish failure + await context.Publish(new InventoryReservationFailed + { + OrderId = message.OrderId, + Reason = $"Insufficient stock for product {item.ProductId}" + }); + return; + } + } + + // EN: All items reserved successfully + // VI: Tất cả items đã được reserve thành công + await context.Publish(new InventoryReserved + { + OrderId = message.OrderId + }); + + _logger.LogInformation("Inventory reserved for order {OrderId}", message.OrderId); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to reserve inventory for order {OrderId}", message.OrderId); + + await context.Publish(new InventoryReservationFailed + { + OrderId = message.OrderId, + Reason = ex.Message + }); + } + } +} + +/// +/// EN: Compensating consumer to release inventory. +/// VI: Consumer bù trừ để giải phóng inventory. +/// +public class ReleaseInventoryConsumer : IConsumer +{ + private readonly IInventoryRepository _repository; + private readonly ILogger _logger; + + public ReleaseInventoryConsumer( + IInventoryRepository repository, + ILogger logger) + { + _repository = repository; + _logger = logger; + } + + public async Task Consume(ConsumeContext context) + { + await _repository.ReleaseReservationAsync( + context.Message.OrderId, + context.CancellationToken); + + _logger.LogInformation( + "Released inventory reservation for order {OrderId}", + context.Message.OrderId); + } +} +``` + +## Common Mistakes / Lỗi Thường Gặp + +### 1. Missing Compensating Actions + +```csharp +// ❌ BAD: No compensation handling +During(InventoryReserved, + When(PaymentFailedEvent) + .TransitionTo(Failed) // Inventory still reserved! +); + +// ✅ GOOD: Proper compensation +During(InventoryReserved, + When(PaymentFailedEvent) + .Publish(context => new ReleaseInventory + { + OrderId = context.Saga.OrderId + }) + .TransitionTo(Failed) +); +``` + +### 2. Non-Idempotent Compensations + +```csharp +// ❌ BAD: Can fail if called twice +public async Task Consume(ConsumeContext context) +{ + var reservation = await _repo.GetReservationAsync(context.Message.OrderId); + await _repo.DeleteAsync(reservation); // Throws if already deleted +} + +// ✅ GOOD: Idempotent compensation +public async Task Consume(ConsumeContext context) +{ + var reservation = await _repo.GetReservationAsync(context.Message.OrderId); + if (reservation != null) + { + await _repo.DeleteAsync(reservation); + } + // Safe to call multiple times +} +``` + +### 3. No Timeout Handling + +```csharp +// ❌ BAD: Saga can hang forever +public OrderSaga() +{ + During(OrderCreated, + When(InventoryReservedEvent) + .TransitionTo(InventoryReserved) + ); + // What if InventoryReservedEvent never arrives? +} + +// ✅ GOOD: Add timeout handling +public OrderSaga() +{ + Schedule(() => ReservationTimeout, x => x.ReservationTimeoutToken, + s => s.Delay = TimeSpan.FromMinutes(5)); + + During(OrderCreated, + When(InventoryReservedEvent) + .Unschedule(ReservationTimeout) + .TransitionTo(InventoryReserved), + + When(ReservationTimeout.Received) + .Then(ctx => ctx.Saga.FailureReason = "Reservation timeout") + .Publish(ctx => new CancelOrder { OrderId = ctx.Saga.OrderId }) + .TransitionTo(Failed) + ); +} +``` + +## Quick Reference / Tham Chiếu Nhanh + +### Choreography vs Orchestration + +| Aspect | Choreography | Orchestration | +|--------|-------------|---------------| +| Coupling | Low | Higher | +| Complexity | Distributed | Centralized | +| Debugging | Hard | Easier | +| Single point of failure | No | Yes (orchestrator) | +| Best for | Simple flows | Complex flows | + +### SAGA Design Checklist + +| Step | Required | +|------|----------| +| Define all saga steps | ✅ | +| Define compensating action for each step | ✅ | +| Make compensations idempotent | ✅ | +| Add timeout handling | ✅ | +| Log state transitions | ✅ | +| Design for partial failures | ✅ | + +## Resources / Tài Nguyên + +- [Detailed Examples](./references/REFERENCE.md) - Full code examples +- [Outbox Pattern](../outbox-pattern/SKILL.md) - Reliable event publishing +- [Inter-service Communication](../inter-service-communication/SKILL.md) - MassTransit +- [Event Sourcing](../event-sourcing/SKILL.md) - Event-based persistence +- [Error Handling](../error-handling-patterns/SKILL.md) - Resilience patterns diff --git a/.agent/skills/saga-pattern/references/REFERENCE.md b/.agent/skills/saga-pattern/references/REFERENCE.md new file mode 100644 index 00000000..d2b2d07e --- /dev/null +++ b/.agent/skills/saga-pattern/references/REFERENCE.md @@ -0,0 +1,513 @@ +# SAGA Pattern - Reference Examples + +## Complete Implementation Examples + +### 1. Complete MassTransit Configuration + +```csharp +/// +/// EN: Configure MassTransit with saga support. +/// VI: Cấu hình MassTransit với hỗ trợ saga. +/// +public static class SagaServiceExtensions +{ + public static IServiceCollection AddSagaSupport( + this IServiceCollection services, + IConfiguration configuration) + { + services.AddMassTransit(x => + { + // EN: Add saga state machine + // VI: Thêm saga state machine + x.AddSagaStateMachine() + .EntityFrameworkRepository(r => + { + r.ConcurrencyMode = ConcurrencyMode.Pessimistic; + r.AddDbContext((provider, builder) => + { + builder.UseNpgsql(configuration.GetConnectionString("SagaDb")); + }); + }); + + // EN: Add consumers + x.AddConsumer(); + x.AddConsumer(); + x.AddConsumer(); + x.AddConsumer(); + x.AddConsumer(); + x.AddConsumer(); + + x.UsingRabbitMq((context, cfg) => + { + cfg.Host(configuration["RabbitMQ:Host"], "/", h => + { + h.Username(configuration["RabbitMQ:Username"]!); + h.Password(configuration["RabbitMQ:Password"]!); + }); + + // EN: Configure retry policy + cfg.UseMessageRetry(r => r.Intervals( + TimeSpan.FromSeconds(1), + TimeSpan.FromSeconds(5), + TimeSpan.FromSeconds(15))); + + cfg.ConfigureEndpoints(context); + }); + }); + + return services; + } +} +``` + +### 2. Saga DbContext + +```csharp +/// +/// EN: DbContext for saga state persistence. +/// VI: DbContext để lưu trữ saga state. +/// +public class SagaDbContext : DbContext +{ + public DbSet OrderSagas => Set(); + + public SagaDbContext(DbContextOptions options) + : base(options) { } + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + modelBuilder.Entity(entity => + { + entity.ToTable("OrderSagas"); + + entity.HasKey(x => x.CorrelationId); + + entity.Property(x => x.CurrentState) + .HasMaxLength(100) + .IsRequired(); + + entity.Property(x => x.UserId) + .HasMaxLength(100); + + entity.Property(x => x.PaymentId) + .HasMaxLength(100); + + entity.Property(x => x.FailureReason) + .HasMaxLength(500); + + // EN: Optimistic concurrency + entity.Property(x => x.RowVersion) + .IsRowVersion(); + + entity.HasIndex(x => x.OrderId); + entity.HasIndex(x => x.CurrentState); + }); + } +} + +/// +/// EN: Extended saga state with row version. +/// VI: Saga state mở rộng với row version. +/// +public class OrderSagaState : SagaStateMachineInstance +{ + public Guid CorrelationId { get; set; } + public string CurrentState { get; set; } = default!; + + public Guid OrderId { get; set; } + public string UserId { get; set; } = default!; + public decimal TotalAmount { get; set; } + public string? PaymentId { get; set; } + public string? FailureReason { get; set; } + + // EN: For timeout scheduling + public Guid? ReservationTimeoutToken { get; set; } + public Guid? PaymentTimeoutToken { get; set; } + + // EN: Optimistic concurrency + public byte[] RowVersion { get; set; } = null!; +} +``` + +### 3. Complete Saga with Timeouts + +```csharp +/// +/// EN: Complete order saga with timeout handling. +/// VI: Order saga hoàn chỉnh với xử lý timeout. +/// +public class OrderSagaWithTimeouts : MassTransitStateMachine +{ + public State OrderCreated { get; private set; } = null!; + public State InventoryReserved { get; private set; } = null!; + public State PaymentProcessing { get; private set; } = null!; + public State Completed { get; private set; } = null!; + public State Failed { get; private set; } = null!; + public State Compensating { get; private set; } = null!; + + public Event OrderSubmittedEvent { get; private set; } = null!; + public Event InventoryReservedEvent { get; private set; } = null!; + public Event InventoryFailedEvent { get; private set; } = null!; + public Event PaymentProcessedEvent { get; private set; } = null!; + public Event PaymentFailedEvent { get; private set; } = null!; + public Event InventoryReleasedEvent { get; private set; } = null!; + + // EN: Timeout schedules / VI: Lịch timeout + public Schedule ReservationTimeoutSchedule { get; private set; } = null!; + public Schedule PaymentTimeoutSchedule { get; private set; } = null!; + + public OrderSagaWithTimeouts() + { + InstanceState(x => x.CurrentState); + + // EN: Configure events + ConfigureEvents(); + + // EN: Configure timeouts + ConfigureTimeouts(); + + // EN: Configure state machine + ConfigureStateMachine(); + + SetCompletedWhenFinalized(); + } + + private void ConfigureEvents() + { + Event(() => OrderSubmittedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + Event(() => InventoryReservedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + Event(() => InventoryFailedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + Event(() => PaymentProcessedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + Event(() => PaymentFailedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + Event(() => InventoryReleasedEvent, x => x.CorrelateById(m => m.Message.OrderId)); + } + + private void ConfigureTimeouts() + { + Schedule(() => ReservationTimeoutSchedule, + x => x.ReservationTimeoutToken, + s => s.Delay = TimeSpan.FromMinutes(5)); + + Schedule(() => PaymentTimeoutSchedule, + x => x.PaymentTimeoutToken, + s => s.Delay = TimeSpan.FromMinutes(10)); + } + + private void ConfigureStateMachine() + { + Initially( + When(OrderSubmittedEvent) + .Then(InitializeSagaData) + .Schedule(ReservationTimeoutSchedule, ctx => new ReservationTimeout + { + OrderId = ctx.Saga.OrderId + }) + .Publish(ctx => new ReserveInventory + { + OrderId = ctx.Saga.OrderId, + Items = ctx.Message.Items + }) + .TransitionTo(OrderCreated) + ); + + During(OrderCreated, + When(InventoryReservedEvent) + .Unschedule(ReservationTimeoutSchedule) + .Schedule(PaymentTimeoutSchedule, ctx => new PaymentTimeout + { + OrderId = ctx.Saga.OrderId + }) + .Publish(ctx => new ProcessPayment + { + OrderId = ctx.Saga.OrderId, + UserId = ctx.Saga.UserId, + Amount = ctx.Saga.TotalAmount + }) + .TransitionTo(InventoryReserved), + + When(InventoryFailedEvent) + .Unschedule(ReservationTimeoutSchedule) + .Then(ctx => ctx.Saga.FailureReason = ctx.Message.Reason) + .Publish(ctx => new CancelOrder + { + OrderId = ctx.Saga.OrderId, + Reason = ctx.Message.Reason + }) + .TransitionTo(Failed), + + When(ReservationTimeoutSchedule.Received) + .Then(ctx => ctx.Saga.FailureReason = "Inventory reservation timeout") + .Publish(ctx => new CancelOrder + { + OrderId = ctx.Saga.OrderId, + Reason = "Timeout waiting for inventory" + }) + .TransitionTo(Failed) + ); + + During(InventoryReserved, + When(PaymentProcessedEvent) + .Unschedule(PaymentTimeoutSchedule) + .Then(ctx => ctx.Saga.PaymentId = ctx.Message.PaymentId) + .Publish(ctx => new CompleteOrder + { + OrderId = ctx.Saga.OrderId, + PaymentId = ctx.Message.PaymentId + }) + .TransitionTo(Completed) + .Finalize(), + + When(PaymentFailedEvent) + .Unschedule(PaymentTimeoutSchedule) + .Then(ctx => ctx.Saga.FailureReason = ctx.Message.Reason) + .Publish(ctx => new ReleaseInventory + { + OrderId = ctx.Saga.OrderId + }) + .TransitionTo(Compensating), + + When(PaymentTimeoutSchedule.Received) + .Then(ctx => ctx.Saga.FailureReason = "Payment processing timeout") + .Publish(ctx => new ReleaseInventory + { + OrderId = ctx.Saga.OrderId + }) + .TransitionTo(Compensating) + ); + + During(Compensating, + When(InventoryReleasedEvent) + .Publish(ctx => new CancelOrder + { + OrderId = ctx.Saga.OrderId, + Reason = ctx.Saga.FailureReason ?? "Unknown error" + }) + .TransitionTo(Failed) + ); + } + + private static void InitializeSagaData(BehaviorContext ctx) + { + ctx.Saga.OrderId = ctx.Message.OrderId; + ctx.Saga.UserId = ctx.Message.UserId; + ctx.Saga.TotalAmount = ctx.Message.TotalAmount; + } +} + +public record ReservationTimeout { public Guid OrderId { get; init; } } +public record PaymentTimeout { public Guid OrderId { get; init; } } +``` + +### 4. Choreography-Style Saga + +```csharp +/// +/// EN: Choreography-style saga using domain events. +/// VI: Saga kiểu choreography dùng domain events. +/// + +// EN: Order Service publishes event after creation +public class CreateOrderCommandHandler : IRequestHandler +{ + private readonly IOrderRepository _repository; + private readonly IPublishEndpoint _publishEndpoint; + + public async Task Handle(CreateOrderCommand request, CancellationToken ct) + { + var order = new Order(request.UserId, request.Address); + foreach (var item in request.Items) + order.AddItem(item.ProductId, item.Quantity, item.UnitPrice); + + await _repository.AddAsync(order, ct); + await _repository.UnitOfWork.SaveChangesAsync(ct); + + // EN: Publish event for choreography + // VI: Publish event cho choreography + await _publishEndpoint.Publish(new OrderCreatedIntegrationEvent + { + OrderId = order.Id, + UserId = order.UserId, + TotalAmount = order.TotalAmount, + Items = order.Items.Select(i => new OrderItemInfo( + i.ProductId, i.Quantity, i.UnitPrice)).ToList() + }, ct); + + return new OrderResult(order.Id); + } +} + +// EN: Inventory Service reacts to OrderCreated +public class OrderCreatedInventoryConsumer : IConsumer +{ + private readonly IInventoryService _inventory; + private readonly IPublishEndpoint _publishEndpoint; + + public async Task Consume(ConsumeContext context) + { + try + { + foreach (var item in context.Message.Items) + { + await _inventory.ReserveAsync( + item.ProductId, + item.Quantity, + context.Message.OrderId); + } + + // EN: Notify success + await context.Publish(new InventoryReservedIntegrationEvent + { + OrderId = context.Message.OrderId + }); + } + catch (InsufficientStockException ex) + { + // EN: Notify failure + await context.Publish(new InventoryReservationFailedIntegrationEvent + { + OrderId = context.Message.OrderId, + Reason = ex.Message + }); + } + } +} + +// EN: Payment Service reacts to InventoryReserved +public class InventoryReservedPaymentConsumer : IConsumer +{ + private readonly IPaymentService _payment; + private readonly IOrderRepository _orders; + private readonly IPublishEndpoint _publishEndpoint; + + public async Task Consume(ConsumeContext context) + { + var order = await _orders.GetByIdAsync(context.Message.OrderId); + + try + { + var paymentId = await _payment.ChargeAsync( + order!.UserId, + order.TotalAmount); + + await context.Publish(new PaymentProcessedIntegrationEvent + { + OrderId = context.Message.OrderId, + PaymentId = paymentId + }); + } + catch (PaymentFailedException ex) + { + await context.Publish(new PaymentFailedIntegrationEvent + { + OrderId = context.Message.OrderId, + Reason = ex.Message + }); + } + } +} + +// EN: Inventory Service compensates on payment failure +public class PaymentFailedInventoryConsumer : IConsumer +{ + private readonly IInventoryService _inventory; + + public async Task Consume(ConsumeContext context) + { + // EN: Compensating action + await _inventory.ReleaseReservationAsync(context.Message.OrderId); + } +} +``` + +### 5. Saga Monitoring and Querying + +```csharp +/// +/// EN: Query service for saga status. +/// VI: Service truy vấn trạng thái saga. +/// +public class SagaQueryService +{ + private readonly SagaDbContext _context; + + public SagaQueryService(SagaDbContext context) + { + _context = context; + } + + public async Task GetOrderSagaStatusAsync( + Guid orderId, + CancellationToken ct = default) + { + var saga = await _context.OrderSagas + .FirstOrDefaultAsync(s => s.OrderId == orderId, ct); + + if (saga == null) + return null; + + return new OrderSagaStatusDto + { + OrderId = saga.OrderId, + CurrentState = saga.CurrentState, + PaymentId = saga.PaymentId, + FailureReason = saga.FailureReason, + IsCompleted = saga.CurrentState == "Completed", + IsFailed = saga.CurrentState == "Failed" + }; + } + + public async Task> GetStuckSagasAsync( + TimeSpan maxAge, + CancellationToken ct = default) + { + var cutoff = DateTime.UtcNow - maxAge; + + return await _context.OrderSagas + .Where(s => s.CurrentState != "Completed" + && s.CurrentState != "Failed" + && s.CreatedAt < cutoff) + .Select(s => new OrderSagaStatusDto + { + OrderId = s.OrderId, + CurrentState = s.CurrentState, + FailureReason = "Potentially stuck" + }) + .ToListAsync(ct); + } +} + +public record OrderSagaStatusDto +{ + public Guid OrderId { get; init; } + public string CurrentState { get; init; } = default!; + public string? PaymentId { get; init; } + public string? FailureReason { get; init; } + public bool IsCompleted { get; init; } + public bool IsFailed { get; init; } +} +``` + +## Database Migrations + +```sql +-- EN: Create OrderSagas table / VI: Tạo bảng OrderSagas +CREATE TABLE "OrderSagas" ( + "CorrelationId" uuid PRIMARY KEY, + "CurrentState" varchar(100) NOT NULL, + "OrderId" uuid NOT NULL, + "UserId" varchar(100), + "TotalAmount" decimal(18,2) NOT NULL DEFAULT 0, + "PaymentId" varchar(100), + "FailureReason" varchar(500), + "ReservationTimeoutToken" uuid, + "PaymentTimeoutToken" uuid, + "RowVersion" bytea NOT NULL, + "CreatedAt" timestamp with time zone NOT NULL DEFAULT NOW() +); + +CREATE INDEX "IX_OrderSagas_OrderId" ON "OrderSagas" ("OrderId"); +CREATE INDEX "IX_OrderSagas_CurrentState" ON "OrderSagas" ("CurrentState"); +CREATE INDEX "IX_OrderSagas_StuckSagas" + ON "OrderSagas" ("CurrentState", "CreatedAt") + WHERE "CurrentState" NOT IN ('Completed', 'Failed'); +``` diff --git a/.agent/skills/service-discovery/SKILL.md b/.agent/skills/service-discovery/SKILL.md new file mode 100644 index 00000000..7ef36eb7 --- /dev/null +++ b/.agent/skills/service-discovery/SKILL.md @@ -0,0 +1,518 @@ +--- +name: service-discovery +description: Service Discovery và Service Registry patterns. Use for dynamic service location, health checking, và load balancing trong microservices. +compatibility: ".NET 10+, Consul, Kubernetes, DNS" +metadata: + author: Velik Ho + version: "1.0" +--- + +# Service Discovery / Service Discovery Pattern + +Patterns cho service discovery và dynamic service location trong microservices. + +## When to Use This Skill / Khi Nào Sử Dụng + +Use this skill when: +- Services need to find each other dynamically / Services cần tìm nhau động +- Deploying to Kubernetes / Triển khai trên Kubernetes +- Implementing client-side load balancing / Triển khai load balancing phía client +- Health checking service instances / Kiểm tra health của service instances + +## Core Concepts / Khái Niệm Cốt Lõi + +### Service Discovery Patterns + +``` +┌─────────────────────────────────────────────────────────────┐ +│ SERVICE DISCOVERY PATTERNS │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ CLIENT-SIDE DISCOVERY │ │ +│ │ │ │ +│ │ Client ──Query──▶ Registry ──▶ Service List │ │ +│ │ │ │ │ +│ │ └──Direct Call──▶ Service Instance │ │ +│ │ │ │ +│ │ Pros: Simpler, fewer hops │ │ +│ │ Cons: Client needs discovery logic │ │ +│ └──────────────────────────────────────────────────────┘ │ +│ │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ SERVER-SIDE DISCOVERY │ │ +│ │ │ │ +│ │ Client ──▶ Load Balancer ──Query──▶ Registry │ │ +│ │ │ │ │ +│ │ └──▶ Service Instance │ │ +│ │ │ │ +│ │ Pros: Client stays simple │ │ +│ │ Cons: Extra network hop │ │ +│ └──────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Service Registry / Registry dịch vụ + +``` +┌─────────────────────────────────────────────────────────────┐ +│ SERVICE REGISTRY │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ Service: user-service │ │ +│ │ Instances: │ │ +│ │ - host: 10.0.1.10, port: 5001, health: ✅ │ │ +│ │ - host: 10.0.1.11, port: 5001, health: ✅ │ │ +│ │ - host: 10.0.1.12, port: 5001, health: ❌ │ │ +│ └─────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ Service: order-service │ │ +│ │ Instances: │ │ +│ │ - host: 10.0.2.10, port: 5002, health: ✅ │ │ +│ │ - host: 10.0.2.11, port: 5002, health: ✅ │ │ +│ └─────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Discovery Options Comparison + +| Option | Type | Complexity | Best For | +|--------|------|------------|----------| +| **Kubernetes DNS** | Server-side | Low | K8s deployments | +| **Consul** | Client-side | Medium | Multi-platform | +| **Eureka** | Client-side | Medium | Spring ecosystem | +| **DNS + Load Balancer** | Server-side | Low | Simple setups | + +## Key Patterns / Mẫu Chính + +### Kubernetes Service Discovery + +```yaml +# EN: Kubernetes Service for internal discovery +# VI: Kubernetes Service cho discovery nội bộ + +apiVersion: v1 +kind: Service +metadata: + name: user-service + namespace: goodgo +spec: + selector: + app: user-service + ports: + - port: 80 + targetPort: 5001 + type: ClusterIP + +--- +# EN: Deployment with health probes +apiVersion: apps/v1 +kind: Deployment +metadata: + name: user-service + namespace: goodgo +spec: + replicas: 3 + selector: + matchLabels: + app: user-service + template: + metadata: + labels: + app: user-service + spec: + containers: + - name: user-service + image: goodgo/user-service:latest + ports: + - containerPort: 5001 + livenessProbe: + httpGet: + path: /health/live + port: 5001 + initialDelaySeconds: 10 + periodSeconds: 10 + readinessProbe: + httpGet: + path: /health/ready + port: 5001 + initialDelaySeconds: 5 + periodSeconds: 5 +``` + +### Using Kubernetes DNS in .NET + +```csharp +/// +/// EN: HTTP client using Kubernetes DNS. +/// VI: HTTP client sử dụng Kubernetes DNS. +/// + +// Program.cs +builder.Services.AddHttpClient(client => +{ + // EN: Use Kubernetes service DNS name + // VI: Sử dụng Kubernetes service DNS name + client.BaseAddress = new Uri("http://user-service.goodgo.svc.cluster.local"); +}) +.AddStandardResilienceHandler(); + +// appsettings.json for different environments +{ + "Services": { + "UserService": { + "BaseUrl": "http://user-service.goodgo.svc.cluster.local" // K8s + // "BaseUrl": "http://localhost:5001" // Local dev + } + } +} +``` + +### Consul Service Registration + +```csharp +/// +/// EN: Register service with Consul. +/// VI: Đăng ký service với Consul. +/// +public static class ConsulServiceExtensions +{ + public static IServiceCollection AddConsulServiceDiscovery( + this IServiceCollection services, + IConfiguration configuration) + { + services.AddSingleton(sp => + { + var consulAddress = configuration["Consul:Address"]; + return new ConsulClient(cfg => + { + cfg.Address = new Uri(consulAddress!); + }); + }); + + services.AddHostedService(); + + return services; + } +} + +/// +/// EN: Background service for Consul registration. +/// VI: Background service cho Consul registration. +/// +public class ConsulRegistrationService : IHostedService +{ + private readonly IConsulClient _consulClient; + private readonly IConfiguration _configuration; + private readonly ILogger _logger; + private string? _registrationId; + + public ConsulRegistrationService( + IConsulClient consulClient, + IConfiguration configuration, + ILogger logger) + { + _consulClient = consulClient; + _configuration = configuration; + _logger = logger; + } + + public async Task StartAsync(CancellationToken ct) + { + var serviceName = _configuration["Service:Name"]!; + var serviceHost = _configuration["Service:Host"]!; + var servicePort = int.Parse(_configuration["Service:Port"]!); + + _registrationId = $"{serviceName}-{Guid.NewGuid():N}"; + + var registration = new AgentServiceRegistration + { + ID = _registrationId, + Name = serviceName, + Address = serviceHost, + Port = servicePort, + Tags = new[] { "api", "v1" }, + Check = new AgentServiceCheck + { + HTTP = $"http://{serviceHost}:{servicePort}/health", + Interval = TimeSpan.FromSeconds(10), + Timeout = TimeSpan.FromSeconds(5), + DeregisterCriticalServiceAfter = TimeSpan.FromMinutes(1) + } + }; + + await _consulClient.Agent.ServiceRegister(registration, ct); + + _logger.LogInformation( + "Registered service {ServiceName} with Consul as {RegistrationId}", + serviceName, _registrationId); + } + + public async Task StopAsync(CancellationToken ct) + { + if (_registrationId != null) + { + await _consulClient.Agent.ServiceDeregister(_registrationId, ct); + _logger.LogInformation("Deregistered service {RegistrationId}", _registrationId); + } + } +} +``` + +### Client-Side Load Balancing with Consul + +```csharp +/// +/// EN: HTTP client with Consul-based service discovery. +/// VI: HTTP client với service discovery dựa trên Consul. +/// +public class ConsulServiceDiscoveryClient +{ + private readonly IConsulClient _consulClient; + private readonly ILogger _logger; + + public ConsulServiceDiscoveryClient( + IConsulClient consulClient, + ILogger logger) + { + _consulClient = consulClient; + _logger = logger; + } + + public async Task GetServiceUriAsync( + string serviceName, + CancellationToken ct = default) + { + var services = await _consulClient.Health.Service( + serviceName, + tag: null, + passingOnly: true, + ct); + + if (services.Response.Length == 0) + { + _logger.LogWarning("No healthy instances found for {ServiceName}", serviceName); + return null; + } + + // EN: Simple round-robin load balancing + // VI: Load balancing round-robin đơn giản + var service = services.Response[Random.Shared.Next(services.Response.Length)]; + var uri = new Uri($"http://{service.Service.Address}:{service.Service.Port}"); + + _logger.LogDebug( + "Resolved {ServiceName} to {Uri}", + serviceName, uri); + + return uri; + } +} + +/// +/// EN: HTTP message handler with service discovery. +/// VI: HTTP message handler với service discovery. +/// +public class ServiceDiscoveryHandler : DelegatingHandler +{ + private readonly ConsulServiceDiscoveryClient _discovery; + private readonly string _serviceName; + + public ServiceDiscoveryHandler( + ConsulServiceDiscoveryClient discovery, + string serviceName) + { + _discovery = discovery; + _serviceName = serviceName; + } + + protected override async Task SendAsync( + HttpRequestMessage request, + CancellationToken ct) + { + var serviceUri = await _discovery.GetServiceUriAsync(_serviceName, ct); + + if (serviceUri == null) + throw new ServiceNotFoundException(_serviceName); + + // EN: Replace the request URI with discovered service + // VI: Thay thế URI request với service được discover + var builder = new UriBuilder(request.RequestUri!) + { + Scheme = serviceUri.Scheme, + Host = serviceUri.Host, + Port = serviceUri.Port + }; + request.RequestUri = builder.Uri; + + return await base.SendAsync(request, ct); + } +} +``` + +### Health Check Endpoints + +```csharp +/// +/// EN: Health check endpoints for service discovery. +/// VI: Health check endpoints cho service discovery. +/// + +// Program.cs +builder.Services.AddHealthChecks() + .AddDbContextCheck("database") + .AddRedis(builder.Configuration.GetConnectionString("Redis")!, "redis") + .AddRabbitMQ(builder.Configuration.GetConnectionString("RabbitMQ")!, "rabbitmq"); + +var app = builder.Build(); + +// EN: Liveness probe - is the service running? +app.MapHealthChecks("/health/live", new HealthCheckOptions +{ + Predicate = _ => false, // No checks, just confirm the process is running + ResponseWriter = WriteMinimalResponse +}); + +// EN: Readiness probe - can the service handle requests? +app.MapHealthChecks("/health/ready", new HealthCheckOptions +{ + Predicate = check => check.Tags.Contains("ready"), + ResponseWriter = WriteDetailedResponse +}); + +// EN: Full health check for monitoring +app.MapHealthChecks("/health", new HealthCheckOptions +{ + ResponseWriter = WriteDetailedResponse +}); + +static Task WriteMinimalResponse(HttpContext context, HealthReport report) +{ + context.Response.ContentType = "text/plain"; + return context.Response.WriteAsync(report.Status.ToString()); +} + +static Task WriteDetailedResponse(HttpContext context, HealthReport report) +{ + context.Response.ContentType = "application/json"; + var response = new + { + status = report.Status.ToString(), + checks = report.Entries.Select(e => new + { + name = e.Key, + status = e.Value.Status.ToString(), + description = e.Value.Description, + duration = e.Value.Duration.TotalMilliseconds + }), + totalDuration = report.TotalDuration.TotalMilliseconds + }; + return context.Response.WriteAsJsonAsync(response); +} +``` + +## Common Mistakes / Lỗi Thường Gặp + +### 1. Hardcoded Service URLs + +```csharp +// ❌ BAD: Hardcoded URLs +var client = new HttpClient +{ + BaseAddress = new Uri("http://10.0.1.50:5001") // What if IP changes? +}; + +// ✅ GOOD: Use service discovery +var client = new HttpClient +{ + BaseAddress = new Uri("http://user-service.goodgo.svc.cluster.local") +}; + +// Or with Consul +var serviceUri = await _discovery.GetServiceUriAsync("user-service"); +``` + +### 2. No Health Checks + +```yaml +# ❌ BAD: No health probes +spec: + containers: + - name: api + image: my-api:latest + +# ✅ GOOD: With health probes +spec: + containers: + - name: api + image: my-api:latest + livenessProbe: + httpGet: + path: /health/live + port: 80 + readinessProbe: + httpGet: + path: /health/ready + port: 80 +``` + +### 3. Missing Retry on Discovery Failure + +```csharp +// ❌ BAD: Single attempt +var service = await _discovery.GetServiceAsync("order-service"); +await _client.GetAsync(service.Uri + "/api/orders"); + +// ✅ GOOD: With retry and fallback +var services = await _discovery.GetHealthyServicesAsync("order-service"); +foreach (var service in services) +{ + try + { + return await _client.GetAsync(service.Uri + "/api/orders"); + } + catch (HttpRequestException) + { + continue; // Try next instance + } +} +throw new ServiceUnavailableException("order-service"); +``` + +## Quick Reference / Tham Chiếu Nhanh + +### Kubernetes Service DNS Format + +| Type | Format | Example | +|------|--------|---------| +| Same namespace | `` | `user-service` | +| Different namespace | `.` | `user-service.goodgo` | +| Full FQDN | `..svc.cluster.local` | `user-service.goodgo.svc.cluster.local` | + +### Consul Health Check Types + +| Type | Use For | +|------|---------| +| HTTP | REST APIs | +| TCP | Databases, caches | +| gRPC | gRPC services | +| Script | Custom checks | + +### Health Probe Configuration + +| Probe | Purpose | Failure Action | +|-------|---------|----------------| +| Liveness | Is process alive? | Restart container | +| Readiness | Can handle traffic? | Remove from LB | +| Startup | Is starting up? | Wait before checking | + +## Resources / Tài Nguyên + +- [Detailed Examples](./references/REFERENCE.md) - Full code examples +- [Deployment Kubernetes](../deployment-kubernetes/SKILL.md) - K8s patterns +- [Docker Traefik](../docker-traefik/SKILL.md) - Container networking +- [Inter-service Communication](../inter-service-communication/SKILL.md) - HTTP clients +- [Error Handling](../error-handling-patterns/SKILL.md) - Resilience patterns diff --git a/.agent/skills/service-discovery/references/REFERENCE.md b/.agent/skills/service-discovery/references/REFERENCE.md new file mode 100644 index 00000000..299c1d12 --- /dev/null +++ b/.agent/skills/service-discovery/references/REFERENCE.md @@ -0,0 +1,559 @@ +# Service Discovery - Reference Examples + +## Complete Implementation Examples + +### 1. Complete Consul Integration + +```csharp +/// +/// EN: Complete Consul service discovery setup. +/// VI: Cài đặt Consul service discovery hoàn chỉnh. +/// + +// Extensions/ConsulExtensions.cs +public static class ConsulExtensions +{ + public static IServiceCollection AddConsulDiscovery( + this IServiceCollection services, + IConfiguration configuration) + { + var consulConfig = configuration.GetSection("Consul").Get()!; + + // EN: Register Consul client + services.AddSingleton(new ConsulClient(cfg => + { + cfg.Address = new Uri(consulConfig.Address); + if (!string.IsNullOrEmpty(consulConfig.Token)) + { + cfg.Token = consulConfig.Token; + } + })); + + // EN: Register service discovery + services.AddSingleton(); + + // EN: Register background service for registration + services.AddHostedService(); + + return services; + } + + public static IHttpClientBuilder AddServiceDiscovery( + this IHttpClientBuilder builder, + string serviceName) + { + builder.AddHttpMessageHandler(sp => + { + var discovery = sp.GetRequiredService(); + return new ServiceDiscoveryDelegatingHandler(discovery, serviceName); + }); + + return builder; + } +} + +public class ConsulConfig +{ + public string Address { get; set; } = "http://localhost:8500"; + public string? Token { get; set; } + public string ServiceName { get; set; } = default!; + public string ServiceHost { get; set; } = default!; + public int ServicePort { get; set; } + public string[] Tags { get; set; } = Array.Empty(); +} +``` + +### 2. Service Discovery Interface and Implementation + +```csharp +/// +/// EN: Service discovery abstraction. +/// VI: Abstraction cho service discovery. +/// +public interface IServiceDiscovery +{ + Task GetServiceAsync(string serviceName, CancellationToken ct = default); + Task> GetServicesAsync(string serviceName, CancellationToken ct = default); + Task RegisterAsync(ServiceRegistration registration, CancellationToken ct = default); + Task DeregisterAsync(string serviceId, CancellationToken ct = default); +} + +public record ServiceInstance( + string Id, + string Name, + string Host, + int Port, + IReadOnlyDictionary Metadata) +{ + public Uri Uri => new($"http://{Host}:{Port}"); +} + +public record ServiceRegistration( + string Name, + string Host, + int Port, + string[] Tags, + TimeSpan HealthCheckInterval); + +/// +/// EN: Consul implementation of service discovery. +/// VI: Triển khai Consul cho service discovery. +/// +public class ConsulServiceDiscovery : IServiceDiscovery +{ + private readonly IConsulClient _consulClient; + private readonly ILogger _logger; + private readonly ConcurrentDictionary _lastCacheTime = new(); + private readonly ConcurrentDictionary> _cache = new(); + private readonly TimeSpan _cacheDuration = TimeSpan.FromSeconds(10); + + public ConsulServiceDiscovery( + IConsulClient consulClient, + ILogger logger) + { + _consulClient = consulClient; + _logger = logger; + } + + public async Task GetServiceAsync( + string serviceName, + CancellationToken ct = default) + { + var services = await GetServicesAsync(serviceName, ct); + + if (services.Count == 0) + return null; + + // EN: Random selection for load balancing + // VI: Chọn ngẫu nhiên để load balancing + return services[Random.Shared.Next(services.Count)]; + } + + public async Task> GetServicesAsync( + string serviceName, + CancellationToken ct = default) + { + // EN: Check cache + if (_lastCacheTime.TryGetValue(serviceName, out var lastTime) && + DateTime.UtcNow - lastTime < _cacheDuration && + _cache.TryGetValue(serviceName, out var cached)) + { + return cached; + } + + // EN: Query Consul for healthy services + var result = await _consulClient.Health.Service( + serviceName, + tag: null, + passingOnly: true, + ct); + + var instances = result.Response + .Select(s => new ServiceInstance( + s.Service.ID, + s.Service.Service, + s.Service.Address, + s.Service.Port, + s.Service.Meta ?? new Dictionary())) + .ToList(); + + // EN: Update cache + _cache[serviceName] = instances; + _lastCacheTime[serviceName] = DateTime.UtcNow; + + _logger.LogDebug( + "Discovered {Count} instances for {ServiceName}", + instances.Count, serviceName); + + return instances; + } + + public async Task RegisterAsync( + ServiceRegistration registration, + CancellationToken ct = default) + { + var serviceId = $"{registration.Name}-{Guid.NewGuid():N}"; + + var consulRegistration = new AgentServiceRegistration + { + ID = serviceId, + Name = registration.Name, + Address = registration.Host, + Port = registration.Port, + Tags = registration.Tags, + Check = new AgentServiceCheck + { + HTTP = $"http://{registration.Host}:{registration.Port}/health", + Interval = registration.HealthCheckInterval, + Timeout = TimeSpan.FromSeconds(5), + DeregisterCriticalServiceAfter = TimeSpan.FromMinutes(1) + } + }; + + await _consulClient.Agent.ServiceRegister(consulRegistration, ct); + + _logger.LogInformation( + "Registered service {ServiceName} as {ServiceId}", + registration.Name, serviceId); + } + + public async Task DeregisterAsync(string serviceId, CancellationToken ct = default) + { + await _consulClient.Agent.ServiceDeregister(serviceId, ct); + _logger.LogInformation("Deregistered service {ServiceId}", serviceId); + } +} +``` + +### 3. Service Discovery HTTP Handler + +```csharp +/// +/// EN: HTTP message handler that resolves service URIs via discovery. +/// VI: HTTP message handler giải quyết URIs qua discovery. +/// +public class ServiceDiscoveryDelegatingHandler : DelegatingHandler +{ + private readonly IServiceDiscovery _discovery; + private readonly string _serviceName; + private readonly ILogger _logger; + + public ServiceDiscoveryDelegatingHandler( + IServiceDiscovery discovery, + string serviceName) + { + _discovery = discovery; + _serviceName = serviceName; + _logger = LoggerFactory.Create(b => b.AddConsole()) + .CreateLogger(); + } + + protected override async Task SendAsync( + HttpRequestMessage request, + CancellationToken ct) + { + var services = await _discovery.GetServicesAsync(_serviceName, ct); + + if (services.Count == 0) + { + throw new ServiceNotFoundException(_serviceName); + } + + // EN: Try each healthy instance + // VI: Thử từng instance healthy + var shuffled = services.OrderBy(_ => Random.Shared.Next()).ToList(); + Exception? lastException = null; + + foreach (var service in shuffled) + { + try + { + // EN: Clone request for retry + using var clonedRequest = await CloneRequestAsync(request); + + // EN: Update request URI + var builder = new UriBuilder(clonedRequest.RequestUri!) + { + Scheme = service.Uri.Scheme, + Host = service.Uri.Host, + Port = service.Uri.Port + }; + clonedRequest.RequestUri = builder.Uri; + + _logger.LogDebug( + "Sending request to {Uri}", + clonedRequest.RequestUri); + + return await base.SendAsync(clonedRequest, ct); + } + catch (HttpRequestException ex) + { + _logger.LogWarning(ex, + "Request to {Service} failed, trying next instance", + service.Uri); + lastException = ex; + } + } + + throw new ServiceUnavailableException( + _serviceName, + "All service instances failed", + lastException); + } + + private static async Task CloneRequestAsync( + HttpRequestMessage request) + { + var clone = new HttpRequestMessage(request.Method, request.RequestUri); + + foreach (var header in request.Headers) + { + clone.Headers.TryAddWithoutValidation(header.Key, header.Value); + } + + if (request.Content != null) + { + var content = await request.Content.ReadAsByteArrayAsync(); + clone.Content = new ByteArrayContent(content); + + foreach (var header in request.Content.Headers) + { + clone.Content.Headers.TryAddWithoutValidation(header.Key, header.Value); + } + } + + return clone; + } +} + +public class ServiceNotFoundException : Exception +{ + public string ServiceName { get; } + + public ServiceNotFoundException(string serviceName) + : base($"No healthy instances found for service: {serviceName}") + { + ServiceName = serviceName; + } +} + +public class ServiceUnavailableException : Exception +{ + public string ServiceName { get; } + + public ServiceUnavailableException( + string serviceName, + string message, + Exception? innerException = null) + : base($"Service {serviceName} unavailable: {message}", innerException) + { + ServiceName = serviceName; + } +} +``` + +### 4. Kubernetes-Native Service Discovery + +```csharp +/// +/// EN: Service discovery for Kubernetes using DNS. +/// VI: Service discovery cho Kubernetes dùng DNS. +/// +public class KubernetesServiceDiscovery : IServiceDiscovery +{ + private readonly IConfiguration _configuration; + private readonly ILogger _logger; + + public KubernetesServiceDiscovery( + IConfiguration configuration, + ILogger logger) + { + _configuration = configuration; + _logger = logger; + } + + public Task GetServiceAsync( + string serviceName, + CancellationToken ct = default) + { + // EN: In Kubernetes, use DNS for service discovery + // VI: Trong Kubernetes, dùng DNS cho service discovery + var namespace_ = _configuration["Kubernetes:Namespace"] ?? "default"; + var host = $"{serviceName}.{namespace_}.svc.cluster.local"; + var port = GetServicePort(serviceName); + + var instance = new ServiceInstance( + $"{serviceName}-k8s", + serviceName, + host, + port, + new Dictionary()); + + return Task.FromResult(instance); + } + + public Task> GetServicesAsync( + string serviceName, + CancellationToken ct = default) + { + // EN: For K8s, we typically use the service DNS which load balances automatically + // VI: Với K8s, ta thường dùng service DNS vốn tự động load balance + var instance = GetServiceAsync(serviceName, ct).Result; + return Task.FromResult>( + instance != null ? new[] { instance } : Array.Empty()); + } + + public Task RegisterAsync(ServiceRegistration registration, CancellationToken ct = default) + { + // EN: In Kubernetes, registration happens via Pod labels/Service selectors + // VI: Trong Kubernetes, đăng ký qua Pod labels/Service selectors + _logger.LogInformation( + "Kubernetes service registration is automatic via Pod labels"); + return Task.CompletedTask; + } + + public Task DeregisterAsync(string serviceId, CancellationToken ct = default) + { + // EN: In Kubernetes, deregistration happens when Pod terminates + // VI: Trong Kubernetes, hủy đăng ký khi Pod kết thúc + _logger.LogInformation( + "Kubernetes service deregistration is automatic when Pod terminates"); + return Task.CompletedTask; + } + + private int GetServicePort(string serviceName) + { + // EN: Get port from configuration or use default + // VI: Lấy port từ config hoặc dùng mặc định + var configKey = $"Services:{serviceName}:Port"; + return _configuration.GetValue(configKey) ?? 80; + } +} + +/// +/// EN: Extension to configure service discovery based on environment. +/// VI: Extension cấu hình service discovery theo môi trường. +/// +public static class ServiceDiscoveryExtensions +{ + public static IServiceCollection AddServiceDiscovery( + this IServiceCollection services, + IConfiguration configuration) + { + var discoveryType = configuration.GetValue("ServiceDiscovery:Type"); + + switch (discoveryType?.ToLower()) + { + case "consul": + services.AddConsulDiscovery(configuration); + break; + + case "kubernetes": + default: + services.AddSingleton(); + break; + } + + return services; + } +} +``` + +### 5. Complete Usage Example + +```csharp +/// +/// EN: Complete example using service discovery. +/// VI: Ví dụ hoàn chỉnh sử dụng service discovery. +/// + +// Program.cs +var builder = WebApplication.CreateBuilder(args); + +// EN: Add service discovery +builder.Services.AddServiceDiscovery(builder.Configuration); + +// EN: Add HTTP clients with discovery +builder.Services.AddHttpClient() + .AddServiceDiscovery("user-service") + .AddStandardResilienceHandler(); + +builder.Services.AddHttpClient() + .AddServiceDiscovery("order-service") + .AddStandardResilienceHandler(); + +// EN: Add health checks +builder.Services.AddHealthChecks() + .AddCheck("service-discovery"); + +var app = builder.Build(); + +app.MapHealthChecks("/health"); +app.MapControllers(); +app.Run(); + +// HealthChecks/ServiceDiscoveryHealthCheck.cs +public class ServiceDiscoveryHealthCheck : IHealthCheck +{ + private readonly IServiceDiscovery _discovery; + private readonly string[] _requiredServices = { "user-service", "order-service" }; + + public ServiceDiscoveryHealthCheck(IServiceDiscovery discovery) + { + _discovery = discovery; + } + + public async Task CheckHealthAsync( + HealthCheckContext context, + CancellationToken ct = default) + { + var missingServices = new List(); + + foreach (var service in _requiredServices) + { + var instances = await _discovery.GetServicesAsync(service, ct); + if (instances.Count == 0) + { + missingServices.Add(service); + } + } + + if (missingServices.Any()) + { + return HealthCheckResult.Unhealthy( + $"Missing services: {string.Join(", ", missingServices)}"); + } + + return HealthCheckResult.Healthy("All required services available"); + } +} +``` + +## Docker Compose for Local Development + +```yaml +version: '3.8' + +services: + consul: + image: consul:1.15 + ports: + - "8500:8500" + command: agent -dev -ui -client=0.0.0.0 + + user-service: + build: + context: ./src/UserService + environment: + - ASPNETCORE_ENVIRONMENT=Development + - Consul__Address=http://consul:8500 + - Service__Name=user-service + - Service__Host=user-service + - Service__Port=80 + depends_on: + - consul + + order-service: + build: + context: ./src/OrderService + environment: + - ASPNETCORE_ENVIRONMENT=Development + - Consul__Address=http://consul:8500 + - Service__Name=order-service + - Service__Host=order-service + - Service__Port=80 + depends_on: + - consul + + api-gateway: + build: + context: ./src/ApiGateway + ports: + - "5000:80" + environment: + - Consul__Address=http://consul:8500 + depends_on: + - consul + - user-service + - order-service +``` diff --git a/note.md b/note.md index 6e39a7f0..d8249abb 100644 --- a/note.md +++ b/note.md @@ -16,4 +16,9 @@ rm-urlencoded" \ > -d "scope=openid profile email api offline_access" 2> &1 | jq . -1. Kiểm tra hỗ trợ cho MSSQL, PSQL, MongoDB \ No newline at end of file +1. Kiểm tra hỗ trợ cho MSSQL, PSQL, MongoDB + + +cd deployments/local +docker compose build wallet-service-net +docker compose up -d wallet-service-net