# Outbox Pattern - Reference Examples ## Complete Implementation Examples ### 1. Complete Outbox Entity Configuration ```csharp /// /// EN: EF Core configuration for OutboxMessage. /// VI: Cấu hình EF Core cho OutboxMessage. /// public class OutboxMessageConfiguration : IEntityTypeConfiguration { public void Configure(EntityTypeBuilder builder) { builder.ToTable("OutboxMessages"); builder.HasKey(e => e.Id); builder.Property(e => e.EventType) .HasMaxLength(500) .IsRequired(); builder.Property(e => e.Payload) .HasColumnType("jsonb") .IsRequired(); builder.Property(e => e.CreatedAt) .IsRequired(); builder.Property(e => e.Error) .HasMaxLength(2000); // EN: Index for fast retrieval of unprocessed messages // VI: Index để truy vấn nhanh messages chưa xử lý builder.HasIndex(e => new { e.IsProcessed, e.CreatedAt }) .HasFilter("\"IsProcessed\" = false") .HasDatabaseName("IX_OutboxMessages_Pending"); // EN: Index for retry tracking // VI: Index để theo dõi retry builder.HasIndex(e => e.RetryCount) .HasFilter("\"IsProcessed\" = false AND \"RetryCount\" >= 3") .HasDatabaseName("IX_OutboxMessages_FailedRetries"); } } ``` ### 2. Inbox Table for Idempotency ```csharp /// /// EN: Inbox message for tracking processed events. /// VI: Inbox message để theo dõi events đã xử lý. /// public class InboxMessage { public Guid Id { get; set; } // Same as incoming MessageId public string ConsumerType { get; set; } = default!; public DateTime ProcessedAt { get; set; } } /// /// EN: Inbox repository for idempotency. /// VI: Repository inbox cho idempotency. /// public class InboxRepository : IInboxRepository { private readonly AppDbContext _context; public InboxRepository(AppDbContext context) { _context = context; } public async Task ExistsAsync(Guid messageId, string consumerType, CancellationToken ct) { return await _context.InboxMessages .AnyAsync(m => m.Id == messageId && m.ConsumerType == consumerType, ct); } public async Task MarkAsProcessedAsync(Guid messageId, string consumerType, CancellationToken ct) { _context.InboxMessages.Add(new InboxMessage { Id = messageId, ConsumerType = consumerType, ProcessedAt = DateTime.UtcNow }); await _context.SaveChangesAsync(ct); } } ``` ### 3. Idempotent Consumer Base Class ```csharp /// /// EN: Base class for idempotent message consumers. /// VI: Lớp cơ sở cho consumer idempotent. /// public abstract class IdempotentConsumer : IConsumer where TMessage : class { private readonly IInboxRepository _inbox; private readonly ILogger _logger; protected IdempotentConsumer( IInboxRepository inbox, ILogger logger) { _inbox = inbox; _logger = logger; } public async Task Consume(ConsumeContext context) { var messageId = context.MessageId ?? Guid.NewGuid(); var consumerType = GetType().Name; // EN: Check if already processed // VI: Kiểm tra đã xử lý chưa if (await _inbox.ExistsAsync(messageId, consumerType, context.CancellationToken)) { _logger.LogInformation( "Message {MessageId} already processed by {Consumer}, skipping", messageId, consumerType); return; } try { await HandleAsync(context.Message, context.CancellationToken); // EN: Mark as processed // VI: Đánh dấu đã xử lý await _inbox.MarkAsProcessedAsync(messageId, consumerType, context.CancellationToken); _logger.LogInformation( "Processed message {MessageId} by {Consumer}", messageId, consumerType); } catch (Exception ex) { _logger.LogError(ex, "Failed to process message {MessageId} by {Consumer}", messageId, consumerType); throw; } } protected abstract Task HandleAsync(TMessage message, CancellationToken ct); } /// /// EN: Example idempotent consumer. /// VI: Ví dụ consumer idempotent. /// public class OrderCreatedConsumer : IdempotentConsumer { private readonly IInventoryService _inventory; public OrderCreatedConsumer( IInboxRepository inbox, IInventoryService inventory, ILogger logger) : base(inbox, logger) { _inventory = inventory; } protected override async Task HandleAsync( OrderCreatedIntegrationEvent message, CancellationToken ct) { foreach (var item in message.Items) { await _inventory.ReserveAsync( item.ProductId, item.Quantity, message.OrderId, ct); } } } ``` ### 4. Enhanced Outbox Processor with Parallel Processing ```csharp /// /// EN: Enhanced outbox processor with parallel processing. /// VI: Outbox processor nâng cao với xử lý song song. /// public class EnhancedOutboxProcessor : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; private readonly ILogger _logger; private readonly OutboxOptions _options; public EnhancedOutboxProcessor( IServiceScopeFactory scopeFactory, IOptions options, ILogger logger) { _scopeFactory = scopeFactory; _options = options.Value; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken ct) { _logger.LogInformation( "Outbox Processor started with {Interval}s interval, batch size {BatchSize}", _options.PollingIntervalSeconds, _options.BatchSize); using var timer = new PeriodicTimer( TimeSpan.FromSeconds(_options.PollingIntervalSeconds)); while (await timer.WaitForNextTickAsync(ct)) { try { var processedCount = await ProcessBatchAsync(ct); if (processedCount > 0) _logger.LogInformation("Processed {Count} outbox messages", processedCount); } catch (Exception ex) when (ex is not OperationCanceledException) { _logger.LogError(ex, "Error processing outbox batch"); } } } private async Task ProcessBatchAsync(CancellationToken ct) { using var scope = _scopeFactory.CreateScope(); var context = scope.ServiceProvider.GetRequiredService(); var publishEndpoint = scope.ServiceProvider.GetRequiredService(); // EN: Use SKIP LOCKED for concurrent processing // VI: Dùng SKIP LOCKED cho xử lý đồng thời var messages = await context.OutboxMessages .FromSqlRaw(@" SELECT * FROM ""OutboxMessages"" WHERE ""IsProcessed"" = false AND ""RetryCount"" < @p0 ORDER BY ""CreatedAt"" LIMIT @p1 FOR UPDATE SKIP LOCKED", _options.MaxRetries, _options.BatchSize) .ToListAsync(ct); if (!messages.Any()) return 0; // EN: Process in parallel with semaphore // VI: Xử lý song song với semaphore using var semaphore = new SemaphoreSlim(_options.MaxParallelism); var tasks = messages.Select(async message => { await semaphore.WaitAsync(ct); try { await ProcessMessageAsync(message, publishEndpoint, ct); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks); await context.SaveChangesAsync(ct); return messages.Count; } private async Task ProcessMessageAsync( OutboxMessage message, IPublishEndpoint publishEndpoint, CancellationToken ct) { try { var eventType = Type.GetType(message.EventType); if (eventType == null) { message.Error = $"Unknown event type: {message.EventType}"; message.IsProcessed = true; // Dead letter return; } var @event = JsonSerializer.Deserialize(message.Payload, eventType); await publishEndpoint.Publish(@event!, eventType, ct); message.IsProcessed = true; message.ProcessedAt = DateTime.UtcNow; message.Error = null; } catch (Exception ex) { message.RetryCount++; message.Error = ex.Message.Length > 2000 ? ex.Message[..2000] : ex.Message; _logger.LogWarning(ex, "Failed to publish message {Id}, retry {Retry}/{Max}", message.Id, message.RetryCount, _options.MaxRetries); } } } public class OutboxOptions { public int PollingIntervalSeconds { get; set; } = 5; public int BatchSize { get; set; } = 100; public int MaxRetries { get; set; } = 5; public int MaxParallelism { get; set; } = 4; } ``` ### 5. MassTransit Outbox Integration ```csharp /// /// EN: Configure MassTransit with built-in outbox. /// VI: Cấu hình MassTransit với outbox tích hợp. /// public static class MassTransitOutboxExtensions { public static IServiceCollection AddMassTransitWithOutbox( this IServiceCollection services, IConfiguration configuration) { services.AddMassTransit(x => { x.AddConsumer(); x.AddConsumer(); // EN: Add Entity Framework Outbox // VI: Thêm Entity Framework Outbox x.AddEntityFrameworkOutbox(o => { o.UsePostgres(); o.UseBusOutbox(); // EN: Configure delivery service // VI: Cấu hình delivery service o.QueryDelay = TimeSpan.FromSeconds(5); o.QueryMessageLimit = 100; o.QueryTimeout = TimeSpan.FromSeconds(30); }); x.UsingRabbitMq((context, cfg) => { cfg.Host(configuration["RabbitMQ:Host"], "/", h => { h.Username(configuration["RabbitMQ:Username"]!); h.Password(configuration["RabbitMQ:Password"]!); }); cfg.ConfigureEndpoints(context); }); }); return services; } } /// /// EN: Using MassTransit outbox in handler. /// VI: Sử dụng MassTransit outbox trong handler. /// public class CreateOrderHandler : IRequestHandler { private readonly AppDbContext _context; private readonly IPublishEndpoint _publishEndpoint; public CreateOrderHandler( AppDbContext context, IPublishEndpoint publishEndpoint) { _context = context; _publishEndpoint = publishEndpoint; } public async Task Handle(CreateOrderCommand request, CancellationToken ct) { var order = new Order(request.UserId, request.Address); _context.Orders.Add(order); // EN: MassTransit automatically saves to outbox table // VI: MassTransit tự động lưu vào outbox table await _publishEndpoint.Publish(new OrderCreatedIntegrationEvent { OrderId = order.Id, UserId = order.UserId }, ct); // EN: Both order and outbox message saved in same transaction // VI: Cả order và outbox message được lưu trong cùng transaction await _context.SaveChangesAsync(ct); return new OrderResult(order.Id); } } ``` ### 6. Cleanup Job for Processed Messages ```csharp /// /// EN: Background job to clean up processed outbox messages. /// VI: Job background để dọn dẹp outbox messages đã xử lý. /// public class OutboxCleanupJob : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; private readonly ILogger _logger; private readonly TimeSpan _cleanupInterval = TimeSpan.FromHours(1); private readonly TimeSpan _retentionPeriod = TimeSpan.FromDays(7); public OutboxCleanupJob( IServiceScopeFactory scopeFactory, ILogger logger) { _scopeFactory = scopeFactory; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) { await Task.Delay(_cleanupInterval, ct); try { await CleanupAsync(ct); } catch (Exception ex) { _logger.LogError(ex, "Error during outbox cleanup"); } } } private async Task CleanupAsync(CancellationToken ct) { using var scope = _scopeFactory.CreateScope(); var context = scope.ServiceProvider.GetRequiredService(); var cutoffDate = DateTime.UtcNow - _retentionPeriod; var deletedCount = await context.OutboxMessages .Where(m => m.IsProcessed && m.ProcessedAt < cutoffDate) .ExecuteDeleteAsync(ct); if (deletedCount > 0) _logger.LogInformation("Cleaned up {Count} processed outbox messages", deletedCount); // EN: Clean up inbox messages too // VI: Dọn dẹp inbox messages var inboxDeletedCount = await context.InboxMessages .Where(m => m.ProcessedAt < cutoffDate) .ExecuteDeleteAsync(ct); if (inboxDeletedCount > 0) _logger.LogInformation("Cleaned up {Count} inbox messages", inboxDeletedCount); } } ``` ## Database Migrations ```sql -- EN: Create OutboxMessages table / VI: Tạo bảng OutboxMessages CREATE TABLE "OutboxMessages" ( "Id" uuid PRIMARY KEY, "EventType" varchar(500) NOT NULL, "Payload" jsonb NOT NULL, "CreatedAt" timestamp with time zone NOT NULL, "ProcessedAt" timestamp with time zone, "IsProcessed" boolean NOT NULL DEFAULT false, "RetryCount" integer NOT NULL DEFAULT 0, "Error" varchar(2000) ); CREATE INDEX "IX_OutboxMessages_Pending" ON "OutboxMessages" ("IsProcessed", "CreatedAt") WHERE "IsProcessed" = false; -- EN: Create InboxMessages table / VI: Tạo bảng InboxMessages CREATE TABLE "InboxMessages" ( "Id" uuid NOT NULL, "ConsumerType" varchar(500) NOT NULL, "ProcessedAt" timestamp with time zone NOT NULL, PRIMARY KEY ("Id", "ConsumerType") ); CREATE INDEX "IX_InboxMessages_ProcessedAt" ON "InboxMessages" ("ProcessedAt"); ```