SharpNest.Kafka 3.0.1

dotnet add package SharpNest.Kafka --version 3.0.1
                    
NuGet\Install-Package SharpNest.Kafka -Version 3.0.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SharpNest.Kafka" Version="3.0.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="SharpNest.Kafka" Version="3.0.1" />
                    
Directory.Packages.props
<PackageReference Include="SharpNest.Kafka" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add SharpNest.Kafka --version 3.0.1
                    
#r "nuget: SharpNest.Kafka, 3.0.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package SharpNest.Kafka@3.0.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=SharpNest.Kafka&version=3.0.1
                    
Install as a Cake Addin
#tool nuget:?package=SharpNest.Kafka&version=3.0.1
                    
Install as a Cake Tool

SharpNest.Kafka

SharpNest.Kafka is a robust and flexible .NET library that simplifies Apache Kafka integration for your .NET applications. It provides a clean abstraction over the Confluent.Kafka client with an intuitive API for publishing messages and consuming from topics.

With SharpNest.Kafka, you can easily implement resilient message handling with built-in retry strategies, automatic topic creation, and proper resource management.

Key Features:    ✅ Fluent Configuration API – Configure Kafka settings in a readable, expressive manner.<br>    ✅ Resilient Messaging – Built-in retry strategies for handling transient failures.<br>    ✅ Auto Topic Creation – Topics are created automatically if they don't exist.<br>    ✅ Proper Resource Management – Ensures correct disposal of Kafka connections and resources.<br>    ✅ Lifetime Control – Supports Singleton, Scoped, and Transient service lifetimes.<br>    ✅ Thread-Safe – Ensures safe concurrent execution with proper synchronization.<br>

🔧 Installation

dotnet add package SharpNest.Kafka

🛠️ How to Register and Use SharpNest.Kafka

1️⃣ Add SharpNest.Kafka services

There are two ways to configure SharpNest.Kafka services:

📌 Using configuration

// Program.cs
using SharpNest.Kafka;

builder.Services
    .AddKafka(builder.Configuration)
    .AddPublisher()
    .AddSingletonSubscriber();

📌 Using programmatic configuration

services.AddKafka(options => 
{
    options.BootstrapServers = "localhost:9092";
    options.DefaultGroup = "my-consumer-group";
    options.Partitions = 3;
    options.ReplicationFactor = 2;
})
.AddPublisher()
.AddScopedSubscriber();

2️⃣ Configure Kafka Settings

Add the following to your appsettings.json:

{
  "Kafka": {
    "BootstrapServers": "localhost:9092",
    "DefaultGroup": "default-group",
    "Partitions": 1,
    "ReplicationFactor": 1,
    "Subscriber": {
      "CommitEmptyMessages": true
    },
    "Publisher": {
      // Publisher-specific settings
    },
    "Security": {
      // Optional security settings
    }
  }
}

3️⃣ Publishing Messages

Inject the IPublisher interface to publish messages to Kafka:

public class MessageService(IPublisher publisher)
{
    private readonly IPublisher _publisher = publisher;

    ...
}

Simple message publishing

var result = await _publisher.PublishAsync(
    "my-topic",  // Topic name
    key,         // Message key
    content      // Message content
);

Publishing with custom headers

var headers = new KeyValuePair<string, byte[]>[] 
{ 
    new("content-type", Encoding.UTF8.GetBytes("application/json")),
    new("correlation-id", Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()))
};

await _publisher.PublishAsync(
    "my-topic", 
    key, 
    content, 
    CancellationToken.None, 
    headers
);

Using KafkaMessage object

var message = new KafkaMessage
{
    Topic = "my-topic",
    Key = key,
    Value = content,
    Headers = new Dictionary<string, byte[]>
    {
        ["timestamp"] = Encoding.UTF8.GetBytes(DateTime.UtcNow.ToString("O"))
    }
};

await _publisher.PublishAsync(message);

4️⃣ Subscribing to Topics

📌 Single Service Subscription
public class MessageConsumerService(ISubscriber subscriber, ILogger<MessageConsumerService> logger)
{
    private readonly ISubscriber _subscriber = subscriber;
    private readonly ILogger<MessageConsumerService> _logger = logger;
    
    public async Task StartConsumingAsync(CancellationToken cancellationToken)
    {
        await _subscriber.SubscribeAsync(
            "my-topic",
            async message => 
            {
                _logger.LogInformation(
                    "Received message: Key={Key}, Value={Value}", 
                    message.Key, 
                    message.Value
                );
                
                await ProcessMessageAsync(message);
            },
            "my-consumer-group",
            cancellationToken
        );
    }
    
    private Task ProcessMessageAsync(IMessage message)
    {
        // Process message

        return Task.CompletedTask;
    }
}
📌 Background Service for Continuous Consumption

Create a background service to continuously consume messages:

public class KafkaConsumerBackgroundService(ISubscriber subscriber, ILogger<KafkaConsumerBackgroundService> logger) : BackgroundService
{
    private readonly ISubscriber _subscriber = subscriber;
    private readonly ILogger<KafkaConsumerBackgroundService> _logger = logger;
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            await _subscriber.SubscribeManyAsync(
                new[] { "topic1", "topic2", "topic3" },
                async message =>
                {
                    _logger.LogInformation(
                        "Message received: Topic={Topic}, Key={Key}",
                        message.Topic,
                        message.Key
                    );
                    
                    // Process message
                    await ProcessMessageAsync(message);
                },
                "multi-topic-consumer-group",
                stoppingToken
            );
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error in Kafka consumer service");
        }
    }
    
    private Task ProcessMessageAsync(IMessage message)
    {
        // Process message

        return Task.CompletedTask;
    }
}

// Register the background service
services.AddHostedService<KafkaConsumerBackgroundService>();

5️⃣ Advanced Configuration

📌 Configure Publisher Settings
services.AddKafka(Configuration.GetSection("Kafka"))
    .ConfigurePublisher(options => 
    {
        // Configure publisher-specific settings
    })
    .WithAdvancedProducerConfig(config => 
    {
        config.MessageTimeoutMs = 10000;
        config.RequestTimeoutMs = 5000;
        config.Acks = Acks.All;
    })
    .AddPublisher();
📌 Configure Subscriber Settings
services.AddKafka(Configuration.GetSection("Kafka"))
    .ConfigureSubscriber(options => 
    {
        options.CommitEmptyMessages = true;
        // Configure other subscriber-specific settings
    })
    .WithAdvancedConsumerConfig(config => 
    {
        config.AutoOffsetReset = AutoOffsetReset.Earliest;
        config.EnableAutoCommit = false;
        config.MaxPollIntervalMs = 300000;
    })
    .AddSingletonSubscriber();

🚀 Complete Example: ASP.NET Core Web API with Kafka Integration: here

SharpNest.Kafka Documentation

📋 Available Options

KafkaSettings

Property Description Default
BootstrapServers Comma-separated list of Kafka broker addresses localhost:9092
DefaultGroup Default consumer group ID default-group
Partitions Number of partitions for new topics 1
ReplicationFactor Replication factor for new topics 1
Security Security settings object null
Subscriber Subscriber-specific settings new KafkaSubscriberSettings()
Publisher Publisher-specific settings new KafkaPublisherSettings()

KafkaSubscriberSettings

Property Description Default
AutoOffsetReset Offset reset behavior (earliest, latest, error) earliest
EnableAutoCommit Whether to enable auto-commit of offsets true
CommitEmptyMessages Whether to commit empty messages false
AutoCommitIntervalMs Auto-commit interval in milliseconds 5000
SessionTimeoutMs Session timeout in milliseconds 30000
MaxPollIntervalMs Maximum poll interval in milliseconds 300000
MaxPartitionFetchBytes Number of messages to request in each fetch 1048576

KafkaPublisherSettings

Property Description Default
Acks Required acknowledgments (0, 1, all) all
MessageTimeoutMs Message timeout in milliseconds 30000
CompressionType Compression type (none, gzip, snappy, lz4, zstd) none
BatchSize Maximum size of a batch in bytes 16384
LingerMs Linger time in milliseconds 5
MaxInFlight Maximum number of in-flight requests 5

KafkaSecuritySettings

Property Description Default
Protocol Security protocol SecurityProtocol.Plaintext
SaslMechanism SASL mechanism SaslMechanism.Plain
Username SASL username null
Password SASL password null
SslCaLocation SSL CA certificate location Not set in class definition
SslCertificateLocation SSL certificate location Not set in class definition
SslKeyLocation SSL key location Not set in class definition

🔒 Secure Kafka Connections

SharpNest.Kafka supports secure connections to Kafka brokers:

services.AddKafka(options => 
{
    options.BootstrapServers = "kafka-broker:9093";
    options.Security = new KafkaSecuritySettings
    {
        Protocol = SecurityProtocol.Ssl,
        SslCaLocation = "/path/to/ca.pem",
        SslCertificateLocation = "/path/to/certificate.pem",
        SslKeyLocation = "/path/to/key.pem"
    };
})
.AddPublisher()
.AddSingletonSubscriber();

Or in appsettings.json:

{
  "Kafka": {
    "BootstrapServers": "kafka-broker:9093",
    "Security": {
      "Protocol": "Ssl",
      "Username": "usr",
      "Password": "pswrd",
      "SslCaLocation": "/path/to/ca.pem",
      "SslCertificateLocation": "/path/to/certificate.pem",
      "SslKeyLocation": "/path/to/key.pem"
    }
  }
}

Note: The SSL certificate properties (SslCaLocation, SslCertificateLocation, SslKeyLocation) are referenced in the examples but appear to be handled differently in your current KafkaSecuritySettings class definition. You may need to update your class to include these properties if they're needed for SSL connections.


## 📜 Additional Notes

- SharpNest.Kafka automatically creates topics if they don't exist when subscribing
- The library handles proper disposal of resources to prevent memory leaks
- Thread safety is ensured through semaphore locks for shared resources
- Serialization is handled by the library, with default SystemTextJsonSerializer support
- Custom serializers can be registered if needed
Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
3.0.1 219 4/1/2025
3.0.0 199 4/1/2025
2.0.2 194 3/30/2025
2.0.1 208 3/22/2025
2.0.0 202 3/22/2025
1.0.0 206 3/22/2025