DotKafka 2.0.1

dotnet add package DotKafka --version 2.0.1
                    
NuGet\Install-Package DotKafka -Version 2.0.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DotKafka" Version="2.0.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="DotKafka" Version="2.0.1" />
                    
Directory.Packages.props
<PackageReference Include="DotKafka" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add DotKafka --version 2.0.1
                    
#r "nuget: DotKafka, 2.0.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package DotKafka@2.0.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=DotKafka&version=2.0.1
                    
Install as a Cake Addin
#tool nuget:?package=DotKafka&version=2.0.1
                    
Install as a Cake Tool

DotKafka - A .NET Kafka SDK

A lightweight .NET SDK for Apache Kafka, inspired by Spring Kafka, providing annotation-based consumers and a simple producer interface.

Features

  • Attribute-based Kafka listeners ([KafkaListener]) for consumers.
  • IKafkaProducer interface for sending messages.
  • KafkaTemplate<TKey, TValue> for flexible message production.
  • Automatic topic creation support.
  • Integration with Microsoft.Extensions.DependencyInjection.
  • Support for retry mechanisms and Dead Letter Queues (DLQ).

Installation

Install via NuGet:

dotnet add package dotkafka

Usage

Consumer Example

public class EventConsumer
{
    [KafkaListener(topics: "sme.analytics", groupId: "default-group")]
    public async Task HandleAsync(ElectoralData message)
    {
        Console.WriteLine($"Consumed message: {message}");
        await Task.CompletedTask;
    }

    [KafkaListener(topics: "analytics", groupId: "default-group", isBatch: true)]
    public async Task HandleAnalyticsAsync(List<ElectoralData> message)
    {
        Console.WriteLine($"Consumed message: {message}");
        await Task.CompletedTask;
    }

    [KafkaListener(topics: new[] { "user.actions" }, groupId: "user-group", isBatch: true)]
    [Retryable(maxRetries: 3, retryDelayMs: 2000, deadLetterTopic: "user.events.dlq")]
    public async Task HandleBatchUserEventsAsync(List<ElectoralData> messages)
    {
        await Task.CompletedTask;
        throw new Exception("Simulated processing failure");
    }

    [KafkaListener(pattern: "analytics.*", groupId: "analytics-group")]
    public async Task HandleAnalyticsEventAsync(ElectoralData message)
    {
        await Task.CompletedTask;
    }

    [KafkaListener(pattern: "telco.*", groupId: "logs-group", isBatch: true)]
    [Retryable(maxRetries: 2, retryDelayMs: 1000)]
    public async Task HandleLogsBatchAsync(List<ElectoralData> messages)
    {
        await Task.CompletedTask;
    }
}

Producer Example

public class EventProducer
{
    private readonly IKafkaProducer _kafkaProducer;

    public EventProducer(IKafkaProducer kafkaProducer)
    {
        _kafkaProducer = kafkaProducer;
    }

    public async Task SendMessageAsync(string topic, ElectoralData message)
    {
        await _kafkaProducer.ProduceAsync(topic, message);
    }
}

Using KafkaTemplate

KafkaTemplate<TKey, TValue> provides a simple way to send messages with optional headers, partitions, and timestamps.

public class EventService
{
    private readonly KafkaTemplate<string, ElectoralData> _kafkaTemplate;

    public EventService(KafkaTemplate<string, ElectoralData> kafkaTemplate)
    {
        _kafkaTemplate = kafkaTemplate;
        _kafkaTemplate.DefaultTopic = "default-topic";
    }

    public async Task SendEventAsync(ElectoralData data)
    {
        await _kafkaTemplate.SendDefaultAsync(data);
    }

    public async Task SendWithKeyAsync(string key, ElectoralData data)
    {
        await _kafkaTemplate.SendDefaultAsync(key, data);
    }

    public async Task SendToPartitionAsync(int partition, string key, ElectoralData data)
    {
        await _kafkaTemplate.SendDefaultAsync(partition, key, data);
    }

    public async Task SendWithTimestampAsync(int partition, DateTime timestamp, string key, ElectoralData data)
    {
        await _kafkaTemplate.SendDefaultAsync(partition, timestamp, key, data);
    }

    public async Task SendToTopicAsync(string topic, string key, ElectoralData data)
    {
        await _kafkaTemplate.SendAsync(topic, key, data);
    }

    public async Task SendSimpleAsync(string topic, ElectoralData data)
    {
        await _kafkaTemplate.SendAsync(topic, data);
    }

    public async Task SendToSpecificPartitionAsync(string topic, int partition, string key, ElectoralData data)
    {
        await _kafkaTemplate.SendAsync(topic, partition, key, data);
    }
}

Web API Controller Example

[ApiController]
[Route("[controller]")]
public class AnalyticsController : ControllerBase
{
    private readonly ILogger<AnalyticsController> _logger;
    private readonly IKafkaTemplate<string, string> _template;

    public AnalyticsController(ILogger<AnalyticsController> logger, IKafkaTemplate<string, string> template)
    {
        _logger = logger;
        _template = template;
    }

    [HttpPost("send", Name = nameof(SendAnalyticsData))]
    public async Task<ActionResult<object>> SendAnalyticsData([FromBody] ElectoralData request)
    {
        var data = JsonConvert.SerializeObject(request);
        await _template.SendAsync("user.events", data);
        return Ok();
    }
}

Dependency Injection Setup

services.AddScoped<EventConsumer>();
services.AddScoped<KafkaTemplate<string, ElectoralData>>();
services.AddKafkaListener(config =>
{
    config.BootstrapServers = "localhost:9092";
    config.AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest;
    config.EnableAutoCommit = true;
    config.TimeoutInMilliseconds = 5000;
});

builder.Services.AddKafkaTemplate<string, string>(c =>
{
    c.BootstrapServers = "localhost:9092";
});

Producer Registration

services.AddDotKafkaProducer(config =>
{
    config.BootstrapServers = "localhost:9092";
});

License

This project is licensed under the MIT License.

Contributing

Contributions are welcome! Feel free to open an issue or submit a pull request.

Contact

For support or inquiries, please reach out to the project maintainers.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.0.1 185 4/13/2025
1.2.0 165 3/30/2025
1.1.1 163 3/30/2025
1.0.2 150 3/27/2025
1.0.1 485 3/26/2025
1.0.0 495 3/25/2025