RmToolkit.MessageBus
3.0.1
dotnet add package RmToolkit.MessageBus --version 3.0.1
NuGet\Install-Package RmToolkit.MessageBus -Version 3.0.1
<PackageReference Include="RmToolkit.MessageBus" Version="3.0.1" />
<PackageVersion Include="RmToolkit.MessageBus" Version="3.0.1" />
<PackageReference Include="RmToolkit.MessageBus" />
paket add RmToolkit.MessageBus --version 3.0.1
#r "nuget: RmToolkit.MessageBus, 3.0.1"
#:package RmToolkit.MessageBus@3.0.1
#addin nuget:?package=RmToolkit.MessageBus&version=3.0.1
#tool nuget:?package=RmToolkit.MessageBus&version=3.0.1
RmToolkit Message Bus & Saga — README
A compact guide for using the RmToolkit message bus and saga orchestration features.
Motivation
Modern modular monolith applications often need clear boundaries between functional modules (e.g., User, Order, Inventory) without fully distributing services.
The RmToolkit Message Bus provides:
- Message-based communication between modules
- Saga orchestration with compensation and fault handling
- Strong modular isolation with local reliability
- In-memory transport for development and testing
This enables modular architectures to evolve toward microservices later without major redesign, while preserving the simplicity and performance of a single-deployable monolith.
Overview
This repository contains a lightweight message-bus abstraction with support for:
- In-memory transport (configurable channel capacity)
- Saga (state machine) orchestration
- Filters for producers, publishers, senders, and consumers
- Built-in Result pattern support via
RmToolkit.Result(included) - Public contracts exposed via
RmToolkit.MessageBus.Contracts
Packages
RmToolkit.MessageBus
Provides the message bus, DI extensions, transports, saga orchestration, filters, and runtime components.RmToolkit.MessageBus.Contracts
Shared message contracts and interfaces (NuGet). Useful for external producers/consumers.RmToolkit.Result
Included automatically for Result-pattern support.
Install contracts package if needed:
dotnet add package RmToolkit.MessageBus.Contracts
Registration / DI Configuration
Register the bus during application startup:
builder.Services.AddMessageBus(conf =>
{
conf.UsingInMemory(opts =>
{
opts.ChannelCapacity = 1_000;
});
conf.ConsumerAssembliesToRegister = [typeof(Program).Assembly];
conf.AddProducerFilter(typeof(CommonProduceFilter<>), b => b.ForMessage<IntegrationEvent1>());
conf.AddPublishFilter(typeof(CommonPublishFilter<>));
conf.AddSendFilter(typeof(CommonSendFilter<>));
conf.AddConsumerFilter(typeof(ModuleAInboxFilter<>),
b => b.ForMessage<ModuleA.IntegrationEvent1>(order: 2));
conf.AddSagaStateMachine<ThreeStepSagaOrchestrator, ThreeStepState>()
.UseInMemory();
});
Notes
ConsumerAssembliesToRegistercontrols consumer discovery.- Saga persistence is pluggable (
InMemory,MemoryCache,DistributedCache,EF Core). - In-memory transport is ideal for development and tests.
Filters
Filters allow cross-cutting behavior around message flow.
public sealed class LoggingConsumeFilter<T> : IConsumeFilter<T>
{
private readonly ILogger<LoggingConsumeFilter<T>> _logger;
public LoggingConsumeFilter(ILogger<LoggingConsumeFilter<T>> logger)
{
_logger = logger;
}
public async Task Invoke(IConsumeContext<T> ctx, IPipe<T> next)
{
_logger.LogInformation("Handling {Message}", typeof(T).Name);
await next.Invoke(ctx);
}
}
Common uses
- Logging / telemetry
- Validation / authorization
- Tracing and correlation
- Retry orchestration
Consumers
Consumers implement IConsumer<TMessage>.
internal sealed class UserCreatedEventHandler
: IConsumer<UserCreated>
{
private readonly ILogger<UserCreatedEventHandler> _logger;
public UserCreatedEventHandler(ILogger<UserCreatedEventHandler> logger)
{
_logger = logger;
}
public async Task Handle(IConsumeContext<UserCreated> context)
{
await Task.Delay(500);
_logger.LogInformation("UserCreated processed: {UserId}", context.Message.UserId);
}
}
Compensation Consumer Example
public sealed class CompensateStepA
: IConsumer<Undo<DoStepA>>
{
public async Task Handle(IConsumeContext<Undo<DoStepA>> context)
{
await context.PublishAsync(
new Compensated<DoStepA> { CorrelationId = context.Message.CorrelationId }
);
}
}
Saga (Three-Step Example)
State
public sealed class ThreeStepState : ISaga
{
public Guid CorrelationId { get; init; }
public State CurrentState { get; set; } = State.None;
public string? ErrorMessage { get; set; }
}
Messages
public record StartWorkflow : EventMessage;
public record DoStepA : CommandMessage;
public record DoStepB : CommandMessage;
public record DoStepC : CommandMessage;
public record StepACompleted : EventMessage;
public record StepBCompleted : EventMessage;
public record StepCCompleted : EventMessage;
Saga Definition (Excerpt)
Initially
.When(StartWorkflowEvent)
.Send(ctx => new DoStepA { CorrelationId = ctx.Message.CorrelationId })
.TransitionTo(StepAInProgress);
Fault & Compensation Handling
- Consumers may publish
Fault<T>messages on failure - Sagas can react by:
- Updating error state
- Publishing
Undo<T>compensation messages - Transitioning to a
Failedstate
Request / Response
Sending Commands
await bus.SendAsync(new CreateUserCommand(1, "ARMIN"));
Requesting a Response
var response = await bus.RequestAsync<GetUserQuery, UserDto>(
new GetUserQuery(1),
timeout: TimeSpan.FromMinutes(1),
cancellationToken: cancellationToken
);
Responding in Consumer
public sealed class GetUserHandler : IConsumer<GetUserQuery>
{
public async Task Handle(IConsumeContext<GetUserQuery> context)
{
await context.RespondAsync(new UserDto(1, "ARMIN"));
}
}
Best Practices
- Prefer idempotent consumers
- Always set
CorrelationIdfor sagas - Keep saga state minimal and serializable
- Use compensation instead of distributed transactions
Troubleshooting
- Saga not correlating → Check
CorrelationIdandCorrelateById - Consumers not discovered → Verify
ConsumerAssembliesToRegister - Filters not executing → Ensure filters are registered before bus startup
License
MIT License
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.EntityFrameworkCore (>= 10.0.1)
- Microsoft.Extensions.Caching.Abstractions (>= 10.0.1)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.1)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.1)
- RmToolkit.MessageBus.Contracts (>= 3.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 3.0.1 | 118 | 1/27/2026 |
| 3.0.0 | 314 | 12/7/2025 |
| 2.1.0-preview1 | 168 | 11/15/2025 |
| 2.0.2 | 255 | 11/14/2025 |
| 2.0.1 | 286 | 11/13/2025 |