Messaggero.Kafka 0.0.1

dotnet add package Messaggero.Kafka --version 0.0.1
                    
NuGet\Install-Package Messaggero.Kafka -Version 0.0.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Messaggero.Kafka" Version="0.0.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Messaggero.Kafka" Version="0.0.1" />
                    
Directory.Packages.props
<PackageReference Include="Messaggero.Kafka" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Messaggero.Kafka --version 0.0.1
                    
#r "nuget: Messaggero.Kafka, 0.0.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Messaggero.Kafka@0.0.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Messaggero.Kafka&version=0.0.1
                    
Install as a Cake Addin
#tool nuget:?package=Messaggero.Kafka&version=0.0.1
                    
Install as a Cake Tool

Messaggero.Kafka

Apache Kafka transport adapter for Messaggero. Provides at-least-once delivery with manual offset commit via Confluent.Kafka.

Quick Start

// Program.cs
builder.Services.AddMessaggero(messaging =>
{
    messaging
        .AddKafka("kafka", opts =>
        {
            opts.BootstrapServers = "localhost:9092";
            opts.GroupId = "my-service";
        })
        .Route<OrderPlaced>(rule => rule
            .To("kafka")
            .OnTopic("orders"))
        .RegisterHandler<OrderPlacedHandler, OrderPlaced>();
});

Configuration

Configure via KafkaOptions inside AddKafka:

Property Default Environment Variable Description
BootstrapServers localhost:9092 MESSAGGERO_KAFKA_BOOTSTRAP_SERVERS Comma-separated list of broker addresses
GroupId messaggero-default MESSAGGERO_KAFKA_GROUP_ID Consumer group ID
ProducerConfig {} Extra Confluent producer key/value pairs
ConsumerConfig {} Extra Confluent consumer key/value pairs
PrefetchCount 100 Max unprocessed messages held in memory (maps to MaxPartitionFetchBytes)
RetryPolicy see below Per-transport retry and dead-letter settings

Environment Variables

You can omit BootstrapServers and GroupId from code and set them through environment variables instead:

MESSAGGERO_KAFKA_BOOTSTRAP_SERVERS=broker1:9092,broker2:9092
MESSAGGERO_KAFKA_GROUP_ID=my-service

Retry Policy

messaging.AddKafka("kafka", opts =>
{
    opts.BootstrapServers = "broker:9092";
    opts.RetryPolicy = new RetryPolicyOptions
    {
        MaxAttempts = 5,
        BackoffStrategy = BackoffStrategy.Exponential,
        InitialDelay = TimeSpan.FromSeconds(1),
        MaxDelay = TimeSpan.FromSeconds(30),
        DeadLetterDestination = new Destination("orders.dlq")
    };
});
Property Default Description
MaxAttempts 3 Total attempts including the first
BackoffStrategy Exponential Exponential or Fixed
InitialDelay 1s Delay before the first retry
MaxDelay 30s Upper cap on exponential backoff
DeadLetterDestination null Topic to route messages to after retries exhausted

Extra Confluent Configuration

Pass any raw Confluent property string via ProducerConfig or ConsumerConfig:

messaging.AddKafka("kafka", opts =>
{
    opts.BootstrapServers = "broker:9092";
    opts.ProducerConfig = new Dictionary<string, string>
    {
        ["message.timeout.ms"] = "5000",
        ["compression.type"] = "lz4"
    };
    opts.ConsumerConfig = new Dictionary<string, string>
    {
        ["session.timeout.ms"] = "30000",
        ["fetch.min.bytes"] = "1"
    };
});

Defining Messages and Handlers

// Message contract
public record OrderPlaced(string OrderId, decimal Total);

// Handler
public class OrderPlacedHandler : IMessageHandler<OrderPlaced>
{
    public async Task HandleAsync(OrderPlaced message, MessageContext context, CancellationToken ct)
    {
        // process message
        // returning without throwing = ACK (offset committed)
    }
}

Throwing an exception from HandleAsync triggers the retry policy. Once all attempts are exhausted, the message is routed to DeadLetterDestination if configured, otherwise the offset is not committed and the message will be replayed on consumer restart.

Publishing

Inject IMessageBus anywhere in your application:

public class OrderService(IMessageBus bus)
{
    public async Task PlaceOrderAsync(string orderId, decimal total, CancellationToken ct)
    {
        var result = await bus.PublishAsync(new OrderPlaced(orderId, total), ct);

        if (result.IsSuccess)
        {
            // result.Outcomes["kafka"]["partition"] — partition written to
            // result.Outcomes["kafka"]["offset"]    — offset assigned
            // result.Outcomes["kafka"]["topic"]     — topic written to
        }
    }
}

Publishing with Custom Headers

var headers = new MessageHeaders();
headers.Set("correlation-id", correlationId);
headers.Set("tenant-id", tenantId);

await bus.PublishAsync(new OrderPlaced(orderId, total), headers, ct);

Multiple Kafka Transports

Register as many named adapters as needed — for example, separate clusters for different domains:

messaging
    .AddKafka("orders-kafka", opts =>
    {
        opts.BootstrapServers = "orders-broker:9092";
        opts.GroupId = "order-service";
    })
    .AddKafka("payments-kafka", opts =>
    {
        opts.BootstrapServers = "payments-broker:9092";
        opts.GroupId = "payment-service";
    })
    .Route<OrderPlaced>(rule => rule.To("orders-kafka").OnTopic("orders"))
    .Route<PaymentProcessed>(rule => rule.To("payments-kafka").OnTopic("payments"))
    .RegisterHandler<OrderPlacedHandler, OrderPlaced>(opts =>
        opts.TransportScope = "orders-kafka")
    .RegisterHandler<PaymentProcessedHandler, PaymentProcessed>(opts =>
        opts.TransportScope = "payments-kafka");

Delivery Semantics

  • Producer: idempotent, acks=all — messages are not lost on broker failover.
  • Consumer: enable.auto.commit=false, auto.offset.reset=earliest — offsets are committed only after HandleAsync returns successfully.
  • At-least-once: a message may be redelivered if the process crashes after processing but before the offset commit.

Requirements

  • .NET 10.0+
  • Confluent.Kafka 2.8.0+
  • A running Kafka broker (3.x+ recommended)
Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.0.1 99 4/19/2026