Expero.Libraries.CSharp.BackgroundTasks 2.2.0

Expero.Libraries.CSharp.BackgroundTasks

Пакет для упрощения работы с фоновыми задачами.

Пакет содержит:

  • Реализации интерфейса IBackgroundTaskPlanner, содержащий логику планирования задачи для выполнения в фоне;

  • Интерфейсы IBackgroundTaskExecutor, IBackgroundTaskHandler и IBackgroundTaskWithAccessorHandler, позволяющие определить обработку для запланированных задач;

  • Реализацию интерфейса IBackgroundTaskDataRepository, содержащий логику работы с дополнительными данными по задаче.

  • Реализация интерфейса IBackgroundTaskDataAccessor, содержащий логику получения типизированных дополнительных данных по задаче, когда модель может варьироваться в зависимости от содержимого задачи.

Подключение к проекту.

Пакет является агрегацией ниже приведённых пакетов и требует наличия настройки в соответствии с их требованиями:

  • Expero.Libraries.CSharp.Logging

  • Expero.Libraries.CSharp.RabbitMQ

  • Mds.Libraries.CSharp.Db.Mongo

  • Mds.Libraries.CSharp.Db.Redis

Для подключения процесса обработки фоновой задачи в пакете есть 2 метода:

AddBackgroundProcess<TTask>(Action<BackgroundTaskSettings<TTask>> buildAction).

AddBackgroundProcess<TTask, TData>(Action<BackgroundTaskSettings<TTask>> buildAction).

Модель задачи TTask должна наследоваться от BackgroundTask, а данные для неё TData от BackgroundTaskData. Использование TData необязательно. Система подразумевает работу только через TTask.

На одну модель задачи TTask возможно подключение только одного процесса обработки.

После регистрации фонового процесса для задачи через DI будут доступны следующие сервисы:

  • IBackgroundTaskPlanner - сервис планирования задач.

  • IBackgroundTaskDataRepository - репозиторий для работы с данными к задаче.

Для корректной обработки задач необходимо реализовать IBackgroundTaskExecutor или же подключить дефолтный. Дефолтный исполнитель использует все зарегистрированные инстансы IBackgroundTaskHandler.

Дефолтный IBackgroundTaskExecutor, зарегистрированный через метод AddExecutorWithAccessor, использует все зарегистрированные инстансы IBackgroundTaskWithAccessorHandler.

Если вам необходимо фильтровать обработчики или же у вас специфическая механика для получения TData, тогда вам необходимо реализовать и подключить свой IBackgroundTaskExecutor.

Важно, обратите внимание, что IBackgroundTaskExecutor является дженериком только от TTask. Это сделано для того, чтобы вы в своей реализации могли сами выбирать механику работы с TData.

Данные сервисы будут является дженериками от задачи (<TTask>) или задачи и данных (<TTask, TData>) в соответствии с подключением.

BackgroundProcessSettings

Модель содержит набор настроек для работы с задачами:

  • ExchangeName - название обменника (RabbitExchangeName) в RabbitMQ для планирования задач;

  • QueueName - название основной очереди (RabbitQueueName) в RabbitMQ для запланированных задач;

  • AdditionalQueueNames - список названий дополнительных очередей (RabbitQueueName) в RabbitMQ для запланированных задач. Позволяет выполнить смену очередей, без потери данных;

  • Delay - задержка (TimeSpan?) на передачу задачи в обработку, по умолчанию null;

  • DeadLetterExchangeName - наименование обменника (RabbitExchangeName) в RabbitMQ для задач, которые не получается обработать (DLQ). Не должен совпадать с именем обменника ExchangeName;

  • DeadLetterMessageExpiration - время жизни (TimeSpan?) сообщений о задачах, которые не получается обработать (DLQ);

  • IsExecutionEnabled - флаг(bool), указывающий, что необходимо запустить обработку задач;

  • IsSavingResultEnabled - флаг(bool), указывающий, что необходимо сохранять результаты обработки задач;

  • ConsumersQuantity - количество (int) параллельных потоков;

  • MaxAttemptsBeforeFail - количество (int) повторных попыток выполнить задачу;

  • RetryPolicy - Настройка политики (BackgroundTaskRetryPolicy) обработки задачи при BackgroundTaskProcessingResult.NeedRetry. По умолчанию задача попадает в конец очереди, пока не исчерпаются все попытки из MaxAttemptsBeforeFail;

  • FailPolicy - Настройка политики (BackgroundTaskFailPolicy) обработки задачи при BackgroundTaskProcessingResult.Fail. По умолчанию задача попадает в конец очереди, пока не исчерпаются все попытки из MaxAttemptsBeforeFail;

  • CriticalFailPolicy - Настройка политики (BackgroundTaskCriticalFailPolicy) обработки задачи при критичных проблемах с обработкой. Обычно это проблемы с инфраструктурой или рабочим окружением. По умолчанию такие задачи игнорируются;

  • LockLifetime - время (TimeSpan) блокировки задачи во время выполнения;

Пример регистрации обработки задачи

public void AddService(IServiceCollection services)
{
    services.AddBackgroundProcess<BackgroundTask>(s =>
            {
                s.ExchangeName = new ("e1_exchange");
                s.QueueName = new ("q1_queue");
                s.AdditionalQueueNames = new List<RabbitQueueName>()
                {
                    new ("q2_queue")
                }
                s.Delay = TimeSpan.FromSeconds(5);
                s.DeadLetterExchangeName = new ("exchange/dead/letter");
                s.DeadLetterMessageExpiration = TimeSpan.FromDays(1);
                s.ConsumersQuantity = 1;
                s.IsExecutionEnabled = true;
                s.IsSavingResultEnabled = true;

                s.MaxAttemptsBeforeFail = 1; // можно не задавать, по дефолту 1
                s.RetryPolicy = BackgroundTaskRetryPolicy.MoveToEnd;
                s.FailPolicy = BackgroundTaskFailPolicy.MoveToEnd;
                s.CriticalFailPolicy = BackgroundTaskCriticalFailPolicy.Ignore;
                s.LockLifetime = TimeSpan.FromSeconds(15); // можно не задавать, по дефолту 15 сек
            })
            .AddDefaultExecutor() // можно подключать кастомный исполнитель
            .AddHandler<BackgroungTaskHandler>(); // обработчики можно регистрировать отдельно по интерфейсу IBackgroundTaskHandler
}

Также возможно подключение исполнителя и обработчика, работающих через акссесор данных (IBackgroundTaskDataAccessor). Это может быть необходимо, когда данные к задаче могут иметь различные типы данных, а за получение дополнительных данных корректного типа несёт обработчик задачи.

public void AddService(IServiceCollection services)
{
    services.AddBackgroundProcess<BackgroundTask, BackgroundTaskData>(s =>
            {
                // ...
            })
            .AddExecutorWithAccessor() // можно подключать кастомный исполнитель
            .AddHandler<BackgroungTaskHandler>(); // обработчики можно регистрировать отдельно по интерфейсу IBackgroundTaskWithAccessorHandler
}

Пример запуска обработки фоновой задачи, при IsExecutionEnabled = false

Иногда необходимо, чтобы добавлять задачи на обработку можно было из всех сервисов, а вот обрабатывать только в одном. Например, если мы хотим вынести сервис, который будет заниматься только фоновой обработкой.

В такой случае при регистрации обработки задач необходимо задать:

public void AddService(IServiceCollection services)
{
    // ...
    
    services.AddBackgroundProcess<BackgroundTask>(s =>
            {
                // ...
                s.IsExecutionEnabled = false;
                // ...
            })

    // ...
}

При такой конфигурации задачи обрабатываться не будут. Но если мы в другой точки запуска добавим регистрацию принудительного запуска, тогда задачи будут обрабатываться.

public void AddService(IServiceCollection services)
{
    services.AddForcedRunner<YourTask>();
}

IBackgroundTaskPlanner

Сервис IBackgroundTaskPlanner используется для планирования задачи необходимого типа в очередь указанную при подключении.

Пример использования

internal sealed class YourService
{
    private readonly IBackgroundTaskPlanner<YourTask> _planner;

    public YourService(IBackgroundTaskPlanner<YourTask> planner)
    {
        _planner = planner;
    }

    /*...*/

    private async Task<YourTask> PlanAsync()
    {
        var @new = new YourTask();

        return await _planner.Publish(@new);
    }
}

IBackgroundTaskExecutor

Интерфейс IBackgroundTaskExecutor используется для выполнения задачи.

В пакете уже есть дефолтные реализации для задачи без дополнительных данных и с ними. Дефолтные IBackgroundTaskExecutor перебирают все зарегистрированные в DI обработчики для задачи (IBackgroundTaskHandler), за исключением IBackgroundTaskExecutor с аксессором (требует IBackgroundTaskWithAccessorHandler).

Без наличия зарегистрированной в DI реализации интерфейса IBackgroundTaskExecutor задачи не будут выполняться.

В результате обработки задачи исполнитель должен вернуть результат обработки задачи.

Есть три варианта:

  • BackgroundTaskProcessingResult.Success - задача успешно обработана.
  • BackgroundTaskProcessingResult.NeedRetry - задачу нельзя обработать сейчас, нужно попробовать позднее.
  • BackgroundTaskProcessingResult.Fail - задачу нельзя обработать вообще.

Пример использования собственного исполнителя

internal sealed class YourExecutor: IBackgroundTaskExecutor<YourTask>
{
    private readonly IEnumerable<IBackgroundTaskHandler<YourTask>> _handlers;

    public YourExecutor(IEnumerable<IBackgroundTaskHandler<YourTask>> handlers)
    {
        _handlers = handlers;
    }


    async Task<RabbitMessageProcessingResult> IBackgroundTaskExecutor<YourTask>.ExecuteAsync(YourTask task)
    {
        return await ProcessTaskAsync(task, _handlers);
    }

     /*...*/
}

С аксессором (IBackgroundTaskDataAccessor):

internal sealed class YourExecutorWithAccessor: IBackgroundTaskExecutor<YourTask>
{
    private readonly IEnumerable<IBackgroundTaskHandler<YourTask>> _handlers;
    private readonly IBackgroundTaskDataRepository<YourTask> _repository;

    public YourExecutor(IEnumerable<IBackgroundTaskHandler<YourTask>> handlers, IBackgroundTaskDataRepository<YourTask> repository)
    {
        _handlers = handlers;
        _repository = repository;
    }


    #region  IBackgroundTaskExecutor

    async Task<RabbitMessageProcessingResult> IBackgroundTaskExecutor<YourTask>.ExecuteAsync(YourTask task)
    {
        // Формирование IBackgroundTaskExecutor через IBackgroundTaskDataRepository
        var accessor = _repository.BuildAccessor<YourTaskData>(task);

        return await ProcessTaskAsync(task, accessor, _handlers);
    }

    #endregion IBackgroundTaskExecutor


     /*...*/
}

## IBackgroundTaskHandler

Интерфейс `IBackgroundTaskHandler` используется для обработки задачи.

Регистрация в DI реализаций обработчиков обязательна в случае использования дефолтного `IBackgroundTaskExecutor`.
Обработчики необходимо регистрировать дженериками от задачи и данных аналогично подключению процесса обработки задачи. 

При использовании кастомного `IBackgroundTaskExecutor` возможно использование отличного интерфейса обработчика.


### Пример использования без данных

```csharp
internal sealed class YourHandler: IBackgroundTaskHandler<YourTask>
{
    /*...*/

    async Task<RabbitMessageProcessingResult> IBackgroundTaskHandler<YourTask>.ExecuteAsync(YourTask task)
    {
        return await ProcessTaskAsync(task);
    }

     /*...*/
}

Пример использования с дополнительными данными

internal sealed class YourHandler: IBackgroundTaskHandler<YourTask, YourTaskData>
{
    /*...*/

    async Task<RabbitMessageProcessingResult> IBackgroundTaskHandler<YourTask>.ExecuteAsync(YourTask task, YourTaskData data)
    {
        return await ProcessTaskAsync(task, data);
    }

     /*...*/
}

IBackgroundTaskWithAccessorHandler

Интерфейс IBackgroundTaskWithAccessorHandler используется для обработки задачи с дополнительными данными и исполнителем с аксессором.

Регистрация в DI реализаций обработчиков обязательна, если используется дефолтный IBackgroundTaskExecutor с аксессором. Обработчики необходимо регистрировать дженериками от задачи и данных аналогично подключению процесса обработки задачи.

При использовании кастомного IBackgroundTaskExecutor возможно использование отличного интерфейса обработчика.

Пример использования

internal sealed class YourHandlerWithAccessor: IBackgroundTaskWithAccessorHandler<YourTask, YourTaskData>
{
    /*...*/


    #region  IBackgroundTaskWithAccessorHandler

    async Task<RabbitMessageProcessingResult> IBackgroundTaskWithAccessorHandler<YourTask, YourTaskData>.ExecuteAsync(YourTask task, IBackgroundTaskDataAccessor<YourTaskData> accessor)
    {
        var data = accessor.Get<YourInheritedTaskData>();

        return await ProcessTaskAsync(task, data);
    }

    #endregion IBackgroundTaskWithAccessorHandler


     /*...*/
}

Репозитории

При планировании задачи данные берутся непосредственно из очереди, но есть возможность сохранения результатов в MongoDB.

При указании параметра IsSavingResultEnabled = true попытки обработки задач будут сохраняться в коллекцию BackgroundTaskProcessingAttempt<TTask>, где TTask - тип задачи. Для удобства инициализаций таких коллекций в MongoDB пакет содержит расширения для IMongoCrudFactory.

Пример инициализации коллекций для сохранения результатов выполнения задачи

var factory = scope.ServiceProvider.GetService<IMongoCrudFactory>();
var indexBuilder = scope.ServiceProvider.GetService<IMongoIndexBuiler>();

var ttl = TimeSpan.FromDays(5);

// Инициализация коллекции с попытками обработки задач
await factory.InitializeBackgroundTaskAttemptsCollectionAsync<YourTask>(indexBuilder, ttl);

Стандартные операции

Схема замены используемых очередей

При необходимости изменить очередь (например, при переименовании), без изменения логики обработки задач, стоит пользоваться следующей схемой релизов. Она обеспечивает отсутствие потери данных при переходе и адаптирована под логику Blue-Green канареечных релизов.

Для выполнения схемы релизов в BackgroundProcessSettings есть свойство AdditionalQueueNames, которое позволяет указать дополнительные очереди, из которых будут разбираться сообщения.

Для обсепечения схемы релизов потребуется выполнять корректировки подключения фонового процесса и конфигурации очередей (выполняется в проекте использующем либу). Далее будут приведены только критичные для релизов настройки.


Релиз 1

  • Добавляем новый обменник e_new и новую очередь q_new.
    • Не забываем забиндить их в миграторе проекта.
  • Добавляем новую очередь q_new в AdditionalQueueNames.
  • Задачи планируем в старый обменник e_old.

Этот шаг нужен, чтобы корректно отработал Rollback в случае отката.

Конфигурация будет выглядеть так:

public void AddService(IServiceCollection services)
{
    services.AddBackgroundProcess<BackgroundTask>(s =>
            {
                s.ExchangeName = new ("e_old");
                s.QueueName = new ("q_old");
                s.AdditionalQueueNames = new List<RabbitQueueName>()
                {
                    new ("q_new")
                }
                ...
            })
            .AddDefaultExecutor()
            .AddHandler<BackgroungTaskHandler>(); 
}

Релиз 2

  • Начинаем планировать в новый обменник e_new.
  • Теперь новая очередь q_new становится основной очередью, а старая очередь q_old переходит в AdditionalQueueNames.

Конфигурация будет выглядеть так:

public void AddService(IServiceCollection services)
{
    services.AddBackgroundProcess<BackgroundTask>(s =>
            {
                s.ExchangeName = new ("e_new");
                s.QueueName = new ("q_new");
                s.AdditionalQueueNames = new List<RabbitQueueName>()
                {
                    new ("q_old")
                }
                ...
            })
            .AddDefaultExecutor()
            .AddHandler<BackgroungTaskHandler>(); 
}

Релиз 3

  • Обязательно убеждаемся, что сообщения из старой очереди q_old разобраны.
  • Убираем старую очередь q_old из AdditionalQueueNames.
    • При желании можем удалить старый обменник e_new и старую очередь q_old.

Конфигурация будет выглядеть так:

public void AddService(IServiceCollection services)
{
    services.AddBackgroundProcess<BackgroundTask>(s =>
            {
                s.ExchangeName = new ("e_new");
                s.QueueName = new ("q_new");
                ...
            })
            .AddDefaultExecutor()
            .AddHandler<BackgroungTaskHandler>(); 
}

No packages depend on Expero.Libraries.CSharp.BackgroundTasks.

Version Downloads Last updated
4.0.0 1 02/28/2026
3.1.0 1 02/28/2026
3.0.0 1 02/28/2026
3.0.0-beta.146970 1 02/28/2026
2.4.3 1 02/28/2026
2.4.1 1 02/28/2026
2.4.1-beta.115992 1 02/28/2026
2.4.0 1 02/28/2026
2.4.0-beta.108300 1 02/28/2026
2.3.0 1 02/28/2026
2.3.0-beta.108065 1 02/28/2026
2.2.0 1 02/28/2026
2.2.0-beta.105359 1 02/28/2026
2.2.0-beta.105249 1 02/28/2026
2.2.0-beta.105248 1 02/28/2026
2.1.0 1 02/28/2026
2.1.0-beta.103434 1 02/28/2026
2.0.1 1 02/28/2026
2.0.0 1 02/28/2026
2.0.0-beta.88735 1 02/28/2026
1.11.0 1 02/28/2026
1.10.0 1 02/28/2026
1.10.0-beta.88734 1 02/28/2026
1.9.1-beta.93210 1 02/28/2026
1.9.0 1 02/28/2026
1.8.0 1 02/28/2026
1.7.2 1 02/28/2026
1.7.1 1 02/28/2026
1.6.0 1 02/28/2026
1.6.0-beta.80579 1 02/28/2026
1.5.0 1 02/28/2026
1.4.0 1 02/28/2026
1.4.0-beta.69171 1 02/28/2026
1.3.0 1 02/28/2026
1.2.0 1 02/28/2026
1.1.0 1 02/28/2026
1.0.0 1 02/28/2026