IronAlpine.EventBus.Kafka 2.3.0

dotnet add package IronAlpine.EventBus.Kafka --version 2.3.0
                    
NuGet\Install-Package IronAlpine.EventBus.Kafka -Version 2.3.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="IronAlpine.EventBus.Kafka" Version="2.3.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="IronAlpine.EventBus.Kafka" Version="2.3.0" />
                    
Directory.Packages.props
<PackageReference Include="IronAlpine.EventBus.Kafka" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add IronAlpine.EventBus.Kafka --version 2.3.0
                    
#r "nuget: IronAlpine.EventBus.Kafka, 2.3.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package IronAlpine.EventBus.Kafka@2.3.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=IronAlpine.EventBus.Kafka&version=2.3.0
                    
Install as a Cake Addin
#tool nuget:?package=IronAlpine.EventBus.Kafka&version=2.3.0
                    
Install as a Cake Tool

IronAlpine.EventBus.Kafka

IronAlpine.EventBus.Kafka is a deterministic, inbox-first/outbox-first Kafka transport package built for the IronAlpine ecosystem.

The package is implemented under the IronAlpine.Eventbus namespace and is designed to work as a transport/runtime layer, while application behavior stays in your mediator notification handlers.

Package Role in IronAlpine

This package is intentionally focused and works in a symmetric relationship with:

  • IronAlpine.Mediator.Abstractions
    • Incoming events are dispatched through IMediator.Publish(object).
    • No custom event-handler abstraction is required in EventBus.
  • IronAlpine.Data.EFCore.Modeling
    • EventBus schema is integrated by contributor pattern (IModelContributor).
    • EventBus tables are attached to service DbContexts without direct hard coupling.

Deterministic Startup Model

Topology registration is explicit and sealed:

  1. Register publish/consume routes via IEventTopologyBuilder.
  2. Call Seal() internally during startup to freeze registry shape.
  3. Workers run against the sealed registry snapshot.

This removes runtime drift caused by late/dynamic discovery and enforces fail-fast startup behavior.

Explicit topology registration (no reflection helpers)

Service code should declare routes explicitly with generic methods:

topology.Publish<UserCreatedIntegrationEvent>(IdentityTopics.UserLifecycle);
topology.Publish<UserUpdatedIntegrationEvent>(IdentityTopics.UserLifecycle);
topology.Consume<CityCreatedIntegrationEvent>(CommonDataTopics.CityLifecycle);
topology.Consume<CityUpdatedIntegrationEvent>(CommonDataTopics.CityLifecycle);

Do not implement helpers like PublishByType, ConsumeByType, ResolveTopic, ResolveGenericMethod in service projects.

For global libraries, IEventTopologyTypeBuilder is available for Type-based declarations, but service code should prefer explicit generic registration.

Processing Architecture

Inbox-First (Consumer Side)

KafkaInboxWriterHostedService:

  1. Consumes message from Kafka.
  2. Normalizes headers + event type.
  3. Writes to InboxMessages.
  4. Commits Kafka offset only after successful inbox insert.

InboxProcessorHostedService:

  1. Claims pending inbox rows with lock window.
  2. Validates event type consistency (header event-type vs payload eventType).
  3. Deserializes to the topology-registered CLR type.
  4. Dispatches through IMediator.Publish(object).
  5. Marks processed or schedules retry / deadletter.

Outbox Pattern (Producer Side)

OutboxEventBus:

  • IEventBus.PublishAsync(...) writes event to OutboxMessages.

KafkaOutboxPublisherHostedService:

  1. Claims pending outbox rows.
  2. Builds Kafka message + headers.
  3. Publishes to Kafka.
  4. Marks row as processed when publish succeeds.
  5. Schedules retry or terminal deadletter on failures.

Ready-Aware Unknown and Mismatch-Proof Rules

  • TYPE_MISMATCH is terminal:
    • If header type and payload type differ, message is deadlettered immediately.
    • LastErrorDetail stores explicit proof (Header: [A], Payload: [B]).
  • Unknown type is ready-aware:
    • RegistryReady=false: retry/backoff (startup tolerance).
    • RegistryReady=true: terminal TYPE_NOT_REGISTERED.

Reason Codes

ReasonCode Meaning Typical Action
TYPE_NOT_REGISTERED Event type is not in explicit consume topology Fix topology registration or event contract
TYPE_MISMATCH Header type and payload type do not match Fix producer envelope contract
DESERIALIZATION_ERROR Payload cannot deserialize to registered CLR type Fix payload schema/version compatibility
TRANSPORT_NON_RETRIABLE Broker rejected message permanently (size/auth/invalid request) Deadletter and inspect transport constraints
RETRY_EXHAUSTED Retry policy reached terminal limit Investigate recurring transient failures

LastErrorDetail is designed as forensic evidence for root-cause analysis.

Configuration

Example (appsettings.json) with flat keys only:

{
  "IronAlpine": {
    "EventBus": {
      "Kafka": {
        "Enabled": true,
        "BootstrapServers": "kafka:9092",
        "ConsumerGroupId": "ia-eventbus-inbox",
        "ClientId": "ia-eventbus-kafka",
        "SecurityProtocol": "SaslSsl",
        "SaslMechanism": "Plain",
        "SaslUsername": "app-user",
        "SaslPassword": "secret",
        "SslCaLocation": "/etc/ssl/certs/kafka/ca.crt",
        "SslEndpointIdentificationAlgorithm": "None",
        "EnableAutoCommit": false,
        "EnableAutoOffsetStore": false,
        "AutoOffsetReset": "Earliest",
        "Provisioning": {
          "Enabled": true,
          "DefaultPartitions": 3,
          "DefaultReplicationFactor": 1,
          "MetadataWaitTimeoutSeconds": 20,
          "MetadataPollIntervalMilliseconds": 500
        },
        "Writer": {
          "Enabled": true,
          "StartupDelaySeconds": 5,
          "PollTimeoutMilliseconds": 1000,
          "IdleDelayMilliseconds": 100,
          "WorkerErrorDelaySeconds": 2
        },
        "Processor": {
          "Enabled": true,
          "BatchSize": 64,
          "LockSeconds": 30,
          "MaxRetryCount": 100,
          "FastRetryCount": 3,
          "FastRetryDelaySeconds": 5,
          "BackoffBaseDelaySeconds": 15,
          "MaxRetryDelaySeconds": 500
        },
        "Outbox": {
          "Enabled": true,
          "BatchSize": 64,
          "LockSeconds": 30,
          "MaxRetryCount": 50,
          "FastRetryCount": 3,
          "FastRetryDelaySeconds": 5,
          "BackoffBaseDelaySeconds": 15,
          "MaxRetryDelaySeconds": 500,
          "MaxMessageBytes": 950000
        },
        "Cleanup": {
          "Enabled": true,
          "ProcessedRetentionDays": 14,
          "DeadLetterRetentionDays": 60,
          "ReplayAuditRetentionDays": 180,
          "IntervalHours": 24,
          "BatchSize": 1000
        }
      }
    }
  }
}

Legacy nested keys under IronAlpine:EventBus:Kafka:Kafka:* are rejected at startup (fail-fast).

If you connect to Strimzi Kafka from OpenShift or Kubernetes through an external Route, set SslEndpointIdentificationAlgorithm to None. This avoids hostname mismatch failures caused by route hostnames not matching the broker certificate SAN/CN. In environments where broker hostnames and certificates match, prefer Https.

Fail-Fast Rules

Startup stops immediately in these cases:

  • Legacy nested config is used: IronAlpine:EventBus:Kafka:Kafka:*.
  • SslEndpointIdentificationAlgorithm is set to an unsupported value.
  • SecurityProtocol=SSL or SecurityProtocol=SaslSsl and SslCaLocation is missing.
  • Same event type is mapped to different topics.
  • Any route is registered with empty/invalid topic.
  • Consume route exists while Writer.Enabled=false or Processor.Enabled=false.
  • Publish route exists while Outbox.Enabled=false.
  • Topic metadata drift detected during provisioning (partition/replication mismatch).

Health and Drift Diagnostics

Startup emits a package stamp:

  • [IA EventBus] Initialized | Version: <version> | AssemblyHash: <hash>

IEventBusHealthState also exposes:

  • version
  • assemblyHash
  • topology/transport readiness
  • worker heartbeat timestamps
  • latest error

Health check (ia-eventbus-ready) uses this state to identify runtime drift and transport outages.

Logging Strategy

Information

  • Package initialization (Version, AssemblyHash).
  • Kafka subscription established.
  • Worker startup (InboxWriter, InboxProcessor, OutboxPublisher).

Warning / Error

  • Deadletter decisions (with ReasonCode).
  • Unknown type decisions and retry schedule details.
  • Type mismatch and deserialization failures.
  • Kafka transport/produce/consume critical failures.

Debug

  • Successful inbox write commit.
  • Successful inbox processing dispatch.
  • Successful outbox publish.

This level split keeps production logs actionable while preserving deep traceability in debug sessions.

Data Model Summary

The package manages:

  • InboxMessages
  • OutboxMessages
  • InboxDeadLetters
  • OutboxDeadLetters
  • EventReplayOperations

All entities include structured retry/error fields and are mapped through the contributor model.

Replay and Retention

  • Replay supports dry-run and audited requeue operations.
  • Cleanup enforces retention windows with a built-in minimum retention safety rule.

OpenTelemetry Hooks

Worker paths emit internal activities via ActivitySource("IronAlpine.Eventbus"):

  • eventbus.inbox.write
  • eventbus.inbox.process
  • eventbus.outbox.publish

These activities can be collected by your existing OpenTelemetry pipeline.

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.3.0 480 5/5/2026
2.2.0 314 4/20/2026
2.1.0 357 4/9/2026
2.0.9 382 4/6/2026
2.0.8 107 4/6/2026
2.0.7 318 4/4/2026

Stable mediator release with request/response, notification publish strategies, streaming, and dependency injection integration.