Expero.Libraries.CSharp.RabbitMQ 1.8.1
Expero.Libraries.CSharp.RabbitMQ
Пакет для более удобной работы с RabbitMQ .
Пакет содержит:
- Контекст
IRabbitContextдля работы с RabbitMQ; - Абстрактный класс
BaseRabbitQueueService, позволяющий просто и быстро реализовать Queue в проекте дляPublishиConsume. HealthCheckсервис для RabbitMQ;
Подробнее о клиенте: https://www.rabbitmq.com/dotnet-api-guide.html.
Подключение к проекту.
Для подключения пакета RabbitMQ к проекту необходимо добавить сервис на этапе запуска.
Если нам надо получить базовый сервис с параметрами по умолчанию, для этого в пакете есть 4 метода:
AddExperoRabbitMQ(RabbitConnectionSettings).
AddExperoRabbitMQ(string, RabbitConnectionSettings).
AddExperoRabbitMQ(Action<RabbitConnectionSettings>).
AddExperoRabbitMQ(string, Action<RabbitConnectionSettings>).
После регистрации сервиса через DI будут доступены следующие сервисы.
IRabbitContext- контекст для работы с подключением к RabbitMQ.IRabbitInitializer- сервис для конфигурирования очередей и обменников в RabbitMQ.IRabbitMessagesConverter- сервис для упаковки и распаковки сообщений при отпраке в RabbitMQ. Вы можете написать свою реализацию и зарегистрировать её в DI.IRabbitContextFactory- сервис для инстанцирования именнованного контекста для работы с RabbitMQ.
Важно понимать, что если вы регистрируете именованное подключение, тогда у вас НЕ будет доступен IRabbitContext по умолчанию в DI.
Модель RabbitConnectionSettings содержит базовый набор параметров подключения:
ConnectionString- строка подключения(на текущий момент поддерживается формат:amqp://user:password@host:port);VirtualHost- строка, виртутальный хост, по умолчанию:/;Prefix- строка, добавляемая перед названием всех очередей, с разделителем, указанным в параметре "Delimiter";Delimiter- строка, разделитель в наименовании очередей, ставится после префикса, по умолчанию:_;RoutingKey- строка, ключ маршрутизации для связывания(binding) очередей и точки доступа;ConsumerConcurrency- количество(int) параллельных потоков;MessageExpiration- время (TimeSpan) жизни сообщения;RecoveryInterval- время(TimeSpan) между попытками автоматического восстановления;
В случае, если строке подключения указаны несколько хостов(кластер) - сервис будет пытаться подключаться к ним по очереди, пока не установит подключение.
Шаблон строки подключения для кластера:
amqp://user:password@host1:port1,host2:port2,host3:port3....
В строке подключения нельзя использовать пробелы, user и password общие для всех hosts.
Пример строки подключения, для кластера:
const сonnectionString = "amqp://rabbit:rabbit@localhost:25678,192.168.0.1:25679,dev.phrm.tech.ru:25690";
Пример регистрации сервиса:
public void AddService(IServiceCollection services)
{
services.AddExperoRabbitMQ(s =>
{
s.ConnectionString = "amqp://rabbit:rabbit@localhost:25678";
s.Prefix = "portal_local";
s.RoutingKey = "portal_local";
s.ConsumerConcurrency = 10;
s.MessageExpiration = TimeSpan.FromMinutes(30);
s.RecoveryInterval = TimeSpan.FromMinutes(5);
});
}
либо через создание экземпляра RabbitConnectionSettings:
public void AddService(IServiceCollection services)
{
var settings = new RabbitConnectionSettings
{
ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
Prefix = "portal_local",
RoutingKey = "portal_local",
ConsumerConcurrency = 10,
MessageExpiration = TimeSpan.FromMinutes(30),
RecoveryInterval = TimeSpan.FromMinutes(5)
};
services.AddExperoRabbitMQ(settings);
}
IRabbitContext
Контекст IRabbitContext доступна через DI, используется для создания соединения с сервером RabbitMQ, а также для создания обменников (Exchange) и очередей (Queue).
Пример использования контекста:
public class Queue
{
private readonly IRabbitExchange _exchange;
private readonly IRabbitQueue _queue;
public Queue(IRabbitContext сontext)
{
var exchangeName = new RabbitExchangeName("notifications");
_exchange = сontext.Create(exchangeName);
var queueName = new RabbitQueueName("sms");
_queue = сontext.Create(queueName);
}
}
IRabbitExchange
Интерфейс для работы с обменниками (Exchanges).
Пример использования:
public class Publisher
{
private readonly IRabbitExchange _exchange;
public Publisher(IRabbitContext сontext)
{
var exchangeName = new RabbitExchangeName("notifications");
_exchange = сontext.Create(exchangeName);
_exchange.Initialize(true);
}
public void Publish()
{
_exchange.Publish(Encoding.UTF8.GetBytes("Hello"));
_exchange.Publish<string>("Hello", TimeSpan.FromMinutes(5));
_exchange.Publish<string>("Hello", TimeSpan.FromMinutes(5), TimeSpan.FromDays(1));
}
}
IRabbitQueue
Интерфейс для работы с очередями (Queues).
Пример использования:
public class Consumer
{
private readonly IRabbitQueue _queue;
public Consumer(IRabbitContext сontext)
{
var queueName = new RabbitQueueName("sms");
_queue = сontext.Create(queueName);
_queue.Initialize();
}
public void Consume()
{
_queue.Consume<string>(async message =>
{
Console.WriteLine(message);
await Task.Delay(1000);
return true;
});
}
}
IRabbitInitializer
Сервис IRabbitInitializer используется для упрощенного создания и инициализации экземпляров IRabbitExchange и IRabbitQueue. Доступен через DI. Основное предназначение проконфигурировать очереди и обменники перед началом работы.
Пример использования:
public void Initialize(IServiceProvider provider)
{
var initializer = provider.GetService<IRabbitInitializer>();
var exchangeName = new RabbitExchangeName("notifications");
initializer.Initialize(exchangeName, true);
var queueName = new RabbitQueueName("sms");
initializer.Initialize(queueName)
.BindWith(exchangeName);
}
IRabbitMessagesConverter
Cервис для упаковки и распаковки сообщений при отпраке в RabbitMQ. Вы можете написать свою реализацию и зарегистрировать её в DI. Используется при упаковке и распаковке объектов в сообщения для отправки.
BaseRabbitQueueService
Абстрактный класс BaseRabbitQueueService - используется для простого создания сервисов очередей.
В конструкторе принимает IRabbitContext, с помощью которой создает экземпляры обменника и очереди.
При использовании данного подхода нет необходимости реализовывать Dispose(), т.к. он реализован внутри.
Пример использования:
internal interface IImportQueue : IDisposable
{
void Publish(ImportTask task);
void Consume(int consumersQuantity, Func<ImportTaskIdContainer, Task<RabbitMessageProcessingResult>> action);
}
internal class ImportQueue : BaseRabbitQueueService, IImportQueue
{
public ImportQueue(IRabbitContext сontext) : base(сontext) { }
#region BaseRabbitQueueService
protected override RabbitExchangeName ExchangeName => MessagesExchanges.Import;
protected override RabbitQueueName QueueName => MessagesQueues.Import;
#endregion BaseRabbitQueueService
#region IImportQueue
void IImportQueue.Publish(ImportTask task)
{
var container = new ImportTaskIdContainer(task);
Publish(container, default);
}
void IImportQueue.Consume(int consumersQuantity, Func<ImportTaskIdContainer, Task<RabbitMessageProcessingResult>> action) => Consume(consumersQuantity, action);
#endregion IImportQueue
}
IRabbitContextFactory:
Если нам надо получить именованный сервис, то используем фабрику IRabbitContextFactory и имя:
IRabbitContextFactory.Create(string).
Для получения экземплра сервиса IRabbitContext через DI обязательно нужно зарегистрировать сервис, используя AddExperoRabbitMQ без указания имени. Фабрика доступна через DI, при регистрации сервиса с указанием имени (AddExperoRabbitMQ("Name", ...).
internal class YourClass : IYourClass
{
private readonly IRabbitContext _context;
public YourClass(IRabbitContextFactory factory)
{
_context = factory.Create("Name");
}
/*...*/
}
AddExperoRabbitMQHealthCheck
Для регистрации HealthCheck сервиса в проекте необходимо вызвать AddExperoRabbitMQHealthCheck.
Метод имеет 3 прегруженные версии, можно использовать разные способы описания подключения.
После регистрации HealthCheck через DI будет доступна IRabbitHealthCheckFactory и сервис IRabbitHealthCheckService.
Пример использования c connectionString:
public static void AddRabbitHealthCheck(IServiceCollection services)
{
var connectionString = "amqp://rabbit:rabbit@localhost:25678";
var timeout = TimeSpan.FromSeconds(1000);
services.AddHealthChecks().AddExperoRabbitMQHealthCheck(
connectionString: connectionString,
policy = MdsHealthCheckPolicy.Any,
name: "RabbitMQ",
tags: new[] { "dependecies", "rabbitmq" },
timeout: timeout);
}
Пример использования c RabbitConnectionSettings:
public static void AddRabbitHealthCheck(IServiceCollection services)
{
var timeout = TimeSpan.FromSeconds(300);
var settings = new RabbitConnectionSettings
{
ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
Prefix = "portal_local",
RoutingKey = "portal_local",
ConsumerConcurrency = 10,
MessageExpiration = TimeSpan.FromMinutes(30),
RecoveryInterval = TimeSpan.FromMinutes(5)
};
services.AddHealthChecks().AddExperoRabbitMQHealthCheck(
connectionSettings: settings,
policy = MdsHealthCheckPolicy.Any,
name: "RabbitMQ",
tags: new[] { "dependecies", "rabbitmq" },
timeout: timeout);
}
Пример использования c RabbitHealthCheckParameters:
public static void AddRabbitHealthCheck(IServiceCollection services)
{
var parameters = new RabbitHealthCheckParameters
{
ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
Timeout = TimeSpan.FromMinutes(5),
Policy = MdsHealthCheckPolicy.Any
};
services.AddHealthChecks().AddExperoRabbitMQHealthCheck(
parameters: parameters,
name: "RabbitMQ",
tags: new[] { "dependecies", "rabbitmq" });
}
IRabbitHealthCheckService
Сервис, позволяющий сделать проверку соединения с сервером. Доступен черрез DI после добавления RabbitMQ в HealthCheck.
Возвращает состояние в формате MdsHealthCheckResult.
Пример использования:
public void Check(IServiceProvider provider)
{
var service = provider.GetService<IRabbitHealthCheckService>();
var parameters = new RabbitHealthCheckParameters()
{
ConnectionString = "amqp://rabbit:rabbit@localhost:25678",
Timeout = TimeSpan.FromMinutes(5),
Policy = MdsHealthCheckPolicy.All
};
var result = service.CheckAsync(parameters);
}
Showing the top 20 packages that depend on Expero.Libraries.CSharp.RabbitMQ.
| Packages | Downloads |
|---|---|
|
Expero.Libraries.CSharp.BackgroundTasks
Пакет для работы с фоновыми задачами.
|
1 |
.NET 7.0
- Mds.Libraries.CSharp.Abstractions (>= 2.0.2)
- Mds.Libraries.CSharp.Extensions (>= 2.0.2)
- Mds.Libraries.CSharp.Server.Health.Tools (>= 2.2.4)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 7.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 7.0.13)
- Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions (>= 7.0.13)
- Newtonsoft.Json (>= 13.0.3)
- RabbitMQ.Client (>= 6.6.0)
| Version | Downloads | Last updated |
|---|---|---|
| 1.10.0 | 1 | 02/28/2026 |
| 1.9.0 | 1 | 02/28/2026 |
| 1.9.0-beta.108004 | 1 | 02/28/2026 |
| 1.8.3 | 1 | 02/28/2026 |
| 1.8.3-beta.101952 | 1 | 02/28/2026 |
| 1.8.2 | 1 | 02/28/2026 |
| 1.8.2-beta.99132 | 1 | 02/28/2026 |
| 1.8.1 | 1 | 02/28/2026 |
| 1.8.0 | 1 | 02/28/2026 |
| 1.8.0-beta.69246 | 1 | 02/28/2026 |
| 1.8.0-beta.69156 | 1 | 02/28/2026 |
| 1.7.0 | 1 | 02/28/2026 |
| 1.5.0 | 1 | 02/28/2026 |
| 1.4.0 | 1 | 02/28/2026 |
| 1.3.0 | 1 | 02/28/2026 |
| 1.2.0 | 1 | 02/28/2026 |
| 1.2.0-beta.31636 | 1 | 02/28/2026 |
| 1.1.0 | 1 | 02/28/2026 |
| 1.0.1 | 1 | 02/28/2026 |
| 1.0.1-beta.28644 | 1 | 02/28/2026 |
| 1.0.0-beta.27892 | 1 | 02/28/2026 |
| 1.0.0-beta.18771 | 1 | 02/28/2026 |