ApacheTech.Common.Mediator.Hosting 1.0.0

dotnet add package ApacheTech.Common.Mediator.Hosting --version 1.0.0
                    
NuGet\Install-Package ApacheTech.Common.Mediator.Hosting -Version 1.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="ApacheTech.Common.Mediator.Hosting" Version="1.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="ApacheTech.Common.Mediator.Hosting" Version="1.0.0" />
                    
Directory.Packages.props
<PackageReference Include="ApacheTech.Common.Mediator.Hosting" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add ApacheTech.Common.Mediator.Hosting --version 1.0.0
                    
#r "nuget: ApacheTech.Common.Mediator.Hosting, 1.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package ApacheTech.Common.Mediator.Hosting@1.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=ApacheTech.Common.Mediator.Hosting&version=1.0.0
                    
Install as a Cake Addin
#tool nuget:?package=ApacheTech.Common.Mediator.Hosting&version=1.0.0
                    
Install as a Cake Tool

ApacheTech Mediator Library

A lightweight, feature-rich mediator library for .NET 8+ applications. Implements the mediator pattern with support for commands, queries, transactions, notifications, and sagas—all with built-in resilience, filtering, and distributed tracing.

.NET 8

Features

Feature Description
Commands Fire-and-forget operations with filter pipeline
Queries Request/response operations with caching support
Transactions Commands with automatic rollback on failure
Notifications Pub/sub with hybrid handlers and callback subscriptions
Sagas Multi-step workflows with per-step compensation
Resilience Built-in retry, timeout, and circuit-breaker patterns
Tracing OpenTelemetry-compatible ActivitySource integration
Filters Prefix, postfix, and wrap filters for cross-cutting concerns

Table of Contents


Installation

# Install the core mediator package
dotnet add package ApacheTech.Common.Mediator

# Install the hosting package for DI and configuration, using ApacheTech.Common.DependencyInjection
dotnet add package ApacheTech.Common.Mediator.Hosting

# Install the hosting package for DI and configuration, using Microsoft.Extensions.DependencyInjection
dotnet add package ApacheTech.Common.Mediator.Hosting.MEDI

Quick Start

1. Register the Mediator

using ApacheTech.Common.Mediator.Hosting;

var services = new ServiceCollection();

services.AddMediator(options =>
{
    options.AssemblyMarkers = [typeof(Program)];  // Scan this assembly for handlers
    options.PipelineOptions = p =>
    {
        p.Timeout = TimeSpan.FromSeconds(30);
        p.MaxRetryAttempts = 3;
    };
});

var serviceProvider = services.BuildServiceProvider();

2. Use the Mediator

var mediator = serviceProvider.GetRequiredService<IMediator>();

// Execute a command
await mediator.ExecuteAsync(new CreateUserCommand { Name = "John" });

// Send a query
var user = await mediator.QueryAsync<GetUserQuery, User>(new GetUserQuery { Id = 1 });

// Publish a notification
await mediator.PublishAsync(new UserCreatedNotification { UserId = "123" });

Commands

Commands represent fire-and-forget operations that mutate state. They do not return a value.

Define a Command

using ApacheTech.Common.Mediator.Commands;

public class CreateUserCommand : ICommand
{
    public required string Name { get; set; }
    public required string Email { get; set; }
    public Guid? CreatedUserId { get; set; }  // Set by handler
}

Create a Handler

using ApacheTech.Common.Mediator.Commands.Handlers;

public class CreateUserCommandHandler : CommandHandlerBase<CreateUserCommand>
{
    private readonly IUserRepository _repository;

    public CreateUserCommandHandler(IUserRepository repository)
    {
        _repository = repository;
    }

    public override async Task HandleAsync(CreateUserCommand command, CancellationToken ct)
    {
        var user = await _repository.CreateAsync(command.Name, command.Email, ct);
        command.CreatedUserId = user.Id;
    }
}

Execute the Command

// Using IMediator
await mediator.ExecuteAsync(new CreateUserCommand 
{ 
    Name = "John", 
    Email = "john@example.com" 
});

// Or using ICommandProcessor directly
var commandProcessor = serviceProvider.GetRequiredService<ICommandProcessor>();
await commandProcessor.ExecuteAsync(command);

Queries

Queries represent request/response operations that return data without side effects.

Define a Query

using ApacheTech.Common.Mediator.Queries;

public class GetUserQuery : IQuery<User>
{
    public required int Id { get; set; }
}

public class User
{
    public int Id { get; set; }
    public string Name { get; set; } = string.Empty;
    public string Email { get; set; } = string.Empty;
}

Create a Handler

using ApacheTech.Common.Mediator.Queries.Handlers;

public class GetUserQueryHandler : QueryHandlerBase<GetUserQuery, User>
{
    private readonly IUserRepository _repository;

    public GetUserQueryHandler(IUserRepository repository)
    {
        _repository = repository;
    }

    public override async Task<User> HandleAsync(GetUserQuery query, CancellationToken ct)
    {
        return await _repository.GetByIdAsync(query.Id, ct) 
            ?? throw new NotFoundException($"User {query.Id} not found");
    }
}

Send the Query

var user = await mediator.QueryAsync<GetUserQuery, User>(
    new GetUserQuery { Id = 42 }
);

Transactions

Transactions are commands with automatic rollback capability. If the handler or any filter throws, the RollbackAsync method is called.

Define a Transaction

using ApacheTech.Common.Mediator.Transactions;

public class TransferFundsTransaction : TransactionBase
{
    public required string FromAccount { get; set; }
    public required string ToAccount { get; set; }
    public required decimal Amount { get; set; }
}

Create a Handler with Rollback

using ApacheTech.Common.Mediator.Transactions.Handlers;

public class TransferFundsHandler : TransactionHandlerBase<TransferFundsTransaction>
{
    private readonly IAccountService _accounts;
    private decimal _debitedAmount;

    public TransferFundsHandler(IAccountService accounts)
    {
        _accounts = accounts;
    }

    public override async Task HandleAsync(TransferFundsTransaction tx, CancellationToken ct)
    {
        // Debit source account
        await _accounts.DebitAsync(tx.FromAccount, tx.Amount, ct);
        _debitedAmount = tx.Amount;

        // Credit destination account (might throw!)
        await _accounts.CreditAsync(tx.ToAccount, tx.Amount, ct);
    }

    public override async Task RollbackAsync(TransferFundsTransaction tx, CancellationToken ct)
    {
        // Reverse the debit if we debited
        if (_debitedAmount > 0)
        {
            await _accounts.CreditAsync(tx.FromAccount, _debitedAmount, ct);
        }
    }
}

Execute the Transaction

await mediator.TransactAsync(new TransferFundsTransaction
{
    FromAccount = "ACC-001",
    ToAccount = "ACC-002",
    Amount = 100.00m
});

// If CreditAsync throws, RollbackAsync is called automatically

Notifications

Notifications implement the pub/sub pattern. Multiple handlers and subscribers can react to a single notification.

Define a Notification

using ApacheTech.Common.Mediator.Notifications;

public class UserCreatedNotification : NotificationBase
{
    public required string UserId { get; set; }
    public required string Email { get; set; }
}

Option 1: Standalone Handler (Auto-Discovered)

using ApacheTech.Common.Mediator.Notifications.Handlers;

public class SendWelcomeEmailHandler : NotificationHandlerBase<UserCreatedNotification>
{
    private readonly IEmailService _email;

    public SendWelcomeEmailHandler(IEmailService email)
    {
        _email = email;
    }

    public override async Task HandleAsync(UserCreatedNotification notification, CancellationToken ct)
    {
        await _email.SendWelcomeAsync(notification.Email, ct);
    }
}

Option 2: Callback Subscription (For Existing Instances)

public class CacheService : IDisposable
{
    private readonly IDisposable _subscription;
    private readonly Dictionary<string, User> _cache = new();

    public CacheService(INotificationSubscriptions subscriptions)
    {
        // Subscribe with a callback
        _subscription = subscriptions.Subscribe<UserCreatedNotification>(OnUserCreatedAsync);
    }

    private Task OnUserCreatedAsync(UserCreatedNotification notification, CancellationToken ct)
    {
        _cache[notification.UserId] = new User { Id = notification.UserId };
        return Task.CompletedTask;
    }

    public void Dispose() => _subscription.Dispose();
}

Publish a Notification

await mediator.PublishAsync(new UserCreatedNotification
{
    UserId = "user-123",
    Email = "user@example.com"
});

// Both the standalone handler AND any callback subscriptions are invoked

Sagas

Sagas are multi-step workflows where each step has a compensating action. If any step fails, all previously completed steps are compensated in reverse order.

Define a Saga

using ApacheTech.Common.Mediator.Sagas;

public class UserOnboardingSaga : SagaBase
{
    public required string Email { get; set; }
    public required string Username { get; set; }

    // Set by steps during execution
    public Guid? UserId { get; set; }
    public Guid? ProfileId { get; set; }
}

Option 1: Standalone Step Classes (Auto-Discovered)

using ApacheTech.Common.Mediator.Sagas.Steps;

public class CreateUserAccountStep : SagaStepBase<UserOnboardingSaga>
{
    private readonly IUserRepository _users;

    public CreateUserAccountStep(IUserRepository users)
    {
        _users = users;
    }

    public override int Order => 1;  // Executes first

    public override async Task ExecuteAsync(
        UserOnboardingSaga saga, 
        SagaContext context, 
        CancellationToken ct)
    {
        var user = await _users.CreateAsync(saga.Email, ct);
        saga.UserId = user.Id;
        context.Set("UserId", user.Id);  // Share with other steps
    }

    public override async Task CompensateAsync(
        UserOnboardingSaga saga, 
        SagaContext context, 
        CancellationToken ct)
    {
        if (saga.UserId.HasValue)
        {
            await _users.DeleteAsync(saga.UserId.Value, ct);
            saga.UserId = null;
        }
    }
}

public class CreateUserProfileStep : SagaStepBase<UserOnboardingSaga>
{
    public override int Order => 2;  // Executes second

    public override async Task ExecuteAsync(
        UserOnboardingSaga saga, 
        SagaContext context, 
        CancellationToken ct)
    {
        var userId = context.GetRequired<Guid>("UserId");
        // Create profile...
    }

    public override Task CompensateAsync(
        UserOnboardingSaga saga, 
        SagaContext context, 
        CancellationToken ct)
    {
        // Delete profile...
        return Task.CompletedTask;
    }
}

Option 2: Fluent Builder (Inline Definition)

var sagaBuilder = serviceProvider.GetRequiredService<ISagaBuilderFactory>();

sagaBuilder.For<UserOnboardingSaga>()
    .AddStep("CreateUser",
        execute: async (saga, ctx, ct) =>
        {
            saga.UserId = Guid.NewGuid();
            ctx.Set("UserId", saga.UserId.Value);
        },
        compensate: async (saga, ctx, ct) =>
        {
            saga.UserId = null;
        })
    .AddStep("CreateProfile",
        execute: async (saga, ctx, ct) =>
        {
            var userId = ctx.GetRequired<Guid>("UserId");
            saga.ProfileId = Guid.NewGuid();
        },
        compensate: async (saga, ctx, ct) =>
        {
            saga.ProfileId = null;
        })
    .AddStep("SendWelcomeEmail",
        execute: async (saga, ctx, ct) =>
        {
            // Send email (no compensation needed)
        })
    .Build();

Execute the Saga

var result = await mediator.OrchestrateAsync(new UserOnboardingSaga
{
    Email = "user@example.com",
    Username = "newuser"
});

if (result.IsSuccess)
{
    Console.WriteLine($"Saga completed! User ID: {result.Saga.UserId}");
}
else if (result.IsCompensated)
{
    Console.WriteLine($"Saga failed at step '{result.FailedStep}': {result.Exception?.Message}");
    Console.WriteLine($"Compensated steps: {string.Join(", ", result.CompletedSteps)}");
}

Saga Context

The SagaContext allows steps to share data:

// In step 1
context.Set("OrderId", orderId);
context.Set("Customer", customer);

// In step 2
var orderId = context.GetRequired<Guid>("OrderId");
var customer = context.Get<Customer>("Customer");  // Returns null if not found

Pipeline Options

Configure timeout and retry behavior globally or per-request.

Global Configuration

services.AddMediator(options =>
{
    options.PipelineOptions = p =>
    {
        p.Timeout = TimeSpan.FromSeconds(30);
        p.MaxRetryAttempts = 3;
        p.RetryDelay = TimeSpan.FromMilliseconds(500);
        p.UseExponentialBackoff = true;
        p.IsTransientException = ex => ex is HttpRequestException or TimeoutException;
    };
});

Per-Request Override

var customOptions = new PipelineOptions
{
    Timeout = TimeSpan.FromMinutes(5),
    MaxRetryAttempts = 5
};

await mediator.ExecuteAsync(command, customOptions);
await mediator.QueryAsync<TQuery, TResponse>(query, customOptions);
await mediator.TransactAsync(transaction, customOptions);

Pipeline Options Properties

Property Default Description
Timeout 30 seconds Maximum execution time before TimeoutException
MaxRetryAttempts 0 Number of retry attempts for transient failures
RetryDelay 100ms Delay between retry attempts
UseExponentialBackoff false Double delay on each retry
IsTransientException null Predicate to identify retryable exceptions

Filters

Filters provide cross-cutting concerns like logging, validation, and performance monitoring.

Filter Types

Type When It Runs
Prefix Before the handler
Postfix After the handler
Wrap Surrounds the entire execution

Define a Filter

using ApacheTech.Common.Mediator.Commands.Filters;

public class LoggingFilter : ICommandFilter<MyCommand>
{
    public CommandFilterType Type => CommandFilterType.Wrap;
    public int Order => 1;

    public async Task HandleAsync(
        MyCommand command,
        CommandFilterDelegate next,
        CancellationToken ct)
    {
        Console.WriteLine($"Starting {command.GetType().Name}");
        var sw = Stopwatch.StartNew();

        await next();  // Execute the handler (and inner filters)

        Console.WriteLine($"Completed in {sw.ElapsedMilliseconds}ms");
    }
}

Apply Filters via Attribute

[CommandFilter<LoggingFilter>]
[CommandFilter<ValidationFilter>]
public class CreateUserCommand : ICommand
{
    public required string Name { get; set; }
}

Short-Circuit Execution

Filters can short-circuit the pipeline:

public class CachingFilter : IQueryFilter<GetUserQuery, User>
{
    public QueryFilterType Type => QueryFilterType.Prefix;

    public async Task<User> HandleAsync(
        GetUserQuery query,
        QueryFilterDelegate<User> next,
        CancellationToken ct)
    {
        var cached = _cache.Get(query.Id);
        if (cached is not null)
        {
            throw new ShortCircuitException<User>(cached);  // Skip handler
        }

        return await next();
    }
}

Unified Mediator Facade

The IMediator interface provides a single entry point for all operations:

public interface IMediator
{
    // Commands
    Task ExecuteAsync<TCommand>(TCommand command, CancellationToken ct = default);
    Task ExecuteAsync<TCommand>(TCommand command, PipelineOptions options, CancellationToken ct = default);

    // Queries
    Task<TResponse> QueryAsync<TQuery, TResponse>(TQuery query, CancellationToken ct = default);
    Task<TResponse> QueryAsync<TQuery, TResponse>(TQuery query, PipelineOptions options, CancellationToken ct = default);

    // Transactions
    Task TransactAsync<TTransaction>(TTransaction transaction, CancellationToken ct = default);
    Task TransactAsync<TTransaction>(TTransaction transaction, PipelineOptions options, CancellationToken ct = default);

    // Notifications
    Task PublishAsync<TNotification>(TNotification notification, CancellationToken ct = default);
    IDisposable Subscribe<TNotification>(Func<TNotification, CancellationToken, Task> callback);

    // Sagas
    Task<SagaResult<TSaga>> OrchestrateAsync<TSaga>(TSaga saga, CancellationToken ct = default);
}

Inject Either the Facade or Individual Processors

// Single injection (recommended)
public class MyService
{
    private readonly IMediator _mediator;
    public MyService(IMediator mediator) => _mediator = mediator;
}

// Or inject specific processors
public class MyService
{
    private readonly ICommandProcessor _commands;
    private readonly IQueryProcessor _queries;

    public MyService(ICommandProcessor commands, IQueryProcessor queries)
    {
        _commands = commands;
        _queries = queries;
    }
}

Configuration

MediatorOptions

services.AddMediator(options =>
{
    // Handler lifetime (Scoped, Singleton, Transient)
    options.DefaultLifetime = ServiceLifetime.Scoped;

    // Assemblies to scan for handlers
    options.AssemblyMarkers = [typeof(Program), typeof(MyHandler)];
    // Or specify assemblies directly
    options.Assemblies = [typeof(MyHandler).Assembly];

    // Custom logger
    options.Logger = new MyCustomLogger();

    // Pipeline defaults
    options.PipelineOptions = p =>
    {
        p.Timeout = TimeSpan.FromSeconds(30);
        p.MaxRetryAttempts = 3;
    };
});

Custom Logger

public class MyLogger : IMediatorLogger
{
    public void LogDebug(string message) => Debug.WriteLine(message);
    public void LogInfo(string message) => Console.WriteLine($"[INFO] {message}");
    public void LogError(Exception? ex, string message) => Console.Error.WriteLine($"[ERROR] {message}");
}

API Reference

Processors

Interface Description
IMediator Unified facade for all operations
ICommandProcessor Executes commands
IQueryProcessor Sends queries and returns responses
ITransactionProcessor Executes transactions with rollback
INotificationPublisher Publishes notifications
INotificationSubscriptions Manages callback subscriptions
ISagaOrchestrator Executes sagas with compensation
ISagaBuilderFactory Creates fluent saga builders

Base Classes

Class Description
CommandHandlerBase<T> Base class for command handlers
QueryHandlerBase<T, R> Base class for query handlers
TransactionHandlerBase<T> Base class for transaction handlers
NotificationHandlerBase<T> Base class for notification handlers
SagaStepBase<T> Base class for saga steps
SagaBase Base class for saga definitions
NotificationBase Base class for notifications with correlation ID
TransactionBase Base class for transactions with state tracking

Result Types

Type Description
SagaResult<T> Result of saga execution with state, completed steps, and exception
SagaState Enum: Pending, Running, Completed, Failed, Compensating, Compensated
SagaContext Shared context for passing data between saga steps

Distributed Tracing

The library includes built-in ActivitySource for OpenTelemetry compatibility:

// Activities are automatically created for:
// - Command.{CommandType}
// - Query.{QueryType}
// - Transaction.{TransactionType}
// - Notification.{NotificationType}
// - Saga.{SagaType}
// - Saga.Step.{StepName}
// - Saga.Compensate.{StepName}

Configure your tracing provider to listen to "ApacheTech.Common.Mediator":

services.AddOpenTelemetry()
    .WithTracing(builder => builder
        .AddSource("ApacheTech.Common.Mediator")
        .AddConsoleExporter());
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.0 101 4/11/2026

ApacheTech.Common.Mediator.Hosting v1.0.0