MagicCSharp.Events.Kafka
0.0.13
dotnet add package MagicCSharp.Events.Kafka --version 0.0.13
NuGet\Install-Package MagicCSharp.Events.Kafka -Version 0.0.13
<PackageReference Include="MagicCSharp.Events.Kafka" Version="0.0.13" />
<PackageVersion Include="MagicCSharp.Events.Kafka" Version="0.0.13" />
<PackageReference Include="MagicCSharp.Events.Kafka" />
paket add MagicCSharp.Events.Kafka --version 0.0.13
#r "nuget: MagicCSharp.Events.Kafka, 0.0.13"
#:package MagicCSharp.Events.Kafka@0.0.13
#addin nuget:?package=MagicCSharp.Events.Kafka&version=0.0.13
#tool nuget:?package=MagicCSharp.Events.Kafka&version=0.0.13
MagicCSharp.Events.Kafka
Kafka event dispatching for distributed systems
Process events across multiple services with guaranteed delivery, fault tolerance, and automatic producer/consumer setup. MagicCSharp.Events.Kafka handles all the Kafka complexity so you can focus on your business logic.
Why MagicCSharp.Events.Kafka?
✅ Guaranteed Delivery - Events are persisted to Kafka before returning
✅ Fault Tolerant - Automatic retries and error handling
✅ Manual Commit - Only commit after successful processing
✅ Auto-Configuration - Producer and consumer setup handled for you
✅ Integrated Logging - Kafka logs flow through ILogger
✅ Scalable - Add more consumers to process events faster
Installation
dotnet add package MagicCSharp.Events.Kafka
dotnet add package MagicCSharp.Events
dotnet add package MagicCSharp
Quick Start
1. Configure Kafka
var kafkaConfig = new KafkaMagicEventConfiguration(
BootstrapServers: "localhost:9092",
GroupId: "my-service-group",
Topic: "domain-events"
);
2. Register Kafka Events
// In your Startup.cs or Program.cs
services.RegisterMagicKafkaEvents(kafkaConfig);
// Optional: Enable OpenTelemetry metrics
// services.RegisterMagicKafkaEvents(kafkaConfig, useOpenTelemetryMetrics: true);
3. Dispatch Events
public class OrderService(IEventDispatcher eventDispatcher)
{
public async Task CreateOrder(CreateOrderRequest request)
{
var order = await SaveOrder(request);
// Event is sent to Kafka and returns immediately
eventDispatcher.Dispatch(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount
});
return order;
}
}
4. Events Are Automatically Consumed
The background service automatically consumes events from Kafka and dispatches them to your handlers:
public class SendConfirmationHandler(IEmailService emailService)
: IEventHandler<OrderCreatedEvent>
{
public async Task Handle(OrderCreatedEvent @event)
{
// This executes when the event is consumed from Kafka
await emailService.SendOrderConfirmation(@event.OrderId);
}
}
How It Works
┌─────────────┐ ┌─────────┐ ┌─────────────┐
│ Service A │ Dispatch│ Kafka │ Consume │ Service B │
│ ├────────►│ Topic ├────────►│ │
│ (Producer) │ │ │ │ (Consumer) │
└─────────────┘ └─────────┘ └─────────────┘
│
▼
┌──────────────┐
│Event Handlers│
└──────────────┘
Producer (IEventDispatcher):
- Serializes the event with type information
- Sends to Kafka topic asynchronously
- Returns immediately (fire-and-forget)
Consumer (Background Service):
- Polls Kafka for new messages
- Deserializes events
- Dispatches to
IAsyncEventDispatcher(local async dispatcher) - Executes all registered handlers
- Commits offset only after successful processing
Features
🚀 Automatic Producer/Consumer Setup
No need to configure Kafka manually - everything is set up for you:
services.RegisterMagicKafkaEvents(new KafkaMagicEventConfiguration(
BootstrapServers: "kafka1:9092,kafka2:9092",
GroupId: "order-service",
Topic: "domain-events"
));
What gets configured:
- ✅ Kafka producer with connection pooling
- ✅ Kafka consumer with consumer group
- ✅ Background service for consuming
- ✅ Logging adapters for Kafka logs
- ✅ Event serializer with type information
🔄 Manual Commit for Reliability
Messages are only committed after successful processing:
// Kafka message received
var event = DeserializeEvent(message);
try
{
await eventDispatcher.Dispatch(event); // Process all handlers
consumer.Commit(message); // ✅ Only commit on success
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to process message");
// ❌ Message is NOT committed - will be retried
}
Benefits:
- No data loss if processing fails
- Failed messages are automatically retried
- At-least-once delivery guarantee
📊 Integrated Logging
Kafka logs automatically flow through ILogger:
// Kafka internal logs appear in your logging system
[2024-01-15 10:30:00] [Information] Kafka: Producer connected to localhost:9092
[2024-01-15 10:30:01] [Debug] Kafka: Message delivered to partition 0, offset 12345
[2024-01-15 10:30:02] [Warning] Kafka: Connection to broker temporarily lost
Log Levels:
Critical- Emergency/Alert level issuesError- Kafka errorsWarning- Connection issues, temporary failuresInformation- Connection status, messages deliveredDebug- Detailed Kafka operationsTrace- Very detailed debugging
⚡ High Performance
Producer:
- Async non-blocking sends
- Connection pooling
- Automatic batching
Consumer:
- Long polling for efficiency
- Parallel processing (scale with consumer group)
- Manual offset management
🔧 Custom Kafka Configuration
Need more control? You can customize the Kafka clients:
// After registration, you can access and customize
var producer = serviceProvider.GetRequiredService<IProducer<Null, string>>();
var consumer = serviceProvider.GetRequiredService<IConsumer<Null, string>>();
🎯 Multiple Topics
You can configure different services to use different topics:
// Order Service
services.RegisterMagicKafkaEvents(new KafkaMagicEventConfiguration(
BootstrapServers: "localhost:9092",
GroupId: "order-service",
Topic: "order-events" // Only order events
));
// User Service
services.RegisterMagicKafkaEvents(new KafkaMagicEventConfiguration(
BootstrapServers: "localhost:9092",
GroupId: "user-service",
Topic: "user-events" // Only user events
));
🛡️ Error Handling
The consumer handles errors gracefully:
Parsing Errors:
// Unknown event type - message is skipped and committed
var event = DeserializeMagicEvent(json);
if (event == null)
{
// Event type not registered, ignore it
consumer.Commit(message);
continue;
}
Processing Errors:
// Handler throws exception - message is NOT committed
try
{
await ProcessEvent(event);
consumer.Commit(message); // ✅ Success
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to process");
// ❌ Not committed - will retry on next poll
}
Connection Errors:
try
{
var message = consumer.Consume(cancellationToken);
// ... process
}
catch (Exception ex)
{
logger.LogError(ex, "Kafka error");
await Task.Delay(TimeSpan.FromSeconds(30)); // Wait before retry
}
Custom Kafka Listeners
You can create custom Kafka listeners for non-event use cases:
public class OrderNotificationListener(
IServiceScopeFactory serviceScopeFactory,
ILogger<OrderNotificationListener> logger)
: KafkaListenerBase<OrderNotification>(serviceScopeFactory, logger)
{
protected override string Topic => "order-notifications";
protected override OrderNotification? ParseCallback(string body, CancellationToken cancellationToken)
{
return JsonSerializer.Deserialize<OrderNotification>(body);
}
protected override async Task OnMessage(OrderNotification message, CancellationToken cancellationToken)
{
// Handle the notification
await ProcessNotification(message);
}
}
// Register it
services.AddHostedService<OrderNotificationListener>();
Configuration Examples
Development (Docker Compose)
var kafkaConfig = new KafkaMagicEventConfiguration(
BootstrapServers: "localhost:9092",
GroupId: "dev-service",
Topic: "dev-events"
);
Production (Multiple Brokers)
var kafkaConfig = new KafkaMagicEventConfiguration(
BootstrapServers: "kafka1.prod:9092,kafka2.prod:9092,kafka3.prod:9092",
GroupId: "order-service-prod",
Topic: "production-events"
);
With Configuration Binding
// appsettings.json
{
"Kafka": {
"BootstrapServers": "kafka:9092",
"GroupId": "my-service",
"Topic": "events"
}
}
// Startup.cs
var kafkaConfig = new KafkaMagicEventConfiguration(
BootstrapServers: configuration["Kafka:BootstrapServers"]!,
GroupId: configuration["Kafka:GroupId"]!,
Topic: configuration["Kafka:Topic"]!
);
services.RegisterMagicKafkaEvents(kafkaConfig);
Complete Example
// 1. Define Event
public record OrderCreatedEvent : MagicEvent
{
public long OrderId { get; init; }
public long CustomerId { get; init; }
public decimal TotalAmount { get; init; }
}
// 2. Create Handler (runs when event is consumed)
public class OrderNotificationHandler(
IEmailService emailService,
ILogger<OrderNotificationHandler> logger)
: IEventHandler<OrderCreatedEvent>
{
public async Task Handle(OrderCreatedEvent @event)
{
logger.LogInformation("Sending order confirmation for order {OrderId}", @event.OrderId);
await emailService.SendOrderConfirmation(@event.OrderId);
}
}
// 3. Configure (in Startup.cs)
var kafkaConfig = new KafkaMagicEventConfiguration(
BootstrapServers: "localhost:9092",
GroupId: "order-service",
Topic: "domain-events"
);
services.RegisterMagicKafkaEvents(kafkaConfig);
// 4. Dispatch Events (in your service)
public class OrderService(IEventDispatcher eventDispatcher, IOrderRepository orderRepository)
{
public async Task<Order> CreateOrder(CreateOrderRequest request)
{
var order = await orderRepository.Create(request);
// Send to Kafka
eventDispatcher.Dispatch(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount
});
return order;
}
}
Scaling
Scale Consumers Horizontally:
Run multiple instances with the same GroupId to process events in parallel:
Instance 1: GroupId="order-service" → Processes partition 0
Instance 2: GroupId="order-service" → Processes partition 1
Instance 3: GroupId="order-service" → Processes partition 2
Kafka automatically balances partitions across consumers in the same group.
Related Packages
MagicCSharp.Events - Core events library (required) MagicCSharp.Events.SQS - AWS SQS alternative MagicCSharp - Core infrastructure library (required)
License
MIT License - See LICENSE file for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net9.0
- Confluent.Kafka (>= 2.6.1)
- MagicCSharp.Events (>= 0.0.13)
- Microsoft.Extensions.Hosting.Abstractions (>= 9.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.