Messaggero 0.0.1
dotnet add package Messaggero --version 0.0.1
NuGet\Install-Package Messaggero -Version 0.0.1
<PackageReference Include="Messaggero" Version="0.0.1" />
<PackageVersion Include="Messaggero" Version="0.0.1" />
<PackageReference Include="Messaggero" />
paket add Messaggero --version 0.0.1
#r "nuget: Messaggero, 0.0.1"
#:package Messaggero@0.0.1
#addin nuget:?package=Messaggero&version=0.0.1
#tool nuget:?package=Messaggero&version=0.0.1
Messaggero
A broker-agnostic messaging library for .NET with multi-transport routing, fan-out/fan-in delivery, retry policies, and OpenTelemetry observability.

Overview
Messaggero decouples application code from broker-specific APIs by exposing a single stable interface for publishing and consuming messages. Multiple transport adapters (Kafka, RabbitMQ) can be active simultaneously within the same process. A message-type-based routing layer decides which transport handles each message — no broker logic leaks into application code.
- Single
IMessageBus.PublishAsynccall regardless of broker - Swappable, simultaneously active transports
- Configuration-driven routing (including fan-out to multiple transports)
- Class-based handlers with optional lifecycle hooks
- Built-in retry policies with fixed or exponential backoff
- OpenTelemetry integration via
EnableObservability() - In-memory test double (
Messaggero.Testing) for unit and integration tests
Packages
| Package | Description |
|---|---|
Messaggero |
Core library — abstractions, routing, hosting, DI registration |
Messaggero.Kafka |
Apache Kafka transport adapter (at-least-once, acks=all) |
Messaggero.RabbitMQ |
RabbitMQ transport adapter (at-least-once, publisher confirms) |
Messaggero.Testing |
In-memory adapter and TestMessageBus assertion helper |
Quick Start
1. Define a message
public sealed class OrderPlaced
{
public required string OrderId { get; init; }
public required decimal Total { get; init; }
}
2. Implement a handler
public sealed class OrderPlacedHandler : IMessageHandler<OrderPlaced>
{
public Task HandleAsync(
OrderPlaced message,
MessageContext context,
CancellationToken cancellationToken)
{
Console.WriteLine($"Order {message.OrderId} via {context.SourceTransport} " +
$"(attempt {context.DeliveryAttempt})");
return Task.CompletedTask;
}
}
3. Register with the DI container
builder.Services.AddMessaggero(messaging =>
{
messaging
.AddKafka("kafka", kafka =>
{
kafka.BootstrapServers = "localhost:9092";
kafka.GroupId = "my-service";
})
.Route<OrderPlaced>(r => r.ToTransport("kafka"))
.RegisterHandler<OrderPlacedHandler, OrderPlaced>()
.EnableObservability();
});
4. Publish a message
public class OrderService(IMessageBus bus)
{
public async Task PlaceOrderAsync(string orderId, decimal total)
{
var result = await bus.PublishAsync(new OrderPlaced
{
OrderId = orderId,
Total = total,
});
if (!result.IsSuccess)
throw new InvalidOperationException($"Publish failed: {result}");
}
}
Configuration Reference
AddMessaggero
services.AddMessaggero(messaging =>
{
messaging
// ── transports ────────────────────────────────────────────
.AddKafka("kafka", kafka => { ... })
.AddRabbitMQ("rmq", rabbit => { ... })
// ── routing ───────────────────────────────────────────────
.Route<OrderPlaced> (r => r.ToTransport("kafka"))
.Route<EmailRequested>(r => r.ToTransport("rmq"))
.Route<AuditEvent> (r => r.ToTransport("kafka").ToTransport("rmq")) // fan-out
// ── handlers ─────────────────────────────────────────────
.RegisterHandler<OrderPlacedHandler, OrderPlaced>()
.RegisterHandler<EmailHandler, EmailRequested>(opts =>
{
opts.MaxConcurrency = 4;
})
// ── serialization / observability ─────────────────────────
.UseDefaultSerializer(new ProtobufMessageSerializer())
.EnableObservability();
});
Transport adapters
Kafka (Messaggero.Kafka)
.AddKafka("kafka", kafka =>
{
kafka.BootstrapServers = "localhost:9092"; // default: $MESSAGGERO_KAFKA_BOOTSTRAP_SERVERS
kafka.GroupId = "my-service"; // default: $MESSAGGERO_KAFKA_GROUP_ID
kafka.PrefetchCount = 100; // messages buffered per consumer
// Optional per-adapter retry policy
kafka.RetryPolicy = new RetryPolicyOptions
{
MaxAttempts = 5,
BackoffStrategy = BackoffStrategy.Exponential,
InitialDelay = TimeSpan.FromSeconds(1),
MaxDelay = TimeSpan.FromSeconds(30),
DeadLetterDestination = new Destination { Name = "orders-dlq" },
};
// Pass any Confluent.Kafka producer/consumer config directly
kafka.ProducerConfig["socket.keepalive.enable"] = "true";
kafka.ConsumerConfig["fetch.min.bytes"] = "1024";
})
| Property | Default | Description |
|---|---|---|
BootstrapServers |
localhost:9092 |
Comma-separated host:port list |
GroupId |
messaggero-default |
Consumer group identifier |
PrefetchCount |
adapter default | Max in-flight messages per consumer |
ProducerConfig |
{} |
Raw Confluent.Kafka producer properties |
ConsumerConfig |
{} |
Raw Confluent.Kafka consumer properties |
Environment variables: MESSAGGERO_KAFKA_BOOTSTRAP_SERVERS, MESSAGGERO_KAFKA_GROUP_ID
RabbitMQ (Messaggero.RabbitMQ)
.AddRabbitMQ("rmq", rabbit =>
{
rabbit.HostName = "localhost"; // default: $MESSAGGERO_RABBITMQ_HOST
rabbit.Port = 5672; // default: $MESSAGGERO_RABBITMQ_PORT
rabbit.UserName = "guest";
rabbit.Password = "guest";
rabbit.VirtualHost = "/";
rabbit.AutomaticRecoveryEnabled = true;
rabbit.PrefetchCount = 50;
})
| Property | Default | Description |
|---|---|---|
HostName |
localhost |
Broker hostname |
Port |
5672 |
AMQP port |
UserName |
guest |
Credentials |
Password |
guest |
Credentials |
VirtualHost |
/ |
Virtual host |
AutomaticRecoveryEnabled |
true |
Reconnect on connection drop |
PrefetchCount |
adapter default | BasicQos prefetch count |
Environment variables: MESSAGGERO_RABBITMQ_HOST, MESSAGGERO_RABBITMQ_PORT
Routing
By default the destination name is the message type name lowercased (e.g. OrderPlaced → orderplaced).
// Custom topic / queue name
.Route<OrderPlaced>(r => r.ToTransport("kafka").ToDestination("orders-v2"))
// RabbitMQ exchange with routing key override
.Route<OrderPlaced>(r => r
.ToTransport("rmq")
.ToDestination("events", new Dictionary<string, string>
{
["routingKey"] = "orders.placed",
}))
// Fan-out — delivered to both transports
.Route<AuditEvent>(r => r.ToTransport("kafka").ToTransport("rmq"))
Handlers
// Sequential delivery (default)
.RegisterHandler<OrderPlacedHandler, OrderPlaced>()
// Concurrent delivery — up to 5 messages processed in parallel
.RegisterHandler<OrderPlacedHandler, OrderPlaced>(opts =>
{
opts.MaxConcurrency = 5;
})
// Transport-scoped — only receives messages from a specific adapter
.RegisterHandler<KafkaAuditHandler, AuditEvent>(opts =>
{
opts.TransportScope = "kafka";
})
HandlerOptions
| Property | Default | Description |
|---|---|---|
MaxConcurrency |
1 |
Max concurrent handler invocations |
TransportScope |
null (all) |
Restrict handler to one named transport |
Optional lifecycle hooks
Handlers can implement IHandlerLifecycle to run setup and teardown logic:
public sealed class OrderPlacedHandler : IMessageHandler<OrderPlaced>, IHandlerLifecycle
{
public Task InitializeAsync(CancellationToken cancellationToken) { /* warm-up */ return Task.CompletedTask; }
public Task DisposeAsync() { /* clean-up */ return Task.CompletedTask; }
public Task HandleAsync(OrderPlaced message, MessageContext context, CancellationToken ct)
=> Task.CompletedTask;
}
MessageContext
Every handler receives a MessageContext alongside the typed message:
| Property | Type | Description |
|---|---|---|
MessageId |
string |
Unique message identifier |
MessageType |
string |
CLR type name of the message |
SourceTransport |
string |
Name of the adapter that delivered the message |
Headers |
MessageHeaders |
Arbitrary key-value metadata |
Timestamp |
DateTimeOffset |
Time the message was produced |
DeliveryAttempt |
int |
1 on first delivery; incremented on retries |
Retry policy
kafka.RetryPolicy = new RetryPolicyOptions
{
MaxAttempts = 5,
BackoffStrategy = BackoffStrategy.Exponential, // Fixed | Exponential
InitialDelay = TimeSpan.FromSeconds(1),
MaxDelay = TimeSpan.FromSeconds(30),
DeadLetterDestination = new Destination { Name = "my-dlq" },
};
| Strategy | Behaviour |
|---|---|
Fixed |
Constant delay between every retry |
Exponential |
Delay doubles each attempt, capped at MaxDelay |
Custom serializer
The default serializer uses System.Text.Json. Swap it globally:
.UseDefaultSerializer(new ProtobufMessageSerializer())
Implement IMessageSerializer to integrate any serialization library:
public interface IMessageSerializer
{
string ContentType { get; }
byte[] Serialize<TMessage>(TMessage message) where TMessage : class;
TMessage Deserialize<TMessage>(ReadOnlySpan<byte> data) where TMessage : class;
}
Observability
.EnableObservability()
Enables OpenTelemetry tracing and metrics. Instruments publish and consume operations with activity spans and counters compatible with any OpenTelemetry-compatible backend (Jaeger, Prometheus, Azure Monitor, etc.).
Custom headers
var headers = new MessageHeaders();
headers["correlation-id"] = correlationId;
headers["tenant-id"] = tenantId;
await bus.PublishAsync(message, headers);
Headers are forwarded to the broker and available in MessageContext.Headers on the consumer side.
Multiple Transports Example
builder.Services.AddMessaggero(messaging =>
{
messaging
.AddKafka("kafka", kafka =>
{
kafka.BootstrapServers = "kafka:9092";
kafka.GroupId = "order-service";
})
.AddRabbitMQ("rmq", rabbit =>
{
rabbit.HostName = "rabbitmq";
rabbit.UserName = "app";
rabbit.Password = "secret";
})
// Each message type routed to the appropriate broker
.Route<OrderPlaced> (r => r.ToTransport("kafka"))
.Route<EmailRequested>(r => r.ToTransport("rmq"))
// Fan-out: audit events sent to both brokers
.Route<AuditEvent> (r => r.ToTransport("kafka").ToTransport("rmq"))
// Handlers
.RegisterHandler<OrderPlacedHandler, OrderPlaced> (opts => opts.MaxConcurrency = 4)
.RegisterHandler<EmailHandler, EmailRequested>()
.RegisterHandler<KafkaAuditHandler, AuditEvent> (opts => opts.TransportScope = "kafka")
.RegisterHandler<RabbitMqAuditHandler, AuditEvent> (opts => opts.TransportScope = "rmq")
.EnableObservability();
});
Testing
Add Messaggero.Testing to your test project.
TestMessageBus — lightweight publish assertions
Use TestMessageBus when you only need to verify that the correct messages are published from a unit:
[Fact]
public async Task PlaceOrder_PublishesOrderPlacedEvent()
{
var bus = new TestMessageBus();
var svc = new OrderService(bus);
await svc.PlaceOrderAsync("ORD-1", 99.99m);
bus.AssertPublished<OrderPlaced>();
var events = bus.GetPublishedMessages<OrderPlaced>();
events.Should().ContainSingle(e => e.OrderId == "ORD-1");
}
TestMessageBus API:
| Method | Description |
|---|---|
AssertPublished<T>() |
Throws if no message of type T was published |
GetPublishedMessages<T>() |
Returns all published messages of type T |
GetAllPublished() |
Returns all published messages with their type and headers |
Reset() |
Clears the captured publish list |
In-memory transport — full pipeline tests
Use AddInMemory to run the full Messaggero pipeline in-process without a real broker:
[Fact]
public async Task Handler_IsInvokedWhenMessagePublished()
{
var services = new ServiceCollection();
services.AddLogging();
services.AddMessaggero(messaging =>
{
messaging
.AddInMemory("mem")
.Route<OrderPlaced>(r => r.ToTransport("mem"))
.RegisterHandler<OrderPlacedHandler, OrderPlaced>();
});
await using var provider = services.BuildServiceProvider();
var host = provider.GetRequiredService<MessagingHost>();
await host.StartAsync(CancellationToken.None);
var bus = provider.GetRequiredService<IMessageBus>();
await bus.PublishAsync(new OrderPlaced { OrderId = "ORD-42", Total = 50m });
// assert via your handler's captured state, or inspect the adapter directly
}
Handler isolation — no library host needed
[Fact]
public async Task Handler_CanBeTestedInIsolation()
{
var handler = new OrderPlacedHandler();
var context = new MessageContext
{
MessageId = "test-1",
MessageType = nameof(OrderPlaced),
SourceTransport = "test",
Headers = new MessageHeaders(),
Timestamp = DateTimeOffset.UtcNow,
DeliveryAttempt = 1,
};
await handler.HandleAsync(
new OrderPlaced { OrderId = "ORD-42", Total = 99.99m },
context,
CancellationToken.None);
// assert handler side-effects directly
}
Delivery Semantics
| Adapter | Guarantee | Ordering | Ack model | Nack behaviour |
|---|---|---|---|---|
| Kafka | At-least-once | Per-partition (key = message type) | Manual offset commit | No-op — replayed on consumer restart |
| RabbitMQ | At-least-once | Per-queue FIFO | BasicAck per delivery tag |
BasicNack(requeue: false) → DLX |
| InMemory | At-most-once | FIFO per destination | Remove from pending set | Move to DeadLetterMessages list |
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0-preview.3.25171.5)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.0-preview.3.25171.5)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0-preview.3.25171.5)
- Microsoft.Extensions.Options (>= 10.0.0-preview.3.25171.5)
NuGet packages (3)
Showing the top 3 NuGet packages that depend on Messaggero:
| Package | Downloads |
|---|---|
|
Messaggero.Testing
In-memory transport adapter and test utilities (TestMessageBus, assertion helpers) for the Messaggero messaging library. |
|
|
Messaggero.Kafka
Apache Kafka transport adapter for the Messaggero messaging library. Provides at-least-once delivery with manual offset commit. |
|
|
Messaggero.RabbitMQ
RabbitMQ transport adapter for the Messaggero messaging library. Uses publisher confirms and manual ack for reliable delivery. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.0.1 | 81 | 4/19/2026 |