KPVSaga.TaskCoordinator.DependencyInjection
0.1.0
dotnet add package KPVSaga.TaskCoordinator.DependencyInjection --version 0.1.0
NuGet\Install-Package KPVSaga.TaskCoordinator.DependencyInjection -Version 0.1.0
<PackageReference Include="KPVSaga.TaskCoordinator.DependencyInjection" Version="0.1.0" />
<PackageVersion Include="KPVSaga.TaskCoordinator.DependencyInjection" Version="0.1.0" />
<PackageReference Include="KPVSaga.TaskCoordinator.DependencyInjection" />
paket add KPVSaga.TaskCoordinator.DependencyInjection --version 0.1.0
#r "nuget: KPVSaga.TaskCoordinator.DependencyInjection, 0.1.0"
#:package KPVSaga.TaskCoordinator.DependencyInjection@0.1.0
#addin nuget:?package=KPVSaga.TaskCoordinator.DependencyInjection&version=0.1.0
#tool nuget:?package=KPVSaga.TaskCoordinator.DependencyInjection&version=0.1.0
API Guide
Что обычно использовать
Для обычного приложения достаточно знать 4 вещи:
AddSagaTransport(...)для регистрации в DIIMessageHandlerдля обработки входящих сообщенийITransportClientдля отправки команд, событий и replyReplyToесли нужен request/reply flow
Как подключить
using SagaLibrary.Infrastructure.Transport.DependencyInjection;
builder.Services.AddSagaTransport(transport =>
{
transport.UseRabbitMq(rabbit =>
{
rabbit.HostName = "localhost";
rabbit.Port = 5672;
rabbit.UserName = "saga";
rabbit.Password = "saga123";
rabbit.CommandsExchange = "x.transport.direct";
rabbit.EventsExchange = "x.transport.events";
rabbit.DeadLetterExchange = "x.transport.dlx";
rabbit.PrefetchCount = 8;
});
transport.Configure(options =>
{
options.OrchestratorEndpointName = "orchestrator";
options.MaxDeliveryAttempts = 3;
options.MandatoryPublish = true;
options.PublisherConfirmsEnabled = true;
});
transport.AddEndpoint("orchestrator", ep =>
{
ep.QueueName = "q.orchestrator";
ep.RoutingKey = "orchestrator";
ep.DeadLetterQueueName = "q.orchestrator.dlq";
});
transport.AddEndpoint("payment", ep =>
{
ep.QueueName = "q.payment";
ep.RoutingKey = "payment";
ep.DeadLetterQueueName = "q.payment.dlq";
});
transport.AddHandler<StartSagaHandler>("orchestrator");
transport.AddHandler<ProcessPaymentHandler>("payment");
});
Что делает этот код:
- собирает transport options
- регистрирует RabbitMQ transport в DI
- добавляет hosted service
- автоматически подписывает handlers на startup
Что такое endpoint
endpoint это логическое имя получателя плюс настройки очереди.
Пример:
transport.AddEndpoint("payment", ep =>
{
ep.QueueName = "q.payment";
ep.RoutingKey = "payment";
ep.DeadLetterQueueName = "q.payment.dlq";
});
Здесь:
"payment"это логическое имя endpoint-аQueueNameэто очередь RabbitMQRoutingKeyэто routing key для командDeadLetterQueueNameэто очередь для сообщений, которые больше не надо ретраить
Как написать handler
using SagaLibrary.Infrastructure.Transport.Handlers;
using SagaLibrary.Infrastructure.Transport.Models;
public sealed class StartSagaHandler : IMessageHandler
{
public Task HandleAsync(ConsumeContext context, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
Регистрация:
transport.AddHandler<StartSagaHandler>("orchestrator");
Hosted service сам:
- стартует transport
- резолвит endpoint
- вызывает
SubscribeAsync(...) - создаёт DI scope на сообщение
- вызывает handler
Как отправить команду
using SagaLibrary.Infrastructure.Transport.Client;
using SagaLibrary.Infrastructure.Transport.Models;
public sealed class SagaStarter(ITransportClient client)
{
public Task StartAsync(CancellationToken cancellationToken)
{
var envelope = new Envelope
{
MessageId = Guid.NewGuid(),
CorrelationId = Guid.NewGuid(),
SagaId = Guid.NewGuid(),
ReplyTo = "client",
MessageType = "StartSaga",
Payload = System.Text.Encoding.UTF8.GetBytes("hello")
};
return client.SendAsync(envelope, cancellationToken: cancellationToken);
}
}
Если просто вызвать SendAsync(envelope), команда уйдёт в orchestrator endpoint.
Если нужно отправить в конкретный endpoint:
await client.SendAsync("payment", envelope, cancellationToken: cancellationToken);
Как опубликовать событие
await client.PublishEventAsync(
new Envelope
{
MessageId = Guid.NewGuid(),
MessageType = "PaymentCompleted",
Payload = payloadBytes
},
cancellationToken: cancellationToken);
Как работает ReplyTo
Если отправитель хочет ответ, он заполняет ReplyTo.
Например:
var request = new Envelope
{
MessageType = "StartSaga",
ReplyTo = "client",
Payload = payloadBytes
};
В handler-е ответ выглядит так:
public sealed class StartSagaHandler(ITransportClient client) : IMessageHandler
{
public Task HandleAsync(ConsumeContext context, CancellationToken cancellationToken)
{
var reply = new Envelope
{
MessageId = Guid.Empty,
CorrelationId = context.CorrelationId,
SagaId = context.SagaId,
MessageType = "SagaStarted",
Payload = context.Envelope.Payload
};
return client.ReplyAsync(reply, context.Envelope, cancellationToken);
}
}
Transport сам возьмёт ReplyTo из исходного запроса и отправит reply в нужный endpoint.
Ack / Nack / Reject простыми словами
В ConsumeContext есть manual settlement:
AckAsync()— сообщение обработано успешноNackAsync()— попробуй ещё разRejectAsync()— больше не пробуй
Если handler завершился без ошибки и сам не сделал settlement, transport сделает Ack автоматически.
Если handler падает с исключением:
- пока лимит попыток не исчерпан, transport делает retry
- потом сообщение уходит в DLQ
Минимальный рабочий пример
using SagaLibrary.Infrastructure.Transport.Client;
using SagaLibrary.Infrastructure.Transport.DependencyInjection;
using SagaLibrary.Infrastructure.Transport.Handlers;
using SagaLibrary.Infrastructure.Transport.Models;
builder.Services.AddSagaTransport(transport =>
{
transport.UseRabbitMq(rabbit =>
{
rabbit.HostName = "localhost";
rabbit.Port = 5672;
rabbit.UserName = "saga";
rabbit.Password = "saga123";
});
transport.Configure(options =>
{
options.OrchestratorEndpointName = "orchestrator";
options.MaxDeliveryAttempts = 3;
});
transport.AddEndpoint("orchestrator", ep =>
{
ep.QueueName = "q.orchestrator";
ep.RoutingKey = "orchestrator";
ep.DeadLetterQueueName = "q.orchestrator.dlq";
});
transport.AddEndpoint("client", ep =>
{
ep.QueueName = "q.client";
ep.RoutingKey = "client";
ep.DeadLetterQueueName = "q.client.dlq";
});
transport.AddHandler<StartSagaHandler>("orchestrator");
});
public sealed class StartSagaHandler(ITransportClient client) : IMessageHandler
{
public Task HandleAsync(ConsumeContext context, CancellationToken cancellationToken)
{
return client.ReplyAsync(
new Envelope
{
MessageType = "SagaStarted",
CorrelationId = context.CorrelationId,
SagaId = context.SagaId,
Payload = context.Envelope.Payload
},
context.Envelope,
cancellationToken);
}
}
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- KPVSaga.TaskCoordinator.Abstractions (>= 0.1.0)
- KPVSaga.TaskCoordinator.Core (>= 0.1.0)
- KPVSaga.Transport (>= 0.1.0)
- Microsoft.Extensions.DependencyInjection (>= 10.0.7)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.7)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on KPVSaga.TaskCoordinator.DependencyInjection:
| Package | Downloads |
|---|---|
|
KPVSaga
Umbrella package for KPVSaga saga state machine, RabbitMQ transport, and dependency injection integration. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.1.0 | 49 | 6/2/2026 |