Expero.Libraries.CSharp.RabbitMQ 1.8.3-beta.101952

Expero.Libraries.CSharp.RabbitMQ

Пакет для более удобной работы с RabbitMQ .

Пакет содержит:

  • Контекст IRabbitContext для работы с RabbitMQ;
  • Абстрактный класс BaseRabbitQueueService, позволяющий просто и быстро реализовать Queue в проекте для Publish и Consume.
  • HealthCheck сервис для RabbitMQ;

Подробнее о клиенте: https://www.rabbitmq.com/dotnet-api-guide.html.

Подключение к проекту.

Для подключения пакета RabbitMQ к проекту необходимо добавить сервис на этапе запуска.

Если нам надо получить базовый сервис с параметрами по умолчанию, для этого в пакете есть 4 метода:

AddExperoRabbitMQ(RabbitConnectionSettings).

AddExperoRabbitMQ(string, RabbitConnectionSettings).

AddExperoRabbitMQ(Action<RabbitConnectionSettings>).

AddExperoRabbitMQ(string, Action<RabbitConnectionSettings>).

После регистрации сервиса через DI будут доступены следующие сервисы.

  • IRabbitContext - контекст для работы с подключением к RabbitMQ.
  • IRabbitInitializer - сервис для конфигурирования очередей и обменников в RabbitMQ.
  • IRabbitMessagesConverter - сервис для упаковки и распаковки сообщений при отпраке в RabbitMQ. Вы можете написать свою реализацию и зарегистрировать её в DI.
  • IRabbitContextFactory - сервис для инстанцирования именнованного контекста для работы с RabbitMQ.

Важно понимать, что если вы регистрируете именованное подключение, тогда у вас НЕ будет доступен IRabbitContext по умолчанию в DI.

Модель RabbitConnectionSettings содержит базовый набор параметров подключения:

  • ConnectionString - строка подключения(на текущий момент поддерживается формат: amqp://user:password@host:port);
  • VirtualHost - строка, виртутальный хост, по умолчанию: /;
  • Prefix - строка, добавляемая перед названием всех очередей, с разделителем, указанным в параметре "Delimiter";
  • Delimiter - строка, разделитель в наименовании очередей, ставится после префикса, по умолчанию: _;
  • RoutingKey - строка, ключ маршрутизации для связывания(binding) очередей и точки доступа;
  • ConsumerConcurrency - количество(int) параллельных потоков;
  • MessageExpiration - время (TimeSpan) жизни сообщения;
  • RecoveryInterval - время(TimeSpan) между попытками автоматического восстановления;

В случае, если строке подключения указаны несколько хостов(кластер) - сервис будет пытаться подключаться к ним по очереди, пока не установит подключение.

Шаблон строки подключения для кластера:

amqp://user:password@host1:port1,host2:port2,host3:port3....

В строке подключения нельзя использовать пробелы, user и password общие для всех hosts.

Пример строки подключения, для кластера:

const сonnectionString = "amqp://rabbit:rabbit@localhost:25678,192.168.0.1:25679,dev.phrm.tech.ru:25690";

Пример регистрации сервиса:

public void AddService(IServiceCollection services)
{
    services.AddExperoRabbitMQ(s =>
    {
        s.ConnectionString = "amqp://rabbit:rabbit@localhost:25678";
        s.Prefix = "portal_local";
        s.RoutingKey = "portal_local";
        s.ConsumerConcurrency = 10;
        s.MessageExpiration = TimeSpan.FromMinutes(30);
        s.RecoveryInterval = TimeSpan.FromMinutes(5);
    });
}

либо через создание экземпляра RabbitConnectionSettings:

public void AddService(IServiceCollection services)
{
    var settings = new RabbitConnectionSettings
    {
        ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
        Prefix = "portal_local",
        RoutingKey = "portal_local",
        ConsumerConcurrency = 10,
        MessageExpiration = TimeSpan.FromMinutes(30),
        RecoveryInterval = TimeSpan.FromMinutes(5)
    };

    services.AddExperoRabbitMQ(settings);
}

IRabbitContext

Контекст IRabbitContext доступна через DI, используется для создания соединения с сервером RabbitMQ, а также для создания обменников (Exchange) и очередей (Queue).

Пример использования контекста:

public class Queue
{
    private readonly IRabbitExchange _exchange;
    private readonly IRabbitQueue _queue;
    
    public Queue(IRabbitContext сontext)
    {
        var exchangeName = new RabbitExchangeName("notifications");
        _exchange = сontext.Create(exchangeName);

        var queueName = new RabbitQueueName("sms");
        _queue = сontext.Create(queueName);
    }
}

IRabbitExchange

Интерфейс для работы с обменниками (Exchanges).

Пример использования:

public class Publisher
{
    private readonly IRabbitExchange _exchange;

    public Publisher(IRabbitContext сontext)
    {
        var exchangeName = new RabbitExchangeName("notifications");
        _exchange = сontext.Create(exchangeName);

        _exchange.Initialize(true);
    }

        
    public void Publish()
    {
        _exchange.Publish(Encoding.UTF8.GetBytes("Hello"));

        _exchange.Publish<string>("Hello", TimeSpan.FromMinutes(5));

        _exchange.Publish<string>("Hello", TimeSpan.FromMinutes(5), TimeSpan.FromDays(1));
    }
}

IRabbitQueue

Интерфейс для работы с очередями (Queues).

Пример использования:

public class Consumer
{
    private readonly IRabbitQueue _queue;

    public Consumer(IRabbitContext сontext)
    {
        var queueName = new RabbitQueueName("sms");
        _queue = сontext.Create(queueName);

        _queue.Initialize();
    }

    public void Consume()
    {
        _queue.Consume<string>(async message  =>
        {
            Console.WriteLine(message);
                
            await Task.Delay(1000);

            return true;
        });
    }
}

IRabbitInitializer

Сервис IRabbitInitializer используется для упрощенного создания и инициализации экземпляров IRabbitExchange и IRabbitQueue. Доступен через DI. Основное предназначение проконфигурировать очереди и обменники перед началом работы.

Пример использования:

public void Initialize(IServiceProvider provider)
{
    var initializer = provider.GetService<IRabbitInitializer>();

    var exchangeName = new RabbitExchangeName("notifications");
    initializer.Initialize(exchangeName, true);

    var queueName = new RabbitQueueName("sms");
    initializer.Initialize(queueName)
                .BindWith(exchangeName);
}

IRabbitMessagesConverter

Cервис для упаковки и распаковки сообщений при отпраке в RabbitMQ. Вы можете написать свою реализацию и зарегистрировать её в DI. Используется при упаковке и распаковке объектов в сообщения для отправки.

BaseRabbitQueueService

Абстрактный класс BaseRabbitQueueService - используется для простого создания сервисов очередей. В конструкторе принимает IRabbitContext, с помощью которой создает экземпляры обменника и очереди. При использовании данного подхода нет необходимости реализовывать Dispose(), т.к. он реализован внутри.

Пример использования:

internal interface IImportQueue : IDisposable
{
    void Publish(ImportTask task);
    void Consume(int consumersQuantity, Func<ImportTaskIdContainer, Task<RabbitMessageProcessingResult>> action);
}
internal class ImportQueue : BaseRabbitQueueService, IImportQueue
{
    public ImportQueue(IRabbitContext сontext) : base(сontext) { }


    #region BaseRabbitQueueService

    protected override RabbitExchangeName ExchangeName => MessagesExchanges.Import;
    protected override RabbitQueueName QueueName => MessagesQueues.Import;

    #endregion BaseRabbitQueueService


    #region IImportQueue

    void IImportQueue.Publish(ImportTask task)
    {
        var container = new ImportTaskIdContainer(task);

        Publish(container, default);
    }

    void IImportQueue.Consume(int consumersQuantity, Func<ImportTaskIdContainer, Task<RabbitMessageProcessingResult>> action) => Consume(consumersQuantity, action);

    #endregion IImportQueue
}

IRabbitContextFactory:

Если нам надо получить именованный сервис, то используем фабрику IRabbitContextFactory и имя:

IRabbitContextFactory.Create(string).

Для получения экземплра сервиса IRabbitContext через DI обязательно нужно зарегистрировать сервис, используя AddExperoRabbitMQ без указания имени. Фабрика доступна через DI, при регистрации сервиса с указанием имени (AddExperoRabbitMQ("Name", ...).


internal class YourClass : IYourClass
{
    private readonly IRabbitContext _context;

    public YourClass(IRabbitContextFactory factory)
    {
        _context = factory.Create("Name");
    }

    /*...*/
}

AddExperoRabbitMQHealthCheck

Для регистрации HealthCheck сервиса в проекте необходимо вызвать AddExperoRabbitMQHealthCheck.

Метод имеет 3 прегруженные версии, можно использовать разные способы описания подключения.

После регистрации HealthCheck через DI будет доступна IRabbitHealthCheckFactory и сервис IRabbitHealthCheckService.

Пример использования c connectionString:

public static void AddRabbitHealthCheck(IServiceCollection services)
{
    var connectionString = "amqp://rabbit:rabbit@localhost:25678";
    var timeout = TimeSpan.FromSeconds(1000);

    services.AddHealthChecks().AddExperoRabbitMQHealthCheck(
                                    connectionString: connectionString,
                                    policy = MdsHealthCheckPolicy.Any,
                                    name: "RabbitMQ",
                                    tags: new[] { "dependecies", "rabbitmq" },
                                    timeout: timeout);
}

Пример использования c RabbitConnectionSettings:

public static void AddRabbitHealthCheck(IServiceCollection services)
{
    var timeout = TimeSpan.FromSeconds(300);

    var settings = new RabbitConnectionSettings
    {
        ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
        Prefix = "portal_local",
        RoutingKey = "portal_local",
        ConsumerConcurrency = 10,
        MessageExpiration = TimeSpan.FromMinutes(30),
        RecoveryInterval = TimeSpan.FromMinutes(5)
    };

    services.AddHealthChecks().AddExperoRabbitMQHealthCheck(
                                    connectionSettings: settings,
                                    policy = MdsHealthCheckPolicy.Any,
                                    name: "RabbitMQ",
                                    tags: new[] { "dependecies", "rabbitmq" },
                                    timeout: timeout);
}

Пример использования c RabbitHealthCheckParameters:

public static void AddRabbitHealthCheck(IServiceCollection services)
{
    var parameters = new RabbitHealthCheckParameters
    {
        ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
        Timeout = TimeSpan.FromMinutes(5),
        Policy = MdsHealthCheckPolicy.Any
    };

    services.AddHealthChecks().AddExperoRabbitMQHealthCheck(
                                    parameters: parameters,
                                    name: "RabbitMQ",
                                    tags: new[] { "dependecies", "rabbitmq" });
}

IRabbitHealthCheckService

Сервис, позволяющий сделать проверку соединения с сервером. Доступен черрез DI после добавления RabbitMQ в HealthCheck.

Возвращает состояние в формате MdsHealthCheckResult.

Пример использования:

public void Check(IServiceProvider provider)
{
    var service = provider.GetService<IRabbitHealthCheckService>();

    var parameters = new RabbitHealthCheckParameters()
    {
        ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
        Timeout = TimeSpan.FromMinutes(5),
        Policy = MdsHealthCheckPolicy.All
    };
    
    var result = service.CheckAsync(parameters);
}

Showing the top 20 packages that depend on Expero.Libraries.CSharp.RabbitMQ.

Packages Downloads
Expero.Libraries.CSharp.BackgroundTasks
Пакет для работы с фоновыми задачами.
1

Version Downloads Last updated
1.10.0 1 02/28/2026
1.9.0 1 02/28/2026
1.9.0-beta.108004 1 02/28/2026
1.8.3 1 02/28/2026
1.8.3-beta.101952 1 02/28/2026
1.8.2 1 02/28/2026
1.8.2-beta.99132 1 02/28/2026
1.8.1 1 02/28/2026
1.8.0 1 02/28/2026
1.8.0-beta.69246 1 02/28/2026
1.8.0-beta.69156 1 02/28/2026
1.7.0 1 02/28/2026
1.5.0 1 02/28/2026
1.4.0 1 02/28/2026
1.3.0 1 02/28/2026
1.2.0 1 02/28/2026
1.2.0-beta.31636 1 02/28/2026
1.1.0 1 02/28/2026
1.0.1 1 02/28/2026
1.0.1-beta.28644 1 02/28/2026
1.0.0-beta.27892 1 02/28/2026
1.0.0-beta.18771 1 02/28/2026