Kafka.Context
1.2.1
dotnet add package Kafka.Context --version 1.2.1
NuGet\Install-Package Kafka.Context -Version 1.2.1
<PackageReference Include="Kafka.Context" Version="1.2.1" />
<PackageVersion Include="Kafka.Context" Version="1.2.1" />
<PackageReference Include="Kafka.Context" />
paket add Kafka.Context --version 1.2.1
#r "nuget: Kafka.Context, 1.2.1"
#:package Kafka.Context@1.2.1
#addin nuget:?package=Kafka.Context&version=1.2.1
#tool nuget:?package=Kafka.Context&version=1.2.1
Kafka.Context
A lightweight, opinionated Kafka + Schema Registry runtime with a context-centric, contract-first workflow (Avro).
Source & docs: https://github.com/synthaicode/Kafka.Context appsettings.json guide (EN): https://github.com/synthaicode/Kafka.Context/blob/main/docs/contracts/appsettings.en.md
This API is intentionally small.
- Produce / Consume only
- Explicit failure handling via DLQ
- Avro + Schema Registry are used to keep schemas reusable as contracts for downstream systems (e.g., Flink)
Single-column key is treated as Kafka primitive and is not registered to SR. Only composite keys are treated as schema contracts.
Non-Goals
- No stream processing engine (Kafka Streams/Flink-style processing is out of scope)
- No ksqlDB query generation / DSL
- No general-purpose Kafka client wrapper (this package stays opinionated and small)
- No automatic schema evolution/migration tooling beyond (register/verify) during provisioning
- No “external Schema Registry schema → POCO” mapping layer (consume assumes the Avro contract matches your POCO)
Schema Scaffold / Verify (CLI, optional)
Schema Registry fingerprint verification is intended to run in CI/dev via a separate dotnet tool:
dotnet tool install -g Kafka.Context.Cli
Example (fail-fast before deploy):
kafka-context schema verify --sr-url http://127.0.0.1:18081 --subject <topic>-value --type "<Namespace>.<TypeName>, <AssemblyName>"
See https://www.nuget.org/packages/Kafka.Context.Cli and docs/schema-scaffold-requirements.md.
Install
dotnet add package Kafka.Context
Minimal configuration
appsettings.json:
{
"KsqlDsl": {
"Common": { "BootstrapServers": "127.0.0.1:39092", "ClientId": "my-app" },
"SchemaRegistry": { "Url": "http://127.0.0.1:18081" },
"DlqTopicName": "dead_letter_queue"
}
}
EF-style quick start (KafkaContext as DbContext)
using Kafka.Context;
using Kafka.Context.Abstractions;
using Kafka.Context.Attributes;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
[KafkaTopic("orders")]
public sealed class Order
{
public int Id { get; set; }
}
// EF-like: KafkaContext ~= DbContext, EventSet<T> ~= DbSet<T>
public sealed class AppKafkaContext : KafkaContext
{
public AppKafkaContext(IConfiguration configuration, ILoggerFactory loggerFactory)
: base(configuration, loggerFactory) { }
public EventSet<Order> Orders { get; set; } = null!;
protected override void OnModelCreating(IModelBuilder modelBuilder)
=> modelBuilder.Entity<Order>();
}
var config = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build();
using var loggerFactory = LoggerFactory.Create(b => b.AddConsole());
await using var ctx = new AppKafkaContext(config, loggerFactory);
// Create/verify topics + register/verify Schema Registry subjects (fail-fast).
await ctx.ProvisionAsync();
// Produce (EF-ish: AddAsync).
await ctx.Orders.AddAsync(new Order { Id = 1 });
// Optional: produce with Kafka headers.
await ctx.Orders.AddAsync(new Order { Id = 2 }, new Dictionary<string, string> { ["traceId"] = "abc" });
// Consume (EF-ish: ForEachAsync).
await ctx.Orders.ForEachAsync(o =>
{
Console.WriteLine($"Order: {o.Id}");
return Task.CompletedTask;
});
EF-style mapping tip (KafkaTopic on entity)
KafkaTopicAttribute is the equivalent of mapping an entity to a table name:
[KafkaTopic("orders")]
public sealed class Order { /* ... */ }
Retry / DLQ
await ctx.Orders
.WithRetry(maxRetries: 2, retryInterval: TimeSpan.FromMilliseconds(200))
.OnError(ErrorAction.DLQ)
.ForEachAsync(_ => throw new InvalidOperationException("boom"));
await ctx.Dlq.ForEachAsync((env, headers, meta) =>
{
Console.WriteLine($"DLQ: {env.Topic} {env.ErrorType} {env.ErrorFingerprint}");
return Task.CompletedTask;
});
Per-topic consumer/producer config
You can override Confluent.Kafka settings per topic via:
KsqlDsl.Topics.<topicName>.Consumer.*KsqlDsl.Topics.<topicName>.Producer.*
Preview Avro before Schema Registry registration
var plans = ctx.PreviewSchemaRegistryAvro();
foreach (var p in plans)
Console.WriteLine($"{p.KeySubject} / {p.ValueSubject}");
Console.WriteLine($"{p.ValueSubject} => {p.ExpectedValueRecordFullName}");
AI Assist
If you're unsure how to use this package, run kafka-context ai guide --copy and paste the output into your AI assistant.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Kafka.Context.Abstractions (>= 1.2.1)
- Kafka.Context.Application (>= 1.2.1)
- Kafka.Context.Infrastructure (>= 1.2.1)
- Kafka.Context.Messaging (>= 1.2.1)
- Kafka.Context.Streaming (>= 1.2.1)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.0)
-
net8.0
- Kafka.Context.Abstractions (>= 1.2.1)
- Kafka.Context.Application (>= 1.2.1)
- Kafka.Context.Infrastructure (>= 1.2.1)
- Kafka.Context.Messaging (>= 1.2.1)
- Kafka.Context.Streaming (>= 1.2.1)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.