MessageValidation.Kafka 2.0.0

dotnet add package MessageValidation.Kafka --version 2.0.0
                    
NuGet\Install-Package MessageValidation.Kafka -Version 2.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MessageValidation.Kafka" Version="2.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="MessageValidation.Kafka" Version="2.0.0" />
                    
Directory.Packages.props
<PackageReference Include="MessageValidation.Kafka" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add MessageValidation.Kafka --version 2.0.0
                    
#r "nuget: MessageValidation.Kafka, 2.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package MessageValidation.Kafka@2.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=MessageValidation.Kafka&version=2.0.0
                    
Install as a Cake Addin
#tool nuget:?package=MessageValidation.Kafka&version=2.0.0
                    
Install as a Cake Tool

MessageValidation.Kafka

Confluent Kafka transport adapter for the MessageValidation pipeline — automatically feed incoming Kafka messages into the validation pipeline with a single line of code.

Installation

dotnet add package MessageValidation.Kafka

Quick Start

Option A — Extension method on IConsumer<string, byte[]>

Wire the pipeline directly onto an existing Confluent consumer:

using Confluent.Kafka;
using MessageValidation.Kafka;

var config = new ConsumerConfig
{
    BootstrapServers = "broker.example.com:9092",
    GroupId = "my-service"
};

using var consumer = new ConsumerBuilder<string, byte[]>(config).Build();
var pipeline = serviceProvider.GetRequiredService<IMessageValidationPipeline>();

// Subscribe and start the consume loop — all messages go through the validation pipeline
await consumer.StartConsuming(pipeline, topics: ["sensors.temperature"], ct);

Option B — DI registration

Let the DI container create and configure the consumer automatically:

using MessageValidation;
using MessageValidation.Kafka;

builder.Services.AddMessageValidation(options =>
{
    options.MapSource<TemperatureReading>("sensors.temperature");
    options.DefaultFailureBehavior = FailureBehavior.Log;
});

builder.Services.AddMessageDeserializer<JsonMessageDeserializer>();
builder.Services.AddMessageHandler<TemperatureReading, TemperatureHandler>();

// Registers ConsumerConfig + IConsumer<string, byte[]>
builder.Services.AddKafkaMessageValidation(config =>
{
    config.BootstrapServers = "broker.example.com:9092";
    config.GroupId = "my-service";
});

Then inject IConsumer<string, byte[]> and start consuming in a BackgroundService:

public class KafkaWorker(
    IConsumer<string, byte[]> consumer,
    IMessageValidationPipeline pipeline) : BackgroundService
{
    protected override Task ExecuteAsync(CancellationToken ct) =>
        consumer.StartConsuming(pipeline, topics: ["sensors.temperature"], ct);
}

Kafka Metadata

When the adapter creates a MessageContext, it populates the Metadata dictionary with Kafka-specific properties:

Key Type Description
kafka.topic string Topic the message was consumed from
kafka.partition int Partition number
kafka.offset long Offset within the partition
kafka.key string Message key (empty string if null)
kafka.timestamp DateTime Message timestamp (UTC)

Access them in your handler:

public class TemperatureHandler : IMessageHandler<TemperatureReading>
{
    public Task HandleAsync(
        TemperatureReading message, MessageContext context, CancellationToken ct = default)
    {
        var partition = context.Metadata["kafka.partition"];
        var offset = context.Metadata["kafka.offset"];
        Console.WriteLine($"[partition={partition} offset={offset}] Sensor {message.SensorId}: {message.Value}°C");
        return Task.CompletedTask;
    }
}

API Reference

IConsumer<string, byte[]>.StartConsuming(pipeline, ct)

Starts a background consume loop (via Task.Run) that polls Kafka and passes every message through the pipeline. Exits gracefully when the cancellation token is cancelled. Assumes the consumer is already subscribed.

IConsumer<string, byte[]>.StartConsuming(pipeline, topics, ct)

Calls consumer.Subscribe(topics) then starts the consume loop above.

AddKafkaMessageValidation(Action<ConsumerConfig>)

Registers a singleton ConsumerConfig and a singleton IConsumer<string, byte[]> built from it. The consumer connects lazily on first use.


Avro & Protobuf

This is the main integration challenge when moving beyond JSON.

The adapter feeds the raw byte[] from Kafka directly into IMessageDeserializer. This works seamlessly for JSON. However, Confluent producers using Schema Registry embed a wire-format prefix before the actual encoded payload:

[0x00] [schema ID — 4 bytes, big-endian] [encoded payload]

For Protobuf there is an additional message-index byte (0x00 for the first/only message type):

[0x00] [schema ID — 4 bytes] [0x00 message index] [protobuf bytes]

Your IMessageDeserializer receives these prefixed bytes as-is. The solution is a custom deserializer per format — no change to the adapter is required.


JSON (no challenge)

Raw bytes are UTF-8 JSON — the standard JsonMessageDeserializer works without any changes.

public class JsonMessageDeserializer : IMessageDeserializer
{
    public object Deserialize(byte[] payload, Type targetType) =>
        System.Text.Json.JsonSerializer.Deserialize(payload, targetType)
        ?? throw new InvalidOperationException($"Cannot deserialize to {targetType.Name}");
}

Protobuf — strip the Confluent prefix

The Protobuf schema is baked into the generated class, so no Schema Registry call is needed at runtime. Strip the wire-format prefix and call Google.Protobuf directly:

using Google.Protobuf;

public class ProtobufMessageDeserializer : IMessageDeserializer
{
    // magic(1) + schemaId(4) + messageIndex(1) = 6
    private const int PrefixLength = 6;

    public object Deserialize(byte[] payload, Type targetType)
    {
        var bytes = HasConfluentPrefix(payload) ? payload[PrefixLength..] : payload;

        var parser = (IMessage)Activator.CreateInstance(targetType)!;
        return parser.Descriptor.Parser.ParseFrom(bytes);
    }

    private static bool HasConfluentPrefix(byte[] payload) =>
        payload.Length > PrefixLength && payload[0] == 0x00;
}
builder.Services.AddMessageDeserializer<ProtobufMessageDeserializer>();

Avro — Schema Registry required

The Avro binary format is schema-dependent: the schema is NOT included in the payload. The schema must be fetched by ID from the Schema Registry to decode it. This is the key challenge.

Add the Schema Registry packages:

dotnet add package Confluent.SchemaRegistry
dotnet add package Confluent.SchemaRegistry.Serdes.Avro

Implement a deserializer that wraps Confluent's AvroDeserializer<T>:

using System.Collections.Concurrent;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;

public class AvroMessageDeserializer(ISchemaRegistryClient schemaRegistry) : IMessageDeserializer
{
    // Cache per type — avoids creating a new deserializer (and Schema Registry round-trip) per message
    private readonly ConcurrentDictionary<Type, object> _deserializers = new();

    public object Deserialize(byte[] payload, Type targetType)
    {
        var deserializer = _deserializers.GetOrAdd(targetType, t =>
        {
            var dt = typeof(AvroDeserializer<>).MakeGenericType(t);
            return Activator.CreateInstance(dt, schemaRegistry, (IEnumerable<KeyValuePair<string, string>>?)null)!;
        });

        var method = deserializer.GetType().GetMethod("Deserialize")!;
        return method.Invoke(deserializer, [payload, false, SerializationContext.Empty])!;
    }
}

Register the Schema Registry client and the deserializer:

builder.Services.AddSingleton<ISchemaRegistryClient>(_ =>
    new CachedSchemaRegistryClient(new SchemaRegistryConfig
    {
        Url = "http://localhost:8081"
    }));

builder.Services.AddMessageDeserializer<AvroMessageDeserializer>();

Why cache deserializer instances? AvroDeserializer<T> makes an HTTP call to Schema Registry to resolve schema IDs on first use. Caching it per type ensures each schema is fetched only once for the lifetime of the application.


Requirements

License

MIT

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.0.0 107 4/28/2026
1.1.0 118 4/21/2026
1.0.0 128 4/1/2026