Skip to content

RabbitMQ ve Publish-Subscribe Patterni ile Fanout Exchange

Merhaba arkadaşlar.

Bir süredir Messaging sistemleri üzerinde yoğun bir şekilde çalışmaktayım. Bu makalede ise RabbitMQ’da Publish-Subscribe pattern’i ile Fanout Exchange‘e değinmeye çalışacağım. Öncelikle tüm bu işlemlere başlamadan RabbitMQ hakkındaki bilgilerimizi tazeleyebilmek adına, buraya tıklayarak RabbitMQ hakkında daha önce yazdığım makalelere bir göz atabilirsiniz.

Dilerseniz konuya girmeden önce biraz messaging sistemlerinden bahsedelim.

Messaging yapıları ile uygulamalar, loosely coupled olarak asenkron bir şekilde birbirleri ile iletişime geçebilmektedirler.

Messaging yapıları temel olarak verinin, bir uygulamadan bir başka uygulamaya aktarılması ile sorumludur. Web Servisler, Windows Servisler veya MVC uygulamaları gibi bir çok platform birbirleri ile iletişime geçme ihtiyacında olabilirler. İşte bu noktada Messaging yapıları bu entegrasyon sürecine odaklanarak, uygulamaların platform bağımsız bir şekilde herhangi bir bilgiyi exchange edebilmelerini sağlamaktadırlar.

Hatırlarsak daha önceki makalemde RabbitMQ üzerinde “Direct Exchange”, “Fanout Exchange” ve “Topic Exchange” gibi farklı exchange tiplerinin bulunduğundan bahsetmiştik. Bu makalemde ise Fanout Exhange tipine değineceğiz ve bir örnek gerçekleştireceğiz.

publish-subscriber

Pub-Sub Message exchange pattern’ini hatırlarsak Publisher tarafından exchange için gönderilen message, queue üzerinde kendisine bağlı olan tüm Subscriber’lara distributed olarak gönderilmekteydi. Bu sefer ki exchange işlemini ise Fanout olarak gerçekleştireceğiz. Fanout exchange tipi isminden de anlaşılabileceği üzere aynı message, farklı consumer’lar tarafından farklı yollarla process edilmeye ihtiyaç duyulduğunda kullanılmaktadır.

fanout-exchange

Bir başka değişle yukarıda bulunan görsel gibi fanout, kendisine bağlı olan tüm queue’lara aynı message’ı iletir.

Konuya bir örnek ile devam edelim. Bir e-ticaret sistemi düşünelim ve sipariş işlemi gerçekleştiğinde Publisher tarafından “foo.billing” ve “foo.shipping” queue’larına bu sipariş bilgilerini gönderelim. Daha önceki RabbitMQ makalesindeki projede kullanmış olduğumuz bazı kodları kullanacağız. Öncelikle yeni bir solution oluşturalım ve “FanoutExMessaging.Common” isminde bir class library ekleyelim. Ekleme işleminin ardından Nuget Package Manager üzerinden “RabbitMQ.Client” paketini yükleyelim.

rabbitmq-client

Ardından daha önceki projede de kullanmış olduğumuz “RabbitMQService” class’ını ekleyelim ve aşağıdaki gibi tanımlayalım.

using RabbitMQ.Client;

namespace FanoutExMessaging.Common
{
    public class RabbitMQService
    {
        // localhost üzerinde kurulu olduğu için host adresi olarak bunu kullanıyorum.
        private readonly string _hostName = "localhost";

        public IConnection GetRabbitMQConnection()
        {
            ConnectionFactory connectionFactory = new ConnectionFactory()
            {
                // RabbitMQ'nun bağlantı kuracağı host'u tanımlıyoruz. Herhangi bir güvenlik önlemi koymak istersek, Management ekranından password adımlarını tanımlayıp factory içerisindeki "UserName" ve "Password" property'lerini set etmemiz yeterlidir.
                HostName = _hostName
            };

            return connectionFactory.CreateConnection();
        }
    }
}

Common katmanı ile işimiz şimdilik bu kadar. Solution üzerine “FanoutExMessaging.Publisher” isminde yeni bir console application ekleyelim ardından Nuget Package Manager üzerinden “RabbitMQ.Client” paketini buraya da dahil edelim ve “FanoutExMessaging.Common” library’sini referans olarak göstererek, Publisher’ı kodlamaya başlayalım.

using System;
using System.Text;
using FanoutExMessaging.Common;
using RabbitMQ.Client;

namespace FanoutExMessaging.Publisher
{
    class Program
    {
        static void Main(string[] args)
        {
            var rabbitMQService = new RabbitMQService();

            using (var connection = rabbitMQService.GetRabbitMQConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare("foo.exchange", ExchangeType.Fanout, true, false, null);

                    channel.QueueDeclare("foo.billing", true, false, false, null);
                    channel.QueueDeclare("foo.shipping", true, false, false, null);

                    channel.QueueBind("foo.billing", "foo.exchange", "");
                    channel.QueueBind("foo.shipping", "foo.exchange", "");

                    var publicationAddress = new PublicationAddress(ExchangeType.Fanout, "foo.exchange", "");

                    channel.BasicPublish(publicationAddress, null,
                        Encoding.UTF8.GetBytes("12345 numaralı sipariş geldi."));
                }
            }

            Console.WriteLine("Sipariş publish işlemi gerçekleştirildi.");
            Console.ReadLine();
        }
    }
}

Burada dikkat etmemiz gereken ilk yer channel üzerinde yeni bir exchange tanımlıyoruz. Tanımlamış olduğumuz bu exchange ile channel’ın “foo.exchange” isminde ve Fanout tipinde olacağını belirtiyoruz. Ardından “foo.billing” ve “foo.shipping” isminde iki adet queue tanımlıyor ve bu queue’ları “foo.exchange” üzerine bind ediyoruz. “PublicationAddress” class’ı ile de message’ın publish yapılacağı adresi ve exchange type’ını tanımlıyoruz.

Tanımlamaların ardından channel üzerinde bulunan “BasicPublish” method’u ile, “12345 numaralı sipariş geldi.” mesajını tüm queue’lara publish yapıyoruz. Dilerseniz Publisher’ı test edebilmek için çalıştıralım ve RabbitMQ Management ekranı üzerinden, Exchanges’in ve Queue’ların oluşup oluşmadığını bir kontrol edelim.

exchanges

Exchanges sekmesine baktığımızda en altta “foo.exchange” isminde “fanout” type’ına sahip bir exchange oluştuğunu görebiliyoruz.

queues

Queues sekmesinde ise “foo.billing” ve “foo.shipping” queue’ları başarılı bir şekilde oluşmuş durumdadır.

Şimdi bu queue’ları process edecek olan Consumer’ları kodlayalım. Öncelikle solution üzerine “FanoutExMessaging.BillingConsumer” isminde yeni bir console application oluşturalım ve içerisine Nuget Package Manager üzerinden “RabbitMQ.Client” paketini dahil edelim ve ardından “FanoutExMessaging.Common” library’sini referans olarak ekleyelim.

BillingConsumer main method’unu aşağıdaki gibi kodlayalım.

using System;
using System.Text;
using FanoutExMessaging.Common;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace FanoutExMessaging.BillingConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var rabbitMQService = new RabbitMQService();

            using (var connection = rabbitMQService.GetRabbitMQConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);

                        Console.WriteLine("foo.billing üzerinden mesaj alındı: {0}", message);
                    };

                    channel.BasicConsume("foo.billing", false, consumer);
                    Console.ReadLine();
                }
            }
        }
    }
}

Burada ise yaptığımız channel üzerinden “foo.billing” queue’suna bağlanmaktır.

İkinci queue’muz olan “foo.shipping” için “FanoutExMessaging.ShippingConsumer” isminde yeni bir console application daha ekleyelim ardından Nuget Package Manager üzerinden “RabbitMQ.Client” paketini dahil ederek, “FanoutExMessaging.Common” library’sini referans olarak ekleyelim. “foo.billing” de olduğu gibi tüm kodlarımı aynı olacak. Buradaki tek fark ise “foo.billing” queue’su yerine, “foo.shipping” queue’suna bağlanmamız olacak.

ShippingConsumer main method’unu aşağıdaki gibi kodlayalım.

using System;
using System.Text;
using FanoutExMessaging.Common;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace FanoutExMessaging.ShippingConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var rabbitMQService = new RabbitMQService();

            using (var connection = rabbitMQService.GetRabbitMQConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    var consumer = new EventingBasicConsumer(channel);

                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);

                        Console.WriteLine("foo.shipping üzerinden mesaj alındı: {0}", message);
                    };

                    channel.BasicConsume("foo.shipping", false, consumer);
                    Console.ReadLine();
                }
            }
        }
    }
}

Şuan elimizde bir Publisher ve iki adet Consumer bulunmakta. Solution’ın ayarlarından “FanoutExMessaging.BillingConsumer” ve “FanoutExMessaging.ShippingConsumer” ın aynı anda çalışabilmesi için multiple startup’ı seçelim.

Projeyi çalıştırdığımızda, aşağıdaki gibi iki adet console uygulaması açılacaktır.

rabbitmq-test-1

Baktığımızda iki consumer üzerinde birer adet aynı mesaj bulunmaktadır. Bunun sebebi ise Publisher’ı test edebilmek için çalıştırdığımızdan dolayı “foo.billing” ve “foo.shipping” queue’larına test mesajının gönderilmesidir. İki consumer açıkken Publisher’ı tekrardan çalıştıralım ve sonucunu izleyelim.

rabbitmq-test-2

Publisher’ın tekrardan çalıştırılması sonucunda fanout exchange aracılığı ile tüm queue’lara aynı mesaj tekrardan gönderilmiştir. Bu işlemin ardından ise ilgili consumer’lar, kendi queue’larında bulunan mesajı fetch etmişlerdir.

Makalemizi sonlandırırken tekrardan belirtmek gerekirse, eğer aynı işi farklı şekillerde ve farklı consumer’lar tarafından process edilme ihtiyacı duyulursa bu örneğimizde olduğu gibi, fanout exchange type’ı kolay bir şekilde kullanılabilinir.

Takipte kalın…

fanoutexmessagingwithrabbitmq

Published inArchitecturalMessagingRabbitMQ

7 Comments

  1. Emre Tiryaki Emre Tiryaki

    Merhabalar,
    Eline sağlık uğraşmıssınız fakat RabbitMq ‘da native kodlamak yerine masstransit yada nservicebus gibi wrap kütüphanelerini tercih etmek gerekir diye düşünüyorum. Çünkü bugun rabbitmq yarın msmq vs kullanılabilir. Ayrıca yukarıdaki saydığım kütüphaneler fluent kodlamayı sağlar ve native kodlamada satırlar dolusu kod yazmak ufak bir plugin ile tek satırda işleminiz görür . Bu sebepden ötürü günümüzde yukardaki gibi native kodlarımın çok ömrü olmadığını düşünüyorum.

    • Ahmet Pirimoğlu Ahmet Pirimoğlu

      Yazıyı okuduktan sonra ben de aynı soruyu sorayım derken sorulduğunu gördüm, çok teşekkürler 🙂

      RabbitMQ’de işlenemeyen mesajlar, retry ihtiyacı, çözümü gibi konularda araştırma yapıldığında yazılar service bus’a yönlendiriliyor. Burada hangi service bus’ı kullanmalı? NServiceBus ücretli, ama kullanımı yaygın. Alternatif MassTransit var.

      Bu konudaki tecrübelerinizi de paylaşırsanız sevinirim.

  2. Metin Metin

    Merhaba,
    Messaging başlığı altındaki yazılarınızı detaylıca inceliyorum. Hepsi çok başarılı, devamını dilerim. Teşekkürler.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.