DotNetKafkaAdapter 0.1.1
dotnet add package DotNetKafkaAdapter --version 0.1.1
NuGet\Install-Package DotNetKafkaAdapter -Version 0.1.1
<PackageReference Include="DotNetKafkaAdapter" Version="0.1.1" />
<PackageVersion Include="DotNetKafkaAdapter" Version="0.1.1" />
<PackageReference Include="DotNetKafkaAdapter" />
paket add DotNetKafkaAdapter --version 0.1.1
#r "nuget: DotNetKafkaAdapter, 0.1.1"
#:package DotNetKafkaAdapter@0.1.1
#addin nuget:?package=DotNetKafkaAdapter&version=0.1.1
#tool nuget:?package=DotNetKafkaAdapter&version=0.1.1
dotnet-kafka-adapter
Lightweight .NET Kafka adapter intended to let an existing codebase publish and consume Kafka messages without taking a direct dependency on Kafka-specific APIs throughout the application.
Goal
Provide a small, maintainable adapter library that:
- Exposes application-facing abstractions for publishing and consuming messages
- Wraps Kafka client setup, configuration, serialization, and delivery concerns
- Integrates cleanly with standard .NET dependency injection and hosting
- Keeps application code insulated from Kafka implementation details
Initial Scope
The first version should focus on the smallest useful feature set:
- Typed message publishing
- Typed message consumption
- JSON serialization and deserialization
- DI registration for producer and consumer services
- Background consumer hosting for worker services or ASP.NET Core apps
- Basic logging, error handling, and retry behavior
Non-Goals
The first version does not need to solve everything:
- Full schema registry support
- Complex stream processing
- Exactly-once delivery guarantees
- Custom admin tooling
- Broad broker management features
Status
This repository now contains the initial solution scaffold, public contracts, producer and consumer implementations, DI registration, typed handler registration helpers, retry/dead-letter behavior, a runnable sample app, and live integration tests against a local Kafka broker.
Checklist
Done
- Confirmed the library is technically feasible as a lightweight .NET adapter over Kafka
- Defined the high-level goal and initial project scope
- Created the initial project README
- Created the .NET solution and class library structure
- Chosen the initial target framework (
net8.0) - Defined the first-pass public abstractions for publishing and consuming
- Defined the first-pass configuration model for brokers, topics, auth, and consumer groups
- Added the Kafka client dependency (
Confluent.Kafka) - Implemented the first producer wrapper for typed messages
- Added the first JSON serialization path for produced messages
- Added DI registration extensions for the producer and consumer paths
- Implemented the consumer worker pattern for typed handlers
- Added JSON deserialization support for consumers
- Added typed handler registration helpers for consuming apps
- Added structured logging around produce/consume failures
- Decided the initial offset commit strategy and failure semantics
- Added retry behavior and dead-letter strategy
- Added local development setup for Kafka
- Added integration tests against a local Kafka instance
- Added sample application showing publish/consume usage
- Added usage documentation and configuration examples
- Narrowed the handler registration API to consumer-facing options
- Added production-oriented failure and authentication guidance
- Added optional certificate-based TLS broker security settings
- Added a TLS-enabled local Kafka integration test path
- Added initial NuGet package metadata and pack/push instructions
- Refined public-release package metadata
- Added operational guidance for monitoring, replay, and dead-letter reprocessing
- Added MIT license metadata for NuGet publication
- Added GitHub Actions release automation for GitHub Releases and NuGet publishing
- Split build/test CI from release automation into separate workflows
- Added fail-fast options validation and built-in metrics for observability
To Do
- Decide whether to refactor shared GitHub Actions steps into a reusable workflow or composite action
Proposed Deliverables
The likely shape of the library:
- A reusable adapter package
- A small sample app
- Integration tests
- Documentation for setup and usage
Local Kafka
The repository includes a single-node Kafka stack for local development in docker-compose.kafka.yml.
Start Kafka
docker compose -f docker-compose.kafka.yml up -d
Kafka will be reachable from the host on localhost:9092.
Stop Kafka
docker compose -f docker-compose.kafka.yml down
To remove the Kafka data volume as well:
docker compose -f docker-compose.kafka.yml down -v
Notes
- This uses a single broker in KRaft combined mode for local development only.
- The adapter should use
localhost:9092asBootstrapServerswhen running on the host machine.
Integration Tests
Run the Kafka integration tests against the local broker with:
dotnet test tests/DotNetKafkaAdapter.IntegrationTests/DotNetKafkaAdapter.IntegrationTests.csproj
To skip the TLS-specific integration test explicitly:
DOTNET_KAFKA_ADAPTER_SKIP_TLS_TESTS=true dotnet test tests/DotNetKafkaAdapter.IntegrationTests/DotNetKafkaAdapter.IntegrationTests.csproj
To run the TLS integration test as well:
powershell -ExecutionPolicy Bypass -File scripts/generate-kafka-tls-certs.ps1
docker compose -f docker-compose.kafka.tls.yml up -d
dotnet test tests/DotNetKafkaAdapter.IntegrationTests/DotNetKafkaAdapter.IntegrationTests.csproj
The TLS broker listens on localhost:9093 and requires:
- server certificate validation against the generated local CA
- client certificate authentication with the generated client certificate and private key
The TLS-specific test is skipped automatically if the generated certificate assets are not present.
It is also skipped when DOTNET_KAFKA_ADAPTER_SKIP_TLS_TESTS=true.
Sample App
A runnable sample lives in samples/DotNetKafkaAdapter.SampleApp.
Run it against the local broker with:
dotnet run --project samples/DotNetKafkaAdapter.SampleApp/DotNetKafkaAdapter.SampleApp.csproj
Optional environment variables:
KAFKA_BOOTSTRAP_SERVERSdefaults tolocalhost:9092KAFKA_TOPICdefaults tosample.ordersKAFKA_CONSUMER_GROUPdefaults tosample.orders.consumerKAFKA_DEAD_LETTER_TOPICdefaults to<topic>.dlq
The sample app:
- creates the sample topic and dead-letter topic if they do not exist
- starts the adapter consumer hosted service
- publishes one
OrderSubmittedmessage - logs the consumed message and stops the host
NuGet Packaging
The adapter library can be packed from src/DotNetKafkaAdapter.
The project currently packs with:
- package ID
DotNetKafkaAdapter - default version
0.1.0 - the repository
README.mdincluded as the NuGet package readme
Create a package locally with:
dotnet pack src/DotNetKafkaAdapter/DotNetKafkaAdapter.csproj -c Release -o artifacts/packages
To set an explicit package version at pack time:
dotnet pack src/DotNetKafkaAdapter/DotNetKafkaAdapter.csproj -c Release -o artifacts/packages /p:Version=0.1.0
The generated .nupkg file will be written to artifacts/packages.
The package metadata is defined in DotNetKafkaAdapter.csproj. If you are preparing a public release, you will likely still want to set or refine:
VersionAuthors- release automation and publishing workflow
Push a built package with:
dotnet nuget push artifacts/packages/DotNetKafkaAdapter.<version>.nupkg --source https://api.nuget.org/v3/index.json --api-key <your-api-key>
If you want symbol packages as well, include them when packing:
dotnet pack src/DotNetKafkaAdapter/DotNetKafkaAdapter.csproj -c Release -o artifacts/packages /p:IncludeSymbols=true /p:SymbolPackageFormat=snupkg
GitHub CI
The repository includes a CI workflow at .github/workflows/ci.yml.
It runs on pushes to main or master and on pull requests. It will:
- restore and build the solution
- start the plaintext local Kafka Docker stack
- run the integration test suite
This keeps routine validation separate from the release pipeline.
The GitHub CI workflow sets DOTNET_KAFKA_ADAPTER_SKIP_TLS_TESTS=true, so the TLS-specific integration test is excluded there.
GitHub Release Automation
The repository includes a release workflow at .github/workflows/release.yml.
It will:
- restore, build, and run the integration test suite
- start the plaintext local Kafka stack in Docker
- pack
.nupkgand.snupkgartifacts - create a GitHub Release for the version tag
- push the package to NuGet when the
NUGET_API_KEYsecret is configured in thereleaseGitHub Environment
The release workflow is intentionally separate from CI so package publication only happens from explicit release events.
The GitHub release workflow also sets DOTNET_KAFKA_ADAPTER_SKIP_TLS_TESTS=true, so TLS integration remains available locally but is excluded from GitHub-hosted runs.
Triggering A Release
The release workflow runs automatically when you push a tag that starts with v, for example:
git tag v0.1.0
git push origin v0.1.0
You can also run it manually from GitHub Actions with:
versionpublish_to_nuget
Required GitHub Environment Secret
The release workflow targets the release GitHub Environment. To publish to NuGet from GitHub Actions:
- Create a GitHub Environment named
release - Add an environment secret named
NUGET_API_KEY
If that secret is not present, the workflow will still build, test, pack, and create the GitHub Release, but it will skip the NuGet push step.
Usage
Typical registration in an application:
using DotNetKafkaAdapter.Abstractions;
using Microsoft.Extensions.DependencyInjection;
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "localhost:9092";
options.ClientId = "my-app";
options.Producer.DefaultTopic = "orders";
});
services.AddKafkaHandler<OrderSubmitted, OrderSubmittedHandler>(
topic: "orders",
consumerGroup: "orders-consumer",
options =>
{
options.MaxRetryAttempts = 3;
options.RetryDelay = TimeSpan.FromSeconds(1);
options.DeadLetterTopic = "orders.dlq";
});
If you prefer to build options up front instead of registering handlers through DI extensions:
var kafkaOptions = new KafkaAdapterOptions
{
BootstrapServers = "localhost:9092",
ClientId = "my-app"
};
kafkaOptions.Producer.DefaultTopic = "orders";
kafkaOptions.AddConsumer<OrderSubmitted, OrderSubmittedHandler>(
topic: "orders",
consumerGroup: "orders-consumer",
configure: options =>
{
options.MaxRetryAttempts = 3;
options.RetryDelay = TimeSpan.FromSeconds(1);
options.DeadLetterTopic = "orders.dlq";
});
services.AddKafkaAdapter(kafkaOptions);
services.AddScoped<OrderSubmittedHandler>();
Publish a message:
await publisher.PublishAsync(
"orders",
new OrderSubmitted("order-123", "customer-42", 42.50m),
new PublishOptions
{
Key = "order-123",
MessageId = "order-123"
},
cancellationToken);
Handle a consumed message:
public sealed class OrderSubmittedHandler : IMessageHandler<OrderSubmitted>
{
public Task HandleAsync(
MessageContext context,
OrderSubmitted message,
CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
}
Failure Semantics
Current consumer behavior:
- If a handler succeeds and
AutoCommitisfalse, the adapter commits the Kafka offset after handling. - If deserialization fails, the adapter does not retry in-process. It sends the message to the dead-letter topic if one is configured; otherwise that consumer loop stops.
- If the handler throws, the adapter retries in-process up to
MaxRetryAttemptswith exponential backoff based onRetryDelay. - If all retries fail and a dead-letter topic is configured, the adapter publishes a
KafkaDeadLetterMessageand then commits the original offset. - If all retries fail and no dead-letter topic is configured, that consumer loop stops without committing the failed message.
What this does not guarantee:
- Exactly-once delivery
- Global ordering beyond Kafka partition semantics
- Distributed retries across processes
- Automatic replay or re-drive of dead-lettered messages
Authentication
Local development usually uses plaintext:
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "localhost:9092";
options.ClientId = "my-app";
});
For SASL/PLAIN over TLS:
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "your-broker:9093";
options.ClientId = "my-app";
options.Security.Protocol = KafkaSecurityProtocol.SaslSsl;
options.Security.SaslMechanism = KafkaSaslMechanism.Plain;
options.Security.Username = "my-username";
options.Security.Password = "my-password";
});
For SASL/SCRAM over TLS:
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "your-broker:9093";
options.ClientId = "my-app";
options.Security.Protocol = KafkaSecurityProtocol.SaslSsl;
options.Security.SaslMechanism = KafkaSaslMechanism.ScramSha512;
options.Security.Username = "my-username";
options.Security.Password = "my-password";
});
For TLS server verification with a custom CA bundle:
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "your-broker:9093";
options.ClientId = "my-app";
options.Security.Protocol = KafkaSecurityProtocol.Ssl;
options.Security.SslCaLocation = "/etc/certs/ca.pem";
options.Security.EnableSslCertificateVerification = true;
options.Security.SslEndpointIdentificationAlgorithm = KafkaSslEndpointIdentificationAlgorithm.Https;
});
For mutual TLS with a client certificate and private key:
services.AddKafkaAdapter(options =>
{
options.BootstrapServers = "your-broker:9093";
options.ClientId = "my-app";
options.Security.Protocol = KafkaSecurityProtocol.Ssl;
options.Security.SslCaLocation = "/etc/certs/ca.pem";
options.Security.SslCertificateLocation = "/etc/certs/client.pem";
options.Security.SslKeyLocation = "/etc/certs/client.key";
options.Security.SslKeyPassword = "optional-key-password";
});
Optional TLS settings currently supported:
SslCaLocationSslCaPemSslCaCertificateStoresSslCertificateLocationSslCertificatePemSslKeyLocationSslKeyPemSslKeyPasswordSslKeystoreLocationSslKeystorePasswordEnableSslCertificateVerificationSslEndpointIdentificationAlgorithm
Production Notes
- Keep
BootstrapServers, usernames, and passwords in configuration or secret storage rather than source code. - Keep certificate file paths, PEM content, and key passwords in configuration or secret storage rather than source code.
- Use
KafkaSecurityProtocol.SaslSslorKafkaSecurityProtocol.Sslfor remote brokers unless you explicitly control a trusted plaintext network. - Use the TLS settings only when your broker protocol is
SslorSaslSsl; they remain optional and are ignored for plaintext configurations. - Decide whether stopping a consumer loop on an unrecoverable failure is acceptable for your service model before using the current defaults in production.
- Provision dead-letter topics explicitly and monitor them; the adapter publishes DLQ messages but does not re-drive them.
- Integration tests in this repo validate the local broker path, not a production cluster topology.
Monitoring And Operations
For production operation, monitor the adapter at both the service and Kafka level.
Recommended service-level signals:
- publish failures from
KafkaMessagePublisher - consumer loop failures from
KafkaConsumerHostedService - retry attempt counts
- dead-letter publish counts
- dead-letter topic growth over time
- handler processing latency
- consumer lag for each topic and consumer group
Recommended operational alerts:
- any consumer loop stops unexpectedly
- dead-letter traffic exceeds a normal baseline
- consumer lag continues to grow for a sustained window
- repeated deserialization failures for the same topic
- repeated handler failures for the same message type or handler
Operationally, the most important current distinction is:
- deserialization failures are not retried in-process
- handler failures are retried in-process and may be dead-lettered after retries are exhausted
That means deserialization failures usually point to a producer-contract mismatch, while handler failures usually point to business logic or downstream dependency issues.
Dead-Letter Reprocessing
The adapter publishes a KafkaDeadLetterMessage payload to the configured dead-letter topic. The message includes:
- original topic
- consumer group
- original key and message ID
- serialized payload
- headers
- partition, offset, and timestamp
- message type and handler type
- failure stage, error message, exception type, and attempt count
Use that information to separate recovery paths:
- if
FailureStageisdeserialization, fix the producer or schema/contract mismatch before replaying - if
FailureStageishandler, fix the handler or downstream dependency before replaying - if the payload is irrecoverable, archive it and do not replay it blindly
For replay, a safe default process is:
- identify the root cause and deploy the fix first
- inspect the DLQ payloads and filter to only the recoverable subset
- republish those messages to the original topic or a dedicated retry topic
- monitor consumer lag, retry counts, and DLQ volume during replay
- keep replay idempotent at the handler level, because the adapter does not guarantee exactly-once handling
Avoid wiring automatic DLQ re-drive back into the same consumer path until you have explicit idempotency and replay controls in place. Otherwise a poison message can loop between the primary topic and DLQ.
Replay Strategy
For low-volume recovery, manual replay with an operator-reviewed tool or script is the safest option.
For higher-volume recovery, prefer a separate replay worker that:
- reads from the DLQ topic
- validates message age, failure stage, and replay eligibility
- republishes at a controlled rate
- tags replayed messages with a header or message attribute if you add that behavior later
- writes an audit log of what was replayed and why
The current adapter intentionally does not include built-in replay automation. That keeps the core library small and avoids baking operational policy into the transport layer.
API Stability
The adapter now validates configuration at startup through the .NET options pipeline, so invalid broker, security, producer, or consumer settings fail fast during host startup instead of surfacing later inside background tasks.
Current stability guardrails:
BootstrapServersmust be configured- SASL username and password are required for SASL protocols
- consumer topic, consumer group, message type, and handler type must be configured
- retry counts must be zero or greater
- configured topic values cannot be whitespace
The intent is to keep the public surface small and predictable while catching invalid registrations before runtime traffic begins.
Metrics
The adapter emits built-in metrics through the .NET Meter named DotNetKafkaAdapter. The stable names live in KafkaAdapterInstrumentation.cs.
Current instruments include:
dotnet_kafka_adapter.messages.publisheddotnet_kafka_adapter.publish.failuresdotnet_kafka_adapter.publish.durationdotnet_kafka_adapter.messages.handleddotnet_kafka_adapter.consume.failuresdotnet_kafka_adapter.deserialization.failuresdotnet_kafka_adapter.handler.failuresdotnet_kafka_adapter.retry.attemptsdotnet_kafka_adapter.dead_letter.publisheddotnet_kafka_adapter.offset_commit.failuresdotnet_kafka_adapter.consumers.active
The metrics use lightweight tags such as:
topicconsumer_grouphandlerdead_letter_topicstage
If you are using OpenTelemetry in an application, add the meter with:
builder.Services.AddOpenTelemetry()
.WithMetrics(metrics => metrics.AddMeter("DotNetKafkaAdapter"));
Next Step
Decide how far to take production hardening, especially packaging and advanced broker security support.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Confluent.Kafka (>= 2.13.2)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.3)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.3)
- Microsoft.Extensions.Options (>= 10.0.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Initial release with typed producer and consumer adapters, DI registration, retry and DLQ support, local Kafka development setup, and plaintext plus TLS integration tests.