As we know, adapting to microservice architecture has many benefits, but unfortunately, it also has some challenging points. Especially on the data consistency side. For example, in the monolithic world, we can use ACID database transactions to easily perform multiple operations in a consistent way. However, in the microservice world, as transactions need to be handled by different distributed services, unfortunately, things are not as easy as in the monolithic world, and that’s why usually we need to provide eventual consistency through events.
Also, we get benefits from design patterns such as Saga for long-running business operations to perform them in a consistent way. This means, that each microservice individually does its responsibilities in its own database, then publishes an event so that other microservices can also do their respective responsibilities, or the related business operation gets coordinated by an orchestrator in an order.
I had covered this transaction management topic in different articles and with different scenarios before. For example, in this “Implementation of Choreography-based Saga in .NET Microservices” article, I had mentioned how we can easily implement the saga pattern in choreographic way. In addition, I had also mentioned the importance of resiliency for events and therefore we can benefit from design patterns such as outbox. So, we can consider this article as a continuation of the saga article. Also in this article, I will use the same example scenario which I used in the saga article. Therefore, in order to have a clear understanding, it would be helpful to first take a quick look at this article if you haven’t read it before.
In the distributed world, when the topic comes to the integrity of the business processes or having consistent and reliable data at different points, it is becoming important that the events need to be published in a resilient way.
For example, in the above scenario, there is an asynchronous payment process (the scenario I used in the saga article). After each service performs its related operation, they publish an event to the message queue and trigger the next process.
What if the related service cannot publish its related event for any reason after it has performed its own operation in its own database? For example, let’s assume that “StockReservedEvent” is published and payment operations are performed by “PaymentService“, but “PaymentCompletedEvent” couldn’t be published. Of course, we can handle transient network-related errors with various retry mechanisms. But in different scenarios such as when the message broker is down, we will be losing the relevant events. Having inconsistent data, especially in important areas such as this payment process, is a situation that none of us would like I guess.
In short, if we are working on important business processes, it is essential to have the data in a consistent and accurate state, and therefore it will be beneficial for us to publish the relevant events in an atomic fashion.
Outbox Pattern
Outbox pattern briefly offers us an approach where we can publish our events in a reliable way. In this approach, instead, publish events directly, we need to save them atomically in the outbox table as part of the relevant business process within the database of the relevant microservice, and then publish them by having another service. Thus, even if the message broker gets down, we will not lose the state of the relevant data and events, and we will ensure that eventual consistency can work in a reliable and durable way.
NOTE: Since outbox pattern works with the “at-least-once” delivery approach, duplications may occur when we publish events from the outbox table. Therefore, in order for consumers to handle duplication, we need to include an identifier inside events and we need to try to make our events them as much idempotent as possible.
There are different ways to implement this pattern depending on the tech stack. For example, if we are using Kafka as a message broker, we can easily do log-based change data capturing by using Debizium and stream the relevant changes to the kafka. If we are using NServiceBus, MassTransit or Cap as a service bus with a different broker, we can take advantage of the outbox feature they offer or we can implement it manually.
As part of this article, we will manually implement the outbox pattern on the asynchronous payment process example scenario that I used in the saga article.
Let’s Implement
We will manually implement the outbox pattern because we may not be using service buses that support this feature in the real world or we may be using a different message broker.
I faced a similar need on a private project that I was working on last year. Unfortunately, our customers hadn’t accessed the online services they purchased because of some important unpublished events. Since we were using RabbitMQ as the message broker and EasyNetQ as the service bus, I needed to implement the outbox pattern manually on the payment processing flow.
First, let’s get the sample project here. Let’s say we want to publish the PaymentCompletedEvent atomically after completing the payment operations in the Payment Service microservice. Because we were publishing the relevant event directly after the payment operations were completed as shown below.
public class StocksReservedEventConsumer : IConsumeAsync<StocksReservedEvent> { private readonly IPaymentService _paymentService; private readonly IBus _bus; public StocksReservedEventConsumer(IPaymentService paymentService, IBus bus) { _paymentService = paymentService; _bus = bus; } public async Task ConsumeAsync(StocksReservedEvent message, CancellationToken cancellationToken = default) { Tuple<bool, string> isPaymentCompleted = await _paymentService.DoPaymentAsync(message.WalletId, message.UserId, message.TotalAmount); if (isPaymentCompleted.Item1) { await _bus.PubSub.PublishAsync(new PaymentCompletedEvent { OrderId = message.OrderId }); } else { await _bus.PubSub.PublishAsync(new PaymentRejectedEvent { OrderId = message.OrderId, Reason = isPaymentCompleted.Item2 }); } } }
First, let’s create a class library called “PaymentService.Infra” (.NET 5.0), and then include the following packages via NuGet.
Microsoft.EntityFrameworkCore - 5.0.0 Microsoft.EntityFrameworkCore.SqlServer - 5.0.0
Now let’s define the outbox table, where we will store our events, by creating a folder named “Models” as below.
using System; namespace PaymentService.Infra.Models { public class OutboxMessage { public OutboxMessage() { EventDate = DateTime.UtcNow; } public int Id { get; set; } public string EventType { get; set; } public string EventPayload { get; set; } public DateTime EventDate { get; set; } public bool IsSent { get; set; } public DateTime? SentDate { get; set; } } }
CREATE TABLE OutboxMessages ( Id int IDENTITY PRIMARY KEY, EventType nvarchar(255) NOT NULL, EventPayload nvarchar(Max) NOT NULL, EventDate datetime NOT NULL, IsSent bit NOT NULL, SentDate datetime );
While developing the sample project before, we skipped some points in order to focus on the main subject and we didn’t implement the “DoPaymentAsync” method that performs payment operations. So now let’s include a simple “Payment” model into the project as follows.
using System; namespace PaymentService.Infra.Models { public class Payment { public Payment() { PaymentDate = DateTime.UtcNow; } public int Id { get; set; } public int UserId { get; set; } public int OrderId { get; set; } public int WalletId { get; set; } public decimal TotalAmount { get; set; } public bool IsPaid { get; set; } public DateTime PaymentDate { get; set; } } }
CREATE TABLE Payments ( Id int IDENTITY PRIMARY KEY, UserId int NOT NULL, OrderId int NOT NULL, WalletId int NOT NULL, TotalAmount decimal(18,4) NOT NULL, IsPaid bit NOT NULL, PaymentDate datetime NOT NULL );
Then, let’s create a class called “AppDbContext” and define the dbcontext in it.
using Microsoft.EntityFrameworkCore; using PaymentService.Models; namespace PaymentService.Infra { public class AppDbContext : DbContext { public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { } public DbSet<Payment> Payments { get; set; } public DbSet<OutboxMessage> OutboxEvents { get; set; } protected override void OnModelCreating(ModelBuilder builder) { builder.Entity<Payment>(entity => { entity.HasKey(k => k.Id); entity.Property(p => p.TotalAmount).HasPrecision(18, 4); entity.ToTable("Payments"); }); builder.Entity<OutboxMessage>(entity => { entity.HasKey(k => k.Id); entity.ToTable("OutboxMessages"); }); base.OnModelCreating(builder); } } }
Now let’s add the “PaymentService.Infra” class library as a reference to the Payment Service project and update the signature of the “DoPaymentAsync” method in the “IPaymentService” interface as follows.
using System.Threading.Tasks; namespace PaymentService.Services { public interface IPaymentService { Task DoPaymentAsync(int orderId, int walletId, int userId, decimal totalAmount); } }
Then, we can implement the outbox pattern in the “PaymentService” class as follows.
using System.Threading.Tasks; using Newtonsoft.Json; using PaymentService.Infra; using PaymentService.Infra.Models; using Shared.Contracts; namespace PaymentService.Services { public class PaymentService : IPaymentService { private readonly AppDbContext _appDbContext; public PaymentService(AppDbContext appDbContext) { _appDbContext = appDbContext; } public async Task DoPaymentAsync(int orderId, int walletId, int userId, decimal totalAmount) { // after payment operation is done... var isPaid = true; var payment = new Payment { OrderId = orderId, WalletId = walletId, UserId = userId, TotalAmount = totalAmount, IsPaid = isPaid }; await _appDbContext.Payments.AddAsync(payment); object paymentResultEvent; if (isPaid) { paymentResultEvent = new PaymentCompletedEvent { OrderId = orderId }; } else { paymentResultEvent = new PaymentRejectedEvent { OrderId = orderId, Reason = "" }; } var outboxMessage = new OutboxMessage { EventPayload = JsonConvert.SerializeObject(paymentResultEvent), EventType = paymentResultEvent.GetType().AssemblyQualifiedName }; await _appDbContext.OutboxEvents.AddAsync(outboxMessage); await _appDbContext.SaveChangesAsync(); } } }
As we can see, after the payment operation is completed, we save the “Payment” record and the related “PaymentCompletedEvent” in the outbox table as a part of the relevant business operation within the same transaction. In this way, we will make sure that the relevant event is resilient to any faults and we will ensure that other relevant domains can also have a consistent state.
Now let’s go to the “StocksReservedEventConsumer” class where we consume the “StocksReservedEvent” and refactor it as follows.
using System.Threading; using System.Threading.Tasks; using EasyNetQ.AutoSubscribe; using PaymentService.Services; using Shared.Contracts; namespace PaymentService.Consumers { public class StocksReservedEventConsumer : IConsumeAsync<StockReservedEvent> { private readonly IPaymentService _paymentService; public StocksReservedEventConsumer(IPaymentService paymentService) { _paymentService = paymentService; } public async Task ConsumeAsync(StocksReservedEvent message, CancellationToken cancellationToken = default) { await _paymentService.DoPaymentAsync(message.OrderId, message.WalletId, message.UserId, message.TotalAmount); } } }
Then, let’s add the connection string of the sample “PaymentDB“, which you will create for testing purposes, to the “appsettings.json” file and perform the necessary injection operations in the “Program.cs” class.
"ConnectionStrings": { "PaymentDB": "Server=127.0.0.1;Database=PaymentDB;Trusted_Connection=true;MultipleActiveResultSets=true" }
services.AddDbContext<AppDbContext>(x => x.UseSqlServer(hostContext.Configuration.GetConnectionString("PaymentDB")));
Thus, we have completed the first part of the outbox pattern.
Let’s run the Order Service, Stock Service and Payment Service to do a quick test, then create a test order through the Swagger UI of the Order Service.
As we can see, together with the “Payment” record, the related event is also inserted in the “OutboxMessages” table. Now we need a background service which will poll the table and publish the events.
For this, let’s create a separate console application called “PaymentServiceOutboxWorker” in the solution and then add “PaymentService.Infra” and “Shared.Contracts” class libraries as a reference. Also, we need to include the following packages via NuGet.
Microsoft.Extensions.Configuration.Json - 5.0.0 Microsoft.Extensions.DependencyInjection - 5.0.2 Microsoft.Extensions.Hosting - 5.0.0 EasyNetQ - 6.3.1
Now let’s define a class called “Worker” in the project and have the logic as follows.
using Microsoft.Extensions.Hosting; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using EasyNetQ; using Microsoft.Extensions.DependencyInjection; using Newtonsoft.Json; using PaymentService.Infra; using PaymentService.Infra.Models; namespace PaymentServiceOutboxWorker { public class Worker : BackgroundService { private readonly IServiceScopeFactory _scopeFactory; public Worker(IServiceScopeFactory scopeFactory) { _scopeFactory = scopeFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { await PublishOutboxMessages(stoppingToken); } } private async Task PublishOutboxMessages(CancellationToken stoppingToken) { try { using var scope = _scopeFactory.CreateScope(); await using var appDbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>(); IBus bus = scope.ServiceProvider.GetRequiredService<IBus>(); List<OutboxMessage> messages = appDbContext.OutboxMessages.Where(om => !om.IsSent).ToList(); foreach (OutboxMessage outboxMessage in messages) { try { var messageType = Type.GetType(outboxMessage.EventType); var message = JsonConvert.DeserializeObject(outboxMessage.EventPayload, messageType!); await bus.PubSub.PublishAsync(message, messageType); outboxMessage.IsSent = true; outboxMessage.SentDate = DateTime.UtcNow; appDbContext.OutboxMessages.Update(outboxMessage); await appDbContext.SaveChangesAsync(); } catch (Exception e) { Console.WriteLine(e); } } } catch (Exception e) { Console.WriteLine(e); } await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); } } }
Here we simply define a long-running background service using the “BackgroundService” base class. This service, which will run every 5 seconds, will poll the outbox table and perform the publishing operations of the unpublished events.
Thus, we will be able to make sure that the events, that are important to us, will reach their destination without getting lost so that the relevant business process can continue.
Now we need to do the necessary configuration and injection operations by updating the “Program.cs” class as follows.
using Microsoft.Extensions.Hosting; using EasyNetQ; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using PaymentService.Infra; namespace PaymentServiceOutboxWorker { class Program { static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); } static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureServices((hostContext, services) => { services.AddDbContext<AppDbContext>(x => x.UseSqlServer(hostContext.Configuration.GetConnectionString("PaymentDB"))); var bus = RabbitHutch.CreateBus(hostContext.Configuration["RabbitMQ:ConnectionString"]); services.AddSingleton<IBus>(bus); services.AddHostedService<Worker>(); }); } }
Then we can define the “appsettings.json” file.
{ "Logging": { "LogLevel": { "Default": "Information", "Microsoft": "Warning", "Microsoft.Hosting.Lifetime": "Information" } }, "RabbitMQ": { "ConnectionString": "host=localhost;username=guest;password=guest" }, "ConnectionStrings": { "PaymentDB": "Server=127.0.0.1;Database=PaymentDB;Trusted_Connection=true;MultipleActiveResultSets=true" } }
That’s all. When we run the “PaymentServiceOutboxWorker” service, we can see that the events will be published successfully to the relevant queue.
Thoughts
We have mentioned that this pattern works with the “at-least-once” delivery approach (as we can also see from the code above). In other words, we may encounter scenarios where the events are published more than once.
While trying to ensure the integrity of the data with the outbox pattern, on the other hand, we may also introduce duplications and cause the system and related business processes to have inconsistent data again. For this reason, we need to make sure that our events and their related consumers are idempotent as much as possible.
For example, we can include an identifier in the events and keep a record of each event in the database of the relevant consumer. Thus, before processing any event, we can make sure that it has not been processed before. With this approach, which is called inbox pattern, we can briefly guarantee exactly-once processing.
In addition, in cases where event ordering is needed, we can sort and process the events according to the parameters we will determine.
Final
Unfortunately, solving communication and data integrity problems in the microservice world is sometimes very challenging and needs to be done carefully. Sometimes a failure of an event also causes other business processes to fail due to inconsistent data within the system. For this reason, it is important that critical events need to be sent atomically and processed by idempotent consumers. Although outbox&inbox patterns look like a simple solution which we can apply everywhere, they may also turn an overkill for our system due to an unnecessary decision. Therefore, it would be best to decide what would be right for our relevant business use-cases before applying them.
References
https://microservices.io/patterns/data/transactional-outbox.html
dolu dolu bir makale olmuş, klavyene sağlık <3
Çok teşekkür ederim güzel yorumun için.
Elinize sağlık çok faydalı bir yazı olmuş. Ufak bir sorum var, worker service outbox dan işlenmemiş mesajları okuyup event atıyor event attıktan sonra da ilgili kaydı işlenmiş olarak update ediyor. Outbox da ki ana amaç kaybolan event sorununu çözmekti ama şuan burada yine bu sorun karşımıza çıkmıyor mu? Mesela worker event attıktan sonra ilgili eventi işlenmiş olarak işaret etti ama belki event önceki senaryolardaki gibi yine kayboldu bu durumda 5 saniyede çalışan iş bir sonraki çalışmasında bu kaybolan event i handle etmeyecek çünkü biz bunu eventi gönderdikten sonra işlenmiş olarak işaretledik. Burada kaçırdığım bir yer mi var?
Merhaba, teşekkür ederim değerli yorumunuz için. Kusura bakmayın anca görüyorum yorumları. Aslında worker tarafındaki olay, publisher tarafına göre biraz farklı. Worker tarafında eğer ilgili servicebus library’sinden ack bilgisini alıyorsam, gittiğine eminsem, ozaman işaretliyoruz ilgili event publish edildi olarak. Benim basit yaptığım kod implementasyonuna lütfen takılmayın. Eğer ilgili event publish edilemezse, hata alırsak veya act bilgisi alamazsak vs. zaten ilgili event Outbox tablosunda duruyor olacak ve tekrar denenecek gönderilene kadar gibi.
Yukarıdaki bir şemada client’e çok yük bindirmişsin bir kanaldan dagılıyor asynchronous olan şema yüklü bir işlemde kasma yapmazmı?
Selam Mert, kusura bakma geç cevap için. Yük bindirmişsin dediğin noktayı anlayamadım. Daha detaylı bir örnek verebilir misin?