KPVSaga.TaskCoordinator.DependencyInjection 0.1.0

dotnet add package KPVSaga.TaskCoordinator.DependencyInjection --version 0.1.0
                    
NuGet\Install-Package KPVSaga.TaskCoordinator.DependencyInjection -Version 0.1.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="KPVSaga.TaskCoordinator.DependencyInjection" Version="0.1.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="KPVSaga.TaskCoordinator.DependencyInjection" Version="0.1.0" />
                    
Directory.Packages.props
<PackageReference Include="KPVSaga.TaskCoordinator.DependencyInjection" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add KPVSaga.TaskCoordinator.DependencyInjection --version 0.1.0
                    
#r "nuget: KPVSaga.TaskCoordinator.DependencyInjection, 0.1.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package KPVSaga.TaskCoordinator.DependencyInjection@0.1.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=KPVSaga.TaskCoordinator.DependencyInjection&version=0.1.0
                    
Install as a Cake Addin
#tool nuget:?package=KPVSaga.TaskCoordinator.DependencyInjection&version=0.1.0
                    
Install as a Cake Tool

API Guide

Что обычно использовать

Для обычного приложения достаточно знать 4 вещи:

  • AddSagaTransport(...) для регистрации в DI
  • IMessageHandler для обработки входящих сообщений
  • ITransportClient для отправки команд, событий и reply
  • ReplyTo если нужен request/reply flow

Как подключить

using SagaLibrary.Infrastructure.Transport.DependencyInjection;

builder.Services.AddSagaTransport(transport =>
{
    transport.UseRabbitMq(rabbit =>
    {
        rabbit.HostName = "localhost";
        rabbit.Port = 5672;
        rabbit.UserName = "saga";
        rabbit.Password = "saga123";
        rabbit.CommandsExchange = "x.transport.direct";
        rabbit.EventsExchange = "x.transport.events";
        rabbit.DeadLetterExchange = "x.transport.dlx";
        rabbit.PrefetchCount = 8;
    });

    transport.Configure(options =>
    {
        options.OrchestratorEndpointName = "orchestrator";
        options.MaxDeliveryAttempts = 3;
        options.MandatoryPublish = true;
        options.PublisherConfirmsEnabled = true;
    });

    transport.AddEndpoint("orchestrator", ep =>
    {
        ep.QueueName = "q.orchestrator";
        ep.RoutingKey = "orchestrator";
        ep.DeadLetterQueueName = "q.orchestrator.dlq";
    });

    transport.AddEndpoint("payment", ep =>
    {
        ep.QueueName = "q.payment";
        ep.RoutingKey = "payment";
        ep.DeadLetterQueueName = "q.payment.dlq";
    });

    transport.AddHandler<StartSagaHandler>("orchestrator");
    transport.AddHandler<ProcessPaymentHandler>("payment");
});

Что делает этот код:

  • собирает transport options
  • регистрирует RabbitMQ transport в DI
  • добавляет hosted service
  • автоматически подписывает handlers на startup

Что такое endpoint

endpoint это логическое имя получателя плюс настройки очереди.

Пример:

transport.AddEndpoint("payment", ep =>
{
    ep.QueueName = "q.payment";
    ep.RoutingKey = "payment";
    ep.DeadLetterQueueName = "q.payment.dlq";
});

Здесь:

  • "payment" это логическое имя endpoint-а
  • QueueName это очередь RabbitMQ
  • RoutingKey это routing key для команд
  • DeadLetterQueueName это очередь для сообщений, которые больше не надо ретраить

Как написать handler

using SagaLibrary.Infrastructure.Transport.Handlers;
using SagaLibrary.Infrastructure.Transport.Models;

public sealed class StartSagaHandler : IMessageHandler
{
    public Task HandleAsync(ConsumeContext context, CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }
}

Регистрация:

transport.AddHandler<StartSagaHandler>("orchestrator");

Hosted service сам:

  • стартует transport
  • резолвит endpoint
  • вызывает SubscribeAsync(...)
  • создаёт DI scope на сообщение
  • вызывает handler

Как отправить команду

using SagaLibrary.Infrastructure.Transport.Client;
using SagaLibrary.Infrastructure.Transport.Models;

public sealed class SagaStarter(ITransportClient client)
{
    public Task StartAsync(CancellationToken cancellationToken)
    {
        var envelope = new Envelope
        {
            MessageId = Guid.NewGuid(),
            CorrelationId = Guid.NewGuid(),
            SagaId = Guid.NewGuid(),
            ReplyTo = "client",
            MessageType = "StartSaga",
            Payload = System.Text.Encoding.UTF8.GetBytes("hello")
        };

        return client.SendAsync(envelope, cancellationToken: cancellationToken);
    }
}

Если просто вызвать SendAsync(envelope), команда уйдёт в orchestrator endpoint.

Если нужно отправить в конкретный endpoint:

await client.SendAsync("payment", envelope, cancellationToken: cancellationToken);

Как опубликовать событие

await client.PublishEventAsync(
    new Envelope
    {
        MessageId = Guid.NewGuid(),
        MessageType = "PaymentCompleted",
        Payload = payloadBytes
    },
    cancellationToken: cancellationToken);

Как работает ReplyTo

Если отправитель хочет ответ, он заполняет ReplyTo.

Например:

var request = new Envelope
{
    MessageType = "StartSaga",
    ReplyTo = "client",
    Payload = payloadBytes
};

В handler-е ответ выглядит так:

public sealed class StartSagaHandler(ITransportClient client) : IMessageHandler
{
    public Task HandleAsync(ConsumeContext context, CancellationToken cancellationToken)
    {
        var reply = new Envelope
        {
            MessageId = Guid.Empty,
            CorrelationId = context.CorrelationId,
            SagaId = context.SagaId,
            MessageType = "SagaStarted",
            Payload = context.Envelope.Payload
        };

        return client.ReplyAsync(reply, context.Envelope, cancellationToken);
    }
}

Transport сам возьмёт ReplyTo из исходного запроса и отправит reply в нужный endpoint.

Ack / Nack / Reject простыми словами

В ConsumeContext есть manual settlement:

  • AckAsync() — сообщение обработано успешно
  • NackAsync() — попробуй ещё раз
  • RejectAsync() — больше не пробуй

Если handler завершился без ошибки и сам не сделал settlement, transport сделает Ack автоматически.

Если handler падает с исключением:

  • пока лимит попыток не исчерпан, transport делает retry
  • потом сообщение уходит в DLQ

Минимальный рабочий пример

using SagaLibrary.Infrastructure.Transport.Client;
using SagaLibrary.Infrastructure.Transport.DependencyInjection;
using SagaLibrary.Infrastructure.Transport.Handlers;
using SagaLibrary.Infrastructure.Transport.Models;

builder.Services.AddSagaTransport(transport =>
{
    transport.UseRabbitMq(rabbit =>
    {
        rabbit.HostName = "localhost";
        rabbit.Port = 5672;
        rabbit.UserName = "saga";
        rabbit.Password = "saga123";
    });

    transport.Configure(options =>
    {
        options.OrchestratorEndpointName = "orchestrator";
        options.MaxDeliveryAttempts = 3;
    });

    transport.AddEndpoint("orchestrator", ep =>
    {
        ep.QueueName = "q.orchestrator";
        ep.RoutingKey = "orchestrator";
        ep.DeadLetterQueueName = "q.orchestrator.dlq";
    });

    transport.AddEndpoint("client", ep =>
    {
        ep.QueueName = "q.client";
        ep.RoutingKey = "client";
        ep.DeadLetterQueueName = "q.client.dlq";
    });

    transport.AddHandler<StartSagaHandler>("orchestrator");
});

public sealed class StartSagaHandler(ITransportClient client) : IMessageHandler
{
    public Task HandleAsync(ConsumeContext context, CancellationToken cancellationToken)
    {
        return client.ReplyAsync(
            new Envelope
            {
                MessageType = "SagaStarted",
                CorrelationId = context.CorrelationId,
                SagaId = context.SagaId,
                Payload = context.Envelope.Payload
            },
            context.Envelope,
            cancellationToken);
    }
}
Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on KPVSaga.TaskCoordinator.DependencyInjection:

Package Downloads
KPVSaga

Umbrella package for KPVSaga saga state machine, RabbitMQ transport, and dependency injection integration.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.1.0 49 6/2/2026