Indiko.Blocks.EventBus.RabbitMQ 2.7.6

dotnet add package Indiko.Blocks.EventBus.RabbitMQ --version 2.7.6
                    
NuGet\Install-Package Indiko.Blocks.EventBus.RabbitMQ -Version 2.7.6
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Indiko.Blocks.EventBus.RabbitMQ" Version="2.7.6" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Indiko.Blocks.EventBus.RabbitMQ" Version="2.7.6" />
                    
Directory.Packages.props
<PackageReference Include="Indiko.Blocks.EventBus.RabbitMQ" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Indiko.Blocks.EventBus.RabbitMQ --version 2.7.6
                    
#r "nuget: Indiko.Blocks.EventBus.RabbitMQ, 2.7.6"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Indiko.Blocks.EventBus.RabbitMQ@2.7.6
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Indiko.Blocks.EventBus.RabbitMQ&version=2.7.6
                    
Install as a Cake Addin
#tool nuget:?package=Indiko.Blocks.EventBus.RabbitMQ&version=2.7.6
                    
Install as a Cake Tool

Indiko.Blocks.EventBus.RabbitMQ

RabbitMQ-based distributed event bus implementation for microservices and scalable event-driven architectures.

Overview

This package provides a production-ready RabbitMQ implementation of the event bus abstractions, enabling reliable message delivery across distributed systems with features like persistence, acknowledgments, and automatic reconnection.

Features

  • RabbitMQ Integration: Full RabbitMQ messaging support via EasyNetQ
  • Distributed Events: Publish-subscribe across multiple services
  • Durable Messages: Persistent message storage
  • Automatic Reconnection: Handles connection failures gracefully
  • Auto-Registration Mode: Automatic queue creation and routing based on event type names
  • Pre-Configured Queue Mode: Connect to pre-provisioned named queues (quorum, DLQ, TTL, overflow)
  • Named Queue Support: Map handlers to specific queue names via attribute or appsettings
  • Message Acknowledgment: Reliable message delivery
  • Dead Letter Queues: Failed message handling with .dlq suffix convention
  • Multiple Consumers: Scale horizontally with competing consumers
  • Auto-Discovery: Automatic handler registration from DI container

Installation

dotnet add package Indiko.Blocks.EventBus.RabbitMQ

Prerequisites

  • RabbitMQ server (3.8+)
  • Management plugin enabled (optional, for monitoring)

Quick Start

Install RabbitMQ

Docker
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management
Local Installation

Configure Services

using Indiko.Blocks.EventBus.RabbitMQ;

public class Startup : WebStartup
{
    public override void ConfigureServices(IServiceCollection services)
    {
        base.ConfigureServices(services);
        
        // Configure RabbitMQ event bus
        services.AddRabbitMQEventBus(options =>
        {
            options.Host = Configuration["RabbitMQ:Host"];
            options.Port = Configuration.GetValue<int>("RabbitMQ:Port");
            options.Username = Configuration["RabbitMQ:Username"];
            options.Password = Configuration["RabbitMQ:Password"];
            options.VirtualHost = Configuration["RabbitMQ:VirtualHost"];
            options.ReConnectOnConnectionLost = true;
        });
        
        // Register event handlers
        services.AddScoped<IEventHandler<UserCreatedEvent>, SendWelcomeEmailHandler>();
        services.AddScoped<IEventHandler<OrderPlacedEvent>, ProcessOrderHandler>();
    }
}

Configuration (appsettings.json)

{
  "RabbitMQ": {
    "Host": "localhost",
    "Port": 5672,
    "Username": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "PrefetchCount": 10,
    "Timeout": 30,
    "ReConnectOnConnectionLost": true,
    "Product": "MyApplication",
    "Platform": ".NET 10"
  }
}

Define Events

using Indiko.Blocks.EventBus.Abstractions.Interfaces;

public class UserCreatedEvent : IEvent
{
    public Guid UserId { get; set; }
    public string Email { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public DateTime CreatedAt { get; set; }
}

public class OrderPlacedEvent : IEvent
{
    public Guid OrderId { get; set; }
    public Guid UserId { get; set; }
    public List<OrderItem> Items { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime OrderDate { get; set; }
}

Implement Handlers

Service A - User Service

public class UserService
{
    private readonly IEventBus _eventBus;
    private readonly IUserRepository _userRepository;

    public async Task<User> CreateUserAsync(CreateUserDto dto)
    {
        var user = new User
        {
            Id = Guid.NewGuid(),
            Email = dto.Email,
            FirstName = dto.FirstName,
            LastName = dto.LastName,
            CreatedAt = DateTime.UtcNow
        };
        
        await _userRepository.AddAsync(user);
        
        // Publish event to RabbitMQ
        await _eventBus.PublishAsync(new UserCreatedEvent
        {
            UserId = user.Id,
            Email = user.Email,
            FirstName = user.FirstName,
            LastName = user.LastName,
            CreatedAt = user.CreatedAt
        });
        
        return user;
    }
}

Service B - Email Service

public class SendWelcomeEmailHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IEmailService _emailService;
    private readonly ILogger<SendWelcomeEmailHandler> _logger;

    public SendWelcomeEmailHandler(IEmailService emailService, ILogger<SendWelcomeEmailHandler> logger)
    {
        _emailService = emailService;
        _logger = logger;
    }

    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        _logger.LogInformation($"Sending welcome email to {event.Email}");
        
        try
        {
            await _emailService.SendWelcomeEmailAsync(
                @event.Email,
                @event.FirstName,
                cancellationToken
            );
            
            _logger.LogInformation($"Welcome email sent to {event.Email}");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, $"Failed to send welcome email to {event.Email}");
            throw; // RabbitMQ will retry or move to dead letter queue
        }
    }
}

Service C - Analytics Service

public class TrackUserRegistrationHandler : IEventHandler<UserCreatedEvent>
{
    private readonly IAnalyticsService _analyticsService;

    public async Task HandleAsync(UserCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        await _analyticsService.TrackEventAsync(new AnalyticsEvent
        {
            EventType = "UserRegistration",
            UserId = @event.UserId,
            Timestamp = @event.CreatedAt,
            Properties = new Dictionary<string, object>
            {
                { "email_domain", @event.Email.Split('@')[1] }
            }
        });
    }
}

How It Works

Automatic Handler Registration

The RabbitMQ event bus automatically discovers and registers all handlers from the DI container:

public void AddAllHandlersFromServiceProvider(IServiceProvider serviceProvider)
{
    // Scans all assemblies for IEventHandler<TEvent> implementations
    var eventHandlerTypes = AppDomain.CurrentDomain.GetAssemblies()
        .SelectMany(a => a.GetTypes())
        .Where(t => t.GetInterfaces()
            .Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IEventHandler<>)));

    // Registers each handler with RabbitMQ
    foreach (var handlerType in eventHandlerTypes)
    {
        var handler = serviceProvider.GetService(handlerType);
        if (handler != null)
        {
            RegisterEventHandler(handler);
        }
    }
}

Queue Naming Convention

Each event type gets its own queue:

UserCreatedEvent ? UserCreatedEvent_Queue
OrderPlacedEvent ? OrderPlacedEvent_Queue
PaymentProcessedEvent ? PaymentProcessedEvent_Queue

Message Flow

Publisher (Service A)
    ?
[RabbitMQ Exchange]
    ?
[UserCreatedEvent_Queue]
    ?
Consumers (Services B, C, D...)

Pre-Configured Queue Mode

When you have pre-provisioned queues in RabbitMQ (e.g., quorum queues managed by infrastructure-as-code), use pre-configured queue mode. The event bus connects to your named queues instead of auto-creating queues from event type names.

Mode is determined automatically: if RabbitMQOptions.Queues contains at least one entry, the bus operates in pre-configured mode and auto-registration is disabled entirely.

Operation Modes at a Glance

Auto-Registration Mode Pre-Configured Queue Mode
Queue names {EventTypeName}_Queue Your named queues
Queue creation On first connect Declare-if-missing on connect
API used EasyNetQ PubSub EasyNetQ Advanced API
Exchange EasyNetQ default Shared direct exchange (EX_BuildingBlock)
Handler mapping By event type [RabbitMQQueue] attribute or appsettings
Enabled when Queues list is empty Queues list has entries

Step 1: Annotate Handlers with [RabbitMQQueue]

using Indiko.Blocks.EventBus.RabbitMQ.Attributes;

[RabbitMQQueue("asset-enhancement")]
public class AssetEnhancementHandler : IEventHandler<AssetEnhancementJobRequestedEvent>
{
    public async Task HandleAsync(AssetEnhancementJobRequestedEvent @event, CancellationToken cancellationToken = default)
    {
        // process the event
    }
}

[RabbitMQQueue("content-generation")]
public class ContentGenerationHandler : IEventHandler<ContentGenerationRequestedEvent>
{
    public async Task HandleAsync(ContentGenerationRequestedEvent @event, CancellationToken cancellationToken = default)
    {
        // process the event
    }
}

Step 2: Configure Queues in appsettings.json

{
  "RabbitMQOptions": {
    "Enabled": true,
    "Host": "localhost",
    "Port": 5672,
    "Username": "guest",
    "Password": "guest",
    "Exchange": "EX_BuildingBlock",
    "ReConnectOnConnectionLost": true,
    "Queues": [
      {
        "Name": "asset-enhancement",
        "Type": "quorum",
        "EnableDlq": true,
        "EventTypes": []
      },
      {
        "Name": "content-generation",
        "Type": "quorum",
        "EnableDlq": true,
        "MessageTtlMs": 300000,
        "MaxLength": 10000,
        "OverflowStrategy": "reject-publish",
        "EventTypes": []
      }
    ]
  }
}

Queue Configuration Properties

Property Type Default Description
Name string required RabbitMQ queue name
Type string "quorum" Queue type: "quorum" or "classic"
EnableDlq bool false Declare a dead-letter queue named {Name}.dlq
MessageTtlMs int? null Per-message TTL in milliseconds (x-message-ttl)
MaxLength int? null Max queue length in messages (x-max-length)
OverflowStrategy string "drop-head" Overflow behavior: "drop-head" or "reject-publish"
EventTypes string[] [] Event type short names to override attribute mapping

Overriding Handler Mapping via appsettings

The EventTypes array maps event types to a queue by short class name. Config overrides the [RabbitMQQueue] attribute:

{
  "Queues": [
    {
      "Name": "asset-enhancement",
      "EventTypes": ["AssetEnhancementJobRequestedEvent", "AssetRegenerationRequestedEvent"]
    }
  ]
}

Both AssetEnhancementJobRequestedEvent and AssetRegenerationRequestedEvent will be routed to the asset-enhancement queue regardless of any attribute on their handlers.

Topology Declared on Connect

When the application starts, RabbitMQEventBus.ConnectAsync declares the full topology before subscribing handlers:

  1. Declares a shared direct exchange (EX_BuildingBlock by default, configurable via Exchange)
  2. For each configured queue (if EnableDlq is true):
    • Declares {Name}.dlq as a classic durable queue
    • Declares the main queue with dead-letter routing to the DLQ via the default exchange
  3. Binds each queue to the exchange with routing key = queue name
  4. Subscribes the matching handler via the Advanced API

If a queue already exists in RabbitMQ with different properties, the bus logs a warning and connects to the existing queue without modifying it.

Message Flow (Pre-Configured Mode)

Publisher
    |
    v
[EX_BuildingBlock] (direct exchange)
    |                           |
    v                           v
[asset-enhancement]    [content-generation]
    |                           |
    v                           v
AssetEnhancementHandler  ContentGenerationHandler
    |
    v (on failure)
[asset-enhancement.dlq]

Publishing in Pre-Configured Mode

Publishing works identically from the caller's perspective. The bus routes to the correct named queue automatically:

await _eventBus.PublishAsync(new AssetEnhancementJobRequestedEvent
{
    JobId = Guid.NewGuid(),
    AssetId = assetId
});

If no queue mapping exists for the event type, PublishAsync throws InvalidOperationException with a descriptive message.

DLQ Naming Convention

Dead-letter queues are named {queue-name}.dlq:

Queue DLQ
asset-enhancement asset-enhancement.dlq
content-generation content-generation.dlq

The DLQ is a classic durable queue. Dead-lettered messages are routed via the default (nameless) exchange using the DLQ name as routing key.

Quorum Queue Notes

  • Quorum queues do not support x-overflow: drop-head. Use "OverflowStrategy": "reject-publish" for quorum queues.
  • DLQ queues are declared as classic (quorum queues cannot be DLQs for other quorum queues in all RabbitMQ versions).
  • TTL (MessageTtlMs) and MaxLength are supported on quorum queues.

Connection Management

Auto-Reconnection

private void Advanced_Disconnected(object sender, DisconnectedEventArgs e)
{
    _logger.LogWarning($"Disconnected from RabbitMQ: {e.Reason}");

    if (_options.ReConnectOnConnectionLost)
    {
        _logger.LogInformation("Reconnecting to RabbitMQ...");
        UnRegisterAllEventHandlers();
        AddAllHandlersFromServiceProvider(_serviceProvider);
        _logger.LogInformation("Reconnected to RabbitMQ");
    }
}

Connection Events

_bus.Advanced.Connected += Advanced_Connected;
_bus.Advanced.Disconnected += Advanced_Disconnected;
_bus.Advanced.MessageReturned += Advanced_MessageReturned;

Advanced Configurations

Multiple RabbitMQ Instances

{
  "RabbitMQ": {
    "Host": "rabbitmq-cluster.example.com",
    "Port": 5672,
    "Username": "app-user",
    "Password": "secure-password",
    "VirtualHost": "/production",
    "Timeout": 60,
    "PrefetchCount": 20
  }
}

Connection String Builder

var connectionString = RabbitConnectionStringBuilder
    .Init(options)
    .Build();

// Produces: host=localhost:5672;virtualHost=/;username=guest;password=guest

Manual Handler Registration

public void Configure(IApplicationBuilder app, IEventBus eventBus, IServiceProvider services)
{
    // Manual registration if needed
    var handler = services.GetRequiredService<IEventHandler<UserCreatedEvent>>();
    eventBus.RegisterEventHandler(handler);
}

Error Handling and Retries

Automatic Retries

RabbitMQ automatically retries failed messages based on configuration:

public class ResilientEventHandler : IEventHandler<OrderPlacedEvent>
{
    public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken cancellationToken)
    {
        try
        {
            await ProcessOrderAsync(@event);
        }
        catch (TransientException ex)
        {
            // Throw to trigger RabbitMQ retry
            _logger.LogWarning($"Transient error, will retry: {ex.Message}");
            throw;
        }
        catch (PermanentException ex)
        {
            // Log and return (don't throw) to acknowledge message
            _logger.LogError($"Permanent error, message will be dropped: {ex.Message}");
            // Message is acknowledged and won't be retried
        }
    }
}

Dead Letter Queue

Configure dead letter queues for failed messages:

services.AddRabbitMQEventBus(options =>
{
    options.Host = "localhost";
    options.DeadLetterExchange = "dlx";
    options.DeadLetterQueue = "failed_messages";
});

Scaling

Horizontal Scaling

Run multiple instances of your service - RabbitMQ distributes messages:

Service Instance 1 ??
Service Instance 2 ???? [UserCreatedEvent_Queue] ?? RabbitMQ
Service Instance 3 ??

Each message is delivered to only ONE instance (competing consumers).

Message Prefetch

Control how many messages each consumer processes at once:

{
  "RabbitMQ": {
    "PrefetchCount": 10
  }
}

Monitoring

RabbitMQ Management UI

Access at: http://localhost:15672

  • Default credentials: guest/guest
  • View queues, exchanges, message rates
  • Monitor consumer connections

Application Logging

_logger.LogInformation("Connected to RabbitMQ at {host}:{port}", options.Host, options.Port);
_logger.LogDebug("Registered event handler {handler} for {event}", handlerType, eventType);
_logger.LogWarning("Disconnected from RabbitMQ: {reason}", disconnectReason);

Best Practices

  1. Idempotent Handlers: Design handlers to be idempotent (handle duplicates)
  2. Event Versioning: Plan for event schema evolution
  3. Correlation IDs: Include correlation IDs for tracing
  4. Small Events: Keep event payloads small
  5. Error Handling: Distinguish transient vs permanent errors
  6. Connection Pooling: Reuse connections
  7. Monitoring: Set up alerts for queue depths

Deployment

Docker Compose

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq

  user-service:
    build: ./UserService
    environment:
      RabbitMQ__Host: rabbitmq
      RabbitMQ__Username: admin
      RabbitMQ__Password: password
    depends_on:
      - rabbitmq

  email-service:
    build: ./EmailService
    environment:
      RabbitMQ__Host: rabbitmq
      RabbitMQ__Username: admin
      RabbitMQ__Password: password
    depends_on:
      - rabbitmq

volumes:
  rabbitmq-data:

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: user-service
        image: user-service:latest
        env:
        - name: RabbitMQ__Host
          value: "rabbitmq-service"
        - name: RabbitMQ__Username
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: username
        - name: RabbitMQ__Password
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password

Migration from InMemory

Simply change the registration:

// From
services.AddInMemoryEventBus();

// To
services.AddRabbitMQEventBus(Configuration);

All events and handlers work without modification!

Target Framework

  • .NET 10

Dependencies

  • Indiko.Blocks.EventBus.Abstractions
  • EasyNetQ (8.0+)
  • RabbitMQ.Client (6.0+)

License

See LICENSE file in the repository root.

  • Indiko.Blocks.EventBus.Abstractions - Core event bus abstractions
  • Indiko.Blocks.EventBus.InMemory - In-memory event bus for development
  • Indiko.Blocks.Mediation.Abstractions - CQRS and mediator pattern

Resources

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.7.6 116 4/23/2026
2.7.5 93 4/23/2026
2.7.4 100 4/23/2026
2.7.3 95 4/23/2026
2.7.2 100 4/23/2026
2.7.1 83 4/23/2026
2.7.0 94 4/23/2026
2.6.4 133 4/21/2026
2.6.3 90 4/21/2026
2.6.2 113 4/21/2026
2.6.1 88 4/18/2026
2.6.0 89 4/17/2026
2.5.1 93 4/14/2026
2.5.0 111 3/30/2026
2.2.18 102 3/8/2026
2.2.17 82 3/8/2026
2.2.16 91 3/8/2026
2.2.15 88 3/7/2026
2.2.13 84 3/7/2026
2.2.12 93 3/7/2026
Loading failed