DotKafka 2.0.1
dotnet add package DotKafka --version 2.0.1
NuGet\Install-Package DotKafka -Version 2.0.1
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DotKafka" Version="2.0.1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="DotKafka" Version="2.0.1" />
<PackageReference Include="DotKafka" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add DotKafka --version 2.0.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: DotKafka, 2.0.1"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package DotKafka@2.0.1
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=DotKafka&version=2.0.1
#tool nuget:?package=DotKafka&version=2.0.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
DotKafka - A .NET Kafka SDK
A lightweight .NET SDK for Apache Kafka, inspired by Spring Kafka, providing annotation-based consumers and a simple producer interface.
Features
- Attribute-based Kafka listeners (
[KafkaListener]) for consumers. IKafkaProducerinterface for sending messages.KafkaTemplate<TKey, TValue>for flexible message production.- Automatic topic creation support.
- Integration with
Microsoft.Extensions.DependencyInjection. - Support for retry mechanisms and Dead Letter Queues (DLQ).
Installation
Install via NuGet:
dotnet add package dotkafka
Usage
Consumer Example
public class EventConsumer
{
[KafkaListener(topics: "sme.analytics", groupId: "default-group")]
public async Task HandleAsync(ElectoralData message)
{
Console.WriteLine($"Consumed message: {message}");
await Task.CompletedTask;
}
[KafkaListener(topics: "analytics", groupId: "default-group", isBatch: true)]
public async Task HandleAnalyticsAsync(List<ElectoralData> message)
{
Console.WriteLine($"Consumed message: {message}");
await Task.CompletedTask;
}
[KafkaListener(topics: new[] { "user.actions" }, groupId: "user-group", isBatch: true)]
[Retryable(maxRetries: 3, retryDelayMs: 2000, deadLetterTopic: "user.events.dlq")]
public async Task HandleBatchUserEventsAsync(List<ElectoralData> messages)
{
await Task.CompletedTask;
throw new Exception("Simulated processing failure");
}
[KafkaListener(pattern: "analytics.*", groupId: "analytics-group")]
public async Task HandleAnalyticsEventAsync(ElectoralData message)
{
await Task.CompletedTask;
}
[KafkaListener(pattern: "telco.*", groupId: "logs-group", isBatch: true)]
[Retryable(maxRetries: 2, retryDelayMs: 1000)]
public async Task HandleLogsBatchAsync(List<ElectoralData> messages)
{
await Task.CompletedTask;
}
}
Producer Example
public class EventProducer
{
private readonly IKafkaProducer _kafkaProducer;
public EventProducer(IKafkaProducer kafkaProducer)
{
_kafkaProducer = kafkaProducer;
}
public async Task SendMessageAsync(string topic, ElectoralData message)
{
await _kafkaProducer.ProduceAsync(topic, message);
}
}
Using KafkaTemplate
KafkaTemplate<TKey, TValue> provides a simple way to send messages with optional headers, partitions, and timestamps.
public class EventService
{
private readonly KafkaTemplate<string, ElectoralData> _kafkaTemplate;
public EventService(KafkaTemplate<string, ElectoralData> kafkaTemplate)
{
_kafkaTemplate = kafkaTemplate;
_kafkaTemplate.DefaultTopic = "default-topic";
}
public async Task SendEventAsync(ElectoralData data)
{
await _kafkaTemplate.SendDefaultAsync(data);
}
public async Task SendWithKeyAsync(string key, ElectoralData data)
{
await _kafkaTemplate.SendDefaultAsync(key, data);
}
public async Task SendToPartitionAsync(int partition, string key, ElectoralData data)
{
await _kafkaTemplate.SendDefaultAsync(partition, key, data);
}
public async Task SendWithTimestampAsync(int partition, DateTime timestamp, string key, ElectoralData data)
{
await _kafkaTemplate.SendDefaultAsync(partition, timestamp, key, data);
}
public async Task SendToTopicAsync(string topic, string key, ElectoralData data)
{
await _kafkaTemplate.SendAsync(topic, key, data);
}
public async Task SendSimpleAsync(string topic, ElectoralData data)
{
await _kafkaTemplate.SendAsync(topic, data);
}
public async Task SendToSpecificPartitionAsync(string topic, int partition, string key, ElectoralData data)
{
await _kafkaTemplate.SendAsync(topic, partition, key, data);
}
}
Web API Controller Example
[ApiController]
[Route("[controller]")]
public class AnalyticsController : ControllerBase
{
private readonly ILogger<AnalyticsController> _logger;
private readonly IKafkaTemplate<string, string> _template;
public AnalyticsController(ILogger<AnalyticsController> logger, IKafkaTemplate<string, string> template)
{
_logger = logger;
_template = template;
}
[HttpPost("send", Name = nameof(SendAnalyticsData))]
public async Task<ActionResult<object>> SendAnalyticsData([FromBody] ElectoralData request)
{
var data = JsonConvert.SerializeObject(request);
await _template.SendAsync("user.events", data);
return Ok();
}
}
Dependency Injection Setup
services.AddScoped<EventConsumer>();
services.AddScoped<KafkaTemplate<string, ElectoralData>>();
services.AddKafkaListener(config =>
{
config.BootstrapServers = "localhost:9092";
config.AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest;
config.EnableAutoCommit = true;
config.TimeoutInMilliseconds = 5000;
});
builder.Services.AddKafkaTemplate<string, string>(c =>
{
c.BootstrapServers = "localhost:9092";
});
Producer Registration
services.AddDotKafkaProducer(config =>
{
config.BootstrapServers = "localhost:9092";
});
License
This project is licensed under the MIT License.
Contributing
Contributions are welcome! Feel free to open an issue or submit a pull request.
Contact
For support or inquiries, please reach out to the project maintainers.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net8.0
- Confluent.Kafka (>= 2.8.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 9.0.3)
- Newtonsoft.Json (>= 13.0.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.