Kacho.RabbitMQ 10.1.1

dotnet add package Kacho.RabbitMQ --version 10.1.1
                    
NuGet\Install-Package Kacho.RabbitMQ -Version 10.1.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Kacho.RabbitMQ" Version="10.1.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Kacho.RabbitMQ" Version="10.1.1" />
                    
Directory.Packages.props
<PackageReference Include="Kacho.RabbitMQ" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Kacho.RabbitMQ --version 10.1.1
                    
#r "nuget: Kacho.RabbitMQ, 10.1.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Kacho.RabbitMQ@10.1.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Kacho.RabbitMQ&version=10.1.1
                    
Install as a Cake Addin
#tool nuget:?package=Kacho.RabbitMQ&version=10.1.1
                    
Install as a Cake Tool

Kacho.RabbitMQ

A lightweight, async-friendly .NET wrapper for the RabbitMQ.Client library, designed to simplify message publishing and consumption with robust connection management, dynamic entity declaration, and flexible configuration.

Overview

Kacho.RabbitMQ provides a high-level abstraction over the RabbitMQ.Client library, streamlining the process of interacting with RabbitMQ for .NET applications. It supports asynchronous operations, connection pooling, dynamic scaling, and configuration-driven setup for exchanges, queues, bindings, publishers, and consumers. The library integrates seamlessly with dependency injection and configuration systems, making it ideal for modern .NET applications requiring reliable message queue interactions.

Features

  • Asynchronous Operations: Fully async/await compatible for publishing, consuming, and connection management.
  • Connection Management: Thread-safe connection handling with automatic retries and shared connection support.
  • Dynamic Entity Declaration: Configurable declaration of exchanges, queues, and bindings via code or configuration files.
  • Channel Pooling: Utilizes Kacho.ASyncObjectPool for efficient channel management with dynamic scaling.
  • Flexible Message Processing: Supports custom message processors with success, failure, retry, and cancellation outcomes, including advanced retry mechanisms via dead-letter exchanges.
  • Configuration-Driven: Integrates with Microsoft.Extensions.Configuration for easy setup via JSON or other configuration sources.
  • Logging and Monitoring: Built-in logging for connection, publishing, and consumption activities.
  • Dependency Injection: Seamless integration with Microsoft.Extensions.DependencyInjection.

Installation

Install the package via NuGet:

dotnet add package Kacho.RabbitMQ

Dependencies

  • .NET 10.0
  • Kacho.ASyncObjectPool (10.1.0)
  • RabbitMQ.Client (7.2.0)
  • Microsoft.Extensions.Configuration.Binder (10.0.2)
  • Microsoft.Extensions.Logging.Abstractions (10.0.2)

Usage

Configuration Setup

Configure RabbitMQ settings using a JSON configuration file (e.g., appsettings.json). The example below includes a retry queue and exchange for handling message retries with TTL and dead-lettering:

{
  "RabbitMQ": {
    "Connections": {
      "default": {
        "Hosts": ["localhost"],
        "Factory": {
          "UserName": "guest",
          "Password": "guest"
        }
      }
    },
    "Entities": {
      "mainExchange": {
        "Type": "exchange",
        "ExchangeName": "main.exchange",
        "ExchangeType": "topic",
        "Durable": true
      },
      "mainQueue": {
        "Type": "queue",
        "QueueName": "main.queue",
        "Durable": true
      },
      "mainBinding": {
        "Type": "binding",
        "ExchangeName": "main.exchange",
        "QueueName": "main.queue",
        "RoutingKey": "main.#"
      },
      "retryExchange": {
        "Type": "exchange",
        "ExchangeName": "retry.exchange",
        "ExchangeType": "direct",
        "Durable": true
      },
      "retryQueue": {
        "Type": "queue",
        "QueueName": "retry.queue",
        "Durable": true,
        "Arguments": {
          "x-message-ttl": 5000,
          "x-dead-letter-exchange": "main.exchange",
          "x-dead-letter-routing-key": "main.event"
        }
      },
      "retryBinding": {
        "Type": "binding",
        "ExchangeName": "retry.exchange",
        "QueueName": "retry.queue",
        "RoutingKey": "retry"
      }
    },
    "Consumers": {
      "mainConsumer": {
        "ConnectionName": "default",
        "QueueName": "main.queue",
        "WorkerCount": 2,
        "ConsumerCount": 2,
        "PrefetchCount": 50,
        "BufferCapacity": 100
      }
    },
    "Publishers": {
      "mainPublisher": {
        "ConnectionName": "default",
        "PoolOptions": {
          "InitialPoolSize": 5,
          "MinPoolSize": 2,
          "MaxPoolSize": 10
        }
      }
    },
    "AutoDeclare": {
      "default": ["mainExchange", "mainQueue", "mainBinding", "retryExchange", "retryQueue", "retryBinding"]
    }
  }
}

Setting Up Dependency Injection

Register the RabbitMQ services in your Program.cs:

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Kacho.RabbitMQ.Extensions;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddRabbitMQ(builder.Configuration.GetSection("RabbitMQ"));

var app = builder.Build();
await app.Services.GetRequiredService<IRabbitMQFactory>().InitializeAsync();
app.Run();

Implementing a Message Processor with Retry Logic

Create a custom message processor that handles retries by publishing to a retry queue with a TTL, tracking retry counts in message headers, and stopping retries when the counter reaches zero:

using Kacho.RabbitMQ.Consumer;
using Kacho.RabbitMQ.Publisher;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

public class MyMessageProcessor : IMessageProcessor
{
    private readonly ILogger<MyMessageProcessor> _logger;
    private readonly IRabbitMQPublisher _publisher;
    private const int MaxRetries = 3;
    private const string RetryExchange = "retry.exchange";
    private const string RetryRoutingKey = "retry";

    public MyMessageProcessor(ILogger<MyMessageProcessor> logger, IRabbitMQFactory factory)
    {
        _logger = logger;
        _publisher = factory.RabbitMQPublisher("mainPublisher");
    }

    public async Task<IMessageProcessingResult> ProcessMessageAsync(BasicDeliverEventArgs message, Activity activity, CancellationToken cancellationToken)
    {
        try
        {
            var body = Encoding.UTF8.GetString(message.Body.ToArray());
            _logger.LogInformation("Processing message: {Body}", body);

            // Simulate processing logic that may fail
            if (body.Contains("fail"))
                throw new Exception("Simulated processing failure");

            await Task.Delay(100, cancellationToken); // Simulate work
            return MessageProcessingResult.Success();
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process message");
            return MessageProcessingResult.Retry();
        }
    }

    public async Task AcknowledgeMessageAsync(IChannel channel, BasicDeliverEventArgs message, IMessageProcessingResult result)
    {
        switch (result.Outcome)
        {
            case ProcessingOutcome.Success:
                await channel.BasicAckAsync(message.DeliveryTag, false);
                _logger.LogInformation("Message acknowledged successfully");
                break;
            case ProcessingOutcome.Retry:
                var headers = message.BasicProperties.Headers ?? new Dictionary<string, object>();
                int retryCount = headers.TryGetValue("x-retry-count", out var count) ? Convert.ToInt32(count) : MaxRetries;

                if (retryCount <= 0)
                {
                    _logger.LogWarning("Max retries reached for message, rejecting");
                    await channel.BasicNackAsync(message.DeliveryTag, false, false);
                    return;
                }

                // Decrement retry count and publish to retry queue
                retryCount--;
                headers["x-retry-count"] = retryCount;
                var properties = channel.CreateBasicProperties();
                properties.Headers = headers;
                properties.Persistent = true;

                await _publisher.PublishAsync(Encoding.UTF8.GetString(message.Body.ToArray()), RetryExchange, RetryRoutingKey, properties);
                _logger.LogInformation("Message sent to retry queue with {RetryCount} retries remaining", retryCount);

                // Acknowledge the original message
                await channel.BasicAckAsync(message.DeliveryTag, false);
                break;
            case ProcessingOutcome.Failure:
            case ProcessingOutcome.Cancelled:
                _logger.LogWarning("Message failed or cancelled, rejecting");
                await channel.BasicNackAsync(message.DeliveryTag, false, false);
                break;
        }
    }

    public async Task AcknowledgeMessageAsync(IChannel channel, BasicDeliverEventArgs message, Exception exception)
    {
        _logger.LogError(exception, "Acknowledging message due to error");
        await AcknowledgeMessageAsync(channel, message, MessageProcessingResult.Retry());
    }
}

Publishing Messages

Use IRabbitMQPublisher to publish messages:

using Kacho.RabbitMQ.Factory;

public async Task PublishMessage(IRabbitMQFactory factory)
{
    var publisher = factory.RabbitMQPublisher("mainPublisher");
    var message = new { Id = 1, Content = "Hello, RabbitMQ!" };
    var properties = new BasicProperties { Headers = new Dictionary<string, object> { { "x-retry-count", 3 } } };
    await publisher.PublishAsync(message, "main.exchange", "main.event", properties);
}

Consuming Messages

Start a consumer with the custom message processor:

using Kacho.RabbitMQ.Factory;

public async Task ConsumeMessages(IRabbitMQFactory factory, IMessageProcessor processor)
{
    var consumer = factory.RabbitMQConsumer("mainConsumer", processor);
    await consumer.StartConsuming("main.queue");
    // Stop when needed
    // await consumer.StopAsync();
}

Key Components

Connection Management

  • ConnectionManager: Ensures thread-safe RabbitMQ connections with automatic retries (up to 3 attempts with 1-second delays).
  • RabbitConnection: Configures connection details (hosts, credentials, etc.) via a ConnectionFactory.

Entities

  • ExchangeEntity: Configures and declares RabbitMQ exchanges (e.g., topic, direct) with options like durability and custom arguments.
  • QueueEntity: Declares queues with properties like durability, exclusivity, and dead-lettering options (e.g., TTL and dead-letter exchange).
  • BindingEntity: Binds queues to exchanges with routing keys and optional arguments.

Publishing

  • RabbitMQPublisher: Publishes messages using a pooled IChannel via Kacho.ASyncObjectPool. Supports JSON serialization by default.
  • IMessageSerializer: Customizable message serialization (default: JsonMessageSerializer).

Consuming

  • RabbitMQConsumer: Manages multiple consumers and workers for processing messages from a queue. Uses a bounded channel for buffering and supports configurable prefetch counts.
  • IMessageProcessor: Defines the logic for processing and acknowledging messages with outcomes (Success, Failure, Retry, Cancelled). Supports advanced retry mechanisms via a retry queue with TTL and dead-lettering.

Factory

  • RabbitMQFactory: Central factory for creating publishers and consumers, initializing connections, and declaring entities based on configuration.
  • RabbitMQRootConfig: Holds configuration for connections, entities, consumers, publishers, and auto-declaration settings.

Configuration Options

ConsumerConfig

  • ConnectionName: Name of the RabbitMQ connection to use.
  • SharedConnectionKey: Optional key for sharing connections across consumers/publishers.
  • QueueName: The queue to consume from.
  • BufferCapacity: Capacity of the internal message channel (default: 100).
  • WorkerCount: Number of worker tasks for processing messages (default: 2).
  • ConsumerCount: Number of RabbitMQ consumers (default: 2).
  • PrefetchCount: Number of messages to prefetch per consumer (default: 50).

PublisherConfig

  • ConnectionName: Name of the RabbitMQ connection to use.
  • SharedConnectionKey: Optional key for sharing connections.
  • PoolOptions: Configures the channel pool using AsyncObjectPoolOptions.

Entity Configuration

Entities (ExchangeEntity, QueueEntity, BindingEntity) support properties like durability, auto-delete, and custom arguments (e.g., TTL, dead-lettering).

Notes

  • Ensure RabbitMQ server is running and accessible at the configured hosts.
  • The library uses Kacho.ASyncObjectPool for efficient channel management, which dynamically scales based on demand.
  • Implement IMessageProcessor carefully to handle message processing and acknowledgment logic, especially for retry scenarios.
  • The retry queue example uses a TTL of 5000ms and dead-letters back to the original queue via main.exchange. Adjust TTL and retry counts as needed.
  • Use configuration files to manage complex setups with multiple exchanges, queues, and bindings.
  • Logging is integrated via Microsoft.Extensions.Logging for debugging and monitoring.

License

MIT License

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
10.1.1 115 1/19/2026
10.1.0 115 1/19/2026
10.0.0 120 1/3/2026
1.0.0 239 6/2/2025