ApacheTech.Common.Mediator
1.0.0
dotnet add package ApacheTech.Common.Mediator --version 1.0.0
NuGet\Install-Package ApacheTech.Common.Mediator -Version 1.0.0
<PackageReference Include="ApacheTech.Common.Mediator" Version="1.0.0" />
<PackageVersion Include="ApacheTech.Common.Mediator" Version="1.0.0" />
<PackageReference Include="ApacheTech.Common.Mediator" />
paket add ApacheTech.Common.Mediator --version 1.0.0
#r "nuget: ApacheTech.Common.Mediator, 1.0.0"
#:package ApacheTech.Common.Mediator@1.0.0
#addin nuget:?package=ApacheTech.Common.Mediator&version=1.0.0
#tool nuget:?package=ApacheTech.Common.Mediator&version=1.0.0
ApacheTech Mediator Library
A lightweight, feature-rich mediator library for .NET 8+ applications. Implements the mediator pattern with support for commands, queries, transactions, notifications, and sagas—all with built-in resilience, filtering, and distributed tracing.
Features
| Feature | Description |
|---|---|
| Commands | Fire-and-forget operations with filter pipeline |
| Queries | Request/response operations with caching support |
| Transactions | Commands with automatic rollback on failure |
| Notifications | Pub/sub with hybrid handlers and callback subscriptions |
| Sagas | Multi-step workflows with per-step compensation |
| Resilience | Built-in retry, timeout, and circuit-breaker patterns |
| Tracing | OpenTelemetry-compatible ActivitySource integration |
| Filters | Prefix, postfix, and wrap filters for cross-cutting concerns |
Table of Contents
- Installation
- Quick Start
- Commands
- Queries
- Transactions
- Notifications
- Sagas
- Pipeline Options
- Filters
- Unified Mediator Facade
- Configuration
- API Reference
Installation
# Install the core mediator package
dotnet add package ApacheTech.Common.Mediator
# Install the hosting package for DI and configuration, using ApacheTech.Common.DependencyInjection
dotnet add package ApacheTech.Common.Mediator.Hosting
# Install the hosting package for DI and configuration, using Microsoft.Extensions.DependencyInjection
dotnet add package ApacheTech.Common.Mediator.Hosting.MEDI
Quick Start
1. Register the Mediator
using ApacheTech.Common.Mediator.Hosting;
var services = new ServiceCollection();
services.AddMediator(options =>
{
options.AssemblyMarkers = [typeof(Program)]; // Scan this assembly for handlers
options.PipelineOptions = p =>
{
p.Timeout = TimeSpan.FromSeconds(30);
p.MaxRetryAttempts = 3;
};
});
var serviceProvider = services.BuildServiceProvider();
2. Use the Mediator
var mediator = serviceProvider.GetRequiredService<IMediator>();
// Execute a command
await mediator.ExecuteAsync(new CreateUserCommand { Name = "John" });
// Send a query
var user = await mediator.QueryAsync<GetUserQuery, User>(new GetUserQuery { Id = 1 });
// Publish a notification
await mediator.PublishAsync(new UserCreatedNotification { UserId = "123" });
Commands
Commands represent fire-and-forget operations that mutate state. They do not return a value.
Define a Command
using ApacheTech.Common.Mediator.Commands;
public class CreateUserCommand : ICommand
{
public required string Name { get; set; }
public required string Email { get; set; }
public Guid? CreatedUserId { get; set; } // Set by handler
}
Create a Handler
using ApacheTech.Common.Mediator.Commands.Handlers;
public class CreateUserCommandHandler : CommandHandlerBase<CreateUserCommand>
{
private readonly IUserRepository _repository;
public CreateUserCommandHandler(IUserRepository repository)
{
_repository = repository;
}
public override async Task HandleAsync(CreateUserCommand command, CancellationToken ct)
{
var user = await _repository.CreateAsync(command.Name, command.Email, ct);
command.CreatedUserId = user.Id;
}
}
Execute the Command
// Using IMediator
await mediator.ExecuteAsync(new CreateUserCommand
{
Name = "John",
Email = "john@example.com"
});
// Or using ICommandProcessor directly
var commandProcessor = serviceProvider.GetRequiredService<ICommandProcessor>();
await commandProcessor.ExecuteAsync(command);
Queries
Queries represent request/response operations that return data without side effects.
Define a Query
using ApacheTech.Common.Mediator.Queries;
public class GetUserQuery : IQuery<User>
{
public required int Id { get; set; }
}
public class User
{
public int Id { get; set; }
public string Name { get; set; } = string.Empty;
public string Email { get; set; } = string.Empty;
}
Create a Handler
using ApacheTech.Common.Mediator.Queries.Handlers;
public class GetUserQueryHandler : QueryHandlerBase<GetUserQuery, User>
{
private readonly IUserRepository _repository;
public GetUserQueryHandler(IUserRepository repository)
{
_repository = repository;
}
public override async Task<User> HandleAsync(GetUserQuery query, CancellationToken ct)
{
return await _repository.GetByIdAsync(query.Id, ct)
?? throw new NotFoundException($"User {query.Id} not found");
}
}
Send the Query
var user = await mediator.QueryAsync<GetUserQuery, User>(
new GetUserQuery { Id = 42 }
);
Transactions
Transactions are commands with automatic rollback capability. If the handler or any filter throws, the RollbackAsync method is called.
Define a Transaction
using ApacheTech.Common.Mediator.Transactions;
public class TransferFundsTransaction : TransactionBase
{
public required string FromAccount { get; set; }
public required string ToAccount { get; set; }
public required decimal Amount { get; set; }
}
Create a Handler with Rollback
using ApacheTech.Common.Mediator.Transactions.Handlers;
public class TransferFundsHandler : TransactionHandlerBase<TransferFundsTransaction>
{
private readonly IAccountService _accounts;
private decimal _debitedAmount;
public TransferFundsHandler(IAccountService accounts)
{
_accounts = accounts;
}
public override async Task HandleAsync(TransferFundsTransaction tx, CancellationToken ct)
{
// Debit source account
await _accounts.DebitAsync(tx.FromAccount, tx.Amount, ct);
_debitedAmount = tx.Amount;
// Credit destination account (might throw!)
await _accounts.CreditAsync(tx.ToAccount, tx.Amount, ct);
}
public override async Task RollbackAsync(TransferFundsTransaction tx, CancellationToken ct)
{
// Reverse the debit if we debited
if (_debitedAmount > 0)
{
await _accounts.CreditAsync(tx.FromAccount, _debitedAmount, ct);
}
}
}
Execute the Transaction
await mediator.TransactAsync(new TransferFundsTransaction
{
FromAccount = "ACC-001",
ToAccount = "ACC-002",
Amount = 100.00m
});
// If CreditAsync throws, RollbackAsync is called automatically
Notifications
Notifications implement the pub/sub pattern. Multiple handlers and subscribers can react to a single notification.
Define a Notification
using ApacheTech.Common.Mediator.Notifications;
public class UserCreatedNotification : NotificationBase
{
public required string UserId { get; set; }
public required string Email { get; set; }
}
Option 1: Standalone Handler (Auto-Discovered)
using ApacheTech.Common.Mediator.Notifications.Handlers;
public class SendWelcomeEmailHandler : NotificationHandlerBase<UserCreatedNotification>
{
private readonly IEmailService _email;
public SendWelcomeEmailHandler(IEmailService email)
{
_email = email;
}
public override async Task HandleAsync(UserCreatedNotification notification, CancellationToken ct)
{
await _email.SendWelcomeAsync(notification.Email, ct);
}
}
Option 2: Callback Subscription (For Existing Instances)
public class CacheService : IDisposable
{
private readonly IDisposable _subscription;
private readonly Dictionary<string, User> _cache = new();
public CacheService(INotificationSubscriptions subscriptions)
{
// Subscribe with a callback
_subscription = subscriptions.Subscribe<UserCreatedNotification>(OnUserCreatedAsync);
}
private Task OnUserCreatedAsync(UserCreatedNotification notification, CancellationToken ct)
{
_cache[notification.UserId] = new User { Id = notification.UserId };
return Task.CompletedTask;
}
public void Dispose() => _subscription.Dispose();
}
Publish a Notification
await mediator.PublishAsync(new UserCreatedNotification
{
UserId = "user-123",
Email = "user@example.com"
});
// Both the standalone handler AND any callback subscriptions are invoked
Sagas
Sagas are multi-step workflows where each step has a compensating action. If any step fails, all previously completed steps are compensated in reverse order.
Define a Saga
using ApacheTech.Common.Mediator.Sagas;
public class UserOnboardingSaga : SagaBase
{
public required string Email { get; set; }
public required string Username { get; set; }
// Set by steps during execution
public Guid? UserId { get; set; }
public Guid? ProfileId { get; set; }
}
Option 1: Standalone Step Classes (Auto-Discovered)
using ApacheTech.Common.Mediator.Sagas.Steps;
public class CreateUserAccountStep : SagaStepBase<UserOnboardingSaga>
{
private readonly IUserRepository _users;
public CreateUserAccountStep(IUserRepository users)
{
_users = users;
}
public override int Order => 1; // Executes first
public override async Task ExecuteAsync(
UserOnboardingSaga saga,
SagaContext context,
CancellationToken ct)
{
var user = await _users.CreateAsync(saga.Email, ct);
saga.UserId = user.Id;
context.Set("UserId", user.Id); // Share with other steps
}
public override async Task CompensateAsync(
UserOnboardingSaga saga,
SagaContext context,
CancellationToken ct)
{
if (saga.UserId.HasValue)
{
await _users.DeleteAsync(saga.UserId.Value, ct);
saga.UserId = null;
}
}
}
public class CreateUserProfileStep : SagaStepBase<UserOnboardingSaga>
{
public override int Order => 2; // Executes second
public override async Task ExecuteAsync(
UserOnboardingSaga saga,
SagaContext context,
CancellationToken ct)
{
var userId = context.GetRequired<Guid>("UserId");
// Create profile...
}
public override Task CompensateAsync(
UserOnboardingSaga saga,
SagaContext context,
CancellationToken ct)
{
// Delete profile...
return Task.CompletedTask;
}
}
Option 2: Fluent Builder (Inline Definition)
var sagaBuilder = serviceProvider.GetRequiredService<ISagaBuilderFactory>();
sagaBuilder.For<UserOnboardingSaga>()
.AddStep("CreateUser",
execute: async (saga, ctx, ct) =>
{
saga.UserId = Guid.NewGuid();
ctx.Set("UserId", saga.UserId.Value);
},
compensate: async (saga, ctx, ct) =>
{
saga.UserId = null;
})
.AddStep("CreateProfile",
execute: async (saga, ctx, ct) =>
{
var userId = ctx.GetRequired<Guid>("UserId");
saga.ProfileId = Guid.NewGuid();
},
compensate: async (saga, ctx, ct) =>
{
saga.ProfileId = null;
})
.AddStep("SendWelcomeEmail",
execute: async (saga, ctx, ct) =>
{
// Send email (no compensation needed)
})
.Build();
Execute the Saga
var result = await mediator.OrchestrateAsync(new UserOnboardingSaga
{
Email = "user@example.com",
Username = "newuser"
});
if (result.IsSuccess)
{
Console.WriteLine($"Saga completed! User ID: {result.Saga.UserId}");
}
else if (result.IsCompensated)
{
Console.WriteLine($"Saga failed at step '{result.FailedStep}': {result.Exception?.Message}");
Console.WriteLine($"Compensated steps: {string.Join(", ", result.CompletedSteps)}");
}
Saga Context
The SagaContext allows steps to share data:
// In step 1
context.Set("OrderId", orderId);
context.Set("Customer", customer);
// In step 2
var orderId = context.GetRequired<Guid>("OrderId");
var customer = context.Get<Customer>("Customer"); // Returns null if not found
Pipeline Options
Configure timeout and retry behavior globally or per-request.
Global Configuration
services.AddMediator(options =>
{
options.PipelineOptions = p =>
{
p.Timeout = TimeSpan.FromSeconds(30);
p.MaxRetryAttempts = 3;
p.RetryDelay = TimeSpan.FromMilliseconds(500);
p.UseExponentialBackoff = true;
p.IsTransientException = ex => ex is HttpRequestException or TimeoutException;
};
});
Per-Request Override
var customOptions = new PipelineOptions
{
Timeout = TimeSpan.FromMinutes(5),
MaxRetryAttempts = 5
};
await mediator.ExecuteAsync(command, customOptions);
await mediator.QueryAsync<TQuery, TResponse>(query, customOptions);
await mediator.TransactAsync(transaction, customOptions);
Pipeline Options Properties
| Property | Default | Description |
|---|---|---|
Timeout |
30 seconds | Maximum execution time before TimeoutException |
MaxRetryAttempts |
0 | Number of retry attempts for transient failures |
RetryDelay |
100ms | Delay between retry attempts |
UseExponentialBackoff |
false | Double delay on each retry |
IsTransientException |
null |
Predicate to identify retryable exceptions |
Filters
Filters provide cross-cutting concerns like logging, validation, and performance monitoring.
Filter Types
| Type | When It Runs |
|---|---|
| Prefix | Before the handler |
| Postfix | After the handler |
| Wrap | Surrounds the entire execution |
Define a Filter
using ApacheTech.Common.Mediator.Commands.Filters;
public class LoggingFilter : ICommandFilter<MyCommand>
{
public CommandFilterType Type => CommandFilterType.Wrap;
public int Order => 1;
public async Task HandleAsync(
MyCommand command,
CommandFilterDelegate next,
CancellationToken ct)
{
Console.WriteLine($"Starting {command.GetType().Name}");
var sw = Stopwatch.StartNew();
await next(); // Execute the handler (and inner filters)
Console.WriteLine($"Completed in {sw.ElapsedMilliseconds}ms");
}
}
Apply Filters via Attribute
[CommandFilter<LoggingFilter>]
[CommandFilter<ValidationFilter>]
public class CreateUserCommand : ICommand
{
public required string Name { get; set; }
}
Short-Circuit Execution
Filters can short-circuit the pipeline:
public class CachingFilter : IQueryFilter<GetUserQuery, User>
{
public QueryFilterType Type => QueryFilterType.Prefix;
public async Task<User> HandleAsync(
GetUserQuery query,
QueryFilterDelegate<User> next,
CancellationToken ct)
{
var cached = _cache.Get(query.Id);
if (cached is not null)
{
throw new ShortCircuitException<User>(cached); // Skip handler
}
return await next();
}
}
Unified Mediator Facade
The IMediator interface provides a single entry point for all operations:
public interface IMediator
{
// Commands
Task ExecuteAsync<TCommand>(TCommand command, CancellationToken ct = default);
Task ExecuteAsync<TCommand>(TCommand command, PipelineOptions options, CancellationToken ct = default);
// Queries
Task<TResponse> QueryAsync<TQuery, TResponse>(TQuery query, CancellationToken ct = default);
Task<TResponse> QueryAsync<TQuery, TResponse>(TQuery query, PipelineOptions options, CancellationToken ct = default);
// Transactions
Task TransactAsync<TTransaction>(TTransaction transaction, CancellationToken ct = default);
Task TransactAsync<TTransaction>(TTransaction transaction, PipelineOptions options, CancellationToken ct = default);
// Notifications
Task PublishAsync<TNotification>(TNotification notification, CancellationToken ct = default);
IDisposable Subscribe<TNotification>(Func<TNotification, CancellationToken, Task> callback);
// Sagas
Task<SagaResult<TSaga>> OrchestrateAsync<TSaga>(TSaga saga, CancellationToken ct = default);
}
Inject Either the Facade or Individual Processors
// Single injection (recommended)
public class MyService
{
private readonly IMediator _mediator;
public MyService(IMediator mediator) => _mediator = mediator;
}
// Or inject specific processors
public class MyService
{
private readonly ICommandProcessor _commands;
private readonly IQueryProcessor _queries;
public MyService(ICommandProcessor commands, IQueryProcessor queries)
{
_commands = commands;
_queries = queries;
}
}
Configuration
MediatorOptions
services.AddMediator(options =>
{
// Handler lifetime (Scoped, Singleton, Transient)
options.DefaultLifetime = ServiceLifetime.Scoped;
// Assemblies to scan for handlers
options.AssemblyMarkers = [typeof(Program), typeof(MyHandler)];
// Or specify assemblies directly
options.Assemblies = [typeof(MyHandler).Assembly];
// Custom logger
options.Logger = new MyCustomLogger();
// Pipeline defaults
options.PipelineOptions = p =>
{
p.Timeout = TimeSpan.FromSeconds(30);
p.MaxRetryAttempts = 3;
};
});
Custom Logger
public class MyLogger : IMediatorLogger
{
public void LogDebug(string message) => Debug.WriteLine(message);
public void LogInfo(string message) => Console.WriteLine($"[INFO] {message}");
public void LogError(Exception? ex, string message) => Console.Error.WriteLine($"[ERROR] {message}");
}
API Reference
Processors
| Interface | Description |
|---|---|
IMediator |
Unified facade for all operations |
ICommandProcessor |
Executes commands |
IQueryProcessor |
Sends queries and returns responses |
ITransactionProcessor |
Executes transactions with rollback |
INotificationPublisher |
Publishes notifications |
INotificationSubscriptions |
Manages callback subscriptions |
ISagaOrchestrator |
Executes sagas with compensation |
ISagaBuilderFactory |
Creates fluent saga builders |
Base Classes
| Class | Description |
|---|---|
CommandHandlerBase<T> |
Base class for command handlers |
QueryHandlerBase<T, R> |
Base class for query handlers |
TransactionHandlerBase<T> |
Base class for transaction handlers |
NotificationHandlerBase<T> |
Base class for notification handlers |
SagaStepBase<T> |
Base class for saga steps |
SagaBase |
Base class for saga definitions |
NotificationBase |
Base class for notifications with correlation ID |
TransactionBase |
Base class for transactions with state tracking |
Result Types
| Type | Description |
|---|---|
SagaResult<T> |
Result of saga execution with state, completed steps, and exception |
SagaState |
Enum: Pending, Running, Completed, Failed, Compensating, Compensated |
SagaContext |
Shared context for passing data between saga steps |
Distributed Tracing
The library includes built-in ActivitySource for OpenTelemetry compatibility:
// Activities are automatically created for:
// - Command.{CommandType}
// - Query.{QueryType}
// - Transaction.{TransactionType}
// - Notification.{NotificationType}
// - Saga.{SagaType}
// - Saga.Step.{StepName}
// - Saga.Compensate.{StepName}
Configure your tracing provider to listen to "ApacheTech.Common.Mediator":
services.AddOpenTelemetry()
.WithTracing(builder => builder
.AddSource("ApacheTech.Common.Mediator")
.AddConsoleExporter());
License
MIT License - see LICENSE for details.
Contributing
Contributions are welcome! Please read our contributing guidelines before submitting PRs.
Built with ❤️ by ApacheTech
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- No dependencies.
NuGet packages (2)
Showing the top 2 NuGet packages that depend on ApacheTech.Common.Mediator:
| Package | Downloads |
|---|---|
|
ApacheTech.Common.Mediator.Hosting
A lightweight pipeline framework for handling requests, commands, events, and notifications. |
|
|
ApacheTech.Common.Mediator.Hosting.MEDI
A lightweight pipeline framework for handling requests, commands, events, and notifications. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0 | 139 | 4/11/2026 |
ApacheTech.Common.Mediator v1.0.0