How to Order Events in Microservices by Using Azure Service Bus (FIFO Consumers)

Sometimes there are some scenarios that we can’t avoid or have to implement even though we know their complexity or drawbacks. I think, having a need to process some events/messages that are relevant to each other in a certain order (that they are published) in a distributed environment is also one of them. For example, we may be doing batch processing and need to process each item in an order or we may be working in the fulfillment domain of an e-commerce company and we may need to process some relevant events in an order to update the status of orders correctly.

Normally if we consider that many message brokers work with the FIFO principle and provide FIFO ordering delivery, we can easily consume some relevant events in the order, they are published, by having a single publisher and subscriber. Of course, if we don’t count network or application-related errors that may occur during event processing. For example, due to a transient error, some events may easily be re-queued and break the relevant order or the message broker may not receive the ack information for a processed event and may make the relevant event available again for re-processing and can cause duplications. As an alternative solution, we may also think to include a timestamp in the events and perform the processing after performing the necessary checks. Unfortunately, this time we will have to deal with different problems such as race-conditions. Technically, of course, we can also handle such problems that may occur and have different logic in order to keep the event processing order safe. But as we have seen, although event ordering seems to be easily achievable with standard FIFO queues, things start to get complicated as there are no guarantees on the subscriber side.

Moreover, due to the requirement of a single subscriber, we will be losing some important principles such as scalability and decoupling and we will be sacrificing overall throughput.

Therefore, while designing our system, architecture or event payloads, it is very important that we need a design where we won’t need event ordering or at least minimize such operations, for the health of our system.

So, what do we need to do when we need event ordering and have to implement it? In this article, I will talk about how we can perform this operation in the most efficient way by using Azure Service Bus. Therefore, I will assume that we are using Azure Service Bus as a message broker, and we will look at how the “sessions” feature of Azure Service Bus can help us in this regard.

Azure Service Bus – Sessions

We have mentioned that normally if we use a message broker that provides FIFO event delivery guarantee, we can consume some relevant events as they are published by having a single publisher and subscriber. But we have also mentioned how fragile and complex this approach can be since there are no guarantees on the subscriber side. So, how can we provide this FIFO event delivery guarantee for the subscriber side in the easiest way?

This is where the sessions feature of the Azure Service Bus comes into play. By using sessions, we can tag some events, that we want, with a “SessionsId” so that tagged events can be processed as a group and in the order that they are published. In addition, we can perform this operation without sacrificing that much for the scalability principle.

So, How?

Ref: https://devblogs.microsoft.com/premier-developer/wp-content/uploads/sites/31/2019/09/word-image.png

First of all, in order to use this feature, we must be using one of the “Standard” or “Premium” tiers of Azure Service Bus. When creating a “queue” or “subscription” on Azure Service Bus, we also need to enable the message sessions feature. You can check here to see how to enable it.

NOTE: In queues where sessions feature is enabled, all events must contain a session id.

To briefly summarise the logic of sessions, when we publish events, they are published with a session id. For example an order id. Then, any subscriber receives an event, that doesn’t have a lock on it yet, from the relevant queue and then performs a lock operation for the session id of that event in order to process them. Thus, all events in the queue with the same session id only can be processed by the subscriber which holds the lock one by one and in the order in which they are published. Also, at this point, we don’t have to have a single subscriber. Of course, we will be sacrificing parallelism when processing events under one session, but we can process the same events which have different sessions in parallel.

Let’s Demonstrate It

Let’s perform a simple example to understand the topic better. Let’s say we work in an e-commerce company and we want to show the status changes of an order until it is delivered to the customer.

First, let’s create a .NET 7 class library project called Contracts and define an event called “OrderStatusChangedEvent” as follows.

namespace Contracts;

public class OrderStatusChangedEvent
{
    public OrderStatusChangedEvent(string orderId, string status, DateTime changeDate)
    {
        OrderId = orderId;
        Status = status;
        ChangeDate = changeDate;
    }

    public string OrderId { get; set; }
    public string Status { get; set; }
    public DateTime ChangeDate { get; set; }
}

Let’s assume that we publish the status changes of an order with this event and we have the statuses as below.

  • InPreparation
  • Shipped
  • DeliveryAttemptFailed
  • DeliveredToPickupPoint
  • Completed

Let’s Publish Events

Now let’s create a basic publisher where we will publish the related events. For this, let’s create a .NET 7 console application called Publisher and include the “Azure.Messaging.ServiceBus” package via NuGet. Then let’s add the Contracts project as a reference.

Now let’s edit the “Program.cs” class as follows.

using Azure.Messaging.ServiceBus;
using Contracts;

var connectionString = "Endpoint=sb://YOUR_SERVICE_BUS.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_SHAREDACCESSKEY";
var queueName = "order.status.queue";

await using var serviceBusClient = new ServiceBusClient(connectionString);
ServiceBusSender serviceBusSender = serviceBusClient.CreateSender(queueName);

var orderId = Guid.NewGuid().ToString();

// ------------------- Order status: In Preparation -------------------
var orderStatusChangedToInPreparationEvent = new OrderStatusChangedEvent(
    orderId: orderId,
    status: "InPreparation",
    changeDate: DateTime.UtcNow);

var orderStatusChangedToInPreparationMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(orderStatusChangedToInPreparationEvent))
{
    SessionId = orderId
};
await serviceBusSender.SendMessageAsync(orderStatusChangedToInPreparationMessage);


// ------------------- Order status: Shipped -------------------
var orderStatusChangedToShippedEvent = new OrderStatusChangedEvent(
    orderId: orderId,
    status: "Shipped",
    changeDate: DateTime.UtcNow);

var orderStatusChangedToShippedMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(orderStatusChangedToShippedEvent))
{
    SessionId = orderId
};
await serviceBusSender.SendMessageAsync(orderStatusChangedToShippedMessage);


// ------------------- Order status: Delivery Attempt Failed -------------------
var orderStatusChangedToDeliveryAttemptFailedEvent = new OrderStatusChangedEvent(
    orderId: orderId,
    status: "DeliveryAttemptFailed",
    changeDate: DateTime.UtcNow);

var orderStatusChangedToDeliveryAttemptFailedMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(orderStatusChangedToDeliveryAttemptFailedEvent))
{
    SessionId = orderId
};
await serviceBusSender.SendMessageAsync(orderStatusChangedToDeliveryAttemptFailedMessage);


// ------------------- Order status: Delivered to Pickup Point -------------------
var orderStatusChangedToDeliveredToPickupPointEvent = new OrderStatusChangedEvent(
    orderId: orderId,
    status: "DeliveredToPickupPoint",
    changeDate: DateTime.UtcNow);

var orderStatusChangedToDeliveredToPickupPointMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(orderStatusChangedToDeliveredToPickupPointEvent))
{
    SessionId = orderId
};
await serviceBusSender.SendMessageAsync(orderStatusChangedToDeliveredToPickupPointMessage);


// ------------------- Order status: Completed -------------------
var orderStatusChangedToCompletedEvent = new OrderStatusChangedEvent(
    orderId: orderId,
    status: "Completed",
    changeDate: DateTime.UtcNow);

var orderStatusChangedToCompletedMessage = new ServiceBusMessage(BinaryData.FromObjectAsJson(orderStatusChangedToCompletedEvent))
{
    SessionId = orderId,
};
orderStatusChangedToCompletedMessage.ApplicationProperties.Add("IsLastItem", true);

await serviceBusSender.SendMessageAsync(orderStatusChangedToCompletedMessage);

Here, we simply assume that we publish the status changes of the relevant order in a certain order that we receive.

Technically speaking, we initialize the “ServiceBusClient” and then the “ServiceBusSender” objects by specifying the queue information. Then, we group the events under a session id in order to process them on the subscriber side in the order that they are published. As the session id, we have used the id of the relevant order here.

The important point here is to provide the information when the relevant session will end. We can provide this information with a property that we will add to the event that we will publish at last. Because, after a subscriber holds a session and completes processing of all events under that session, first it must release the session it has held. So, in our example, we will perform the release of the relevant session by looking at the “IsLastItem” property that we have added to the last event.

Of course, a subscriber can process multiple sessions in parallel, but the sessions that we don’t release will negatively affect the session limit that the subscriber can process maximum in parallel.

Let’s Process

Now let’s take a look at the subscriber side. For this, let’s create a .NET 7 console application called Subscriber and include the “Azure.Messaging.ServiceBus” package via NuGet. Then let’s add the Contracts project as a reference again.

Then let’s edit the “Program.cs” class as follows.

using Azure.Messaging.ServiceBus;
using Contracts;

var connectionString = "Endpoint=sb://YOUR_SERVICE_BUS.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_SHAREDACCESSKEY";
var queueName = "order.status.queue";

await using var serviceBusClient = new ServiceBusClient(connectionString);
await using ServiceBusSessionProcessor sessionProcessor = serviceBusClient.CreateSessionProcessor(queueName);

sessionProcessor.ProcessMessageAsync += ProcessMessages;
sessionProcessor.ProcessErrorAsync += ProcessErrors;

Task ProcessMessages (ProcessSessionMessageEventArgs args)
{
    var orderStatusChangedEvent = args.Message.Body.ToObjectFromJson<OrderStatusChangedEvent>();

    Console.WriteLine($"Session ID: {args.SessionId}\nOrder ID: {orderStatusChangedEvent?.OrderId}\nStatus: {orderStatusChangedEvent?.Status}\nChanged Date: {orderStatusChangedEvent?.ChangeDate}");
    Console.WriteLine("------------------------------");

    var appProperties = args.Message.ApplicationProperties;
    if (appProperties != null && appProperties.TryGetValue("IsLastItem", out var isLastItem))
    {
        if ((bool)isLastItem)
        {
            Console.WriteLine("Session closed");
            args.ReleaseSession();
        }
    }

    return Task.CompletedTask;
}

Task ProcessErrors(ProcessErrorEventArgs args)
{
    Console.WriteLine("There is an error!");
    
    return Task.CompletedTask;
}

await sessionProcessor.StartProcessingAsync();
Console.ReadKey();

Here, we initialize the “ServiceBusSessionProcessor” object instead of “ServiceBusSender“. Then we define the “ProcessMessageAsync” and “ProcessErrorAsync” events to process the events.

At this point, the subscriber will get the first event in “order.status.queue” and hold a lock for its relevant session id. During the relevant session, it will take the events in the order they were published and perform the processing. Thanks to the lock held by the subscriber, the events under the relevant session won’t be taken by other subscribers and the events will be processed one by one by getting their orders preserved.

The default lock mode is “PeekLock“. Thus, the related event won’t be deleted from the queue until it is successfully processed during the session, and also by default, ack information will be automatically sent to Azure Service Bus. If we want to perform this operation manually under our control, it will be sufficient to set the “AutoCompleteMessages” property to “false” as follows when initializing the “ServiceBusSessionProcessor” object.

var options = new ServiceBusSessionProcessorOptions()
{
    AutoCompleteMessages = false
};
await using ServiceBusSessionProcessor sessionProcessor = serviceBusClient.CreateSessionProcessor(queueName, options);

Then, after processing the relevant event, we can inform the broker that we have processed the event as follows.

await args.CompleteMessageAsync();

In the “ReceiveAndDelete” lock mode, the related event is getting deleted from the relevant queue after it is received by the related subscriber. Frankly speaking, it is a risky approach as there is a risk of losing the relevant event in case of any error.

Also, we can determine the lock durations while creating the queues. If the processing times of the events will generally take longer, it will be to our advantage to extend or renew the relevant lock durations to be safe. We can perform the renew operation by calling the “args.RenewSessionLockAsync();” method during the processing. Otherwise, the relevant lock will be released and will be ready for being taken by other subscribers.

In addition, since these session operations are performed at the broker level when the relevant subscriber gets down due to any error, one of the related subscribers can accept the relevant session and continue to process the events from the point where they left off by preserving the same order.

Likewise, in case of any transient error, while processing an event, the related event will be retried again by preserving its order according to the determined retry policy. The point we need to be careful of here is that when the relevant event reaches the maximum number of retries, it will be moved to the dead letter queue. We can determine how long this retry operation will take place, either when creating the relevant queue or when initializing the “ServiceBusClient” as follows.

var clientOptions = new ServiceBusClientOptions
{
    RetryOptions = new ServiceBusRetryOptions()
    {
        MaxRetries = 10,
        MaxDelay = TimeSpan.FromMinutes(1)
    }
};

await using var serviceBusClient = new ServiceBusClient(connectionString, clientOptions);

In the “ProcessMessages” method, in order to release the relevant session, we access the event properties as follows and check the “IsLastItem” property that we have determined and perform the release of the session.

In the “ProcessErrors” method, if any error occurs while processing the related event, we can perform the operations that we want. Then retry mechanism kicks in.

In addition to these, we can make a subscriber work for specific session ids. For this, while initializing the “ServiceBusSessionProcessor” object, it will be enough to set the session ids as follows.

var options = new ServiceBusSessionProcessorOptions()
{
    SessionIds = { "my-x-sessions", "my-y-sessions" }
};

If we want, while processing the events, we can also keep state information for later use in the relevant session.

async Task ProcessMessages(ProcessSessionMessageEventArgs args)
{
    await args.SetSessionStateAsync(BinaryData.FromString("some state"));
    string someState = (await args.GetSessionStateAsync()).ToString();
}

Now let’s run 2 subscribers and 1 publisher to perform a quick test.

As we can see, the subscriber on the right side which accepted the session first, processed all the relevant events in the same session in the order they were published and closed the relevant session.

Well

As I said at the beginning of the article, unfortunately, we sometimes have to implement some logic in a way we don’t want. Just for this reason, years ago, we had a few subscribers that only needed to work with a single instance, and we were having some complex logic in order to maintain the relevant event processing order.

It will be very beneficial for us to design our events/system in a way that doesn’t require such a scenario. Of course, it will vary according to the complexity of the business and its needs.

Within the scope of this article, I have tried to show how we can perform FIFO ordering and processing with minimum effort on the subscriber side by using the Azure Service Bus and sessions feature. In addition, as I have mentioned, we shouldn’t forget that we have to carefully configure some configurations such as “session lock duration“, “timeouts” and “retry” while processing events on the subscriber side.

References

https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sessions

Gökhan Gökalp

View Comments

  • Thanks Gokhan for the article. As always all good all clear. It's an interesting feature but still one important question is open for me. Let's say we have two consumers C1 and C2 consuming messages from a queue. The queue contains some messages of sessions A and B in the following order: A1, A2, B1. So let's say the consumer C1 takes the message A1 and locking the session A. Next message in the queue is A2 (also belongs to the session A). So what in this case consumer C2 is going to do? Is it blocked and idling because the session A is locked or it will somehow skip the message A2 and start processing the message B1?

    • Thanks Vitalii, I'm glad that you found the article clear. Yes indeed, your second consumer as you called C2, won't stay in idle mode. That's the good part of Azure Service Bus. C2 will also grap other event from the same queue which belongs different session or haven't locked yet. In short, it allows us to process different sessions in parallel even if they are in same queue.

  • Thanks for the awesome article. I was really looking to solver the scenario. Assume I have to combine the output from multiple services using a unique Session ID. How do I guarentee that the operation is performed only if all the services has completed processing the messages corresponding to the Session ID?

    How do we handle exception and waiting time?

    How to handle the scenario if message failure?

Recent Posts

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Policy Enforcement-Automated Governance with OPA Gatekeeper and Ratify) – Part 2

{:tr} Makalenin ilk bölümünde, Software Supply Chain güvenliğinin öneminden ve containerized uygulamaların güvenlik risklerini azaltabilmek…

6 months ago

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Security Scanning, SBOMs, Signing&Verifying Artifacts) – Part 1

{:tr}Bildiğimiz gibi modern yazılım geliştirme ortamında containerization'ın benimsenmesi, uygulamaların oluşturulma ve dağıtılma şekillerini oldukça değiştirdi.…

8 months ago

Delegating Identity & Access Management to Azure AD B2C and Integrating with .NET

{:tr}Bildiğimiz gibi bir ürün geliştirirken olabildiğince farklı cloud çözümlerinden faydalanmak, harcanacak zaman ve karmaşıklığın yanı…

1 year ago

Providing Atomicity for Eventual Consistency with Outbox Pattern in .NET Microservices

{:tr}Bildiğimiz gibi microservice architecture'ına adapte olmanın bir çok artı noktası olduğu gibi, maalesef getirdiği bazı…

2 years ago

Building Microservices by Using Dapr and .NET with Minimum Effort – 02 (Azure Container Apps)

{:tr}Bir önceki makale serisinde Dapr projesinden ve faydalarından bahsedip, local ortamda self-hosted mode olarak .NET…

2 years ago

Some Awesome News of .NET 7

{:tr}Bildiğimiz gibi .NET Conf 2022, 8-10 kasım arasında gerçekleşti. Konferans sırasında ise .NET 7 ve…

2 years ago