RabbitMQ ve Publish-Subscribe Patterni ile Fanout Exchange

Merhaba arkadaşlar.

Bir süredir Messaging sistemleri üzerinde yoğun bir şekilde çalışmaktayım. Bu makalede ise RabbitMQ’da Publish-Subscribe pattern’i ile Fanout Exchange‘e değinmeye çalışacağım. Öncelikle tüm bu işlemlere başlamadan RabbitMQ hakkındaki bilgilerimizi tazeleyebilmek adına, buraya tıklayarak RabbitMQ hakkında daha önce yazdığım makalelere bir göz atabilirsiniz.

Dilerseniz konuya girmeden önce biraz messaging sistemlerinden bahsedelim.

Messaging yapıları ile uygulamalar, loosely coupled olarak asenkron bir şekilde birbirleri ile iletişime geçebilmektedirler.

Messaging yapıları temel olarak verinin, bir uygulamadan bir başka uygulamaya aktarılması ile sorumludur. Web Servisler, Windows Servisler veya MVC uygulamaları gibi bir çok platform birbirleri ile iletişime geçme ihtiyacında olabilirler. İşte bu noktada Messaging yapıları bu entegrasyon sürecine odaklanarak, uygulamaların platform bağımsız bir şekilde herhangi bir bilgiyi exchange edebilmelerini sağlamaktadırlar.

Hatırlarsak daha önceki makalemde RabbitMQ üzerinde “Direct Exchange”, “Fanout Exchange” ve “Topic Exchange” gibi farklı exchange tiplerinin bulunduğundan bahsetmiştik. Bu makalemde ise Fanout Exhange tipine değineceğiz ve bir örnek gerçekleştireceğiz.

Pub-Sub Message exchange pattern’ini hatırlarsak Publisher tarafından exchange için gönderilen message, queue üzerinde kendisine bağlı olan tüm Subscriber’lara distributed olarak gönderilmekteydi. Bu sefer ki exchange işlemini ise Fanout olarak gerçekleştireceğiz. Fanout exchange tipi isminden de anlaşılabileceği üzere aynı message, farklı consumer’lar tarafından farklı yollarla process edilmeye ihtiyaç duyulduğunda kullanılmaktadır.

Bir başka değişle yukarıda bulunan görsel gibi fanout, kendisine bağlı olan tüm queue’lara aynı message’ı iletir.

Konuya bir örnek ile devam edelim. Bir e-ticaret sistemi düşünelim ve sipariş işlemi gerçekleştiğinde Publisher tarafından “foo.billing” ve “foo.shipping” queue’larına bu sipariş bilgilerini gönderelim. Daha önceki RabbitMQ makalesindeki projede kullanmış olduğumuz bazı kodları kullanacağız. Öncelikle yeni bir solution oluşturalım ve “FanoutExMessaging.Common” isminde bir class library ekleyelim. Ekleme işleminin ardından Nuget Package Manager üzerinden “RabbitMQ.Client” paketini yükleyelim.

Ardından daha önceki projede de kullanmış olduğumuz “RabbitMQService” class’ını ekleyelim ve aşağıdaki gibi tanımlayalım.

using RabbitMQ.Client;

namespace FanoutExMessaging.Common
{
    public class RabbitMQService
    {
        // localhost üzerinde kurulu olduğu için host adresi olarak bunu kullanıyorum.
        private readonly string _hostName = "localhost";

        public IConnection GetRabbitMQConnection()
        {
            ConnectionFactory connectionFactory = new ConnectionFactory()
            {
                // RabbitMQ'nun bağlantı kuracağı host'u tanımlıyoruz. Herhangi bir güvenlik önlemi koymak istersek, Management ekranından password adımlarını tanımlayıp factory içerisindeki "UserName" ve "Password" property'lerini set etmemiz yeterlidir.
                HostName = _hostName
            };

            return connectionFactory.CreateConnection();
        }
    }
}

Common katmanı ile işimiz şimdilik bu kadar. Solution üzerine “FanoutExMessaging.Publisher” isminde yeni bir console application ekleyelim ardından Nuget Package Manager üzerinden “RabbitMQ.Client” paketini buraya da dahil edelim ve “FanoutExMessaging.Common” library’sini referans olarak göstererek, Publisher’ı kodlamaya başlayalım.

using System;
using System.Text;
using FanoutExMessaging.Common;
using RabbitMQ.Client;

namespace FanoutExMessaging.Publisher
{
    class Program
    {
        static void Main(string[] args)
        {
            var rabbitMQService = new RabbitMQService();

            using (var connection = rabbitMQService.GetRabbitMQConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare("foo.exchange", ExchangeType.Fanout, true, false, null);

                    channel.QueueDeclare("foo.billing", true, false, false, null);
                    channel.QueueDeclare("foo.shipping", true, false, false, null);

                    channel.QueueBind("foo.billing", "foo.exchange", "");
                    channel.QueueBind("foo.shipping", "foo.exchange", "");

                    var publicationAddress = new PublicationAddress(ExchangeType.Fanout, "foo.exchange", "");

                    channel.BasicPublish(publicationAddress, null,
                        Encoding.UTF8.GetBytes("12345 numaralı sipariş geldi."));
                }
            }

            Console.WriteLine("Sipariş publish işlemi gerçekleştirildi.");
            Console.ReadLine();
        }
    }
}

Burada dikkat etmemiz gereken ilk yer channel üzerinde yeni bir exchange tanımlıyoruz. Tanımlamış olduğumuz bu exchange ile channel’ın “foo.exchange” isminde ve Fanout tipinde olacağını belirtiyoruz. Ardından “foo.billing” ve “foo.shipping” isminde iki adet queue tanımlıyor ve bu queue’ları “foo.exchange” üzerine bind ediyoruz. “PublicationAddress” class’ı ile de message’ın publish yapılacağı adresi ve exchange type’ını tanımlıyoruz.

Tanımlamaların ardından channel üzerinde bulunan “BasicPublish” method’u ile, “12345 numaralı sipariş geldi.” mesajını tüm queue’lara publish yapıyoruz. Dilerseniz Publisher’ı test edebilmek için çalıştıralım ve RabbitMQ Management ekranı üzerinden, Exchanges’in ve Queue’ların oluşup oluşmadığını bir kontrol edelim.

Exchanges sekmesine baktığımızda en altta “foo.exchange” isminde “fanout” type’ına sahip bir exchange oluştuğunu görebiliyoruz.

Queues sekmesinde ise “foo.billing” ve “foo.shipping” queue’ları başarılı bir şekilde oluşmuş durumdadır.

Şimdi bu queue’ları process edecek olan Consumer’ları kodlayalım. Öncelikle solution üzerine “FanoutExMessaging.BillingConsumer” isminde yeni bir console application oluşturalım ve içerisine Nuget Package Manager üzerinden “RabbitMQ.Client” paketini dahil edelim ve ardından “FanoutExMessaging.Common” library’sini referans olarak ekleyelim.

BillingConsumer main method’unu aşağıdaki gibi kodlayalım.

using System;
using System.Text;
using FanoutExMessaging.Common;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace FanoutExMessaging.BillingConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var rabbitMQService = new RabbitMQService();

            using (var connection = rabbitMQService.GetRabbitMQConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);

                        Console.WriteLine("foo.billing üzerinden mesaj alındı: {0}", message);
                    };

                    channel.BasicConsume("foo.billing", false, consumer);
                    Console.ReadLine();
                }
            }
        }
    }
}

Burada ise yaptığımız channel üzerinden “foo.billing” queue’suna bağlanmaktır.

İkinci queue’muz olan “foo.shipping” için “FanoutExMessaging.ShippingConsumer” isminde yeni bir console application daha ekleyelim ardından Nuget Package Manager üzerinden “RabbitMQ.Client” paketini dahil ederek, “FanoutExMessaging.Common” library’sini referans olarak ekleyelim. “foo.billing” de olduğu gibi tüm kodlarımı aynı olacak. Buradaki tek fark ise “foo.billing” queue’su yerine, “foo.shipping” queue’suna bağlanmamız olacak.

ShippingConsumer main method’unu aşağıdaki gibi kodlayalım.

using System;
using System.Text;
using FanoutExMessaging.Common;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace FanoutExMessaging.ShippingConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var rabbitMQService = new RabbitMQService();

            using (var connection = rabbitMQService.GetRabbitMQConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);

                        Console.WriteLine("foo.shipping üzerinden mesaj alındı: {0}", message);
                    };

                    channel.BasicConsume("foo.shipping", false, consumer);
                    Console.ReadLine();
                }
            }
        }
    }
}

Şuan elimizde bir Publisher ve iki adet Consumer bulunmakta. Solution’ın ayarlarından “FanoutExMessaging.BillingConsumer” ve “FanoutExMessaging.ShippingConsumer” ın aynı anda çalışabilmesi için multiple startup’ı seçelim.

Projeyi çalıştırdığımızda, aşağıdaki gibi iki adet console uygulaması açılacaktır.

Baktığımızda iki consumer üzerinde birer adet aynı mesaj bulunmaktadır. Bunun sebebi ise Publisher’ı test edebilmek için çalıştırdığımızdan dolayı “foo.billing” ve “foo.shipping” queue’larına test mesajının gönderilmesidir. İki consumer açıkken Publisher’ı tekrardan çalıştıralım ve sonucunu izleyelim.

Publisher’ın tekrardan çalıştırılması sonucunda fanout exchange aracılığı ile tüm queue’lara aynı mesaj tekrardan gönderilmiştir. Bu işlemin ardından ise ilgili consumer’lar, kendi queue’larında bulunan mesajı fetch etmişlerdir.

Makalemizi sonlandırırken tekrardan belirtmek gerekirse, eğer aynı işi farklı şekillerde ve farklı consumer’lar tarafından process edilme ihtiyacı duyulursa bu örneğimizde olduğu gibi, fanout exchange type’ı kolay bir şekilde kullanılabilinir.

Takipte kalın…

fanoutexmessagingwithrabbitmq

Gökhan Gökalp

View Comments

  • Merhabalar,
    Eline sağlık uğraşmıssınız fakat RabbitMq 'da native kodlamak yerine masstransit yada nservicebus gibi wrap kütüphanelerini tercih etmek gerekir diye düşünüyorum. Çünkü bugun rabbitmq yarın msmq vs kullanılabilir. Ayrıca yukarıdaki saydığım kütüphaneler fluent kodlamayı sağlar ve native kodlamada satırlar dolusu kod yazmak ufak bir plugin ile tek satırda işleminiz görür . Bu sebepden ötürü günümüzde yukardaki gibi native kodlarımın çok ömrü olmadığını düşünüyorum.

    • Merhaba, teşekkür ederim değerli yorumunuz için. Eğer enterprise düzeyde bir messaging yapısı kuruluyor, reliable söz konusu ise evet kesinlikle NServiceBus gibi Servis Bus framework'lerini tercih ederim/edilmelidir. (şahsi fikrim) Fakat sadece basic düzeyde bir MQ işlemleri söz konusu ise hight level bir abstraction'a ihtiyaç duyulmayada bilinir (bence). Buda business ile alakalı bir şey. Asıl konuya da gelecek olursak makalenin amacında ise başlatmış olduğum zaman zaman RabbitMQ serisine devam etmek ve burada pub/sub nasıl uygulanabilinir'i göstermek ki buda pure olarak olmalı diye düşündüğüm için bu şekilde. İyi günler dilerim.

    • Yazıyı okuduktan sonra ben de aynı soruyu sorayım derken sorulduğunu gördüm, çok teşekkürler :)

      RabbitMQ'de işlenemeyen mesajlar, retry ihtiyacı, çözümü gibi konularda araştırma yapıldığında yazılar service bus'a yönlendiriliyor. Burada hangi service bus'ı kullanmalı? NServiceBus ücretli, ama kullanımı yaygın. Alternatif MassTransit var.

      Bu konudaki tecrübelerinizi de paylaşırsanız sevinirim.

      • Merhaba, retry ihtiyaçları business'ınızın vereceği rule'lara göre değişir aslında. Hata tiplerini kategorize edebilir ve bu şekilde retry edilmesine ihtiyaç duyulanları belirleyebilirsiniz. Uygulamanızın error handling kısmında bir hata oluştuğunda belirlediğiniz rule'lara göre o mesajı tekrardan kuyruğa koyabilirsiniz, gibi yöntemler var. Kanımca messaging yapılarının en zor kısımları da diyebilirim :) NServiceBus evet ücretli, alıp almama kararı size kalmış :) MassTransit ise genelde gördüğüm kadarı ile en çok tercih edilen BUS'lar arasında. Hangisi diye sorarsan açıkcası bu konu hakkında yazılmış bir çok karşılaştırmalar mevcut. Şuraya bir bakmanı tavsiye edebilirim. :)

  • Merhaba,
    Messaging başlığı altındaki yazılarınızı detaylıca inceliyorum. Hepsi çok başarılı, devamını dilerim. Teşekkürler.

Recent Posts

Overcoming Event Size Limits with the Conditional Claim-Check Pattern in Event-Driven Architectures

{:en}In today’s technological age, we typically build our application solutions on event-driven architecture in order…

3 months ago

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Policy Enforcement-Automated Governance with OPA Gatekeeper and Ratify) – Part 2

{:tr} Makalenin ilk bölümünde, Software Supply Chain güvenliğinin öneminden ve containerized uygulamaların güvenlik risklerini azaltabilmek…

8 months ago

Securing the Supply Chain of Containerized Applications to Reduce Security Risks (Security Scanning, SBOMs, Signing&Verifying Artifacts) – Part 1

{:tr}Bildiğimiz gibi modern yazılım geliştirme ortamında containerization'ın benimsenmesi, uygulamaların oluşturulma ve dağıtılma şekillerini oldukça değiştirdi.…

10 months ago

Delegating Identity & Access Management to Azure AD B2C and Integrating with .NET

{:tr}Bildiğimiz gibi bir ürün geliştirirken olabildiğince farklı cloud çözümlerinden faydalanmak, harcanacak zaman ve karmaşıklığın yanı…

1 year ago

How to Order Events in Microservices by Using Azure Service Bus (FIFO Consumers)

{:tr}Bazen bazı senaryolar vardır karmaşıklığını veya eksi yanlarını bildiğimiz halde implemente etmekten kaçamadığımız veya implemente…

2 years ago

Providing Atomicity for Eventual Consistency with Outbox Pattern in .NET Microservices

{:tr}Bildiğimiz gibi microservice architecture'ına adapte olmanın bir çok artı noktası olduğu gibi, maalesef getirdiği bazı…

2 years ago