Kacho.RabbitMQ
10.1.1
dotnet add package Kacho.RabbitMQ --version 10.1.1
NuGet\Install-Package Kacho.RabbitMQ -Version 10.1.1
<PackageReference Include="Kacho.RabbitMQ" Version="10.1.1" />
<PackageVersion Include="Kacho.RabbitMQ" Version="10.1.1" />
<PackageReference Include="Kacho.RabbitMQ" />
paket add Kacho.RabbitMQ --version 10.1.1
#r "nuget: Kacho.RabbitMQ, 10.1.1"
#:package Kacho.RabbitMQ@10.1.1
#addin nuget:?package=Kacho.RabbitMQ&version=10.1.1
#tool nuget:?package=Kacho.RabbitMQ&version=10.1.1
Kacho.RabbitMQ
A lightweight, async-friendly .NET wrapper for the RabbitMQ.Client library, designed to simplify message publishing and consumption with robust connection management, dynamic entity declaration, and flexible configuration.
Overview
Kacho.RabbitMQ provides a high-level abstraction over the RabbitMQ.Client library, streamlining the process of interacting with RabbitMQ for .NET applications. It supports asynchronous operations, connection pooling, dynamic scaling, and configuration-driven setup for exchanges, queues, bindings, publishers, and consumers. The library integrates seamlessly with dependency injection and configuration systems, making it ideal for modern .NET applications requiring reliable message queue interactions.
Features
- Asynchronous Operations: Fully async/await compatible for publishing, consuming, and connection management.
- Connection Management: Thread-safe connection handling with automatic retries and shared connection support.
- Dynamic Entity Declaration: Configurable declaration of exchanges, queues, and bindings via code or configuration files.
- Channel Pooling: Utilizes
Kacho.ASyncObjectPoolfor efficient channel management with dynamic scaling. - Flexible Message Processing: Supports custom message processors with success, failure, retry, and cancellation outcomes, including advanced retry mechanisms via dead-letter exchanges.
- Configuration-Driven: Integrates with
Microsoft.Extensions.Configurationfor easy setup via JSON or other configuration sources. - Logging and Monitoring: Built-in logging for connection, publishing, and consumption activities.
- Dependency Injection: Seamless integration with
Microsoft.Extensions.DependencyInjection.
Installation
Install the package via NuGet:
dotnet add package Kacho.RabbitMQ
Dependencies
- .NET 10.0
Kacho.ASyncObjectPool(10.1.0)RabbitMQ.Client(7.2.0)Microsoft.Extensions.Configuration.Binder(10.0.2)Microsoft.Extensions.Logging.Abstractions(10.0.2)
Usage
Configuration Setup
Configure RabbitMQ settings using a JSON configuration file (e.g., appsettings.json). The example below includes a retry queue and exchange for handling message retries with TTL and dead-lettering:
{
"RabbitMQ": {
"Connections": {
"default": {
"Hosts": ["localhost"],
"Factory": {
"UserName": "guest",
"Password": "guest"
}
}
},
"Entities": {
"mainExchange": {
"Type": "exchange",
"ExchangeName": "main.exchange",
"ExchangeType": "topic",
"Durable": true
},
"mainQueue": {
"Type": "queue",
"QueueName": "main.queue",
"Durable": true
},
"mainBinding": {
"Type": "binding",
"ExchangeName": "main.exchange",
"QueueName": "main.queue",
"RoutingKey": "main.#"
},
"retryExchange": {
"Type": "exchange",
"ExchangeName": "retry.exchange",
"ExchangeType": "direct",
"Durable": true
},
"retryQueue": {
"Type": "queue",
"QueueName": "retry.queue",
"Durable": true,
"Arguments": {
"x-message-ttl": 5000,
"x-dead-letter-exchange": "main.exchange",
"x-dead-letter-routing-key": "main.event"
}
},
"retryBinding": {
"Type": "binding",
"ExchangeName": "retry.exchange",
"QueueName": "retry.queue",
"RoutingKey": "retry"
}
},
"Consumers": {
"mainConsumer": {
"ConnectionName": "default",
"QueueName": "main.queue",
"WorkerCount": 2,
"ConsumerCount": 2,
"PrefetchCount": 50,
"BufferCapacity": 100
}
},
"Publishers": {
"mainPublisher": {
"ConnectionName": "default",
"PoolOptions": {
"InitialPoolSize": 5,
"MinPoolSize": 2,
"MaxPoolSize": 10
}
}
},
"AutoDeclare": {
"default": ["mainExchange", "mainQueue", "mainBinding", "retryExchange", "retryQueue", "retryBinding"]
}
}
}
Setting Up Dependency Injection
Register the RabbitMQ services in your Program.cs:
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Kacho.RabbitMQ.Extensions;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddRabbitMQ(builder.Configuration.GetSection("RabbitMQ"));
var app = builder.Build();
await app.Services.GetRequiredService<IRabbitMQFactory>().InitializeAsync();
app.Run();
Implementing a Message Processor with Retry Logic
Create a custom message processor that handles retries by publishing to a retry queue with a TTL, tracking retry counts in message headers, and stopping retries when the counter reaches zero:
using Kacho.RabbitMQ.Consumer;
using Kacho.RabbitMQ.Publisher;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
public class MyMessageProcessor : IMessageProcessor
{
private readonly ILogger<MyMessageProcessor> _logger;
private readonly IRabbitMQPublisher _publisher;
private const int MaxRetries = 3;
private const string RetryExchange = "retry.exchange";
private const string RetryRoutingKey = "retry";
public MyMessageProcessor(ILogger<MyMessageProcessor> logger, IRabbitMQFactory factory)
{
_logger = logger;
_publisher = factory.RabbitMQPublisher("mainPublisher");
}
public async Task<IMessageProcessingResult> ProcessMessageAsync(BasicDeliverEventArgs message, Activity activity, CancellationToken cancellationToken)
{
try
{
var body = Encoding.UTF8.GetString(message.Body.ToArray());
_logger.LogInformation("Processing message: {Body}", body);
// Simulate processing logic that may fail
if (body.Contains("fail"))
throw new Exception("Simulated processing failure");
await Task.Delay(100, cancellationToken); // Simulate work
return MessageProcessingResult.Success();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process message");
return MessageProcessingResult.Retry();
}
}
public async Task AcknowledgeMessageAsync(IChannel channel, BasicDeliverEventArgs message, IMessageProcessingResult result)
{
switch (result.Outcome)
{
case ProcessingOutcome.Success:
await channel.BasicAckAsync(message.DeliveryTag, false);
_logger.LogInformation("Message acknowledged successfully");
break;
case ProcessingOutcome.Retry:
var headers = message.BasicProperties.Headers ?? new Dictionary<string, object>();
int retryCount = headers.TryGetValue("x-retry-count", out var count) ? Convert.ToInt32(count) : MaxRetries;
if (retryCount <= 0)
{
_logger.LogWarning("Max retries reached for message, rejecting");
await channel.BasicNackAsync(message.DeliveryTag, false, false);
return;
}
// Decrement retry count and publish to retry queue
retryCount--;
headers["x-retry-count"] = retryCount;
var properties = channel.CreateBasicProperties();
properties.Headers = headers;
properties.Persistent = true;
await _publisher.PublishAsync(Encoding.UTF8.GetString(message.Body.ToArray()), RetryExchange, RetryRoutingKey, properties);
_logger.LogInformation("Message sent to retry queue with {RetryCount} retries remaining", retryCount);
// Acknowledge the original message
await channel.BasicAckAsync(message.DeliveryTag, false);
break;
case ProcessingOutcome.Failure:
case ProcessingOutcome.Cancelled:
_logger.LogWarning("Message failed or cancelled, rejecting");
await channel.BasicNackAsync(message.DeliveryTag, false, false);
break;
}
}
public async Task AcknowledgeMessageAsync(IChannel channel, BasicDeliverEventArgs message, Exception exception)
{
_logger.LogError(exception, "Acknowledging message due to error");
await AcknowledgeMessageAsync(channel, message, MessageProcessingResult.Retry());
}
}
Publishing Messages
Use IRabbitMQPublisher to publish messages:
using Kacho.RabbitMQ.Factory;
public async Task PublishMessage(IRabbitMQFactory factory)
{
var publisher = factory.RabbitMQPublisher("mainPublisher");
var message = new { Id = 1, Content = "Hello, RabbitMQ!" };
var properties = new BasicProperties { Headers = new Dictionary<string, object> { { "x-retry-count", 3 } } };
await publisher.PublishAsync(message, "main.exchange", "main.event", properties);
}
Consuming Messages
Start a consumer with the custom message processor:
using Kacho.RabbitMQ.Factory;
public async Task ConsumeMessages(IRabbitMQFactory factory, IMessageProcessor processor)
{
var consumer = factory.RabbitMQConsumer("mainConsumer", processor);
await consumer.StartConsuming("main.queue");
// Stop when needed
// await consumer.StopAsync();
}
Key Components
Connection Management
ConnectionManager: Ensures thread-safe RabbitMQ connections with automatic retries (up to 3 attempts with 1-second delays).RabbitConnection: Configures connection details (hosts, credentials, etc.) via aConnectionFactory.
Entities
ExchangeEntity: Configures and declares RabbitMQ exchanges (e.g., topic, direct) with options like durability and custom arguments.QueueEntity: Declares queues with properties like durability, exclusivity, and dead-lettering options (e.g., TTL and dead-letter exchange).BindingEntity: Binds queues to exchanges with routing keys and optional arguments.
Publishing
RabbitMQPublisher: Publishes messages using a pooledIChannelviaKacho.ASyncObjectPool. Supports JSON serialization by default.IMessageSerializer: Customizable message serialization (default:JsonMessageSerializer).
Consuming
RabbitMQConsumer: Manages multiple consumers and workers for processing messages from a queue. Uses a bounded channel for buffering and supports configurable prefetch counts.IMessageProcessor: Defines the logic for processing and acknowledging messages with outcomes (Success, Failure, Retry, Cancelled). Supports advanced retry mechanisms via a retry queue with TTL and dead-lettering.
Factory
RabbitMQFactory: Central factory for creating publishers and consumers, initializing connections, and declaring entities based on configuration.RabbitMQRootConfig: Holds configuration for connections, entities, consumers, publishers, and auto-declaration settings.
Configuration Options
ConsumerConfig
ConnectionName: Name of the RabbitMQ connection to use.SharedConnectionKey: Optional key for sharing connections across consumers/publishers.QueueName: The queue to consume from.BufferCapacity: Capacity of the internal message channel (default: 100).WorkerCount: Number of worker tasks for processing messages (default: 2).ConsumerCount: Number of RabbitMQ consumers (default: 2).PrefetchCount: Number of messages to prefetch per consumer (default: 50).
PublisherConfig
ConnectionName: Name of the RabbitMQ connection to use.SharedConnectionKey: Optional key for sharing connections.PoolOptions: Configures the channel pool usingAsyncObjectPoolOptions.
Entity Configuration
Entities (ExchangeEntity, QueueEntity, BindingEntity) support properties like durability, auto-delete, and custom arguments (e.g., TTL, dead-lettering).
Notes
- Ensure RabbitMQ server is running and accessible at the configured hosts.
- The library uses
Kacho.ASyncObjectPoolfor efficient channel management, which dynamically scales based on demand. - Implement
IMessageProcessorcarefully to handle message processing and acknowledgment logic, especially for retry scenarios. - The retry queue example uses a TTL of 5000ms and dead-letters back to the original queue via
main.exchange. Adjust TTL and retry counts as needed. - Use configuration files to manage complex setups with multiple exchanges, queues, and bindings.
- Logging is integrated via
Microsoft.Extensions.Loggingfor debugging and monitoring.
License
MIT License
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Kacho.ASyncObjectPool (>= 10.1.0)
- Microsoft.Extensions.Configuration.Binder (>= 10.0.2)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.2)
- RabbitMQ.Client (>= 7.2.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.