Messaggero.Kafka
0.0.1
dotnet add package Messaggero.Kafka --version 0.0.1
NuGet\Install-Package Messaggero.Kafka -Version 0.0.1
<PackageReference Include="Messaggero.Kafka" Version="0.0.1" />
<PackageVersion Include="Messaggero.Kafka" Version="0.0.1" />
<PackageReference Include="Messaggero.Kafka" />
paket add Messaggero.Kafka --version 0.0.1
#r "nuget: Messaggero.Kafka, 0.0.1"
#:package Messaggero.Kafka@0.0.1
#addin nuget:?package=Messaggero.Kafka&version=0.0.1
#tool nuget:?package=Messaggero.Kafka&version=0.0.1
Messaggero.Kafka
Apache Kafka transport adapter for Messaggero. Provides at-least-once delivery with manual offset commit via Confluent.Kafka.
Quick Start
// Program.cs
builder.Services.AddMessaggero(messaging =>
{
messaging
.AddKafka("kafka", opts =>
{
opts.BootstrapServers = "localhost:9092";
opts.GroupId = "my-service";
})
.Route<OrderPlaced>(rule => rule
.To("kafka")
.OnTopic("orders"))
.RegisterHandler<OrderPlacedHandler, OrderPlaced>();
});
Configuration
Configure via KafkaOptions inside AddKafka:
| Property | Default | Environment Variable | Description |
|---|---|---|---|
BootstrapServers |
localhost:9092 |
MESSAGGERO_KAFKA_BOOTSTRAP_SERVERS |
Comma-separated list of broker addresses |
GroupId |
messaggero-default |
MESSAGGERO_KAFKA_GROUP_ID |
Consumer group ID |
ProducerConfig |
{} |
— | Extra Confluent producer key/value pairs |
ConsumerConfig |
{} |
— | Extra Confluent consumer key/value pairs |
PrefetchCount |
100 |
— | Max unprocessed messages held in memory (maps to MaxPartitionFetchBytes) |
RetryPolicy |
see below | — | Per-transport retry and dead-letter settings |
Environment Variables
You can omit BootstrapServers and GroupId from code and set them through environment variables instead:
MESSAGGERO_KAFKA_BOOTSTRAP_SERVERS=broker1:9092,broker2:9092
MESSAGGERO_KAFKA_GROUP_ID=my-service
Retry Policy
messaging.AddKafka("kafka", opts =>
{
opts.BootstrapServers = "broker:9092";
opts.RetryPolicy = new RetryPolicyOptions
{
MaxAttempts = 5,
BackoffStrategy = BackoffStrategy.Exponential,
InitialDelay = TimeSpan.FromSeconds(1),
MaxDelay = TimeSpan.FromSeconds(30),
DeadLetterDestination = new Destination("orders.dlq")
};
});
| Property | Default | Description |
|---|---|---|
MaxAttempts |
3 |
Total attempts including the first |
BackoffStrategy |
Exponential |
Exponential or Fixed |
InitialDelay |
1s |
Delay before the first retry |
MaxDelay |
30s |
Upper cap on exponential backoff |
DeadLetterDestination |
null |
Topic to route messages to after retries exhausted |
Extra Confluent Configuration
Pass any raw Confluent property string via ProducerConfig or ConsumerConfig:
messaging.AddKafka("kafka", opts =>
{
opts.BootstrapServers = "broker:9092";
opts.ProducerConfig = new Dictionary<string, string>
{
["message.timeout.ms"] = "5000",
["compression.type"] = "lz4"
};
opts.ConsumerConfig = new Dictionary<string, string>
{
["session.timeout.ms"] = "30000",
["fetch.min.bytes"] = "1"
};
});
Defining Messages and Handlers
// Message contract
public record OrderPlaced(string OrderId, decimal Total);
// Handler
public class OrderPlacedHandler : IMessageHandler<OrderPlaced>
{
public async Task HandleAsync(OrderPlaced message, MessageContext context, CancellationToken ct)
{
// process message
// returning without throwing = ACK (offset committed)
}
}
Throwing an exception from HandleAsync triggers the retry policy. Once all attempts are exhausted, the message is routed to DeadLetterDestination if configured, otherwise the offset is not committed and the message will be replayed on consumer restart.
Publishing
Inject IMessageBus anywhere in your application:
public class OrderService(IMessageBus bus)
{
public async Task PlaceOrderAsync(string orderId, decimal total, CancellationToken ct)
{
var result = await bus.PublishAsync(new OrderPlaced(orderId, total), ct);
if (result.IsSuccess)
{
// result.Outcomes["kafka"]["partition"] — partition written to
// result.Outcomes["kafka"]["offset"] — offset assigned
// result.Outcomes["kafka"]["topic"] — topic written to
}
}
}
Publishing with Custom Headers
var headers = new MessageHeaders();
headers.Set("correlation-id", correlationId);
headers.Set("tenant-id", tenantId);
await bus.PublishAsync(new OrderPlaced(orderId, total), headers, ct);
Multiple Kafka Transports
Register as many named adapters as needed — for example, separate clusters for different domains:
messaging
.AddKafka("orders-kafka", opts =>
{
opts.BootstrapServers = "orders-broker:9092";
opts.GroupId = "order-service";
})
.AddKafka("payments-kafka", opts =>
{
opts.BootstrapServers = "payments-broker:9092";
opts.GroupId = "payment-service";
})
.Route<OrderPlaced>(rule => rule.To("orders-kafka").OnTopic("orders"))
.Route<PaymentProcessed>(rule => rule.To("payments-kafka").OnTopic("payments"))
.RegisterHandler<OrderPlacedHandler, OrderPlaced>(opts =>
opts.TransportScope = "orders-kafka")
.RegisterHandler<PaymentProcessedHandler, PaymentProcessed>(opts =>
opts.TransportScope = "payments-kafka");
Delivery Semantics
- Producer: idempotent,
acks=all— messages are not lost on broker failover. - Consumer:
enable.auto.commit=false,auto.offset.reset=earliest— offsets are committed only afterHandleAsyncreturns successfully. - At-least-once: a message may be redelivered if the process crashes after processing but before the offset commit.
Requirements
- .NET 10.0+
- Confluent.Kafka 2.8.0+
- A running Kafka broker (3.x+ recommended)
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Confluent.Kafka (>= 2.8.0)
- Messaggero (>= 0.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.0.1 | 99 | 4/19/2026 |