Messaggero.RabbitMQ
0.0.1
dotnet add package Messaggero.RabbitMQ --version 0.0.1
NuGet\Install-Package Messaggero.RabbitMQ -Version 0.0.1
<PackageReference Include="Messaggero.RabbitMQ" Version="0.0.1" />
<PackageVersion Include="Messaggero.RabbitMQ" Version="0.0.1" />
<PackageReference Include="Messaggero.RabbitMQ" />
paket add Messaggero.RabbitMQ --version 0.0.1
#r "nuget: Messaggero.RabbitMQ, 0.0.1"
#:package Messaggero.RabbitMQ@0.0.1
#addin nuget:?package=Messaggero.RabbitMQ&version=0.0.1
#tool nuget:?package=Messaggero.RabbitMQ&version=0.0.1
Messaggero.RabbitMQ
RabbitMQ transport adapter for Messaggero. Provides at-least-once delivery with publisher confirms and manual BasicAck/BasicNack via RabbitMQ.Client v7.
Quick Start
// Program.cs
builder.Services.AddMessaggero(messaging =>
{
messaging
.AddRabbitMQ("rmq", opts =>
{
opts.HostName = "localhost";
opts.UserName = "guest";
opts.Password = "guest";
})
.Route<OrderPlaced>(rule => rule
.ToTransport("rmq")
.ToDestination("orders"))
.RegisterHandler<OrderPlacedHandler, OrderPlaced>();
});
Configuration
Configure via RabbitMqOptions inside AddRabbitMQ:
| Property | Default | Environment Variable | Description |
|---|---|---|---|
HostName |
localhost |
MESSAGGERO_RABBITMQ_HOST |
Broker hostname |
Port |
5672 |
MESSAGGERO_RABBITMQ_PORT |
AMQP port |
UserName |
guest |
— | Credentials |
Password |
guest |
— | Credentials |
VirtualHost |
/ |
— | RabbitMQ virtual host |
AutomaticRecoveryEnabled |
true |
— | Reconnect automatically on connection drop |
PrefetchCount |
100 |
— | BasicQos prefetch count per consumer channel |
RetryPolicy |
see below | — | Per-transport retry and dead-letter settings |
Environment Variables
You can omit HostName and Port from code and supply them via environment variables instead:
MESSAGGERO_RABBITMQ_HOST=rabbitmq
MESSAGGERO_RABBITMQ_PORT=5672
Retry Policy
messaging.AddRabbitMQ("rmq", opts =>
{
opts.HostName = "rabbitmq";
opts.RetryPolicy = new RetryPolicyOptions
{
MaxAttempts = 5,
BackoffStrategy = BackoffStrategy.Exponential,
InitialDelay = TimeSpan.FromSeconds(1),
MaxDelay = TimeSpan.FromSeconds(30),
DeadLetterDestination = new Destination { Name = "orders.dlq" }
};
});
| Property | Default | Description |
|---|---|---|
MaxAttempts |
3 |
Total attempts including the first |
BackoffStrategy |
Exponential |
Exponential or Fixed |
InitialDelay |
1s |
Delay before the first retry |
MaxDelay |
30s |
Upper cap on exponential backoff |
DeadLetterDestination |
null |
Queue to route messages to after retries exhausted |
Defining Messages and Handlers
// Message contract
public record OrderPlaced(string OrderId, decimal Total);
// Handler
public class OrderPlacedHandler : IMessageHandler<OrderPlaced>
{
public async Task HandleAsync(OrderPlaced message, MessageContext context, CancellationToken ct)
{
// process message
// returning without throwing = BasicAck
}
}
Throwing from HandleAsync triggers the retry policy. After all attempts are exhausted the message is routed to DeadLetterDestination if configured, otherwise a BasicNack(requeue: false) is sent — RabbitMQ will forward it to a configured Dead Letter Exchange (DLX).
Publishing
Inject IMessageBus anywhere in your application:
public class OrderService(IMessageBus bus)
{
public async Task PlaceOrderAsync(string orderId, decimal total, CancellationToken ct)
{
var result = await bus.PublishAsync(new OrderPlaced(orderId, total), ct);
if (result.IsSuccess)
{
// result.Outcomes["rmq"]["exchange"] — exchange published to
// result.Outcomes["rmq"]["routingKey"] — routing key used
}
}
}
The adapter publishes with publisher confirms enabled (publisherConfirmationsEnabled: true). PublishAsync awaits the broker confirm before returning, ensuring the message is persisted by the broker.
Custom Routing Key
By default the routing key equals the destination name. Override it per route:
.Route<OrderPlaced>(r => r
.ToTransport("rmq")
.ToDestination("events", new Dictionary<string, string>
{
["routingKey"] = "orders.placed"
}))
Publishing with Custom Headers
var headers = new MessageHeaders();
headers.Set("correlation-id", correlationId);
headers.Set("tenant-id", tenantId);
await bus.PublishAsync(new OrderPlaced(orderId, total), headers, ct);
Headers are forwarded as AMQP message headers and available in MessageContext.Headers on the consumer side.
Multiple RabbitMQ Transports
Register multiple named adapters for separate vhosts or clusters:
messaging
.AddRabbitMQ("orders-rmq", opts =>
{
opts.HostName = "rabbitmq";
opts.VirtualHost = "/orders";
opts.UserName = "orders-app";
opts.Password = "secret";
})
.AddRabbitMQ("notifications-rmq", opts =>
{
opts.HostName = "rabbitmq";
opts.VirtualHost = "/notifications";
opts.UserName = "notify-app";
opts.Password = "secret";
})
.Route<OrderPlaced> (r => r.ToTransport("orders-rmq"))
.Route<EmailRequested> (r => r.ToTransport("notifications-rmq"))
.RegisterHandler<OrderPlacedHandler, OrderPlaced> (opts =>
opts.TransportScope = "orders-rmq")
.RegisterHandler<EmailHandler, EmailRequested> (opts =>
opts.TransportScope = "notifications-rmq");
Delivery Semantics
- Producer: messages are published as
Persistent = truewith publisher confirms.PublishAsynconly returns success after the broker has persisted the message. - Consumer:
autoAck: false, prefetch controlled byPrefetchCount.BasicAckis sent only afterHandleAsyncreturns without throwing. - Rejection: a failed message receives
BasicNack(requeue: false). Configure a Dead Letter Exchange on the queue in RabbitMQ to capture rejected messages. - At-least-once: a message may be redelivered if the process crashes after
HandleAsynccompletes but beforeBasicAckis sent.
Requirements
- .NET 10.0+
- RabbitMQ.Client 7.1.2+
- A running RabbitMQ broker (3.13+ recommended)
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Messaggero (>= 0.0.1)
- RabbitMQ.Client (>= 7.1.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.0.1 | 99 | 4/19/2026 |