Expero.Libraries.CSharp.RabbitMQ 1.0.0-beta.27892

Expero.Libraries.CSharp.RabbitMQ

Пакет для более удобной работы с RabbitMQ .

Пакет содержит:

  • Фабрику для работы с RabbitMQ;
  • Абстрактный класс BaseRabbitQueueService, позволяющий просто и быстро реализовать Queue в проекте для Publish и Consume.
  • HealthCheck сервис для RabbitMQ;

Подключение к проекту.

Для подключения пакета RabbitMQ к проекту необходимо добавить сервис на этапе запуска. Для этого в пакете есть 2 метода:

AddExperoRabbitMQ(RabbitConnectionSettings).

AddExperoRabbitMQ(Action<RabbitConnectionSettings>).

После регистрации сервиса через DI будет доступна фабрика IRabbitFactory с переданными настройками подключения, а также интерфейсы IRabbitInitializer и IRabbitMessagesConverter.

  • IRabbitFactory - фабрика для работы с подключением к RabbitMQ.
  • IRabbitInitializer - сервис для конфигурирования очередей и обменников в RabbitMQ.
  • IRabbitMessagesConverter - сервис для упаковки и распаковки сообщений при отпраке в RabbitMQ. Вы можете написать свою реализацию и зарегистрировать её в DI.

Модель RabbitConnectionSettings содержит базовый набор параметров подключения:

  • ConnectionString - строка подключения(на текущий момент поддерживается формат: amqp://user:password@host:port);
  • Prefix - строка, добавляемая перед названием всех очередей данной фабрики, с разделителем _;
  • RoutingKey - строка, ключ маршрутизации для связывания(binding) очередей и точки доступа;
  • ConsumerConcurrency - количество(int) параллельных потоков для фабрики;
  • MessageExpiration - время (TimeSpan) жизни сообщения;
  • RecoveryInterval - время(TimeSpan) между попытками автоматического восстановления;

Подробнее о фабрике: https://www.rabbitmq.com/dotnet-api-guide.html.

Пример регистрации сервиса:

public void AddService(IServiceCollection services)
{
    services.AddExperoRabbitMQ(s =>
    {
        s.ConnectionString = "amqp://rabbit:rabbit@localhost:25678";
        s.Prefix = "portal_local";
        s.RoutingKey = "portal_local";
        s.ConsumerConcurrency = 10;
        s.MessageExpiration = TimeSpan.FromSeconds(1800);
        s.RecoveryInterval = TimeSpan.FromSeconds(300);
    });
}

либо через создание экземпляра RabbitConnectionSettings:

public void AddService(IServiceCollection services)
{
    var settings = new RabbitConnectionSettings
    {
        ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
        Prefix = "portal_local",
        RoutingKey = "portal_local",
        ConsumerConcurrency = 10,
        MessageExpiration = TimeSpan.FromSeconds(1800),
        RecoveryInterval = TimeSpan.FromSeconds(300)
    };

    services.AddExperoRabbitMQ(settings);
}

IRabbitFactory

Фабрика IRabbitFactory доступна через DI, используется для создания соединения с сервером RabbitMQ, а также для создания обменников (Exchange) и очередей (Queue).

Пример использования фабрики:

public class Queue
{
    private readonly IRabbitExchange _exchange;
    private readonly IRabbitQueue _queue;
    
    public Queue(IRabbitFactory factory)
    {
        var exchangeName = new RabbitExchangeName("notifications");
        _exchange = factory.Create(exchangeName);

        var queueName = new RabbitQueueName("sms");
        _queue = factory.Create(queueName);
    }
}

IRabbitExchange

Интерфейс для работы с обменниками (Exchanges).

Пример использования:

public class Publisher
{
    private readonly IRabbitExchange _exchange;

    public Publisher(IRabbitFactory factory)
    {
        var exchangeName = new RabbitExchangeName("notifications");
        _exchange = factory.Create(exchangeName);

        _exchange.Initialize(true);
    }

        
    public void Publish()
    {
        _exchange.Publish(Encoding.UTF8.GetBytes("Hello"));

        _exchange.Publish<string>("Hello", TimeSpan.FromSeconds(300));
    }
}

IRabbitQueue

Интерфейс для работы с очередями (Queues).

Пример использования:

public class Consumer
{
    private readonly IRabbitQueue _queue;

    public Consumer(IRabbitFactory factory)
    {
        var queueName = new RabbitQueueName("sms");
        _queue = factory.Create(queueName);

        _queue.Initialize();;
    }

    public void Consume()
    {
        _queue.Consume<string>(async message  =>
        {
            Console.WriteLine(message);
                
            await Task.Delay(1000);

            return true;
        });
    }
}

IRabbitInitializer

Сервис IRabbitInitializer используется для упрощенного создания и инициализации экземпляров IRabbitExchange и IRabbitQueue. Доступен через DI. Основное предназначение проконфигурировать очереди и обменники перед началом работы.

Пример использования:

public void Initialize(IServiceProvider provider)
{
    var initializer = provider.GetService<IRabbitInitializer>();

    var exchangeName = new RabbitExchangeName("notifications");
    initializer.Initialize(exchangeName);

    var queueName = new RabbitQueueName("sms");
    initializer.Initialize(queueName)
                .BindWith(exchangeName);
}

IRabbitMessagesConverter

Cервис для упаковки и распаковки сообщений при отпраке в RabbitMQ. Вы можете написать свою реализацию и зарегистрировать её в DI. Используется при упаковке и распаковке объектов в сообщения для отправки.

BaseRabbitQueueService

Абстрактный класс BaseRabbitQueueService - используется для простого создания сервисов очередей. В конструкторе принимает фабрику IRabbitFactory, с помощью которой создает экземпляры обменника и очереди. При использовании данного подхода нет необходимости реализовывать Dispose(), т.к. он реализован внутри.

Пример использования:

internal interface IImportQueue : IDisposable
{
    void Publish(ImportTask task);
    void Consume(int consumersQuantity, Func<ImportTaskIdContainer, Task<bool>> action);
}
internal class ImportQueue : BaseRabbitQueueService, IImportQueue
{
    public ImportQueue(IRabbitFactory factory) : base(factory) { }


    #region BaseRabbitQueueService

    protected override RabbitExchangeName ExchangeName => MessagesExchanges.Import;
    protected override RabbitQueueName QueueName => MessagesQueues.Import;

    #endregion BaseRabbitQueueService


    #region IImportQueue

    void IImportQueue.Publish(ImportTask task)
    {
        var container = new ImportTaskIdContainer(task);

        Publish(container, default);
    }

    void IImportQueue.Consume(int consumersQuantity, Func<ImportTaskIdContainer, Task<bool>> action) => Consume(consumersQuantity, action);

    #endregion IImportQueue
}

AddExperoRabbitMQHealthCheck

Для регистрации HealthCheck сервиса в проекте необходимо вызвать AddExperoRabbitMQHealthCheck.

Метод имеет 3 прегруженные версии, можно использовать разные способы описания подключения.

После регистрации HealthCheck через DI будет доступна фабрика IRabbitHealthCheckFactory и сервис IRabbitHealthCheckService.

Пример использования c connectionString:

public static void AddRabbitHealthCheck(IServiceCollection services)
{
    var connectionString = "amqp://rabbit:rabbit@localhost:25678";
    var timeout = TimeSpan.FromSeconds(1000);

    services.AddHealthChecks().AddExperoRabbitMQHealthCheck(
                                    connectionString: connectionString,
                                    name: "RabbitMQ",
                                    tags: new[] { "dependecies", "rabbitmq" },
                                    timeout: timeout);
}

Пример использования c RabbitConnectionSettings:

public static void AddRabbitHealthCheck(IServiceCollection services)
{
    var timeout = TimeSpan.FromSeconds(1000);

    var settings = new RabbitConnectionSettings
    {
        ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
        Prefix = "portal_local",
        RoutingKey = "portal_local",
        ConsumerConcurrency = 10,
        MessageExpiration = TimeSpan.FromSeconds(1800),
        RecoveryInterval = TimeSpan.FromSeconds(300)
    };

    services.AddHealthChecks().AddExperoRabbitMQHealthCheck(
                                    connectionSettings: settings,
                                    name: "RabbitMQ",
                                    tags: new[] { "dependecies", "rabbitmq" },
                                    timeout: timeout);
}

Пример использования c RabbitHealthCheckParameters:

public static void AddRabbitHealthCheck(IServiceCollection services)
{
    var parameters = new RabbitHealthCheckParameters
    {
        ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
        Timeout = TimeSpan.FromSeconds(1000)
    };

    services.AddHealthChecks().AddExperoRabbitMQHealthCheck(
                                    parameters: parameters,
                                    name: "RabbitMQ",
                                    tags: new[] { "dependecies", "rabbitmq" });
}

IRabbitHealthCheckService

Сервис, позволяющий сделать проверку соединения с сервером. Доступен черрез DI после добавления RabbitMQ в HealthCheck.

Возвращает состояние в формате MdsHealthCheckResult.

Пример использования:

public void Check(IServiceProvider provider)
{
    var rabbitHealthCheckService = provider.GetService<IRabbitHealthCheckService>();

    var rabbitHealthCheckParameters = new RabbitHealthCheckParameters()
    {
        ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
        Timeout = TimeSpan.FromSeconds(300)
    };
    
    var result = rabbitHealthCheckService.CheckAsync(rabbitHealthCheckParameters);
}

Showing the top 20 packages that depend on Expero.Libraries.CSharp.RabbitMQ.

Packages Downloads
Expero.Libraries.CSharp.BackgroundTasks
Пакет для работы с фоновыми задачами.
1

Version Downloads Last updated
1.10.0 1 02/28/2026
1.9.0 1 02/28/2026
1.9.0-beta.108004 1 02/28/2026
1.8.3 1 02/28/2026
1.8.3-beta.101952 1 02/28/2026
1.8.2 1 02/28/2026
1.8.2-beta.99132 1 02/28/2026
1.8.1 1 02/28/2026
1.8.0 1 02/28/2026
1.8.0-beta.69246 1 02/28/2026
1.8.0-beta.69156 1 02/28/2026
1.7.0 1 02/28/2026
1.5.0 1 02/28/2026
1.4.0 1 02/28/2026
1.3.0 1 02/28/2026
1.2.0 1 02/28/2026
1.2.0-beta.31636 1 02/28/2026
1.1.0 1 02/28/2026
1.0.1 1 02/28/2026
1.0.1-beta.28644 1 02/28/2026
1.0.0-beta.27892 1 02/28/2026
1.0.0-beta.18771 1 02/28/2026