PollyAzureEventHub

Polly v8 resilience for Azure.Messaging.EventHubs — add retry, timeout, and circuit-breaker to any Event Hubs producer operation in two lines.
var producer = new EventHubProducerClient(connectionString, "my-hub");
var resilient = producer.WithPolly(pipeline => pipeline
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromSeconds(2),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
ShouldHandle = EventHubsTransientErrors.IsTransient,
})
.AddTimeout(TimeSpan.FromSeconds(30)));
using var batch = await resilient.CreateBatchAsync();
batch.TryAdd(new EventData("hello"));
await resilient.SendAsync(batch);
Why PollyAzureEventHub?
Azure Event Hubs is a mission-critical ingestion pipeline — dropped events mean lost data. EventHubsException.IsTransient tells you exactly which errors are safe to retry; this library wires that directly into Polly v8:
| Problem |
Solution |
EventHubsException where IsTransient = true (throttling, service busy, connection dropped) |
Caught by EventHubsTransientErrors.IsTransient |
TimeoutException — service took too long to respond |
Caught by EventHubsTransientErrors.IsTransient |
TaskCanceledException — network timeout during transit |
Caught by EventHubsTransientErrors.IsTransient |
| Non-retriable errors (bad request, auth failure) |
Not retried — IsTransient = false is respected |
| Cascading failures during an outage |
Wrap with AddCircuitBreaker |
Installation
dotnet add package PollyAzureEventHub
dotnet add package Polly.Core
Quick-start
1. Manual wiring
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Polly;
using Polly.Retry;
var producer = new EventHubProducerClient(connectionString, "telemetry");
var resilient = producer.WithPolly(p => p
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromSeconds(2),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
ShouldHandle = EventHubsTransientErrors.IsTransient,
}));
// Send a batch
using var batch = await resilient.CreateBatchAsync();
foreach (var reading in sensorReadings)
batch.TryAdd(new EventData(JsonSerializer.SerializeToUtf8Bytes(reading)));
await resilient.SendAsync(batch);
2. Dependency injection
// Program.cs / Startup.cs
builder.Services.AddPollyAzureEventHub(
connectionString,
"telemetry",
pipeline => pipeline
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromSeconds(2),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
ShouldHandle = EventHubsTransientErrors.IsTransient,
})
.AddTimeout(TimeSpan.FromSeconds(30)));
// Inject ResilientEventHubProducerClient into your services
public class TelemetryIngester(ResilientEventHubProducerClient producer)
{
public async Task SendAsync(IEnumerable<Reading> readings, CancellationToken ct)
{
using var batch = await producer.CreateBatchAsync(ct);
foreach (var r in readings)
batch.TryAdd(new EventData(JsonSerializer.SerializeToUtf8Bytes(r)));
await producer.SendAsync(batch, ct);
}
}
3. Bring your own client (DI overload)
builder.Services.AddSingleton(new EventHubProducerClient(
fullyQualifiedNamespace,
"telemetry",
new DefaultAzureCredential()));
builder.Services.AddPollyAzureEventHub(pipeline => pipeline
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 5,
ShouldHandle = EventHubsTransientErrors.IsTransient,
}));
Transient error reference
// Use in any Polly strategy:
ShouldHandle = EventHubsTransientErrors.IsTransient
| Condition |
Why it's transient |
EventHubsException (IsTransient = true) |
Service throttling, quota exceeded, brief outage — SDK-designated as safe to retry |
EventHubsException (IsTransient = false) |
Auth failure, bad request — not retried |
TimeoutException |
Operation timed out waiting for service response |
TaskCanceledException |
Network-level cancellation or timeout |
Key differentiator: EventHubsException.IsTransient is set by the Azure SDK team — this library exposes it directly as a Polly predicate, so your retry logic is always in sync with the SDK's own classification.
Advanced pipelines
Full production pipeline with observability
producer.WithPolly(p => p
.AddTimeout(TimeSpan.FromSeconds(60))
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 5,
Delay = TimeSpan.FromSeconds(1),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
ShouldHandle = EventHubsTransientErrors.IsTransient,
OnRetry = args =>
{
logger.LogWarning("Event Hubs retry {Attempt} after {Delay}ms — {Exception}",
args.AttemptNumber, args.RetryDelay.TotalMilliseconds, args.Outcome.Exception?.Message);
return ValueTask.CompletedTask;
},
})
.AddCircuitBreaker(new CircuitBreakerStrategyOptions
{
FailureRatio = 0.5,
SamplingDuration = TimeSpan.FromSeconds(30),
MinimumThroughput = 10,
BreakDuration = TimeSpan.FromSeconds(30),
ShouldHandle = EventHubsTransientErrors.IsTransient,
}));
Sending with partition key
var options = new SendEventOptions { PartitionKey = deviceId };
await resilient.SendAsync(events, options, cancellationToken);
Arbitrary operations via ExecuteAsync
var partitionProps = await resilient.ExecuteAsync(
(c, ct) => c.GetPartitionPropertiesAsync("0", ct));
API reference
ResilientEventHubProducerClient
| Member |
Description |
Inner |
The underlying EventHubProducerClient |
CreateBatchAsync(ct) |
Creates a batch through the pipeline |
CreateBatchAsync(options, ct) |
Creates a batch with options through the pipeline |
SendAsync(batch, ct) |
Sends a batch through the pipeline |
SendAsync(events, ct) |
Sends a collection of events through the pipeline |
SendAsync(events, options, ct) |
Sends events with partition options through the pipeline |
ExecuteAsync<T>(operation, ct) |
Runs any EventHubProducerClient operation through the pipeline |
EventHubsTransientErrors
| Member |
Description |
IsTransient |
PredicateBuilder for transient EventHubsException, TimeoutException, TaskCanceledException |
Extension methods
| Method |
Description |
client.WithPolly(pipeline) |
Wraps an EventHubProducerClient with a pre-built ResiliencePipeline |
client.WithPolly(configure) |
Builds a pipeline inline and wraps the client |
DI extensions
| Method |
Description |
services.AddPollyAzureEventHub(configure) |
Registers ResiliencePipeline + ResilientEventHubProducerClient (requires EventHubProducerClient already in DI) |
services.AddPollyAzureEventHub(connectionString, hubName, configure) |
Registers EventHubProducerClient, pipeline, and resilient client |
Target frameworks
| Framework |
Supported |
| .NET 6 |
✅ |
| .NET 8 |
✅ |
| .NET 9 |
✅ |
License
MIT © Justin Bannister