In this article, I will try to explain some of the problems like Error handling and message Redelivery that I have encountered in our transition processes to message architecture. Also, I will mention what kind of solutions we applied and what kind of solutions MassTransit supplied to us for these concerns. By the way, this is my first English blog post. So, I’m so sorry if I do any mistake.
Imagine, we are working on an e-commerce website. When any order successfully processed on the system, then suppose the system published an event type of “IPaymentApproved”. It is possible more than consumer can consume “IPaymentApproved” event and let’s think one of them is responsible for sending an e-mail to the customer about order details.
“IPaymentApproved” event is defined as below.
namespace ErrorAndRedeliverHandlingSample.Contracts { public interface IPaymentApproved { string OrderNumber { get; set; } } }
Also “IPaymentApproved” event’s consumer temporarily defined as below.
namespace ErrorAndRedeliverHandlingSample { public class PaymentApprovedConsumer : IConsumer { public async Task Consume(ConsumeContext context) { //send an e-mail logic... } } }
Well, if we face any exceptional situations during consuming process what are the next steps? These exceptional situations could be due to coding errors or it can also possible incomplete/inconsistent messages. MassTransit supplies some solutions to us for these situations.
With default bus configuration, exceptions are caught by MassTransit middleware and related message moves automatically to “{queue_name}_error” queue. Exception details can be found in the header of the message being moved. Also we should consider monitoring these “{queue_name}_error” queues with any network monitoring tool and it will be wise to add some sensors on these, especially for production environment.
Sometimes transient exceptions occur at a certain time. Maybe a web service could not return any response or maybe some deadlocks can occur in a database. In this situations, we can apply retrying option not to lose the messages.
For example:
namespace ErrorAndRedeliverHandlingSample { public class PaymentApprovedConsumer : IConsumer { public async Task Consume(ConsumeContext context) { //send an e-mail logic... throw new Exception("Something's happened during processing..."); } } }
Let’s look at the above code block and imagine. Some transient exceptions occurs because of our e-mail provider’s web service does not respond at a certain time. By the way, most of the time we will get a success for second tries.
It is possible to enable the retry function as below:
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.ReceiveEndpoint(host, "queue_name", e => { e.UseRetry(r => r.Immediate(5)); e.Consumer(() => new PaymentApprovedConsumer()); }); }
If we look at the “e.UseRetry(r => r.Immediate(5))” code line, we applied retry operation “5” more times with the “Immediate” policy before MassTransit automatically moves the message to error queue when any exception occurs in the system.
Also, there are several retry policies you can use like below:
Also, generally I prefer to use “Incremental” retry policy. With “Incremental” retry policy you can set retry operation count, interval time for every repeated operation and also it is possible to set incremental interval time for every repeated operation’s interval time.
As an example, I will show with MetroBus library. This is my simple MassTransit wrapper and it covers most commonly used functions.
IBusControl busControl = MetroBusInitializer.Instance.UseRabbitMq("rabbitMqUri", "rabbitMqUserName", "rabbitMqPassword") .UseIncrementalRetryPolicy(retryLimit: 5, initialIntervalFromMinute: 10, intervalIncrementFromMinute: 10) .InitializeConsumer("queueName").Build();
On the other hand, exceptions that occur during operations can be handled or ignored with retry filters.
For example:
cfg.UseRetry(r => { c.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException)); });
You can reach more detailed information about filters from here. Let’s do some examples for the retry operations.
NOTE: I will produce some messages with using “IPaymentApproved” interface for during do an example about consumers.
Firstly let’s initialize “PaymentApprovedConsumerService” like below:
namespace ErrorAndRedeliverHandlingSample { public class PaymentApprovedConsumerService { private readonly IBusControl _consumerBusControl; private readonly string _rabbitMqUri; private readonly string _rabbitMqUserName; private readonly string _rabbitMqPassword; private readonly string _queueName; public PaymentApprovedConsumerService() { _rabbitMqUri = ConfigurationManager.AppSettings["RabbitMqUri"]; _rabbitMqUserName = ConfigurationManager.AppSettings["RabbitMqUserName"]; _rabbitMqPassword = ConfigurationManager.AppSettings["RabbitMqPassword"]; _queueName = ConfigurationManager.AppSettings["FooQueue"]; _consumerBusControl = MetroBusInitializer.Instance.UseRabbitMq(_rabbitMqUri, _rabbitMqUserName, _rabbitMqPassword) .UseIncrementalRetryPolicy(3, 1, 1) .InitializeConsumer(_queueName).Build(); } public void Start() { _consumerBusControl.Start(); } public void Stop() { _consumerBusControl.Stop(); } } }
then changing “Program.cs” inside like below:
namespace ErrorAndRedeliverHandlingSample { class Program { static void Main(string[] args) { var paymentApprovedConsumerService = new PaymentApprovedConsumerService(); paymentApprovedConsumerService.Start(); } } }
and let’s start the project!
If we look at the console output, an exception was thrown during consuming process and as a result of this action, the retry function is triggered.
If we look at the queue on the RabbitMQ management screen as below:
we can see the related message is waiting in the “unacked” state.
With “1” minute interval time the retry operation will be performing “3” times with incremental retry policy. The retry operation keeps going until it reaches the retry limit. After every repeated operation “1” more additional minute will be added to interval time. If the retry limit is reached and failure state still keeps going, the message will be moved to error queue like below:
On the other hand, an important exception handling element is the Circuit Breaker pattern. This pattern is used for protecting your resource from being overloaded at any certain time if any error occurs on the system and the failure threshold value is reached.
This pattern’s flow generally like below:
Let’s imagine, during consuming “IPaymentApproved” event when we try to send an e-mail using our e-mail provider, their web service is not responding for a while and requests are timed out after 30 seconds. Well, if lots of events occur at the same time, what will be the next step? All of the requests will be timed out end resource will be busy unnecessarily maybe causing cascading failures.
So, what if we open the circuit breaker?
For this situations, the circuit breaker will be monitoring failure states for us. When failures reached threshold rate, the circuit breaker will open automatically and it will protect the system from causing repeated failure situations until reset interval time expires. If the reset interval time expires, consuming process will continue exponentially. If failures still keep going, timeout interval will be reset and the circuit breaker will reopen itself automatically. If everything is OK, then consuming flow keeps going.
It is possible to enable the circuit breaker as follows:
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.ReceiveEndpoint(host, "queue_name", e => { e.UseCircuitBreaker(cb => { cb.TripThreshold = 15; cb.ActiveThreshold = 10; cb.ResetInterval = TimeSpan.FromMinutes(5); }); e.Consumer(() => new PaymentApprovedConsumer()); }); }
and on the MetroBus:
IBusControl busControl = MetroBusInitializer.Instance.UseRabbitMq("rabbitMqUri", "rabbitMqUserName", "rabbitMqPassword") .UseCircuitBreaker(tripThreshold: 15, activeThreshold: 10, resetInterval: 5) .InitializeConsumer("queueName").Build();
You can reach more detailed information about circuit breaker from here.
Another thing to consider of when initializing a new consumer is to use a rate limiter. The main purpose of rate limiter is to limit number of messages we can consume in a certain time period.
It is possible to enable the rate limiter as follows:
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.ReceiveEndpoint(host, "queue_name", e => { e.UseRateLimit(20, TimeSpan.FromSeconds(5)); e.Consumer(() => new PaymentApprovedConsumer()); }); }
and on the MetroBus:
IBusControl busControl = MetroBusInitializer.Instance.UseRabbitMq("rabbitMqUri", "rabbitMqUserName", "rabbitMqPassword") .UseRateLimiter(rateLimit: 20, interval: 5000) .InitializeConsumer("queueName").Build();
it can be enabled fluently like above.
One of the most important concerns I have faced recently, thanks to some of our business cases, is the question of how to handle messages that we should not handle yet.
There is no built-in deferred operation in AMQP standards in RabbitMQ. But, this feature is possible with using MassTransit and RabbitMQ community plugins.
Let’s do an example about deferred operation with “RabbitMQ Delayed Message Plugin”. I prefer this plugin for deferred operations. Because with this plugin, deferred operations are handled on the RabbitMQ exchanges and you don’t need any custom tool. Now, we have to download and install “rabbitmq_delayed_message_exchange” plugin from here.
NOTE: to enable this plugin, you can use “rabbitmq-plugins enable rabbitmq_delayed_message_exchange” command on the CLI.
After enabling this plugin, you can initialize consumer like below:
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.UseDelayedExchangeMessageScheduler(); }
and on the MetroBus:
IBusControl busControl = MetroBusInitializer.Instance.UseRabbitMq("rabbitMqUri", "rabbitMqUserName", "rabbitMqPassword") .UseDelayedExchangeMessageScheduler() .InitializeConsumer("queueName").Build();
it can be enough like above.
Then let’s refactor “PaymentApprovedConsumer” like below:
namespace ErrorAndRedeliverHandlingSample { public class PaymentApprovedConsumer : IConsumer { public async Task Consume(ConsumeContext context) { int? maxAttempts = context.Headers.Get("MT-Redelivery-Count", default(int?)); if (maxAttempts > 3) { throw new Exception("Something's happened during processing..."); } Console.WriteLine($"Attempts: {maxAttempts} Order number: {context.Message.OrderNumber}"); await context.Defer(TimeSpan.FromMinutes(1)); } } }
On the other hand, during deferred operations, the most important thing is getting redelivery count information. It can be found on message’s header with “MT-Redelivery-Count” key. If we get this information, we can protect our system from infinite retry operations.
Let’s start the consumer and we will look at the results.
If we look at the console output, we can see deferred operations attempt numbers. The one-minute deferred operation performed with “context.Defer(TimeSpan.FromMinutes(1))” code line and the message moved to error queue because failure state still occurred.
If we look at the RabbitMQ management screen as below:
MassTransit has created bindings for newly created “_delay” queue. This is great, isn’t it?
Anyway, I hope this article would help who needs to any information about some messaging concerns. If you have any questions about it, you can write a comment on this post or send me an e-mail.
https://github.com/GokGokalp/messaging-error-and-redeliver-handling-sample
{:en}In today’s technological age, we typically build our application solutions on event-driven architecture in order…
{:tr} Makalenin ilk bölümünde, Software Supply Chain güvenliğinin öneminden ve containerized uygulamaların güvenlik risklerini azaltabilmek…
{:tr}Bildiğimiz gibi modern yazılım geliştirme ortamında containerization'ın benimsenmesi, uygulamaların oluşturulma ve dağıtılma şekillerini oldukça değiştirdi.…
{:tr}Bildiğimiz gibi bir ürün geliştirirken olabildiğince farklı cloud çözümlerinden faydalanmak, harcanacak zaman ve karmaşıklığın yanı…
{:tr}Bazen bazı senaryolar vardır karmaşıklığını veya eksi yanlarını bildiğimiz halde implemente etmekten kaçamadığımız veya implemente…
{:tr}Bildiğimiz gibi microservice architecture'ına adapte olmanın bir çok artı noktası olduğu gibi, maalesef getirdiği bazı…
View Comments
Paylasim icin tesekkurler.
Ben teşekkür ederim. :)
Hocam selam, önceki tema çok daha sade okunaklı ve güzeldi. Bu hiç olmamış, hiç elit değil. Bildiğin "PHP'ci" teması olmuş bu, lütfen eski haline al :D
Saygılarımla, çok seven takipçin.
Öyle demeyelim Efe, :) Wordpress'de php unutmayalım.
Gökhan Hocam teşekkür ederiz yazılar için. birçok teknolojiyi bir arada derli toplu sunmanız very nice olmuş :)
Ben teşekkür ederim güzel yorumunuz için.
Thank you for such interesting article, however it would be great if you can extend it with some ideas of custom exception handling outside masstransit, for instance if we have two storage services and one of them succeeds while the other fails to store data, then I might need to delete the record stored by the first service.
What do you think of as a good pattern to solve this issue? I am thinking to create a new different service to sort this kind of exceptions, so I encapsulated the exceptions inside "ValidatedMessage" containing an IsValid bool and list of exceptions because if I throw an exception then Masstransit would try to resend the message always when it should not for some cases.
I am having some troubles on how to forward the ValidatedMessage to my exception handling service and stop consuming it in the current service if IsValid is false.
Thanks for your comment. As you said, we have two storage and we are trying to add something. First, how do we try to add something to these services? I mean, is this add operation happening in a single point or separate services? Anyway, this situation is really ineluctable when we are working with distributed systems. :) So, I don't really know what the best performant way is. I think we can apply many ways to provide this situation in "eventually consistency" such as "sagas, state machines", "compensate transaction" etc... Well, the masstransit is also great framework when we are working with the distributed systems. It provides great functionalities like "SagaStateMachine". I want to say again is "I don't know, what the best performant way is". These patterns really have some complexities. If you want, you can also check this link: https://gokhan-gokalp.azurewebsites.net/en/messaging-architecture-da-saga-patterni-ile-failure-management/ Btw, maybe we can avoid this situation in a quick way, with developing some end-of-day or operation checker such as watchdog.