IronAlpine.EventBus.Kafka
2.3.0
dotnet add package IronAlpine.EventBus.Kafka --version 2.3.0
NuGet\Install-Package IronAlpine.EventBus.Kafka -Version 2.3.0
<PackageReference Include="IronAlpine.EventBus.Kafka" Version="2.3.0" />
<PackageVersion Include="IronAlpine.EventBus.Kafka" Version="2.3.0" />
<PackageReference Include="IronAlpine.EventBus.Kafka" />
paket add IronAlpine.EventBus.Kafka --version 2.3.0
#r "nuget: IronAlpine.EventBus.Kafka, 2.3.0"
#:package IronAlpine.EventBus.Kafka@2.3.0
#addin nuget:?package=IronAlpine.EventBus.Kafka&version=2.3.0
#tool nuget:?package=IronAlpine.EventBus.Kafka&version=2.3.0
IronAlpine.EventBus.Kafka
IronAlpine.EventBus.Kafka is a deterministic, inbox-first/outbox-first Kafka transport package built for the IronAlpine ecosystem.
The package is implemented under the IronAlpine.Eventbus namespace and is designed to work as a transport/runtime layer, while application behavior stays in your mediator notification handlers.
Package Role in IronAlpine
This package is intentionally focused and works in a symmetric relationship with:
IronAlpine.Mediator.Abstractions- Incoming events are dispatched through
IMediator.Publish(object). - No custom event-handler abstraction is required in EventBus.
- Incoming events are dispatched through
IronAlpine.Data.EFCore.Modeling- EventBus schema is integrated by contributor pattern (
IModelContributor). - EventBus tables are attached to service DbContexts without direct hard coupling.
- EventBus schema is integrated by contributor pattern (
Deterministic Startup Model
Topology registration is explicit and sealed:
- Register publish/consume routes via
IEventTopologyBuilder. - Call
Seal()internally during startup to freeze registry shape. - Workers run against the sealed registry snapshot.
This removes runtime drift caused by late/dynamic discovery and enforces fail-fast startup behavior.
Explicit topology registration (no reflection helpers)
Service code should declare routes explicitly with generic methods:
topology.Publish<UserCreatedIntegrationEvent>(IdentityTopics.UserLifecycle);
topology.Publish<UserUpdatedIntegrationEvent>(IdentityTopics.UserLifecycle);
topology.Consume<CityCreatedIntegrationEvent>(CommonDataTopics.CityLifecycle);
topology.Consume<CityUpdatedIntegrationEvent>(CommonDataTopics.CityLifecycle);
Do not implement helpers like PublishByType, ConsumeByType, ResolveTopic, ResolveGenericMethod in service projects.
For global libraries, IEventTopologyTypeBuilder is available for Type-based declarations, but service code should prefer explicit generic registration.
Processing Architecture
Inbox-First (Consumer Side)
KafkaInboxWriterHostedService:
- Consumes message from Kafka.
- Normalizes headers + event type.
- Writes to
InboxMessages. - Commits Kafka offset only after successful inbox insert.
InboxProcessorHostedService:
- Claims pending inbox rows with lock window.
- Validates event type consistency (
header event-typevspayload eventType). - Deserializes to the topology-registered CLR type.
- Dispatches through
IMediator.Publish(object). - Marks processed or schedules retry / deadletter.
Outbox Pattern (Producer Side)
OutboxEventBus:
IEventBus.PublishAsync(...)writes event toOutboxMessages.
KafkaOutboxPublisherHostedService:
- Claims pending outbox rows.
- Builds Kafka message + headers.
- Publishes to Kafka.
- Marks row as processed when publish succeeds.
- Schedules retry or terminal deadletter on failures.
Ready-Aware Unknown and Mismatch-Proof Rules
TYPE_MISMATCHis terminal:- If header type and payload type differ, message is deadlettered immediately.
LastErrorDetailstores explicit proof (Header: [A], Payload: [B]).
- Unknown type is ready-aware:
RegistryReady=false: retry/backoff (startup tolerance).RegistryReady=true: terminalTYPE_NOT_REGISTERED.
Reason Codes
| ReasonCode | Meaning | Typical Action |
|---|---|---|
TYPE_NOT_REGISTERED |
Event type is not in explicit consume topology | Fix topology registration or event contract |
TYPE_MISMATCH |
Header type and payload type do not match | Fix producer envelope contract |
DESERIALIZATION_ERROR |
Payload cannot deserialize to registered CLR type | Fix payload schema/version compatibility |
TRANSPORT_NON_RETRIABLE |
Broker rejected message permanently (size/auth/invalid request) | Deadletter and inspect transport constraints |
RETRY_EXHAUSTED |
Retry policy reached terminal limit | Investigate recurring transient failures |
LastErrorDetail is designed as forensic evidence for root-cause analysis.
Configuration
Example (appsettings.json) with flat keys only:
{
"IronAlpine": {
"EventBus": {
"Kafka": {
"Enabled": true,
"BootstrapServers": "kafka:9092",
"ConsumerGroupId": "ia-eventbus-inbox",
"ClientId": "ia-eventbus-kafka",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "Plain",
"SaslUsername": "app-user",
"SaslPassword": "secret",
"SslCaLocation": "/etc/ssl/certs/kafka/ca.crt",
"SslEndpointIdentificationAlgorithm": "None",
"EnableAutoCommit": false,
"EnableAutoOffsetStore": false,
"AutoOffsetReset": "Earliest",
"Provisioning": {
"Enabled": true,
"DefaultPartitions": 3,
"DefaultReplicationFactor": 1,
"MetadataWaitTimeoutSeconds": 20,
"MetadataPollIntervalMilliseconds": 500
},
"Writer": {
"Enabled": true,
"StartupDelaySeconds": 5,
"PollTimeoutMilliseconds": 1000,
"IdleDelayMilliseconds": 100,
"WorkerErrorDelaySeconds": 2
},
"Processor": {
"Enabled": true,
"BatchSize": 64,
"LockSeconds": 30,
"MaxRetryCount": 100,
"FastRetryCount": 3,
"FastRetryDelaySeconds": 5,
"BackoffBaseDelaySeconds": 15,
"MaxRetryDelaySeconds": 500
},
"Outbox": {
"Enabled": true,
"BatchSize": 64,
"LockSeconds": 30,
"MaxRetryCount": 50,
"FastRetryCount": 3,
"FastRetryDelaySeconds": 5,
"BackoffBaseDelaySeconds": 15,
"MaxRetryDelaySeconds": 500,
"MaxMessageBytes": 950000
},
"Cleanup": {
"Enabled": true,
"ProcessedRetentionDays": 14,
"DeadLetterRetentionDays": 60,
"ReplayAuditRetentionDays": 180,
"IntervalHours": 24,
"BatchSize": 1000
}
}
}
}
}
Legacy nested keys under IronAlpine:EventBus:Kafka:Kafka:* are rejected at startup (fail-fast).
If you connect to Strimzi Kafka from OpenShift or Kubernetes through an external Route, set
SslEndpointIdentificationAlgorithm to None. This avoids hostname mismatch failures caused by
route hostnames not matching the broker certificate SAN/CN. In environments where broker hostnames
and certificates match, prefer Https.
Fail-Fast Rules
Startup stops immediately in these cases:
- Legacy nested config is used:
IronAlpine:EventBus:Kafka:Kafka:*. SslEndpointIdentificationAlgorithmis set to an unsupported value.SecurityProtocol=SSLorSecurityProtocol=SaslSslandSslCaLocationis missing.- Same event type is mapped to different topics.
- Any route is registered with empty/invalid topic.
- Consume route exists while
Writer.Enabled=falseorProcessor.Enabled=false. - Publish route exists while
Outbox.Enabled=false. - Topic metadata drift detected during provisioning (partition/replication mismatch).
Health and Drift Diagnostics
Startup emits a package stamp:
[IA EventBus] Initialized | Version: <version> | AssemblyHash: <hash>
IEventBusHealthState also exposes:
versionassemblyHash- topology/transport readiness
- worker heartbeat timestamps
- latest error
Health check (ia-eventbus-ready) uses this state to identify runtime drift and transport outages.
Logging Strategy
Information
- Package initialization (
Version,AssemblyHash). - Kafka subscription established.
- Worker startup (
InboxWriter,InboxProcessor,OutboxPublisher).
Warning / Error
- Deadletter decisions (with
ReasonCode). - Unknown type decisions and retry schedule details.
- Type mismatch and deserialization failures.
- Kafka transport/produce/consume critical failures.
Debug
- Successful inbox write commit.
- Successful inbox processing dispatch.
- Successful outbox publish.
This level split keeps production logs actionable while preserving deep traceability in debug sessions.
Data Model Summary
The package manages:
InboxMessagesOutboxMessagesInboxDeadLettersOutboxDeadLettersEventReplayOperations
All entities include structured retry/error fields and are mapped through the contributor model.
Replay and Retention
- Replay supports dry-run and audited requeue operations.
- Cleanup enforces retention windows with a built-in minimum retention safety rule.
OpenTelemetry Hooks
Worker paths emit internal activities via ActivitySource("IronAlpine.Eventbus"):
eventbus.inbox.writeeventbus.inbox.processeventbus.outbox.publish
These activities can be collected by your existing OpenTelemetry pipeline.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Confluent.Kafka (>= 2.12.0)
- IronAlpine.Data.EFCore.Modeling (>= 2.1.2)
- IronAlpine.Eventbus.Contracts (>= 2.2.0)
- IronAlpine.Mediator.Abstractions (>= 2.0.0)
- Microsoft.EntityFrameworkCore (>= 9.0.7)
- Microsoft.EntityFrameworkCore.Relational (>= 9.0.7)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 9.0.7)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 9.0.7)
- Microsoft.Extensions.Hosting.Abstractions (>= 9.0.7)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.7)
- Microsoft.Extensions.Options (>= 9.0.7)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 9.0.7)
-
net9.0
- Confluent.Kafka (>= 2.12.0)
- IronAlpine.Data.EFCore.Modeling (>= 2.1.2)
- IronAlpine.Eventbus.Contracts (>= 2.2.0)
- IronAlpine.Mediator.Abstractions (>= 2.0.0)
- Microsoft.EntityFrameworkCore (>= 9.0.7)
- Microsoft.EntityFrameworkCore.Relational (>= 9.0.7)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 9.0.7)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 9.0.7)
- Microsoft.Extensions.Hosting.Abstractions (>= 9.0.7)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.7)
- Microsoft.Extensions.Options (>= 9.0.7)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 9.0.7)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Stable mediator release with request/response, notification publish strategies, streaming, and dependency injection integration.