PollyKafka 1.0.0
dotnet add package PollyKafka --version 1.0.0
NuGet\Install-Package PollyKafka -Version 1.0.0
<PackageReference Include="PollyKafka" Version="1.0.0" />
<PackageVersion Include="PollyKafka" Version="1.0.0" />
<PackageReference Include="PollyKafka" />
paket add PollyKafka --version 1.0.0
#r "nuget: PollyKafka, 1.0.0"
#:package PollyKafka@1.0.0
#addin nuget:?package=PollyKafka&version=1.0.0
#tool nuget:?package=PollyKafka&version=1.0.0
PollyKafka
Polly v8 resilience for Confluent.Kafka — automatic retry, circuit breaker, and per-operation timeout for Kafka producers and consumers. Drop-in wrappers, no custom serialisers required.
Why PollyKafka?
Kafka brokers fail transiently: leader elections, broker restarts, network blips. Without resilience, a single BrokerNotAvailable or RequestTimedOut error can crash your producer or leave your consumer in a broken state. PollyKafka wraps every produce and consume operation in a Polly v8 pipeline so transient failures are retried automatically, persistent broker failures trip the circuit breaker, and hanging operations are cancelled by a configurable timeout.
| Feature | Raw Confluent.Kafka | PollyKafka |
|---|---|---|
| Automatic retry | ❌ | ✅ |
| Circuit breaker | ❌ | ✅ |
| Per-operation timeout | ❌ | ✅ |
| Configurable transient error codes | ❌ | ✅ |
| DI registration | ❌ | ✅ |
| Targets net8 + net9 | ✅ | ✅ |
Installation
dotnet add package PollyKafka
Quick Start
Producer
// Manual construction
var producerConfig = new ProducerConfig { BootstrapServers = "localhost:9092" };
var inner = new ProducerBuilder<string, string>(producerConfig).Build();
var producer = new ResilientProducer<string, string>(inner, new PollyKafkaOptions
{
MaxRetries = 3,
BaseDelay = TimeSpan.FromMilliseconds(200),
});
var result = await producer.ProduceAsync("my-topic", new Message<string, string>
{
Key = "order-id",
Value = System.Text.Json.JsonSerializer.Serialize(order),
});
Console.WriteLine($"Delivered to {result.TopicPartitionOffset}");
Consumer
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "my-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
var inner = new ConsumerBuilder<string, string>(consumerConfig).Build();
var consumer = new ResilientConsumer<string, string>(inner, new PollyKafkaOptions());
consumer.Subscribe("my-topic");
while (!stoppingToken.IsCancellationRequested)
{
var result = await consumer.ConsumeAsync(stoppingToken);
if (result is null) continue; // timeout — no message available
// process result.Message.Value ...
consumer.Commit(result);
}
With Dependency Injection
// Program.cs
builder.Services.AddResilientKafkaProducer<string, string>(
new ProducerConfig { BootstrapServers = "localhost:9092" },
o =>
{
o.MaxRetries = 3;
o.OperationTimeout = TimeSpan.FromSeconds(15);
});
builder.Services.AddResilientKafkaConsumer<string, string>(
new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "orders-consumer",
});
// Inject ResilientProducer<string, string> / ResilientConsumer<string, string>
Configuration
var options = new PollyKafkaOptions
{
// Retry
MaxRetries = 3, // 0 = no retry
BaseDelay = TimeSpan.FromMilliseconds(200), // exponential base
MaxDelay = TimeSpan.FromSeconds(30),
// Circuit breaker
CircuitBreakerFailureRatio = 0.5,
CircuitBreakerMinimumThroughput = 10,
CircuitBreakerSamplingDuration = TimeSpan.FromSeconds(30),
CircuitBreakerBreakDuration = TimeSpan.FromSeconds(5),
// Timeout
OperationTimeout = TimeSpan.FromSeconds(10),
// Which Kafka error codes trigger retry/CB
TransientErrorCodes = new HashSet<ErrorCode>
{
ErrorCode.BrokerNotAvailable,
ErrorCode.LeaderNotAvailable,
ErrorCode.NotLeaderForPartition,
ErrorCode.RequestTimedOut,
ErrorCode.NetworkException,
ErrorCode.KafkaStorageError,
ErrorCode.Local_AllBrokersDown,
ErrorCode.Local_TimedOut,
ErrorCode.Local_Transport,
ErrorCode.Local_MsgTimedOut,
},
};
| Property | Default | Description |
|---|---|---|
MaxRetries |
3 |
Retry attempts (0 = disabled) |
BaseDelay |
200 ms |
Base delay for exponential back-off with jitter |
MaxDelay |
30 s |
Cap for exponential back-off delay |
CircuitBreakerFailureRatio |
0.5 |
Failure ratio to open circuit |
CircuitBreakerMinimumThroughput |
10 |
Minimum calls before CB can open |
CircuitBreakerSamplingDuration |
30 s |
Sliding window for failure ratio |
CircuitBreakerBreakDuration |
5 s |
How long the circuit stays open |
OperationTimeout |
10 s |
Max time per produce/consume before TimeoutRejectedException |
TransientErrorCodes |
see above | ErrorCode set that triggers retry/CB |
Error Handling
Non-transient KafkaExceptions (e.g. TopicAuthorizationFailed) are rethrown as-is. After retries are exhausted the last exception is a TransientKafkaException wrapping the original:
try
{
await producer.ProduceAsync("my-topic", message);
}
catch (TransientKafkaException ex)
{
// All retries failed
Console.WriteLine($"Kafka error: {ex.ErrorCode} — {ex.KafkaException.Message}");
}
catch (BrokenCircuitException)
{
// Circuit is open — fail fast
}
catch (TimeoutRejectedException)
{
// Operation exceeded OperationTimeout
}
Resilience Pipeline Order
Operations flow through the pipeline in this order:
Retry → Circuit Breaker → Timeout → Kafka operation
Related Packages
| Package | Description |
|---|---|
| PollyBackoff | Advanced back-off strategies with jitter |
| PollyChaos | Chaos engineering — inject faults in tests |
| PollyMediatR | Polly pipeline behaviour for MediatR |
| PollyEFCore | Resilient EF Core execution strategies |
| PollyHealthChecks | Health check endpoints for Polly circuits |
| PollyOpenAI | Retry + rate-limit handling for OpenAI / Azure OpenAI |
| PollyRedis | Resilient StackExchange.Redis wrapper |
| PollySignalR | Reconnect policy for SignalR HubConnection |
| PollyGrpc | Polly v8 resilience for gRPC .NET clients via Interceptor |
| PollyCaching | Distributed cache with Polly resilience |
| PollyBulkhead | Bulkhead isolation for concurrent workloads |
License
MIT
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Confluent.Kafka (>= 2.14.2)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.2)
- Polly.Core (>= 8.7.0)
-
net9.0
- Confluent.Kafka (>= 2.14.2)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 8.0.2)
- Polly.Core (>= 8.7.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0 | 44 | 6/23/2026 |