MassTransit Saga State Machine ile Model Workflow’u Oluşturmak

Merhaba arkadaşlar.

Bir süredir gerek yoğun iş temposu gerekse de sosyal hayatımdaki bazı yoğunluklardan dolayı, yeni bir makale yazmaya fırsat bulamamıştım. Sizlerde fark ederseniz bir süredir makale konularımı microservice ve messaging yapıları üzerine yoğunlaştırdım.

Bu makale içeriğinde ise geliştiriyor olduğumuz microservice ve messaging yapılarında, long-running business process işlemlerinde consistency’i MassTransit Saga State Machine implementasyonu ile nasıl sağlarız ve nasıl bir model workflow’u oluştururuz gibi konulara, araştırmalarım sırasında edinebildiğim bilgiler doğrultusunda değinmeye çalışacağım.

Dilerseniz öncelikle Saga kavramını biraz açalım.

Nedir Bu Saga?

Saga pattern’ı, birden çok sistem ile ilgili bir collaborating söz konusu olduğunda ve herhangi bir failure anında backtrack veya corrective bir aksiyon alınabilmesini sağlayan bir patttern’dır.

Farklı bir tanımla ise long-running business process’lerde consistency’i sağlayabilmek için distributed transaction kullanım mekanizması olarak da tanımlayabiliriz. Saga kavramı, Hector Garcaa-Molrna ve Kenneth Salem tarafından Sagas isimli akademik çalışması ile ele alınmıştır.

Bir önceki “MassTransit kullanarak RabbitMQ ile Messaging Altyapısı Oluşturma” makalesinde gerçekleştirdiğimiz örnek senaryoyu hatırlarsak eğer:

Yukarıdaki gibi basit bir e-commerce sistemindeki sipariş işlemini örnek vermiştik.

Burada yer alan “OrderService” ile “IOrderCommand” tipindeki message’ları consume ederek, farklı işlemler ve event’lar gerçekleştirmiştik. Sonrasında ise sistemin “XCommand” message’ına göre işlem yapacak olan “XService” lere sahip olabileceğinden de bahsetmiştik. Gelin buraya order işlemi ile ilgili bir kaç yeni adım daha ekleyelim.

Order senaryosuna “BillingService” ve “FraudService” ide dahil ettik.

Order işleminin tamamlanabilmesi için event’lar sequentially veya parallel olarak distributed bir şekilde bu service’ler arasında işlenmektedir. Peki bunar gibi -n kadar daha service’i sisteme dahil ettiğimizi düşünelim. Peki şimdi birbirleri ile related olan bu service’ler aralarındaki message flow’u, nasıl manage edilebilecek? Bunlara ek olarak birde gerçekten uygulamaları microservice olarak geliştiriyorsak ve order işlemi gibi belli step’lerin sonucunda bir artifact oluşturan süreçlere de sahipsek, transaction bütünlüğü reliable bir şekilde nasıl sağlanacak? Çünkü bazı event’lar başarılı bir şekilde process edilebilirken bazılarının ise fail olabilme durumu mevcut olacaktır. Bu tarz işlemler ise tutarsız(inconsistent) sonuçlar doğuracaktır.

İşte bu tarz süreçler için MassTransit framework’ü içerisindeki Saga State Machine ile bir workflow oluşturacağız. Bu implementasyon ile temelinde birden çok iş akışlarının birden çok sisteme dağılmasında, herhangi bir iş aşamasında meydana gelebilecek olan aksi bir eyleme karşı önlem(compensation) alınabilmesini sağlayacak yapılar yazabilmek mümkündür.

Peki Saga bunu nasıl sağlıyor?

Yukarıdaki resimde order işlemi gibi long lived business process’in olduğunu düşünelim ve burada, transaction T4 durumundan C4 durumuna geçerken bir hata alındığı için tekrardan T3 durumundan T4 durumuna bir compensation sağlandığını görüyoruz.

Implementasyon kısmına başlamadan önce, sizlere bir kaç tamamlayıcı kavramdan bahsetmek istiyorum. Öncelikle model workflow’u oluşturmak, bir saga değildir.  Workflow state’den bağımsızdır ve State Machine’in aksine activity‘ler ile ilgilenir. Workflow üzerindeki transition’lar ise, herhangi bir action tamamlandığında gerçekleşir. Workflow, State Machine ile beraber tanımlanabilir ve iş akışları belirli state’ler doğrultusunda ilerletilebilinir.

State Machine Nedir?

State Machine, bir object’in bir state’den bir diğer state’e  transition’ı ile ilgilenir. State Machine bu işlemi ise saga object’ine gelen bir request’in state’ini, bir başka yerde correlation id ile beraber persist ederek handle eder. Sonrasında ise o correlation id’ye bağlı olarak gerçekleşen event’lar ile object state’ini yönetir.

Bu yapı ise long-running business process’lerde state koordinasyonunu merkezileştirmeyi ve tutarlılığını sağlamaktadır. Açıkçası biraz karmaşık bir konu fakat ilgili business’a bağlı olarak uygulandığında, taşlar yerlerine kendiliğinden oturuyor.

MassTransit Saga State Machine ile Model Workflow’u Implementasyonu

Gerçekleştirecek olduğumuz örnekte daha önce de bahsettiğim gibi bir önceki “MassTransit kullanarak RabbitMQ ile Messaging Altyapısı Oluşturma” makale konusu üzerinden devam edeceğiz. Bu örnek üzerinde bir kaç noktayı refactor edeceğiz ve ardından MassTransit Saga implementasyonunu ekstra bir “BillingService” i daha ekleyerek, bir model workflow’u oluşturacağız.

Refactor edeceğimiz ilk nokta hatırlarsak ilk message’ı produce ettiğimiz nokta “LightMessagingCore.Boilerplate.OrderUI” projesi idi.

public class OrderController : Controller
{
    private readonly ISendEndpoint _bus;

    public OrderController()
    {
        var busControl = BusConfigurator.Instance.ConfigureBus();
        var sendToUri = new Uri($"{MqConstants.RabbitMQUri}{ConfigurationManager.AppSettings["OrderQueueName"]}");

        _bus = busControl.GetSendEndpoint(sendToUri).Result;
    }

    // GET: Order
    public ActionResult Index(OrderModel orderModel)
    {
        if (orderModel.OrderId > 0)
            CreateOrder(orderModel);

        return View();
    }

    private void CreateOrder(OrderModel orderModel)
    {
        _bus.Send(orderModel).Wait();
    }
}

Burada “OrderController” içerisinde initialize ettiğimiz bus üzerinden “OrderQueue” ya bir “IOrderCommand” produce ediyorduk.

Bu noktada artık ilgili command’ı direkt olarak “OrderQueue” ya produce etmek yerine, “SagaQueue” ya produce edeceğiz. Bunun için “sendToUri” kısmını aşağıdaki gibi güncelleyelim.

var sendToUri = new Uri($"{MqConstants.RabbitMQUri}{ConfigurationManager.AppSettings["SagaQueueName"]}");

Çünkü bir Saga State Machine tanımlayarak, bir model workflow’u oluşturacağız. Ilk produce olan command ile birlikte bir state oluşturacağız ve correlation’ı sağlayarak ilgili process’leri MassTransit State Machine’i üzerinden devam ettireceğiz. Bu yüzden command’ın ilk produce olacağı nokta, Saga Queue’su olacaktır.

URI’ı güncelledikten sonra “Web.config” e gelerek, “SagaQueueName” key’ini aşağıdaki gibi tanımlayalım.

  <appSettings>
    <add key="RabbitMQUri" value="rabbitmq://localhost/" />
    <add key="RabbitMQUserName" value="guest" />
    <add key="RabbitMQPassword" value="guest" />
    <add key="SagaQueueName" value="lightmessagingcore.boilerplate.saga"/>
  </appSettings>

“LightMessagingCore.Boilerplate.OrderUI” projesi içerisinde yapacaklarımız bu kadar.

Sagas implementasyonuna geçmeden önce ihtiyaç duyacağımız messaging contract’larını, “LightMessagingCore.Boilerplate.Messaging” projesi altında aşağıdaki gibi tanımlayalım.

using System;

namespace LightMessagingCore.Boilerplate.Messaging
{
    public interface IOrderReceivedEvent
    {
        Guid CorrelationId { get; }
        int OrderId { get; }
        string OrderCode { get; }
    }
}

“IOrderReceivedEvent” ını “OrderService” içerisinde, order sagas üzerinden bir message produce edildiğinde consume edebilmek için kullanıyor olacağız.

using System;

namespace LightMessagingCore.Boilerplate.Messaging
{
    public interface IOrderProcessedEvent
    {
        Guid CorrelationId { get; set; }
        int OrderId { get; set; }
    }
}

“IOrderProcessedEvent” ını ise “OrderService” içerisinde consume edilen message’ı business requirements doğrultusunda handle ettikten sonra, faturalandırılma işlemleri için “BillingService” e bir event fırlatabilmek için kullanacağız.

“LightMessagingCore.Boilerplate.OrderService” projesine gidelim ve “OrderReceivedConsumer” class’ını aşağıdaki gibi refactor edelim.

using System;
using System.Threading.Tasks;
using LightMessagingCore.Boilerplate.Messaging;
using MassTransit;

namespace LightMessagingCore.Boilerplate.OrderService
{
    public class OrderReceivedConsumer : IConsumer
    {
        public async Task Consume(ConsumeContext context)
        {
            var orderCommand = context.Message;

            await Console.Out.WriteLineAsync($"Order code: {orderCommand.OrderCode} Order id: {orderCommand.OrderId} is received.");

            //do something..

            await context.Publish(
                new { CorrelationId = context.Message.CorrelationId, OrderId = orderCommand.OrderId });
        }
    }
}

Bir önceki örneğimizden farkı ise artık “IOrderCommand” yerine “IOrderReceivedEvent” larını consume ediyoruz ve ilgili business işlemleri gerçekleştirdikten sonra, geriye bir “IOrderProcessedEvent” ı publish ediyoruz.

Faturalandırma işlemleri için ise business’a “BillingService” i dahil edeceğimizi söylemiştik. Solution üzerine “LightMessagingCore.Boilerplate.BillingService” isminde yeni bir console application daha ekleyelim. Ekleme işleminin ardından “System.Configuration”, “LightMessagingCore.Boilerplate.Common” ve “LightMessagingCore.Boilerplate.Messaging” library’lerini referans olarak ekleyelim. Son olarak da Nuget Package Manager üzerinden “MassTransit.RabbitMQ” paketini kuralım. Artık consumer kısmını kodlamak için hazır durumdayız.

“OrderProcessedConsumer” isminde bir class ekleyelim ve aşağıdaki gibi kodlayalım.

using System;
using System.Threading.Tasks;
using LightMessagingCore.Boilerplate.Messaging;
using MassTransit;

namespace LightMessagingCore.Boilerplate.BillingService
{
    public class OrderProcessedConsumer : IConsumer
    {
        public async Task Consume(ConsumeContext context)
        {
            var orderCommand = context.Message;

            await Console.Out.WriteLineAsync($"Order id: {orderCommand.OrderId} is being billed.");
        }
    }
}

Bu kısımda daha önceki örnekte de olduğu gibi, yeni oluşturmuş olduğumuz “IOrderProcessedEvent” ını consume ediyoruz ve ilgili business işlemlerini burada gerçekleştiriyoruz. Oluşturduğumuz consumer’ı “Program.cs” de aşağıdaki gibi initialize edelim.

using System;
using System.Configuration;
using LightMessagingCore.Boilerplate.Common;
using MassTransit;

namespace LightMessagingCore.Boilerplate.BillingService
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.Title = "BillingService";

            var bus = BusConfigurator.Instance
                .ConfigureBus((cfg, host) =>
                {
                    cfg.ReceiveEndpoint(host, ConfigurationManager.AppSettings["OrderQueueName"], e =>
                    {
                        e.Consumer();
                    });
                });

            bus.StartAsync();

            Console.WriteLine("Listening order processed event..");
            Console.ReadLine();
        }
    }
}

Console application üzerinden host olacak olan bus, “OrderQueueName” üzerinden gelecek olan receive endpoint’e göre consume işlemlerini “OrderProcessedConsumer” üzerinden gerçekleştirecektir.

“App.config” bilgisini de aşağıdaki gerçekleştirelim.

  <appSettings>
    <add key="RabbitMQUri" value="rabbitmq://localhost/" />
    <add key="RabbitMQUserName" value="guest" />
    <add key="RabbitMQPassword" value="guest" />
    <add key="OrderQueueName" value="lightmessagingcore.boilerplate.order" />
  </appSettings>

“BillingService” ide “OrderService” den sonraki business step’i olarak sisteme dahil ettik.

Şimdi saga state machine ile birlikte workflow kısmını oluşturmaya başlayabiliriz.

Solution’a “LightMessagingCore.Boilerplate.Saga” isminde yeni bir console application daha ekleyelim. Ekleme işleminin ardından “System.Configuration”, “LightMessagingCore.Boilerplate.Common” ve “LightMessagingCore.Boilerplate.Messaging” library’lerini referans olarak ekleyelim. Referans ekleme işlemini tamamladıktan sonra Nuget Package Manager üzerinden “MassTransit.RabbitMQ” ve “MassTransit.Automatonymous” paketlerini kuralım.

Tüm işlemleri tamamladıktan sonra şimdi “OrderSagaState” isminde yeni bir class oluşturalım ve “SagaStateMachineInstance” interface’ini implemente edelim.

using Automatonymous;
using System;

namespace LightMessagingCore.Boilerplate.Saga
{
    public class OrderSagaState : SagaStateMachineInstance
    {
        public Guid CorrelationId { get; set; }
        public State CurrentState { get; set; }

        public int OrderId { get; set; }
        public string OrderCode { get; set; }
    }
}

Bu class order’ın state’inin tutulacağı class dır. “CorrelationId” property’sine message’ları business flow’u sırasında correlate edebilmek için ihtiyaç duymaktadır Saga State Machine. “CurrentState” property’si ile ise mevcut state tutulmaktadır ve type’ını “Automatonymous” namespace’i altından almaktadır. Geriye kalan “OrderId” ve “OrderCode” gibi property’ler ise, state’ini tutacağımız message’ın property’leridir.

“OrderController” ı refactor ederken artık message’ları direkt olarak “SagaQueue” ya produce edeceğimizden ve initialize işlemini saga tarafından gerçekleştireceğimizi söylemiştik. Saga içerisinden ilgili message command’ı için state ve correlation işlemlerini gerçekleştirdikten sonra order business’ının gerçekleşebilmesi için “OrderService” e bir “IOrderReceivedEvent” ı publish edeceğimizi söylemiştik. Bunun için yine “LightMessagingCore.Boilerplate.Saga” projesi içerisine “Messages” klasörü ekleyelim ve içerisine “OrderReceivedEvent” isminde bir event tanımlayalım ve “IOrderReceivedEvent” interface’ini implemente edelim.

using System;
using LightMessagingCore.Boilerplate.Messaging;

namespace LightMessagingCore.Boilerplate.Saga.Messages
{
    public class OrderReceivedEvent : IOrderReceivedEvent
    {
        private readonly OrderSagaState _orderSagaState;

        public OrderReceivedEvent(OrderSagaState orderSagaState)
        {
            _orderSagaState = orderSagaState;
        }

        public Guid CorrelationId => _orderSagaState.CorrelationId;

        public string OrderCode => _orderSagaState.OrderCode;

        public int OrderId => _orderSagaState.OrderId;
    }
}

Bu event içerisinde constructor injection aracılığı ile bir “orderSagaState” alıyoruz ve order business’ının gerçekleşebilmesi için “OrderService” e publish edeceğimiz property’leri tanımlıyoruz.

“OrderSaga” yı oluşturmak için artık hazırız. Burada önce order’ın business gereği sahip olabileceği state’leri tanımlayacağız ve ardından gerçekleşebilecek event’ları tanımlayıp, bir workflow oluşturacağız. Öncelikle “OrderSaga” isminde bir class ekleyelim ve “MassTransitStateMachine” class’ını aşağıdaki gibi implemente edelim.

using Automatonymous;
using System;
using LightMessagingCore.Boilerplate.Messaging;
using LightMessagingCore.Boilerplate.Saga.Messages;

namespace LightMessagingCore.Boilerplate.Saga
{
    public class OrderSaga : MassTransitStateMachine
    {
        public State Received { get; set; }
        public State Processed { get; set; }

        public Event OrderCommand { get; set; }
        public Event OrderProcessed { get; set; }
    }
}

“MassTransitStateMachine” class’ına implemente ederken type olarak state tutmak için oluşturmuş olduğumuz “OrderSagaState” i geçiyoruz.

Örnek senaryomuz gereğince order, ilk “OrderService” e düştüğünde bir “Received” statüsüne ve ilgili order business’ı gerçekleştirildikten sonra da bir “Processed” statüsüne sahip olabilir. Bu yüzden “State” type’ına sahip, “Received” ve “Processed” property’lerini tanımlıyoruz. Event olarak ise ilk message “IOrderCommand” interface’i ile “OrderUI” tarafından produce edildiğinde, şuan oluşturuyor olduğumuz saga tarafından consume edilecektir. Bu yüzden event olarak bir “OrderCommand” bulunmaktadır. “OrderService” tarafından ilgili işlemler gerçekleştirildikten sonra publish edilen bir diğer event olarak da “IOrderProcessedEvent” bulunmaktadır. Bunun için birde “OrderProcessed” event’ı tanımlıyoruz.

Şimdi “OrderSaga” nın constructor’ı içerisinde initial state’i tanımlayarak, state machine üzerine event’ları aşağıdaki gibi tanımlayalım.

    public class OrderSaga : MassTransitStateMachine
    {
        public State Received { get; set; }
        public State Processed { get; set; }

        public Event OrderCommand { get; set; }
        public Event OrderProcessed { get; set; }

        public OrderSaga()
        {
            InstanceState(s => s.CurrentState);

            Event(() => OrderCommand,
                cec =>
                        cec.CorrelateBy(state => state.OrderCode, context => context.Message.OrderCode)
                        .SelectId(selector => Guid.NewGuid()));

            Event(() => OrderProcessed, cec => cec.CorrelateById(selector =>
                        selector.Message.CorrelationId));
        }
}

“InstanceState” method’u ile “OrderSagaState” i, state property’si üzerinden initialize ediyoruz. Ardından “Event” method’u ile içerisinde sıralaması önemli olarak ilk gelecek olan event’in “OrderCommand” olacağını ve bu message flow’unu, state’in “OrderCode” property’si üzerinden(unique olması önemli) correlate edileceğini söylüyoruz. Buradaki “state” state’ini tutmak için oluşturmuş olduğumuz yeni “OrderSagaState” objesidir ve “context” expression’ı ise o an consume edilen “IOrderCommand” event’ıdır.

“SelectId” method’u ile ise correlation işlemleri için bir id seçimi yapıyoruz. Bu işlem için eğer kullanabileceğiniz unique bir correlation id’niz yok ise, yeni bir guid oluşturup gerçekleştirmek mümkündür. Diğer stepler arasında workflow tarafından bu correlation id otomatik olarak set edilecektir. Bir diğer event olan “OrderProcessed” ı tanımlıyoruz ve correlation işleminin “Message” üzerindeki “CorrelationId” den sağlanacağını belirtiyoruz.

Gerekli event tanımlamalarını da tamamladıktan sonra, şimdi workflow için activity’leri tanımlamaya başlayabiliriz.

using Automatonymous;
using System;
using LightMessagingCore.Boilerplate.Messaging;
using LightMessagingCore.Boilerplate.Saga.Messages;

namespace LightMessagingCore.Boilerplate.Saga
{
    public class OrderSaga : MassTransitStateMachine
    {
        public State Received { get; set; }
        public State Processed { get; set; }

        public Event OrderCommand { get; set; }
        public Event OrderProcessed { get; set; }

        public OrderSaga()
        {
            InstanceState(s => s.CurrentState);

            Event(() => OrderCommand,
                cec =>
                        cec.CorrelateBy(state => state.OrderCode, context => context.Message.OrderCode)
                        .SelectId(selector => Guid.NewGuid()));

            Event(() => OrderProcessed, cec => cec.CorrelateById(selector =>
                        selector.Message.CorrelationId));

            Initially(
                When(OrderCommand)
                    .Then(context =>
                    {
                        context.Instance.OrderCode = context.Data.OrderCode;
                        context.Instance.OrderId = context.Data.OrderId;
                    })
                    .ThenAsync(
                        context => Console.Out.WriteLineAsync($"{context.Data.OrderId} order id is received..")
                    )
                    .TransitionTo(Received)
                    .Publish(context => new OrderReceivedEvent(context.Instance))
                );


            During(Received,
                When(OrderProcessed)
                .ThenAsync(
                    context => Console.Out.WriteLineAsync($"{context.Data.OrderId} order id is processed.."))
                .Finalize()
                );

            SetCompletedWhenFinalized();
        }
    }
}

“Initially” method’u ile ilk başta ne işlemi gerçekleştireceğimizi tanımlayacağız. Bunun için fluent bir şekilde “When” ve “Then” method’larını kullanıyoruz. Yani bir “OrderCommand” geldiğinde sonra “context.Instance” a, consume edilen data üzerindeki değeri alması gerektiğini söylüyoruz. Bu noktada “context.Instance” state’i tuttuğumuz “OrderSagaState” objesi oluyor. “ThenAsync” method’u ile yapmak istediğiniz farklı business işlemleri var ise async bir şekilde burada gerçekleştirebilmek mümkündür.

Bunlara ek olarak fluently bir şekilde kullanabileceğiniz, bazı harika method’lar da mevcuttur. Örnek vermek gerekirse conditional işlemler gerçekleştirebilmek için “If” method’u, saga kısmına compensation chain oluşturabilmek için “Catch” method’u gibi bazı method’lar mevcuttur. Bunlara ek olarak state bağlımlı scheduled işlemler de gerçekleştirebilmek için “Schedule” method’uda vardır.

.TransitionTo(Received)
.Publish(context => new OrderReceivedEvent(context.Instance))

“Initially” method’unda yukarıdaki son iki satırda da, “TransitionTo” method’u ile state’in artık “Received” olduğunu ve ardından “Publish” method’u ile işlenmek üzere order command’ı bir “OrderReceivedEvent” olarak queue’ya fırlatıyoruz.

Artık order saga’nın ilk başta ne yapması gerektiği hazır durumdadır. Artık workflow üzerine diğer activity’leri de tanımlamaya başlayabiliriz. “During” method’u ile objenin belirtilen state’e sahip olduğunda gerçekleşmesini istediğimiz işlemleri tanımlıyoruz. Biz ise örnek senaryomuz gereği obje “Received” state’ine sahipse ve “BillingService” için “OrderProcessed” event’ı yakalandıysa, console ekranına ilgili işlem bilgisini yazmasını söylüyoruz. Yani bu kısımda “BillingService” devreye girip çalışmaya başlamasıdır. ve “Finalize” method’u ile ilgili state’i tamamlıyoruz.

SetCompletedWhenFinalized();

Tüm bu işlemlerin sonunda artık yukarıdaki satır ile, ilgili saga state instance’ının sonlandığını söylüyoruz ve MassTransit tarafından artık bu instance bilgileri repository üzerinden otomatik olarak silinecektir. (bu kısma birazdan değineceğim)

Artık bir order workflow’una ve state machine’ine sahibiz. Örnek senaryomuz gereği her bir servis ilgili işlemini process ederken, saga console’u üzerinden gelen bilgilerini görebiliyor olacağız.

Şimdi “Program.cs” e gelelim ve burada saga host’unu aşağıdaki gibi initialize edelim.

using System;
using System.Configuration;
using Automatonymous;
using LightMessagingCore.Boilerplate.Common;
using MassTransit.Saga;

namespace LightMessagingCore.Boilerplate.Saga
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.Title = "Saga";
            var orderSaga = new OrderSaga();
            var repo = new InMemorySagaRepository();

            var bus = BusConfigurator.Instance
                .ConfigureBus((cfg, host) =>
                {
                    cfg.ReceiveEndpoint(host, ConfigurationManager.AppSettings["SagaQueueName"], e =>
                    {
                        e.StateMachineSaga(orderSaga, repo);
                    });
                });

            bus.StartAsync();

            Console.WriteLine("Order saga started..");
            Console.ReadLine();
        }
    }
}

Burada öncelikle az önce yukarıda bahsetmiş olduğum repository kavramından bahsetmek istiyorum. MassTransit ilgili state’leri repository’de tutmaktadır. Ben örnek gereği “InMemorySagaRepository” ile state işlemlerimi isminden de anlaşılabileceği gibi in memory olarak gerçekleştirdim. Bu işlemleri database üzerinde entityframework repository’si ile de gerçekleştirebilmek mümkündür. Bu işlemlerin ardından diğer host’lardan tek farkı ise, “Consumer” method’u yerine “StateMachineSaga” method’unu kullanmamızdır.

Test işlemlerinden önce”App.config” bilgilerini aşağıdaki gibi güncelleyelim.

  <appSettings>
    <add key="RabbitMQUri" value="rabbitmq://localhost/" />
    <add key="RabbitMQUserName" value="guest" />
    <add key="RabbitMQPassword" value="guest" />
    <add key="SagaQueueName" value="lightmessagingcore.boilerplate.saga" />
  </appSettings>

Dilerseniz şimdi solution’ın multiple startup project kısmına “LightMessagingCore.Boilerplate.BillingService” ve “LightMessagingCore.Boilerplate.Saga” yı da dahil edelim ve start’a basalım.

Açılan “OrderUI” üzerinden yukarıdaki bilgileri girelim ve “Create” butonuna basalım.

Dikkat ederseniz “IOrderCommand” message’ı ilk olarak “Saga” ya geldi ve “IOrderReceivedEvent” ı olarak publish edildi. Ardından “OrderService” publish edilen event’ı handle ettikten sonra “IOrderProcessedEvent” ını publish etti. “BillingService” de ilgili event’ı consume ederek, handle işlemini gerçekleştirdi.

Her bir step, state’ler doğrultusunda gördüğümüz gibi saga workflow’u üzerinden geçerek gerçekleşti. Sizlerde ihtiyaçlarınız doğrultusunda workflow üzerinde activity’leri tanımlarken, gerçekleştirmek istediğiniz eylemleri de tanımlayabilirsiniz.

Bir sürelik aradan sonra umarım faydalı bir makale içeriği olmuştur. Örnek uygulamanın tümüne aşağıdan erişebilirsiniz.

https://github.com/GokGokalp/lightmessagingcore-boilerplate-with-saga

Referanslar:

http://automatonymous.readthedocs.io/en/latest/overview/saga.html?highlight=StateMachineSaga
http://vasters.com/archive/Sagas.html

https://medium.com/@roman01la/confusion-about-saga-pattern-bbaac56e622#.npnnb9una
Gökhan Gökalp

View Comments

  • Merhaba Hocam,

    Ne yaptıysam saga işleminin sonucunu respond ile web api içerisine alamadım. İstediğim şu saga workflow çalışacak sonucu isteği yapan web apiye gönderecek o da web apiye istek yapan client e gönderecek.

    Bu konuda bir önerin var mıdır ?

    • Merhaba, teşekkür ederim öncelikle ilginiz için. Kusura bakmayın henüz yeni vakit bulabildim bloğuma bakmak için. İsteğiniz saga ile oluşturmuş olduğunuz bir flow'da, sync olarak geriye bir response mu dönmek istiyorsunuz? Eğer doğru anladı isem, saga ile farklı servis'lerinizi bir flow'a soktuğunuzda, sync olarak bir cevap geriye dönmek yerine, çalışan servis'ten bir event fırlatıp o event sonucunda bir şeyler yapmak async olarak daha doğru olmaz mı acaba? Çünkü saga flow'unda yaptığı sizin farklı servislerinizi bir sıraya sokarak, ilgili event'lar doğrultusunda işlemleri sırası ile ve oluşabilecek fail durumlarında compensating işlemlerini gerçekleştirmektir. Nasıl bir senaryoda öyle bir ihtiyacınız oldu acaba?

      • Merhaba cevap için teşekkürler, amaç şu aslında;

        Web api' ye client istek yapıyor. İsteğin sonucu ne oldu konusunda client e bir cevap dönmüyoruz. İşlemleri sıraya sokuyoruz buraya kadar sorun yok ama son işlemde finalize yaptığımızda bu workflow işi bitirdi diye dönmemiz gerekiyor ki işlemin bittiğini client anlasın.

        Aslında sorunu çözdüm. Initilize da gelen isteğin ilk context ini tutup işlem bittiğinde respond ile mesaj gönderdiğinizde workflow a ilk isteği yapan client e sonuç gönderebiliyoruz.

        • Merhaba tekrardan Yalçın bey. Anladığım kadarı ile async bir işlem gerçekleştirmek istiyorsunuz ve arkada bazı flow'lara sahipsiniz. Bunları ordered bir şekilde çalıştırmak istiyorsunuz bazı state'lere göre. Fakat takıldığım nokta, 1.cisi bu client kim? Çünkü, hem async işletmek istiyorsunuz hem de client'a geri haber vermek istiyorsunuz? Eğer, client'ın bundan haberdar olması gerekiyor ve buna göre belirli başlı işlemler de yapılacaksa, işlem bittiğinde bir event fırlatın ve bu event'i dinleyen ilgili consumer'lar gereken işi yapsın (eğer bu client third party başka bir yer değilse).

          Saygılarımla

  • Merhaba , Aynı console uygulaması içerisinde birden fazla consumer tanımlamak istiyorum fakat ilk consumer düştükten sonra diğer mesajı publish edince herhangi bir aksiyon alınmıyor.

    • Merhaba, consumer düştükten kastınız nedir? Consumer down mı oluyor? yoksa message'ı consume mü ediyor?

      • Merhaba , aslında kendimi tam anlatamamışım.

        Ben yapıyı biraz daha düzeltiyim tekbir console uygulaması üstünden ilerletiyimd derken consumerları tanımlamayı unutmuşum :) Fakat burda da şöyle birşey yaptım aslında ilgili consumerları tanımlarken ortak bir RequestConsumer classı tanımlayıp içerisinde publish ettiğim mesajları şu şekilde implement ettim ; ardından da tek bir consumer class üstünden gerekli eventları fırlattım.

        public class OrderRequestConsumer :
        IConsumer,
        IConsumer,
        IConsumer

        Bu arada makaleniz için çok teşekkür ederim anlamama çok yardımcı oldu :)

  • Merhaba.Ben aynen uyquladim bir sorunla karşılaşdım.Saga order id is received edir.Ancaq
    order id is processed etmir.Buna gorede OrderService ve BillingService consumer yapamiyor.Ne ola bilir acaba?

Recent Posts

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Policy Enforcement-Automated Governance with OPA Gatekeeper and Ratify) – Part 2

{:tr} Makalenin ilk bölümünde, Software Supply Chain güvenliğinin öneminden ve containerized uygulamaların güvenlik risklerini azaltabilmek…

6 months ago

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Security Scanning, SBOMs, Signing&Verifying Artifacts) – Part 1

{:tr}Bildiğimiz gibi modern yazılım geliştirme ortamında containerization'ın benimsenmesi, uygulamaların oluşturulma ve dağıtılma şekillerini oldukça değiştirdi.…

8 months ago

Delegating Identity & Access Management to Azure AD B2C and Integrating with .NET

{:tr}Bildiğimiz gibi bir ürün geliştirirken olabildiğince farklı cloud çözümlerinden faydalanmak, harcanacak zaman ve karmaşıklığın yanı…

1 year ago

How to Order Events in Microservices by Using Azure Service Bus (FIFO Consumers)

{:tr}Bazen bazı senaryolar vardır karmaşıklığını veya eksi yanlarını bildiğimiz halde implemente etmekten kaçamadığımız veya implemente…

2 years ago

Providing Atomicity for Eventual Consistency with Outbox Pattern in .NET Microservices

{:tr}Bildiğimiz gibi microservice architecture'ına adapte olmanın bir çok artı noktası olduğu gibi, maalesef getirdiği bazı…

2 years ago

Building Microservices by Using Dapr and .NET with Minimum Effort – 02 (Azure Container Apps)

{:tr}Bir önceki makale serisinde Dapr projesinden ve faydalarından bahsedip, local ortamda self-hosted mode olarak .NET…

2 years ago