Cortex.Mediator
3.1.2
dotnet add package Cortex.Mediator --version 3.1.2
NuGet\Install-Package Cortex.Mediator -Version 3.1.2
<PackageReference Include="Cortex.Mediator" Version="3.1.2" />
<PackageVersion Include="Cortex.Mediator" Version="3.1.2" />
<PackageReference Include="Cortex.Mediator" />
paket add Cortex.Mediator --version 3.1.2
#r "nuget: Cortex.Mediator, 3.1.2"
#:package Cortex.Mediator@3.1.2
#addin nuget:?package=Cortex.Mediator&version=3.1.2
#tool nuget:?package=Cortex.Mediator&version=3.1.2
Cortex.Mediator 🧠
Cortex.Mediator is a lightweight and extensible implementation of the Mediator pattern for .NET applications, designed to power clean, modular architectures like Vertical Slice Architecture and CQRS.
Built as part of the Cortex Data Framework, this library simplifies command and query handling with built-in support for:
- ✅ Commands & Queries
- ✅ Notifications (Events)
- ✅ Pipeline Behaviors
- ✅ FluentValidation
- ✅ Logging
🚀 Getting Started
Install via NuGet
dotnet add package Cortex.Mediator
🛠️ Setup
In Program.cs or Startup.cs:
builder.Services.AddCortexMediator(
new[] { typeof(Program) }, // Assemblies to scan for handlers
options => options.AddDefaultBehaviors() // Logging
);
📦 Folder Structure Example (Vertical Slice)
Features/
CreateUser/
CreateUserCommand.cs
CreateUserCommandHandler.cs
CreateUserValidator.cs
CreateUserEndpoint.cs
✏️ Defining a Command
public class CreateUserCommand : ICommand<Guid>
{
public string UserName { get; set; }
public string Email { get; set; }
}
Handler
public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand,Guid>
{
public async Task<Guid> Handle(CreateUserCommand command, CancellationToken cancellationToken)
{
// Logic here
}
}
Sending Commands
Simplified API (Recommended) - Type is automatically inferred:
// Using extension methods - no need to specify type parameters!
var userId = await mediator.SendAsync(command);
// For void commands (no return value)
await mediator.SendAsync(new DeleteUserCommand { UserId = userId });
Explicit Type Parameters (Legacy):
var userId = await mediator.SendCommandAsync<CreateUserCommand, Guid>(command);
Validator (Optional, via FluentValidation)
public class CreateUserValidator : AbstractValidator<CreateUserCommand>
{
public CreateUserValidator()
{
RuleFor(x => x.UserName).NotEmpty();
RuleFor(x => x.Email).NotEmpty().EmailAddress();
}
}
🔍 Defining a Query
public class GetUserQuery : IQuery<GetUserResponse>
{
public int UserId { get; set; }
}
public class GetUserQueryHandler : IQueryHandler<GetUserQuery, GetUserResponse>
{
public async Task<GetUserResponse> Handle(GetUserQuery query, CancellationToken cancellationToken)
{
return new GetUserResponse { UserId = query.UserId, UserName = "Andy" };
}
}
Sending Queries
Simplified API (Recommended) - Type is automatically inferred:
// Using extension methods - no need to specify type parameters!
var user = await mediator.QueryAsync(new GetUserQuery { UserId = 1 });
Explicit Type Parameters (Legacy):
var user = await mediator.SendQueryAsync<GetUserQuery, GetUserResponse>(query);
📢 Notifications (Events)
public class UserCreatedNotification : INotification
{
public string UserName { get; set; }
}
public class SendWelcomeEmailHandler : INotificationHandler<UserCreatedNotification>
{
public async Task Handle(UserCreatedNotification notification, CancellationToken cancellationToken)
{
// Send email...
}
}
await mediator.PublishAsync(new UserCreatedNotification { UserName = "Andy" });
🔧 Pipeline Behaviors (Built-in)
Out of the box, Cortex.Mediator supports:
LoggingCommandBehavior- Logs command execution with timingLoggingQueryBehavior- Logs query execution with timingLoggingNotificationBehavior- Logs notification publishing with timingExceptionHandlingCommandBehavior- Centralized exception handling for commandsExceptionHandlingQueryBehavior- Centralized exception handling for queriesExceptionHandlingNotificationBehavior- Centralized exception handling for notificationsValidationCommandBehavior- FluentValidation support (viaCortex.Mediator.Behaviors.FluentValidation)
Registering Behaviors
// Add default logging behaviors
options.AddDefaultBehaviors();
// Add exception handling behaviors
options.AddExceptionHandlingBehaviors();
// Add both logging and exception handling
options.AddDefaultBehaviorsWithExceptionHandling();
// Custom behaviors
options.AddOpenCommandPipelineBehavior(typeof(MyCustomBehavior<,>));
options.AddOpenQueryPipelineBehavior(typeof(MyCustomQueryBehavior<,>));
options.AddOpenNotificationPipelineBehavior(typeof(MyCustomNotificationBehavior<>));
⚠️ Exception Handling Behavior
The exception handling behaviors provide centralized exception handling with optional fallback results.
Basic Setup
builder.Services.AddCortexMediator(
new[] { typeof(Program) },
options => options.AddExceptionHandlingBehaviors()
);
Custom Exception Handler
Implement IExceptionHandler to customize exception handling:
public class MyExceptionHandler : IExceptionHandler
{
private readonly ILogger<MyExceptionHandler> _logger;
public MyExceptionHandler(ILogger<MyExceptionHandler> logger)
{
_logger = logger;
}
public Task<bool> HandleAsync(
Exception exception,
Type requestType,
object request,
CancellationToken cancellationToken)
{
_logger.LogError(exception, "Error processing {RequestType}", requestType.Name);
// Return true to suppress the exception, false to rethrow
return Task.FromResult(false);
}
}
// Register in DI
services.AddSingleton<IExceptionHandler, MyExceptionHandler>();
Exception Handler with Fallback Result
For commands and queries that return a value, implement IExceptionHandler<TResult>:
public class FallbackExceptionHandler : IExceptionHandler<ApiResponse>
{
public Task<(bool handled, ApiResponse? result)> HandleWithResultAsync(
Exception exception,
Type requestType,
object request,
CancellationToken cancellationToken)
{
var fallback = new ApiResponse
{
Success = false,
Error = exception.Message
};
return Task.FromResult((true, fallback));
}
public Task<bool> HandleAsync(Exception exception, Type requestType, object request, CancellationToken cancellationToken)
=> Task.FromResult(false);
}
Notification Exception Suppression
For notifications, you can suppress exceptions to allow other handlers to continue:
// The ExceptionHandlingNotificationBehavior has a suppressExceptions parameter
// When true, exceptions are logged but not rethrown
💾 Caching Behavior for Queries
The caching behavior provides automatic caching of query results to improve performance.
Basic Setup
// Add caching services
builder.Services.AddMediatorCaching(options =>
{
options.DefaultAbsoluteExpiration = TimeSpan.FromMinutes(5);
options.DefaultSlidingExpiration = TimeSpan.FromMinutes(1);
options.CacheKeyPrefix = "MyApp";
});
// Add mediator with caching behavior
builder.Services.AddCortexMediator(
new[] { typeof(Program) },
options => options.AddCachingBehavior()
);
Using the Cacheable Attribute
Mark your query classes with the [Cacheable] attribute:
[Cacheable(AbsoluteExpirationSeconds = 300, SlidingExpirationSeconds = 60)]
public class GetUserQuery : IQuery<UserDto>
{
public int UserId { get; set; }
}
Using the ICacheableQuery Interface
For more control, implement ICacheableQuery:
public class GetProductQuery : IQuery<ProductDto>, ICacheableQuery
{
public int ProductId { get; set; }
// Custom cache key
public string? CacheKey => $"product-{ProductId}";
// Custom expiration times
public TimeSpan? AbsoluteExpiration => TimeSpan.FromMinutes(10);
public TimeSpan? SlidingExpiration => TimeSpan.FromMinutes(2);
}
Cache Invalidation
Use ICacheInvalidator to manually invalidate cached results:
public class UpdateUserCommandHandler : ICommandHandler<UpdateUserCommand>
{
private readonly ICacheInvalidator _cacheInvalidator;
public UpdateUserCommandHandler(ICacheInvalidator cacheInvalidator)
{
_cacheInvalidator = cacheInvalidator;
}
public async Task Handle(UpdateUserCommand command, CancellationToken cancellationToken)
{
// Update user in database...
// Invalidate the cached query result
_cacheInvalidator.Invalidate<GetUserQuery, UserDto>(
new GetUserQuery { UserId = command.UserId });
}
}
Custom Cache Key Generator
Implement ICacheKeyGenerator for custom key generation:
public class MyCacheKeyGenerator : ICacheKeyGenerator
{
public string GenerateKey<TQuery, TResult>(TQuery query)
where TQuery : IQuery<TResult>
{
// Custom key generation logic
return $"MyApp:{typeof(TQuery).Name}:{query.GetHashCode()}";
}
}
// Register custom generator
services.AddMediatorCaching<MyCacheKeyGenerator>();
🌊 Streaming Requests (IAsyncEnumerable)
Cortex.Mediator supports streaming queries that return IAsyncEnumerable<T>, perfect for handling large datasets efficiently without loading everything into memory.
Defining a Streaming Query
// Define the streaming query
public class GetAllUsersQuery : IStreamQuery<UserDto>
{
public int PageSize { get; set; } = 100;
}
// Implement the streaming handler
public class GetAllUsersQueryHandler : IStreamQueryHandler<GetAllUsersQuery, UserDto>
{
private readonly IDbConnection _db;
public GetAllUsersQueryHandler(IDbConnection db)
{
_db = db;
}
public async IAsyncEnumerable<UserDto> Handle(
GetAllUsersQuery query,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
// Stream results from database one at a time
await foreach (var user in _db.StreamUsersAsync(query.PageSize, cancellationToken))
{
yield return new UserDto
{
Id = user.Id,
Name = user.Name,
Email = user.Email
};
}
}
}
Consuming Streaming Queries
// Using the StreamAsync extension method (recommended)
await foreach (var user in mediator.StreamAsync(new GetAllUsersQuery()))
{
Console.WriteLine($"Processing: {user.Name}");
// Process each user as it arrives - no need to wait for all results
}
// Or with explicit type parameters
await foreach (var user in mediator.CreateStream<GetAllUsersQuery, UserDto>(query))
{
Console.WriteLine(user.Name);
}
Streaming with Cancellation
var cts = new CancellationTokenSource();
await foreach (var item in mediator.StreamAsync(query, cts.Token))
{
if (ShouldStop(item))
{
cts.Cancel(); // Gracefully stop streaming
break;
}
Process(item);
}
Streaming Pipeline Behaviors
Register pipeline behaviors for streaming queries:
// Create a custom streaming behavior
public class MetricsStreamBehavior<TQuery, TResult> : IStreamQueryPipelineBehavior<TQuery, TResult>
where TQuery : IStreamQuery<TResult>
{
public async IAsyncEnumerable<TResult> Handle(
TQuery query,
StreamQueryHandlerDelegate<TResult> next,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var count = 0;
await foreach (var item in next().WithCancellation(cancellationToken))
{
count++;
yield return item;
}
Console.WriteLine($"Streamed {count} items");
}
}
// Register the behavior
services.AddCortexMediator(
new[] { typeof(Program) },
options => options.AddOpenStreamQueryPipelineBehavior(typeof(MetricsStreamBehavior<,>))
);
Built-in Logging Behavior for Streams
// Use the built-in logging behavior
options.AddOpenStreamQueryPipelineBehavior(typeof(LoggingStreamQueryBehavior<,>));
🔄 Request Pre/Post Processors
Pre-processors run before the handler executes, and post-processors run after. They're simpler than pipeline behaviors and are ideal for cross-cutting concerns.
Basic Setup
// Register processor behaviors
services.AddCortexMediator(
new[] { typeof(Program) },
options => options.AddProcessorBehaviors()
);
Creating a Pre-Processor
Pre-processors run before the handler and can be used for validation, authorization, or data enrichment:
public class LoggingPreProcessor<TRequest> : IRequestPreProcessor<TRequest>
{
private readonly ILogger<LoggingPreProcessor<TRequest>> _logger;
public LoggingPreProcessor(ILogger<LoggingPreProcessor<TRequest>> logger)
{
_logger = logger;
}
public Task ProcessAsync(TRequest request, CancellationToken cancellationToken)
{
_logger.LogInformation("Processing {RequestType}", typeof(TRequest).Name);
return Task.CompletedTask;
}
}
// Register for a specific request type
services.AddTransient<IRequestPreProcessor<CreateOrderCommand>, OrderValidationPreProcessor>();
// Or register for all requests (generic)
services.AddTransient(typeof(IRequestPreProcessor<>), typeof(LoggingPreProcessor<>));
Creating a Post-Processor
Post-processors run after successful handler execution. Use them for logging, auditing, or triggering side effects:
// Post-processor for commands/queries that return a result
public class AuditPostProcessor<TRequest, TResponse> : IRequestPostProcessor<TRequest, TResponse>
{
private readonly IAuditService _auditService;
public AuditPostProcessor(IAuditService auditService)
{
_auditService = auditService;
}
public async Task ProcessAsync(TRequest request, TResponse response, CancellationToken cancellationToken)
{
await _auditService.LogAsync(new AuditEntry
{
RequestType = typeof(TRequest).Name,
ResponseType = typeof(TResponse).Name,
Timestamp = DateTime.UtcNow
});
}
}
// Post-processor for void commands
public class NotificationPostProcessor : IRequestPostProcessor<CreateOrderCommand>
{
private readonly IMediator _mediator;
public NotificationPostProcessor(IMediator mediator)
{
_mediator = mediator;
}
public async Task ProcessAsync(CreateOrderCommand request, CancellationToken cancellationToken)
{
// Publish a notification after the command completes
await _mediator.PublishAsync(new OrderCreatedNotification { /* ... */ }, cancellationToken);
}
}
Use Cases for Pre-Processors
- Validation: Validate input before processing
- Authorization: Check user permissions
- Data Enrichment: Add data to the request (e.g., current user ID)
- Rate Limiting: Check and enforce rate limits
- Logging: Log incoming requests
Use Cases for Post-Processors
- Audit Logging: Record what happened
- Notifications: Send notifications after successful operations
- Cache Invalidation: Clear related cached data
- Event Publishing: Publish domain events
- Metrics: Record performance metrics
💬 Contributing
We welcome contributions from the community! Whether it's reporting bugs, suggesting features, or submitting pull requests, your involvement helps improve Cortex for everyone.
💬 How to Contribute
- Fork the Repository
- Create a Feature Branch
git checkout -b feature/YourFeature
- Commit Your Changes
git commit -m "Add your feature"
- Push to Your Fork
git push origin feature/YourFeature
- Open a Pull Request
Describe your changes and submit the pull request for review.
📄 License
This project is licensed under the MIT License.
📚 Sponsorship
Cortex is an open-source project maintained by BuilderSoft. Your support helps us continue developing and improving Cortex. Consider sponsoring us to contribute to the future of resilient streaming platforms.
How to Sponsor
- Financial Contributions: Support us through GitHub Sponsors or other preferred platforms.
- Corporate Sponsorship: If your organization is interested in sponsoring Cortex, please contact us directly.
Contact Us: cortex@buildersoft.io
Contact
We'd love to hear from you! Whether you have questions, feedback, or need support, feel free to reach out.
- Email: cortex@buildersoft.io
- Website: https://buildersoft.io
- GitHub Issues: Cortex Data Framework Issues
- Join our Discord Community:
Thank you for using Cortex Data Framework! We hope it empowers you to build scalable and efficient data processing pipelines effortlessly.
Built with ❤️ by the Buildersoft team.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 is compatible. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.0 is compatible. netstandard2.1 is compatible. |
| .NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen40 was computed. tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Microsoft.Extensions.Caching.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Caching.Memory (>= 9.0.3)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.3)
- Microsoft.Extensions.DependencyInjection (>= 9.0.3)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Options (>= 9.0.3)
- Scrutor (>= 6.0.1)
-
.NETStandard 2.1
- Microsoft.Extensions.Caching.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Caching.Memory (>= 9.0.3)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.3)
- Microsoft.Extensions.DependencyInjection (>= 9.0.3)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Options (>= 9.0.3)
- Scrutor (>= 6.0.1)
-
net10.0
- Microsoft.Extensions.Caching.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Caching.Memory (>= 9.0.3)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.3)
- Microsoft.Extensions.DependencyInjection (>= 9.0.3)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Options (>= 9.0.3)
- Scrutor (>= 6.0.1)
-
net7.0
- Microsoft.Extensions.Caching.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Caching.Memory (>= 9.0.3)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.3)
- Microsoft.Extensions.DependencyInjection (>= 9.0.3)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Options (>= 9.0.3)
- Scrutor (>= 6.0.1)
-
net8.0
- Microsoft.Extensions.Caching.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Caching.Memory (>= 9.0.3)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.3)
- Microsoft.Extensions.DependencyInjection (>= 9.0.3)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Options (>= 9.0.3)
- Scrutor (>= 6.0.1)
-
net9.0
- Microsoft.Extensions.Caching.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Caching.Memory (>= 9.0.3)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.3)
- Microsoft.Extensions.DependencyInjection (>= 9.0.3)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.3)
- Microsoft.Extensions.Options (>= 9.0.3)
- Scrutor (>= 6.0.1)
NuGet packages (4)
Showing the top 4 NuGet packages that depend on Cortex.Mediator:
| Package | Downloads |
|---|---|
|
StellarTech.Common
A agrouped common entities for Stellar |
|
|
Cortex.Mediator.Behaviors.FluentValidation
Buildersoft Cortex Mediator is a library for .NET applications that implements the mediator pattern. It helps to reduce dependencies between objects by allowing in-process messaging without direct communication. Instead, objects communicate through Cortex Mediator, making them less coupled and more maintainable.. |
|
|
Cortex.Mediator.Behaviors.Transactional
Cortex.Mediator.Behaviors.Transactional provides transactional pipeline behaviors for Cortex.Mediator. Wrap command execution in TransactionScope for automatic commit on success and rollback on failure. Supports custom transaction contexts for Entity Framework, Dapper, and other ORM integrations. |
|
|
Cortex.Streams.Mediator
Integration library that bridges Cortex.Streams with Cortex.Mediator, enabling seamless CQRS pattern integration with stream processing pipelines. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 3.1.2 | 315 | 2/10/2026 |
| 3.1.1 | 1,075 | 1/30/2026 |
| 3.1.0 | 163 | 1/30/2026 |
| 2.2.0 | 1,278 | 1/24/2026 |
| 2.1.0 | 28,639 | 10/10/2025 |
| 1.7.0 | 24,957 | 7/24/2025 |
| 1.6.1 | 11,268 | 4/8/2025 |
| 1.6.0 | 235 | 4/5/2025 |
| 1.5.0 | 227 | 2/8/2025 |
| 1.4.0 | 252 | 1/18/2025 |
| 1.3.1 | 208 | 12/24/2024 |
| 1.3.0 | 203 | 12/24/2024 |
| 1.2.1 | 202 | 12/10/2024 |
| 1.2.0 | 219 | 12/9/2024 |
| 1.1.0 | 248 | 12/3/2024 |
| 1.0.0 | 474 | 6/27/2024 |
Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management!