RmToolkit.MessageBus 3.0.1

dotnet add package RmToolkit.MessageBus --version 3.0.1
                    
NuGet\Install-Package RmToolkit.MessageBus -Version 3.0.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="RmToolkit.MessageBus" Version="3.0.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="RmToolkit.MessageBus" Version="3.0.1" />
                    
Directory.Packages.props
<PackageReference Include="RmToolkit.MessageBus" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add RmToolkit.MessageBus --version 3.0.1
                    
#r "nuget: RmToolkit.MessageBus, 3.0.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package RmToolkit.MessageBus@3.0.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=RmToolkit.MessageBus&version=3.0.1
                    
Install as a Cake Addin
#tool nuget:?package=RmToolkit.MessageBus&version=3.0.1
                    
Install as a Cake Tool

RmToolkit Message Bus & Saga — README

A compact guide for using the RmToolkit message bus and saga orchestration features.


Motivation

Modern modular monolith applications often need clear boundaries between functional modules (e.g., User, Order, Inventory) without fully distributing services.

The RmToolkit Message Bus provides:

  • Message-based communication between modules
  • Saga orchestration with compensation and fault handling
  • Strong modular isolation with local reliability
  • In-memory transport for development and testing

This enables modular architectures to evolve toward microservices later without major redesign, while preserving the simplicity and performance of a single-deployable monolith.


Overview

This repository contains a lightweight message-bus abstraction with support for:

  • In-memory transport (configurable channel capacity)
  • Saga (state machine) orchestration
  • Filters for producers, publishers, senders, and consumers
  • Built-in Result pattern support via RmToolkit.Result (included)
  • Public contracts exposed via RmToolkit.MessageBus.Contracts

Packages

  • RmToolkit.MessageBus
    Provides the message bus, DI extensions, transports, saga orchestration, filters, and runtime components.

  • RmToolkit.MessageBus.Contracts
    Shared message contracts and interfaces (NuGet). Useful for external producers/consumers.

  • RmToolkit.Result
    Included automatically for Result-pattern support.

Install contracts package if needed:

dotnet add package RmToolkit.MessageBus.Contracts

Registration / DI Configuration

Register the bus during application startup:

builder.Services.AddMessageBus(conf =>
{
    conf.UsingInMemory(opts =>
    {
        opts.ChannelCapacity = 1_000;
    });

    conf.ConsumerAssembliesToRegister = [typeof(Program).Assembly];

    conf.AddProducerFilter(typeof(CommonProduceFilter<>), b => b.ForMessage<IntegrationEvent1>());
    conf.AddPublishFilter(typeof(CommonPublishFilter<>));
    conf.AddSendFilter(typeof(CommonSendFilter<>));

    conf.AddConsumerFilter(typeof(ModuleAInboxFilter<>), 
        b => b.ForMessage<ModuleA.IntegrationEvent1>(order: 2));

    conf.AddSagaStateMachine<ThreeStepSagaOrchestrator, ThreeStepState>()
        .UseInMemory();
});

Notes

  • ConsumerAssembliesToRegister controls consumer discovery.
  • Saga persistence is pluggable (InMemory, MemoryCache, DistributedCache, EF Core).
  • In-memory transport is ideal for development and tests.

Filters

Filters allow cross-cutting behavior around message flow.

public sealed class LoggingConsumeFilter<T> : IConsumeFilter<T>
{
    private readonly ILogger<LoggingConsumeFilter<T>> _logger;

    public LoggingConsumeFilter(ILogger<LoggingConsumeFilter<T>> logger)
    {
        _logger = logger;
    }

    public async Task Invoke(IConsumeContext<T> ctx, IPipe<T> next)
    {
        _logger.LogInformation("Handling {Message}", typeof(T).Name);
        await next.Invoke(ctx);
    }
}

Common uses

  • Logging / telemetry
  • Validation / authorization
  • Tracing and correlation
  • Retry orchestration

Consumers

Consumers implement IConsumer<TMessage>.

internal sealed class UserCreatedEventHandler 
    : IConsumer<UserCreated>
{
    private readonly ILogger<UserCreatedEventHandler> _logger;

    public UserCreatedEventHandler(ILogger<UserCreatedEventHandler> logger)
    {
        _logger = logger;
    }

    public async Task Handle(IConsumeContext<UserCreated> context)
    {
        await Task.Delay(500);
        _logger.LogInformation("UserCreated processed: {UserId}", context.Message.UserId);
    }
}

Compensation Consumer Example

public sealed class CompensateStepA 
    : IConsumer<Undo<DoStepA>>
{
    public async Task Handle(IConsumeContext<Undo<DoStepA>> context)
    {
        await context.PublishAsync(
            new Compensated<DoStepA> { CorrelationId = context.Message.CorrelationId }
        );
    }
}

Saga (Three-Step Example)

State

public sealed class ThreeStepState : ISaga
{
    public Guid CorrelationId { get; init; }
    public State CurrentState { get; set; } = State.None;
    public string? ErrorMessage { get; set; }
}

Messages

public record StartWorkflow : EventMessage;
public record DoStepA : CommandMessage;
public record DoStepB : CommandMessage;
public record DoStepC : CommandMessage;

public record StepACompleted : EventMessage;
public record StepBCompleted : EventMessage;
public record StepCCompleted : EventMessage;

Saga Definition (Excerpt)

Initially
    .When(StartWorkflowEvent)
    .Send(ctx => new DoStepA { CorrelationId = ctx.Message.CorrelationId })
    .TransitionTo(StepAInProgress);

Fault & Compensation Handling

  • Consumers may publish Fault<T> messages on failure
  • Sagas can react by:
    • Updating error state
    • Publishing Undo<T> compensation messages
    • Transitioning to a Failed state

Request / Response

Sending Commands

await bus.SendAsync(new CreateUserCommand(1, "ARMIN"));

Requesting a Response

var response = await bus.RequestAsync<GetUserQuery, UserDto>(
    new GetUserQuery(1),
    timeout: TimeSpan.FromMinutes(1),
    cancellationToken: cancellationToken
);

Responding in Consumer

public sealed class GetUserHandler : IConsumer<GetUserQuery>
{
    public async Task Handle(IConsumeContext<GetUserQuery> context)
    {
        await context.RespondAsync(new UserDto(1, "ARMIN"));
    }
}

Best Practices

  • Prefer idempotent consumers
  • Always set CorrelationId for sagas
  • Keep saga state minimal and serializable
  • Use compensation instead of distributed transactions

Troubleshooting

  • Saga not correlating → Check CorrelationId and CorrelateById
  • Consumers not discovered → Verify ConsumerAssembliesToRegister
  • Filters not executing → Ensure filters are registered before bus startup

License

MIT License

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
3.0.1 118 1/27/2026
3.0.0 314 12/7/2025
2.1.0-preview1 168 11/15/2025
2.0.2 255 11/14/2025
2.0.1 286 11/13/2025
Loading failed