Cortex.Mediator 3.1.2

dotnet add package Cortex.Mediator --version 3.1.2
                    
NuGet\Install-Package Cortex.Mediator -Version 3.1.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Cortex.Mediator" Version="3.1.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Cortex.Mediator" Version="3.1.2" />
                    
Directory.Packages.props
<PackageReference Include="Cortex.Mediator" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Cortex.Mediator --version 3.1.2
                    
#r "nuget: Cortex.Mediator, 3.1.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Cortex.Mediator@3.1.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Cortex.Mediator&version=3.1.2
                    
Install as a Cake Addin
#tool nuget:?package=Cortex.Mediator&version=3.1.2
                    
Install as a Cake Tool

Cortex.Mediator 🧠

Cortex.Mediator is a lightweight and extensible implementation of the Mediator pattern for .NET applications, designed to power clean, modular architectures like Vertical Slice Architecture and CQRS.

Built as part of the Cortex Data Framework, this library simplifies command and query handling with built-in support for:

  • ✅ Commands & Queries
  • ✅ Notifications (Events)
  • ✅ Pipeline Behaviors
  • ✅ FluentValidation
  • ✅ Logging

GitHub License NuGet Version GitHub contributors Discord Shield

🚀 Getting Started

Install via NuGet

dotnet add package Cortex.Mediator

🛠️ Setup

In Program.cs or Startup.cs:

builder.Services.AddCortexMediator(
    new[] { typeof(Program) }, // Assemblies to scan for handlers
    options => options.AddDefaultBehaviors() // Logging
);

📦 Folder Structure Example (Vertical Slice)

Features/
  CreateUser/
    CreateUserCommand.cs
    CreateUserCommandHandler.cs
    CreateUserValidator.cs
    CreateUserEndpoint.cs

✏️ Defining a Command

public class CreateUserCommand : ICommand<Guid>
{
    public string UserName { get; set; }
    public string Email { get; set; }
}

Handler

public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand,Guid>
{
    public async Task<Guid> Handle(CreateUserCommand command, CancellationToken cancellationToken)
    {
        // Logic here
    }
}

Sending Commands

Simplified API (Recommended) - Type is automatically inferred:

// Using extension methods - no need to specify type parameters!
var userId = await mediator.SendAsync(command);

// For void commands (no return value)
await mediator.SendAsync(new DeleteUserCommand { UserId = userId });

Explicit Type Parameters (Legacy):

var userId = await mediator.SendCommandAsync<CreateUserCommand, Guid>(command);

Validator (Optional, via FluentValidation)

public class CreateUserValidator : AbstractValidator<CreateUserCommand>
{
    public CreateUserValidator()
    {
        RuleFor(x => x.UserName).NotEmpty();
        RuleFor(x => x.Email).NotEmpty().EmailAddress();
    }
}

🔍 Defining a Query

public class GetUserQuery : IQuery<GetUserResponse>
{
    public int UserId { get; set; }
}
public class GetUserQueryHandler : IQueryHandler<GetUserQuery, GetUserResponse>
{
    public async Task<GetUserResponse> Handle(GetUserQuery query, CancellationToken cancellationToken)
    {
        return new GetUserResponse { UserId = query.UserId, UserName = "Andy" };
    }
}

Sending Queries

Simplified API (Recommended) - Type is automatically inferred:

// Using extension methods - no need to specify type parameters!
var user = await mediator.QueryAsync(new GetUserQuery { UserId = 1 });

Explicit Type Parameters (Legacy):

var user = await mediator.SendQueryAsync<GetUserQuery, GetUserResponse>(query);

📢 Notifications (Events)

public class UserCreatedNotification : INotification
{
    public string UserName { get; set; }
}

public class SendWelcomeEmailHandler : INotificationHandler<UserCreatedNotification>
{
    public async Task Handle(UserCreatedNotification notification, CancellationToken cancellationToken)
    {
        // Send email...
    }
}
await mediator.PublishAsync(new UserCreatedNotification { UserName = "Andy" });

🔧 Pipeline Behaviors (Built-in)

Out of the box, Cortex.Mediator supports:

  • LoggingCommandBehavior - Logs command execution with timing
  • LoggingQueryBehavior - Logs query execution with timing
  • LoggingNotificationBehavior - Logs notification publishing with timing
  • ExceptionHandlingCommandBehavior - Centralized exception handling for commands
  • ExceptionHandlingQueryBehavior - Centralized exception handling for queries
  • ExceptionHandlingNotificationBehavior - Centralized exception handling for notifications
  • ValidationCommandBehavior - FluentValidation support (via Cortex.Mediator.Behaviors.FluentValidation)

Registering Behaviors

// Add default logging behaviors
options.AddDefaultBehaviors();

// Add exception handling behaviors
options.AddExceptionHandlingBehaviors();

// Add both logging and exception handling
options.AddDefaultBehaviorsWithExceptionHandling();

// Custom behaviors
options.AddOpenCommandPipelineBehavior(typeof(MyCustomBehavior<,>));
options.AddOpenQueryPipelineBehavior(typeof(MyCustomQueryBehavior<,>));
options.AddOpenNotificationPipelineBehavior(typeof(MyCustomNotificationBehavior<>));

⚠️ Exception Handling Behavior

The exception handling behaviors provide centralized exception handling with optional fallback results.

Basic Setup

builder.Services.AddCortexMediator(
    new[] { typeof(Program) },
    options => options.AddExceptionHandlingBehaviors()
);

Custom Exception Handler

Implement IExceptionHandler to customize exception handling:

public class MyExceptionHandler : IExceptionHandler
{
    private readonly ILogger<MyExceptionHandler> _logger;

    public MyExceptionHandler(ILogger<MyExceptionHandler> logger)
    {
        _logger = logger;
    }

    public Task<bool> HandleAsync(
        Exception exception,
        Type requestType,
        object request,
        CancellationToken cancellationToken)
    {
        _logger.LogError(exception, "Error processing {RequestType}", requestType.Name);
        
        // Return true to suppress the exception, false to rethrow
        return Task.FromResult(false);
    }
}

// Register in DI
services.AddSingleton<IExceptionHandler, MyExceptionHandler>();

Exception Handler with Fallback Result

For commands and queries that return a value, implement IExceptionHandler<TResult>:

public class FallbackExceptionHandler : IExceptionHandler<ApiResponse>
{
    public Task<(bool handled, ApiResponse? result)> HandleWithResultAsync(
        Exception exception,
        Type requestType,
        object request,
        CancellationToken cancellationToken)
    {
        var fallback = new ApiResponse 
        { 
            Success = false, 
            Error = exception.Message 
        };
        
        return Task.FromResult((true, fallback));
    }

    public Task<bool> HandleAsync(Exception exception, Type requestType, object request, CancellationToken cancellationToken)
        => Task.FromResult(false);
}

Notification Exception Suppression

For notifications, you can suppress exceptions to allow other handlers to continue:

// The ExceptionHandlingNotificationBehavior has a suppressExceptions parameter
// When true, exceptions are logged but not rethrown

💾 Caching Behavior for Queries

The caching behavior provides automatic caching of query results to improve performance.

Basic Setup

// Add caching services
builder.Services.AddMediatorCaching(options =>
{
    options.DefaultAbsoluteExpiration = TimeSpan.FromMinutes(5);
    options.DefaultSlidingExpiration = TimeSpan.FromMinutes(1);
    options.CacheKeyPrefix = "MyApp";
});

// Add mediator with caching behavior
builder.Services.AddCortexMediator(
    new[] { typeof(Program) },
    options => options.AddCachingBehavior()
);

Using the Cacheable Attribute

Mark your query classes with the [Cacheable] attribute:

[Cacheable(AbsoluteExpirationSeconds = 300, SlidingExpirationSeconds = 60)]
public class GetUserQuery : IQuery<UserDto>
{
    public int UserId { get; set; }
}

Using the ICacheableQuery Interface

For more control, implement ICacheableQuery:

public class GetProductQuery : IQuery<ProductDto>, ICacheableQuery
{
    public int ProductId { get; set; }
    
    // Custom cache key
    public string? CacheKey => $"product-{ProductId}";
    
    // Custom expiration times
    public TimeSpan? AbsoluteExpiration => TimeSpan.FromMinutes(10);
    public TimeSpan? SlidingExpiration => TimeSpan.FromMinutes(2);
}

Cache Invalidation

Use ICacheInvalidator to manually invalidate cached results:

public class UpdateUserCommandHandler : ICommandHandler<UpdateUserCommand>
{
    private readonly ICacheInvalidator _cacheInvalidator;

    public UpdateUserCommandHandler(ICacheInvalidator cacheInvalidator)
    {
        _cacheInvalidator = cacheInvalidator;
    }

    public async Task Handle(UpdateUserCommand command, CancellationToken cancellationToken)
    {
        // Update user in database...
        
        // Invalidate the cached query result
        _cacheInvalidator.Invalidate<GetUserQuery, UserDto>(
            new GetUserQuery { UserId = command.UserId });
    }
}

Custom Cache Key Generator

Implement ICacheKeyGenerator for custom key generation:

public class MyCacheKeyGenerator : ICacheKeyGenerator
{
    public string GenerateKey<TQuery, TResult>(TQuery query) 
        where TQuery : IQuery<TResult>
    {
        // Custom key generation logic
        return $"MyApp:{typeof(TQuery).Name}:{query.GetHashCode()}";
    }
}

// Register custom generator
services.AddMediatorCaching<MyCacheKeyGenerator>();

🌊 Streaming Requests (IAsyncEnumerable)

Cortex.Mediator supports streaming queries that return IAsyncEnumerable<T>, perfect for handling large datasets efficiently without loading everything into memory.

Defining a Streaming Query

// Define the streaming query
public class GetAllUsersQuery : IStreamQuery<UserDto>
{
    public int PageSize { get; set; } = 100;
}

// Implement the streaming handler
public class GetAllUsersQueryHandler : IStreamQueryHandler<GetAllUsersQuery, UserDto>
{
    private readonly IDbConnection _db;

    public GetAllUsersQueryHandler(IDbConnection db)
    {
        _db = db;
    }

    public async IAsyncEnumerable<UserDto> Handle(
        GetAllUsersQuery query,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        // Stream results from database one at a time
        await foreach (var user in _db.StreamUsersAsync(query.PageSize, cancellationToken))
        {
            yield return new UserDto
            {
                Id = user.Id,
                Name = user.Name,
                Email = user.Email
            };
        }
    }
}

Consuming Streaming Queries

// Using the StreamAsync extension method (recommended)
await foreach (var user in mediator.StreamAsync(new GetAllUsersQuery()))
{
    Console.WriteLine($"Processing: {user.Name}");
    // Process each user as it arrives - no need to wait for all results
}

// Or with explicit type parameters
await foreach (var user in mediator.CreateStream<GetAllUsersQuery, UserDto>(query))
{
    Console.WriteLine(user.Name);
}

Streaming with Cancellation

var cts = new CancellationTokenSource();

await foreach (var item in mediator.StreamAsync(query, cts.Token))
{
    if (ShouldStop(item))
    {
        cts.Cancel(); // Gracefully stop streaming
        break;
    }
    Process(item);
}

Streaming Pipeline Behaviors

Register pipeline behaviors for streaming queries:

// Create a custom streaming behavior
public class MetricsStreamBehavior<TQuery, TResult> : IStreamQueryPipelineBehavior<TQuery, TResult>
    where TQuery : IStreamQuery<TResult>
{
    public async IAsyncEnumerable<TResult> Handle(
        TQuery query,
        StreamQueryHandlerDelegate<TResult> next,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var count = 0;
        await foreach (var item in next().WithCancellation(cancellationToken))
        {
            count++;
            yield return item;
        }
        Console.WriteLine($"Streamed {count} items");
    }
}

// Register the behavior
services.AddCortexMediator(
    new[] { typeof(Program) },
    options => options.AddOpenStreamQueryPipelineBehavior(typeof(MetricsStreamBehavior<,>))
);

Built-in Logging Behavior for Streams

// Use the built-in logging behavior
options.AddOpenStreamQueryPipelineBehavior(typeof(LoggingStreamQueryBehavior<,>));

🔄 Request Pre/Post Processors

Pre-processors run before the handler executes, and post-processors run after. They're simpler than pipeline behaviors and are ideal for cross-cutting concerns.

Basic Setup

// Register processor behaviors
services.AddCortexMediator(
    new[] { typeof(Program) },
    options => options.AddProcessorBehaviors()
);

Creating a Pre-Processor

Pre-processors run before the handler and can be used for validation, authorization, or data enrichment:

public class LoggingPreProcessor<TRequest> : IRequestPreProcessor<TRequest>
{
    private readonly ILogger<LoggingPreProcessor<TRequest>> _logger;

    public LoggingPreProcessor(ILogger<LoggingPreProcessor<TRequest>> logger)
    {
        _logger = logger;
    }

    public Task ProcessAsync(TRequest request, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Processing {RequestType}", typeof(TRequest).Name);
        return Task.CompletedTask;
    }
}

// Register for a specific request type
services.AddTransient<IRequestPreProcessor<CreateOrderCommand>, OrderValidationPreProcessor>();

// Or register for all requests (generic)
services.AddTransient(typeof(IRequestPreProcessor<>), typeof(LoggingPreProcessor<>));

Creating a Post-Processor

Post-processors run after successful handler execution. Use them for logging, auditing, or triggering side effects:

// Post-processor for commands/queries that return a result
public class AuditPostProcessor<TRequest, TResponse> : IRequestPostProcessor<TRequest, TResponse>
{
    private readonly IAuditService _auditService;

    public AuditPostProcessor(IAuditService auditService)
    {
        _auditService = auditService;
    }

    public async Task ProcessAsync(TRequest request, TResponse response, CancellationToken cancellationToken)
    {
        await _auditService.LogAsync(new AuditEntry
        {
            RequestType = typeof(TRequest).Name,
            ResponseType = typeof(TResponse).Name,
            Timestamp = DateTime.UtcNow
        });
    }
}

// Post-processor for void commands
public class NotificationPostProcessor : IRequestPostProcessor<CreateOrderCommand>
{
    private readonly IMediator _mediator;

    public NotificationPostProcessor(IMediator mediator)
    {
        _mediator = mediator;
    }

    public async Task ProcessAsync(CreateOrderCommand request, CancellationToken cancellationToken)
    {
        // Publish a notification after the command completes
        await _mediator.PublishAsync(new OrderCreatedNotification { /* ... */ }, cancellationToken);
    }
}

Use Cases for Pre-Processors

  • Validation: Validate input before processing
  • Authorization: Check user permissions
  • Data Enrichment: Add data to the request (e.g., current user ID)
  • Rate Limiting: Check and enforce rate limits
  • Logging: Log incoming requests

Use Cases for Post-Processors

  • Audit Logging: Record what happened
  • Notifications: Send notifications after successful operations
  • Cache Invalidation: Clear related cached data
  • Event Publishing: Publish domain events
  • Metrics: Record performance metrics

💬 Contributing

We welcome contributions from the community! Whether it's reporting bugs, suggesting features, or submitting pull requests, your involvement helps improve Cortex for everyone.

💬 How to Contribute

  1. Fork the Repository
  2. Create a Feature Branch
git checkout -b feature/YourFeature
  1. Commit Your Changes
git commit -m "Add your feature"
  1. Push to Your Fork
git push origin feature/YourFeature
  1. Open a Pull Request

Describe your changes and submit the pull request for review.

📄 License

This project is licensed under the MIT License.

📚 Sponsorship

Cortex is an open-source project maintained by BuilderSoft. Your support helps us continue developing and improving Cortex. Consider sponsoring us to contribute to the future of resilient streaming platforms.

How to Sponsor

  • Financial Contributions: Support us through GitHub Sponsors or other preferred platforms.
  • Corporate Sponsorship: If your organization is interested in sponsoring Cortex, please contact us directly.

Contact Us: cortex@buildersoft.io

Contact

We'd love to hear from you! Whether you have questions, feedback, or need support, feel free to reach out.

Thank you for using Cortex Data Framework! We hope it empowers you to build scalable and efficient data processing pipelines effortlessly.

Built with ❤️ by the Buildersoft team.

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 is compatible.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 is compatible. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (4)

Showing the top 4 NuGet packages that depend on Cortex.Mediator:

Package Downloads
StellarTech.Common

A agrouped common entities for Stellar

Cortex.Mediator.Behaviors.FluentValidation

Buildersoft Cortex Mediator is a library for .NET applications that implements the mediator pattern. It helps to reduce dependencies between objects by allowing in-process messaging without direct communication. Instead, objects communicate through Cortex Mediator, making them less coupled and more maintainable..

Cortex.Mediator.Behaviors.Transactional

Cortex.Mediator.Behaviors.Transactional provides transactional pipeline behaviors for Cortex.Mediator. Wrap command execution in TransactionScope for automatic commit on success and rollback on failure. Supports custom transaction contexts for Entity Framework, Dapper, and other ORM integrations.

Cortex.Streams.Mediator

Integration library that bridges Cortex.Streams with Cortex.Mediator, enabling seamless CQRS pattern integration with stream processing pipelines.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
3.1.2 315 2/10/2026
3.1.1 1,075 1/30/2026
3.1.0 163 1/30/2026
2.2.0 1,278 1/24/2026
2.1.0 28,639 10/10/2025
1.7.0 24,957 7/24/2025
1.6.1 11,268 4/8/2025
1.6.0 235 4/5/2025
1.5.0 227 2/8/2025
1.4.0 252 1/18/2025
1.3.1 208 12/24/2024
1.3.0 203 12/24/2024
1.2.1 202 12/10/2024
1.2.0 219 12/9/2024
1.1.0 248 12/3/2024
1.0.0 474 6/27/2024

Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management!