Eventso.KafkaProducer 1.0.0

dotnet add package Eventso.KafkaProducer --version 1.0.0
NuGet\Install-Package Eventso.KafkaProducer -Version 1.0.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Eventso.KafkaProducer" Version="1.0.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Eventso.KafkaProducer --version 1.0.0
#r "nuget: Eventso.KafkaProducer, 1.0.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Eventso.KafkaProducer as a Cake Addin
#addin nuget:?package=Eventso.KafkaProducer&version=1.0.0

// Install Eventso.KafkaProducer as a Cake Tool
#tool nuget:?package=Eventso.KafkaProducer&version=1.0.0

Binary low-allocating Kafka producer for .Net

NuGet

Binary Kafka producer with a unified api that accepts ReadOnlySpan<byte> instead of Message<TKey,TValue> and avoids creating intermediate byte arrays during serialization. Uses low-level api of Confluent.Kafka library and is fully compatible with it. A single producer instance is enough for your entire application.

Features

  • Binary producer api accepts ReadOnlySpan<byte> for key and value
  • Only single producer instance in application is enough
  • Effectively producing message batch
  • Ready to use overloads for common key types: short, int, long, string, Guid
  • Support for Protobuf, System.Text.Json, SpanJson serialization (separate packages)
  • Confluent.Kafka compatibility

Registration

There are 3 ways to create binary producer:

// 1. Create from Confluent.Kafka ProducerBuilder<TAnyKey, TAnyValue>

ProducerBuilder<byte[], byte[]> confluentBuilder = ...

IProducer binaryProducer = confluentBuilder.BuildBinary();

// 2. Using non-generic ProducerBuilder with same api as ProducerBuilder<TKey, TValue>

ProducerBuilder producerBuilder = new ProducerBuilder(producerConfig);

producerBuilder
    .SetDefaultPartitioner(...)
    .SetErrorHandler(...);

IProducer binaryProducer = producerBuilder.Build();

// 3. Create binary producer from generic Confluent.Kafka producer

IProducer<TAnyKey, TAnyValue> confluentProducer = ...

IProducer binaryProducer = confluentProducer.CreateBinary();

How to use the producer:

Two basic methods in IProducer

Task<DeliveryResult> ProduceAsync(
    string topic,
    ReadOnlySpan<byte> key,
    ReadOnlySpan<byte> value,
    CancellationToken cancellationToken = default(CancellationToken),
    Headers? headers = null,
    Timestamp timestamp = default,
    Partition? partition = null);

void Produce(
    string topic,
    ReadOnlySpan<byte> key,
    ReadOnlySpan<byte> value,
    Headers? headers = null,
    Timestamp timestamp = default,
    Action<DeliveryReport>? deliveryHandler = null,
    Partition? partition = null);

Library contains extension methods for frequently used key types: short, int, long, string, Guid

MessageBatch

MessageBatch creates only one TaskCompletionSource and Task per batch while original producer api creates TaskCompletionSource per messages. Batch Produce supports all producer features.

var batch = producer.CreateBatch(topicName);

foreach(var record in records)
{
    batch.Produce(record.Key, record.Value)
}

//wait for delivery all messages
await batch.Complete(token);

Protobuf and Json values

Additional packages contain method overloads that accepts Google.Protobuf.IMessage or typed object as message value. They use stack or ArrayPool for non-allocating serialization.

Performance and memory allocation benchmark

Benchmark source code

Method Messages Mean Gen0 Gen1 Gen2 Allocated
Binary_Proto_Buffer_MessageBatch 5000 29.038 ms 218.7500 - - 1406.83 KB
Binary_Proto_WhenAll 5000 33.454 ms 400.0000 200.0000 - 2481.26 KB
Confluent_Proto_WhenAll 5000 37.963 ms 1000.0000 666.6667 - 6652.58 KB
Binary_Proto_Buffer_MessageBatch 10000 52.968 ms 400.0000 - - 2813.2 KB
Binary_SpanJson_MessageBatch 5000 57.299 ms - - - 1407.25 KB
Binary_Json_Buffer_MessageBatch 5000 59.708 ms 500.0000 - - 3516.85 KB
Binary_Proto_WhenAll 10000 61.909 ms 666.6667 444.4444 222.2222 4870.61 KB
Confluent_Proto_WhenAll 10000 75.649 ms 2000.0000 1000.0000 666.6667 13304.45 KB
Confluent_Json_WhenAll 5000 82.425 ms 1000.0000 - - 9583.16 KB
Binary_SpanJson_MessageBatch 10000 107.249 ms - - - 2814.13 KB
Binary_Json_Buffer_MessageBatch 10000 113.456 ms 1000.0000 - - 7033.18 KB
Confluent_Json_WhenAll 10000 145.569 ms 3000.0000 1000.0000 - 19164.77 KB
Binary_Proto_Buffer_MessageBatch 30000 147.694 ms 1000.0000 - - 8438.85 KB
Binary_Proto_WhenAll 30000 173.801 ms 2000.0000 1000.0000 - 14752.95 KB
Confluent_Proto_WhenAll 30000 219.556 ms 6000.0000 2000.0000 1000.0000 39656.1 KB
Binary_Json_Buffer_MessageBatch 30000 335.746 ms 3000.0000 - - 21096.23 KB
Binary_SpanJson_MessageBatch 30000 339.693 ms 1000.0000 - - 8438.8 KB
Confluent_Json_WhenAll 30000 433.557 ms 9500.0000 3000.0000 1500.0000 57237.13 KB

Thanks to @xeromorph for the great ideas and help.

Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on Eventso.KafkaProducer:

Package Downloads
Eventso.KafkaProducer.Protobuf

Kafka producer with Protobuf serialization. Low memory allocation and effective batching api.

Eventso.KafkaProducer.SpanJson

Kafka producer with SpanJson serialization. Low memory allocation and effective batching api.

Eventso.KafkaProducer.Json

Kafka producer with System.Text.Json serialization. Low memory allocation and effective batching api.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.0 90 5/13/2024
0.2.0 140 4/8/2024
0.1.0 150 3/20/2024