Overcoming Event Size Limits with the Conditional Claim-Check Pattern in Event-Driven Architectures

In today’s technological age, we typically build our application solutions on event-driven architecture in order to make them as scalable, resilient, and flexible as possible. While event-driven architecture sounds good, it also brings along many complexities and limitations. In my previous articles, I also tried to touch on topics such as the best practices we can apply to build a resilient and flexible microservice infrastructure.

A few of these were;

In this article, I will try to address one of the limitations, which is the event/message size limit.

The Importance of Event Size

In the context of event-driven architecture, the size of events is considerably important. Since the entire system is built on events, the size of an event can have a direct impact on the scalability, performance, and cost of the entire system.

Under normal circumstances, I usually don’t think event payload sizes would easily become excessively large. However, sometimes we may work on domains that require different needs.

For example, in the e-commerce domain, when an order is created by the order service, the billing service might be interested in the related event and generate an invoice in PDF format. This PDF invoice may need to be used by different services. For instance, one service might send the invoice to the user, while another service might save the invoice in an archive.

Or we might be using the fat events approach. In this approach, the event payloads can be quite large due to the domain having a significant number of attributes.

In short, in such scenarios, event sizes may vary depending on the domains, and we might need to share these events across different domains. Of course, before making these decisions, we need to thoroughly understand their impact on the overall system. For instance, while we might accept minimal effects on overall performance, we may need to implement alternative solutions in cases where costs could increase.

For example, we might be using a fully managed message broker like Azure Service Bus. Azure Service Bus has a 256KB message size limit in its standard tier. Of course, this limit can be increased to 100MB with the premium tier. However, when it comes to maximizing the cost-benefit ratio, it would be more beneficial to focus on how we can solve such problems using best practices without significantly increasing costs.

Claim-Check Pattern

The claim-check pattern is a messaging pattern that we can use to address the scenarios we mentioned.

This pattern suggests storing large event payloads in external storage instead of carrying them through message brokers and creating a token or key (claim-check) that references the stored data. It states that the relevant event consumers can retrieve the stored data using the generated reference. In this way, it not only allows us to optimize the performance of the messaging system but also helps control potential costs through an alternative approach.

Let’s Implement

Let’s take a look at how we can easily implement the claim-check pattern conditionally within our microservices.

Let’s consider an example scenario where we are working within an e-commerce domain, and let’s assume that after an order is created in the system, a large “OrderCreatedEvent” is published. In this example, I will use Azure Service Bus as the message broker and Azure Storage Account as the external storage. Now, let’s start by developing a simple service bus library to handle messaging operations between our services.

First, let’s create a .NET 9 class library project called “TodoEcom.ServiceBus“, and then add the following NuGet packages to it.

  • Azure.Messaging.ServiceBus
  • Azure.Storage.Blobs
  • Microsoft.Extensions.Configuration.Binder
  • Microsoft.Extensions.Http
  • System.Configuration.ConfigurationManager
Then, let’s create a class named “ServiceBus” and define the “IServiceBus” interface along with the wrapper that we will use to customize the Azure Service Bus messages.
using System.Collections.Concurrent;
using System.Text.Json;
using Azure.Messaging.ServiceBus;
using Azure.Storage.Blobs;
using Microsoft.Extensions.Configuration;

namespace TodoEcom.ServiceBus;

public interface IServiceBus
{
    Task Publish<T>(T @event, string? queueName = null, CancellationToken cancellationToken = default);
    Task<ServiceBusReceivedMessageWrapper<T>> Receive<T>(string? queueName = null, CancellationToken cancellationToken = default);
}

public class ServiceBusReceivedMessageWrapper<T>
{
    private readonly ServiceBusReceivedMessage _receivedMessage;
    private readonly ServiceBusReceiver _receiver;
    public T Data { get; }

    internal ServiceBusReceivedMessageWrapper(ServiceBusReceivedMessage receivedMessage, ServiceBusReceiver receiver, T payload)
    {
        _receivedMessage = receivedMessage;
        _receiver = receiver;
        Data = payload;
    }

    public async Task CompleteAsync()
    {
        await _receiver.CompleteMessageAsync(_receivedMessage);
    }
}

At this point, we are defining an interface that will allow us to simply publish and consume events. Since we will implement the claim-check pattern conditionally within this service bus library, we are creating our own “ServiceBusReceivedMessageWrapper” class by encapsulating the “ServiceBusReceivedMessage” class used by the Azure Service Bus client library. This design will allow us to prepare the “Data” property, where the payload will be stored, either by retrieving it from the Azure Storage Account or directly from Azure Service Bus, depending on the size of the relevant event’s payload. This way, services that use this library will continue to consume their events flawlessly, independent of the underlying operations.

Additionally, the Azure Service Bus client library requires its own message type, the “ServiceBusReceivedMessage” object, to complete the processing of a consumed event. Therefore, we encapsulate the “ServiceBusReceivedMessage” object within the wrapper and expose the completion process through the “CompleteAsync” method to ensure that it can be consistently handled by consumers.

Now, let’s implement the “IServiceBus” interface within the same class, starting with the “Publish” method.

public class AzureServiceBus : IServiceBus
{
    private readonly ServiceBusClient _serviceBusClient;
    private readonly BlobContainerClient? _blobContainerClient;
    private readonly IHttpClientFactory _httpClientFactory;
    private readonly ConcurrentDictionary<string, ServiceBusSender> _senderPool;
    private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receiverPool;
    private readonly int _maxEventPayloadSizeLimitInKB;
    private readonly int _claimCheckTokenExpirationInHours;
    private const string IsClaimCheckProperty = "IsClaimCheck";
    private const string ClaimCheckBlobUriProperty = "ClaimCheckBlobUri";
    private readonly bool _enableClaimCheck;
    private readonly string? _defaultQueueName;

    public AzureServiceBus(IConfiguration configuration, IHttpClientFactory httpClientFactory)
    {
        var serviceBusConnectionString = configuration.GetValue<string>("ServiceBus:ConnectionString") ?? throw new KeyNotFoundException("ServiceBus:ConnectionString is not found.");
        _defaultQueueName = configuration.GetValue<string>("ServiceBus:DefaultQueueName");
        _serviceBusClient = new ServiceBusClient(serviceBusConnectionString);

        _httpClientFactory = httpClientFactory;

        _senderPool = new ConcurrentDictionary<string, ServiceBusSender>();
        _receiverPool = new ConcurrentDictionary<string, ServiceBusReceiver>();


        _enableClaimCheck = configuration.GetValue<bool>("ServiceBus:ClaimCheck:EnableClaimCheck");

        if(_enableClaimCheck)
        {
            var blobStorageConnectionString = configuration.GetValue<string>("ServiceBus:ClaimCheck:BlobStorage:ConnectionString") ?? throw new KeyNotFoundException("ServiceBus:BlobStorage:ConnectionString is not found.");

            var claimCheckContainerName = configuration.GetValue<string>("ServiceBus:ClaimCheck:BlobStorage:ClaimCheckContainerName") ?? throw new KeyNotFoundException("ServiceBus:BlobStorage:ClaimCheckContainerName is not found.");

            _maxEventPayloadSizeLimitInKB = configuration.GetValue<int?>("ServiceBus:ClaimCheck:MaxEventPayloadSizeLimitInKB") ?? throw new KeyNotFoundException("ServiceBus:MaxEventPayloadSizeLimitInKB is not found.");

            _claimCheckTokenExpirationInHours = configuration.GetValue<int?>("ServiceBus:ClaimCheck:BlobStorage:ClaimCheckTokenExpirationInHours") ?? throw new KeyNotFoundException("ServiceBus:BlobStorage:ClaimCheckTokenExpirationInHours is not found.");

            _blobContainerClient = new BlobContainerClient(blobStorageConnectionString, claimCheckContainerName);
        }
    }

    public async Task Publish<T>(T @event, string? queueName = null, CancellationToken cancellationToken = default)
    {
        if(string.IsNullOrEmpty(queueName))
        {
            queueName = _defaultQueueName!;
        }

        var sender = _senderPool.GetOrAdd(queueName, _serviceBusClient.CreateSender);

        var serializedEvent = JsonSerializer.SerializeToUtf8Bytes(@event);

        ServiceBusMessage message;
        if (_enableClaimCheck && IsEventPayloadSizeExceedsLimit(serializedEvent))
        {
            var blobUri = await UploadPayloadToBlobAsync(serializedEvent, queueName, @event!.GetType().Name);

            message = new ServiceBusMessage
            {
                ApplicationProperties =
                {
                    [IsClaimCheckProperty] = true,
                    [ClaimCheckBlobUriProperty] = blobUri
                }
            };
        }
        else
        {
            message = new ServiceBusMessage(serializedEvent);
        }

        await sender.SendMessageAsync(message, cancellationToken);
    }

    private bool IsEventPayloadSizeExceedsLimit(byte[] serializedEvent)
    {
        var eventSize = serializedEvent.Length;
        Console.WriteLine(eventSize);
        return eventSize > _maxEventPayloadSizeLimitInKB * 1024;
    }

    private async Task<string> UploadPayloadToBlobAsync(byte[] serializedEvent, string queueName, string eventType)
    {
        var blobName = $"{queueName}/{eventType}/{Guid.NewGuid()}.json";

        await _blobContainerClient!.CreateIfNotExistsAsync();
        var blobClient = _blobContainerClient!.GetBlobClient(blobName);

        using var stream = new MemoryStream(serializedEvent);
        await blobClient.UploadAsync(stream, overwrite: true);

        var sasUri = blobClient.GenerateSasUri(Azure.Storage.Sas.BlobSasPermissions.Read, DateTimeOffset.UtcNow.AddHours(_claimCheckTokenExpirationInHours));

        return sasUri.ToString();
    }
}

First, we perform all the necessary configurations and all the required service injection operations. To allow consumers to decide whether to use the claim-check feature, we bind its control to the “ServiceBus:ClaimCheck:EnableClaimCheck” parameter. Additionally, we define a pooling mechanism to manage the “ServiceBusSender” and “ServiceBusReceiver” objects more efficiently.

The “Publish” method is the main point where the actual claim-check mechanism is conditionally implemented. At this point, the event we want to publish is first serialized, and the claim-check process is then conditionally handled within the “IsEventPayloadSizeExceedsLimit” block. If the size of the event exceeds the defined limit, the claim-check process will automatically be performed via Azure Blob Storage.

At this point:

  • In the “UploadPayloadToBlobAsync” method, we handle the process of uploading the event payload to Azure Blob Storage.
  • After that, we generate a SAS URI to allow the payload to be retrieved again from Azure Blob Storage.
  • Then, we include the generated SAS URI in the “ApplicationProperties” section of the event’s metadata.

Now, let’s proceed with the “Receive” method.

public class AzureServiceBus : IServiceBus
{
    // other codes...

    public async Task<ServiceBusReceivedMessageWrapper<T>> Receive<T>(string? queueName = null, CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrEmpty(queueName))
        {
            queueName = _defaultQueueName!;
        }

        var receiver = _receiverPool.GetOrAdd(queueName, _serviceBusClient.CreateReceiver);

        var message = await receiver.ReceiveMessageAsync(cancellationToken: cancellationToken);

        ServiceBusReceivedMessageWrapper<T> wrappedMessage;

        if (message.ApplicationProperties.TryGetValue(IsClaimCheckProperty, out var isClaimCheck) && (bool)isClaimCheck)
        {
            var blobUri = message.ApplicationProperties[ClaimCheckBlobUriProperty] as string
                  ?? throw new InvalidOperationException($"The claim-check value is not found. Message CorrelationId: {message.CorrelationId}");

            byte[] payload = await DownloadPayloadFromBlobAsync((string)blobUri);

            var actualEvent = JsonSerializer.Deserialize<T>(payload);

            wrappedMessage = new ServiceBusReceivedMessageWrapper<T>(message, receiver, actualEvent!);
        }
        else
        {
            wrappedMessage = new ServiceBusReceivedMessageWrapper<T>(message, receiver, message.Body.ToObjectFromJson()!);
        }

        return wrappedMessage;
    }

    private async Task<byte[]> DownloadPayloadFromBlobAsync(string blobUri)
    {
        var httpClient = _httpClientFactory.CreateClient();
        using var response = await httpClient.GetAsync(blobUri);

        if (response.IsSuccessStatusCode)
        {
            return await response.Content.ReadAsByteArrayAsync();
        }

        throw new Exception($"Failed to download the claim-check payload from {blobUri}. Status Code: {response.StatusCode}");
    }
}

At this point, after consuming the event, we check whether the claim-check token is present in the metadata. If the claim-check token exists, we retrieve the Blob SAS URI from the metadata and use the “DownloadPayloadFromBlobAsync” method to obtain the event payload. If the claim-check token is not present, we pass the event directly to the consumer via the wrapper as it was consumed. Thus, as we mentioned earlier, the consumers will continue to consume events seamlessly, independent of the underlying claim-check operations.

Now, let’s implement the publisher that will publish the “OrderCreatedEvent” to quickly perform a test. First, create a class library called “TodoEcom.Contracts” and define the “OrderCreatedEvent” which will be shared between the publisher and the consumer, as shown below.

namespace TodoEcom.Contracts.Events;

public class OrderCreatedEvent
{
    public int OrderId { get; set; }
    public int CustomerId { get; set; }
    public required string PaymentMethod { get; set; }
    public DateTime CreatedAt { get; set; }
    public required IReadOnlyList<ProductDTO> Products { get; set; }

    // other attributes such as address, applied discounts, shipping details, transaction details and etc.
}

public class ProductDTO
{
    public int ProductId { get; set; }
    public required string Name { get; set; }
    public int Quantity { get; set; }
    public decimal Price { get; set; }
}

At this point, I didn’t include too many attributes to keep the example simple. However, I believe the main idea has been conveyed.

Now, let’s create a .NET 9 webapi project called “TodoEcom.OrderService.API” and add the “TodoEcom.Contracts” and “TodoEcom.ServiceBus” libraries as references. Then, update the “Program.cs” class as shown below.

using TodoEcom.Contracts.Events;
using TodoEcom.ServiceBus;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddOpenApi();
builder.Services.AddHttpClient();
builder.Services.AddSingleton<IServiceBus, AzureServiceBus>();

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.MapOpenApi();
}

app.UseHttpsRedirection();

app.MapPost("/orders", async (IServiceBus serviceBus) =>
{
    // Perform order operations...

    var orderCreatedEvent = new OrderCreatedEvent
    {
        OrderId = 1,
        CustomerId = 123,
        PaymentMethod = "CreditCard",
        CreatedAt = DateTime.UtcNow,
        Products = Enumerable.Range(1,20).Select(i => new ProductDTO
        {
            ProductId = i,
            Name = $"My product {i}",
            Quantity = 1,
            Price = i,
        }).ToList()
    };

    await serviceBus.Publish(orderCreatedEvent);

})
.WithName("CreateAnOrder");

app.Run();

At this point, after performing the necessary client registration, we publish an example “OrderCreatedEvent” using the service bus library we have implemented. In order to test the claim-check mechanism, we also add a few product details to the event so that the payload limit we set can be exceeded.

Now let’s also update the “appsettings.json” file as shown below.

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "ServiceBus": {
    "ConnectionString": "Endpoint=sb://mypocsbus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_SHARED_ACCESS_KEY",
    "DefaultQueueName": "orders",
    "ClaimCheck": {
      "EnableClaimCheck": true,
      "MaxEventPayloadSizeLimitInKB": 1,
      "BlobStorage": {
        "ConnectionString": "DefaultEndpointsProtocol=https;AccountName=mypocstorageac;AccountKey=YOUR_ACCOUNT_KEY;EndpointSuffix=core.windows.net",
        "ClaimCheckContainerName": "claim-check",
        "ClaimCheckTokenExpirationInHours": 1
      }
    }
  }
}

At this point, the “EnableClaimCheck” parameter indicates that we will use the claim-check feature, and the “MaxEventPayloadSizeLimitInKB” parameter limits the size of the event we want to publish to Azure Service Bus to 1 KB.

Now, let’s create a .NET 9 worker project called “TodoEcom.BillingService.Consumer” and add the “TodoEcom.Contracts” and “TodoEcom.ServiceBus” libraries as references. Then, let’s consume the “OrderCreatedEvent“.

using TodoEcom.Contracts.Events;
using TodoEcom.ServiceBus;

namespace TodoEcom.BillingService.Consumer;

public class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;
    private readonly IServiceBus _serviceBus;
    private const string OrdersQueueName = "orders";

    public Worker(ILogger<Worker> logger, IServiceBus serviceBus)
    {
        _logger = logger;
        _serviceBus = serviceBus;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            if (_logger.IsEnabled(LogLevel.Information))
            {
                _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
            }

            var @event = await _serviceBus.Receive<OrderCreatedEvent>(OrdersQueueName, stoppingToken);

            Console.WriteLine($"Event received. OrderId: {@event.Data.OrderId} ");

            // some operations...

            await @event.CompleteAsync();
        }
    }
}

Let’s also update the “appsettings.json” file as shown below.

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "ServiceBus": {
    "ConnectionString": "Endpoint=sb://mypocsbus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_SHARED_ACCESS_KEY"
  }
}

We are now ready to test.

Let’s Test

First, let’s run the “TodoEcom.OrderService.API” project and trigger the “/orders” endpoint.

curl -X POST http://localhost:5065/orders

Then, using the “Service Bus Explorer” in the Azure Portal, let’s take a look at the “Custom Properties” section of the message we published to the “orders” queue.

Service Bus Queue

At this point, we can see that the event payload we published has been uploaded to Azure Blob Storage, and the corresponding Blob SAS URI has been added to the “ClaimCheckBlobUri” property.

Then, let’s run the “TodoEcom.BillingService.Consumer” project to consume the respective event.

As we can see from the terminal logs, after the event we published was consumed by the service bus library we developed, the payload information was successfully retrieved from the respective storage account and seamlessly delivered to the “TodoEcom.BillingService.Consumer” project.

In this article, we explored how we can easily use the conditional claim-check mechanism to manage large payloads. With the service bus library we implemented, we aimed to ensure that the scalability and efficiency of the system are minimally affected, while allowing relevant consumers to continue publishing and consuming their events seamlessly, independent of the underlying processes.

You can find the sample code repository here: https://github.com/GokGokalp/claim-check-sample

References

https://learn.microsoft.com/en-us/azure/architecture/patterns/claim-check
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

Gökhan Gökalp

Recent Posts

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Policy Enforcement-Automated Governance with OPA Gatekeeper and Ratify) – Part 2

{:tr} Makalenin ilk bölümünde, Software Supply Chain güvenliğinin öneminden ve containerized uygulamaların güvenlik risklerini azaltabilmek…

7 months ago

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Security Scanning, SBOMs, Signing&Verifying Artifacts) – Part 1

{:tr}Bildiğimiz gibi modern yazılım geliştirme ortamında containerization'ın benimsenmesi, uygulamaların oluşturulma ve dağıtılma şekillerini oldukça değiştirdi.…

10 months ago

Delegating Identity & Access Management to Azure AD B2C and Integrating with .NET

{:tr}Bildiğimiz gibi bir ürün geliştirirken olabildiğince farklı cloud çözümlerinden faydalanmak, harcanacak zaman ve karmaşıklığın yanı…

1 year ago

How to Order Events in Microservices by Using Azure Service Bus (FIFO Consumers)

{:tr}Bazen bazı senaryolar vardır karmaşıklığını veya eksi yanlarını bildiğimiz halde implemente etmekten kaçamadığımız veya implemente…

2 years ago

Providing Atomicity for Eventual Consistency with Outbox Pattern in .NET Microservices

{:tr}Bildiğimiz gibi microservice architecture'ına adapte olmanın bir çok artı noktası olduğu gibi, maalesef getirdiği bazı…

2 years ago

Building Microservices by Using Dapr and .NET with Minimum Effort – 02 (Azure Container Apps)

{:tr}Bir önceki makale serisinde Dapr projesinden ve faydalarından bahsedip, local ortamda self-hosted mode olarak .NET…

2 years ago