EventTransit 2.0.0
See the version list below for details.
dotnet add package EventTransit --version 2.0.0
NuGet\Install-Package EventTransit -Version 2.0.0
<PackageReference Include="EventTransit" Version="2.0.0" />
<PackageVersion Include="EventTransit" Version="2.0.0" />
<PackageReference Include="EventTransit" />
paket add EventTransit --version 2.0.0
#r "nuget: EventTransit, 2.0.0"
#:package EventTransit@2.0.0
#addin nuget:?package=EventTransit&version=2.0.0
#tool nuget:?package=EventTransit&version=2.0.0
EventTransit - Complete Documentation
📦 Package Information
Package Name: EventTransit
Version: 2.0.0
Package Location: ./nupkg/EventTransit.2.0.0.nupkg
Target Framework: .NET 9.0
License: MIT
🚀 Installation
Option 1: Install from Local Package
# Add local NuGet source
dotnet nuget add source /path/to/nupkg --name LocalEventTransit
# Install package
dotnet add package EventTransit --version 2.0.0 --source LocalEventTransit
Option 2: Direct Installation
dotnet add package EventTransit --source /path/to/nupkg
Option 3: Package Manager Console
Install-Package EventTransit -Source /path/to/nupkg
✨ Key Features
- ✅ Outbox Pattern - Guaranteed message delivery with transactional outbox
- ✅ Inbox Pattern - Automatic duplicate detection and idempotency
- ✅ Dead Letter Handling - Automatic DLQ management with cross-service notifications
- ✅ Exponential Retry - Configurable retry with exponential backoff (1s, 2s, 4s, 8s...)
- ✅ Multiple Exchange Types - Direct, Topic, Fanout, Headers
- ✅ Built-in Dashboard - Real-time monitoring UI with Razor Pages
- ✅ Entity Framework Integration - Seamless EF Core integration
- ✅ Repository Pattern Support - Works with custom repositories
- ✅ Multi-Database Support - PostgreSQL, SQL Server, MySQL, SQLite
- ✅ RabbitMQ Integration - Production-ready RabbitMQ support
- ✅ Worker Service Support - Works with ASP.NET Core and Worker Services
- ✅ Type-Safe - Strongly-typed message contracts
- ✅ Resilience - Circuit breaker, retry policies, error handling
📚 Table of Contents
- Quick Start
- Exchange Types
- Advanced Scenarios
- Configuration
- Worker Service Setup
- Dashboard
- Best Practices
🚀 Quick Start
1. Setup (ASP.NET Core Web Application)
using EventTransit;
using Microsoft.EntityFrameworkCore;
var builder = WebApplication.CreateBuilder(args);
// Add EventTransit with Entity Framework
builder.Services.AddEventTransitWithEntityFramework<MyDbContext>(
config =>
{
// RabbitMQ Connection
config.Broker.ConnectionString = "Host=localhost;Port=5672;Username=guest;Password=guest";
// Default Exchange
config.DefaultExchange.Name = "my-app-exchange";
config.DefaultExchange.Type = EventTransit.Brokers.Abstractions.ExchangeType.Direct;
config.DefaultExchange.Durable = true;
// Outbox Configuration
config.Outbox.Enabled = true;
config.Outbox.ProcessingIntervalSeconds = 10;
config.Outbox.BatchSize = 100;
config.Outbox.MaxRetryAttempts = 3;
// Inbox Configuration
config.Inbox.Enabled = true;
config.Inbox.ProcessingIntervalSeconds = 30;
// Dead Letter Configuration
config.DeadLetter.EnableCrossServiceNotifications = true;
config.DeadLetter.EnableDuplicateNotifications = true;
config.DeadLetter.ServiceName = "MyService";
config.DeadLetter.IncludeMessagePayload = true;
// Resilience Configuration (Exponential Retry)
config.Resilience.Enabled = true;
config.Resilience.MaxRetryAttempts = 3;
config.Resilience.BaseRetryDelayMs = 1000; // 1 second
config.Resilience.MaxRetryDelayMs = 30000; // 30 seconds
},
new EventTransit.Schema.SchemaConfiguration
{
AutoCreateTables = true,
CreateIndexes = true
});
// Add your DbContext
builder.Services.AddDbContext<MyDbContext>(options =>
options.UseNpgsql(builder.Configuration.GetConnectionString("Database")));
// Register consumers
builder.Services.AddEventTransitConsumer<OrderCreatedConsumer>();
builder.Services.AddEventTransitConsumer<PaymentProcessedConsumer>();
var app = builder.Build();
// Enable the dashboard at /eventtransit
app.UseEventTransitDashboard();
await app.RunAsync();
2. Define Your DbContext
using EventTransit.EntityFramework.Entities;
using Microsoft.EntityFrameworkCore;
public class MyDbContext : DbContext
{
public MyDbContext(DbContextOptions<MyDbContext> options) : base(options) { }
// EventTransit tables
public DbSet<OutboxMessage> OutboxMessages { get; set; }
public DbSet<InboxMessage> InboxMessages { get; set; }
// Your domain entities
public DbSet<Order> Orders { get; set; }
public DbSet<Payment> Payments { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
// Apply EventTransit configurations
modelBuilder.ApplyConfiguration(new EventTransit.EntityFramework.Configurations.OutboxMessageConfiguration());
modelBuilder.ApplyConfiguration(new EventTransit.EntityFramework.Configurations.InboxMessageConfiguration());
}
}
3. Define Message Contracts
using EventTransit.Core;
// Simple message
public class OrderCreatedEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string OrderNumber { get; set; }
public decimal Amount { get; set; }
public string CustomerId { get; set; }
public DateTime CreatedAt { get; set; }
}
// Message with binding configuration
[MessageBinding(exchangeName: "orders", queueName: "order-processing", RoutingKey = "order.created")]
public class OrderProcessingEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string OrderId { get; set; }
public string Status { get; set; }
}
4. Create Consumers
using EventTransit.Consumers;
using EventTransit.Core;
using Microsoft.Extensions.Logging;
// Basic Consumer
[ConsumerBinding("orders", "order-processing-queue", RoutingKey = "order.created")]
public class OrderCreatedConsumer : ConsumerBase<OrderCreatedEvent>
{
private readonly IOrderService _orderService;
public OrderCreatedConsumer(
IOrderService orderService,
ILogger<OrderCreatedConsumer> logger) : base(logger)
{
_orderService = orderService;
}
protected override async Task HandleMessageAsync(OrderCreatedEvent message)
{
Logger.LogInformation("Processing order: {OrderNumber}", message.OrderNumber);
// Your business logic
await _orderService.ProcessOrderAsync(message);
Logger.LogInformation("Order processed: {OrderNumber}", message.OrderNumber);
}
}
// Consumer with Inbox Pattern (Idempotency)
[ConsumerBinding("payments", "payment-queue", RoutingKey = "payment.processed")]
public class PaymentConsumer : ConsumerBase<PaymentEvent>, IInboxTrackingPreference
{
public bool UseInboxTracking => true; // Enable duplicate detection
private readonly IPaymentService _paymentService;
public PaymentConsumer(
IPaymentService paymentService,
ILogger<PaymentConsumer> logger) : base(logger)
{
_paymentService = paymentService;
}
protected override async Task HandleMessageAsync(PaymentEvent message)
{
// This will only be processed once, even if delivered multiple times
Logger.LogInformation("Processing payment: {PaymentId}", message.Id);
await _paymentService.ProcessPaymentAsync(message);
}
}
// Consumer with Dependency Injection
[ConsumerBinding("notifications", "email-queue", RoutingKey = "notification.email")]
public class EmailConsumer : ConsumerBase<EmailNotification>
{
private readonly IEmailService _emailService;
private readonly ITemplateService _templateService;
public EmailConsumer(
IEmailService emailService,
ITemplateService templateService,
ILogger<EmailConsumer> logger) : base(logger)
{
_emailService = emailService;
_templateService = templateService;
}
protected override async Task HandleMessageAsync(EmailNotification message)
{
var template = await _templateService.GetTemplateAsync(message.TemplateId);
var body = _templateService.RenderTemplate(template, message.Data);
await _emailService.SendAsync(message.To, message.Subject, body);
}
}
5. Publish Messages
using EventTransit.Core;
using Microsoft.AspNetCore.Mvc;
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IMessagePublisher _publisher;
private readonly IOrderRepository _orderRepository;
public OrdersController(
IMessagePublisher publisher,
IOrderRepository orderRepository)
{
_publisher = publisher;
_orderRepository = orderRepository;
}
[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
{
// Create order in database
var order = await _orderRepository.CreateAsync(request);
// Publish event (stored in outbox, guaranteed delivery)
var orderEvent = new OrderCreatedEvent
{
Id = order.Id.ToString(),
OrderNumber = order.OrderNumber,
Amount = order.Amount,
CustomerId = order.CustomerId,
CreatedAt = DateTime.UtcNow
};
await _publisher.PublishAsync(orderEvent, routingKey: "order.created");
return Ok(new { orderId = order.Id });
}
[HttpPost("{id}/cancel")]
public async Task<IActionResult> CancelOrder(string id)
{
var order = await _orderRepository.GetByIdAsync(id);
if (order == null) return NotFound();
order.Status = "Cancelled";
await _orderRepository.UpdateAsync(order);
// Publish cancellation event
var cancelEvent = new OrderCancelledEvent
{
Id = Guid.NewGuid().ToString(),
OrderId = order.Id.ToString(),
Reason = "Customer requested cancellation"
};
await _publisher.PublishAsync(cancelEvent, routingKey: "order.cancelled");
return Ok();
}
}
📚 Exchange Types
1. Direct Exchange (Point-to-Point)
Use Case: One-to-one message delivery with specific routing
When to Use:
- Task queues (order processing, email sending)
- Command processing (create user, update inventory)
- Direct notifications (user-specific alerts)
- Background jobs (report generation, data export)
Example: Order Processing System
// ===== MESSAGE DEFINITION =====
[MessageBinding(
exchangeName: "orders",
queueName: "order-processing",
RoutingKey = "order.created",
ExchangeType = ExchangeType.Direct,
DurableQueue = true)]
public class OrderCreatedEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string OrderNumber { get; set; }
public decimal Amount { get; set; }
public string CustomerId { get; set; }
public List<OrderItem> Items { get; set; }
}
// ===== CONSUMER =====
[ConsumerBinding("orders", "order-processing", RoutingKey = "order.created")]
public class OrderProcessor : ConsumerBase<OrderCreatedEvent>
{
private readonly IOrderService _orderService;
private readonly IInventoryService _inventoryService;
public OrderProcessor(
IOrderService orderService,
IInventoryService inventoryService,
ILogger<OrderProcessor> logger) : base(logger)
{
_orderService = orderService;
_inventoryService = inventoryService;
}
protected override async Task HandleMessageAsync(OrderCreatedEvent message)
{
Logger.LogInformation("Processing order: {OrderNumber}", message.OrderNumber);
// Reserve inventory
await _inventoryService.ReserveItemsAsync(message.Items);
// Process payment
await _orderService.ProcessPaymentAsync(message.OrderNumber);
// Update order status
await _orderService.UpdateStatusAsync(message.OrderNumber, "Processing");
Logger.LogInformation("Order processed: {OrderNumber}", message.OrderNumber);
}
}
// ===== PUBLISHER =====
public class OrderService
{
private readonly IMessagePublisher _publisher;
public async Task CreateOrderAsync(CreateOrderDto dto)
{
var orderEvent = new OrderCreatedEvent
{
OrderNumber = GenerateOrderNumber(),
Amount = dto.Amount,
CustomerId = dto.CustomerId,
Items = dto.Items
};
// Publish to direct exchange
await _publisher.PublishAsync(orderEvent, routingKey: "order.created");
}
}
Characteristics:
- ✅ Exact routing key match
- ✅ One producer → One consumer
- ✅ Guaranteed delivery with outbox
- ✅ Best for: Task queues, commands
2. Topic Exchange (Pattern Matching)
Use Case: Selective routing based on patterns, pub/sub with filtering
When to Use:
- Event notifications with categories (user.created, user.updated, user.deleted)
- Logging systems (log.error, log.warning, log.info)
- Monitoring and alerting (metric.cpu.high, metric.memory.low)
- Multi-tenant systems (tenant.123.order.created)
Routing Key Patterns:
*(star) - Matches exactly one word#(hash) - Matches zero or more words
Examples:
user.*.createdmatchesuser.admin.created,user.customer.createdorder.#matchesorder.created,order.payment.completed,order.shipped.tracking.updatedlog.*.errormatcheslog.api.error,log.database.error
Example: Multi-Channel Notification System
// ===== MESSAGE DEFINITION =====
[MessageBinding(
exchangeName: "notifications",
ExchangeType = ExchangeType.Topic,
DurableExchange = true)]
public class NotificationEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Channel { get; set; } // email, sms, push
public string Priority { get; set; } // urgent, normal, low
public string Recipient { get; set; }
public string Subject { get; set; }
public string Message { get; set; }
public Dictionary<string, string> Metadata { get; set; }
}
// ===== CONSUMER 1: All Email Notifications =====
[ConsumerBinding("notifications", "email-queue", RoutingKey = "notification.email.*")]
public class EmailConsumer : ConsumerBase<NotificationEvent>
{
private readonly IEmailService _emailService;
public EmailConsumer(IEmailService emailService, ILogger<EmailConsumer> logger)
: base(logger)
{
_emailService = emailService;
}
protected override async Task HandleMessageAsync(NotificationEvent message)
{
Logger.LogInformation("Sending email to: {Recipient} (Priority: {Priority})",
message.Recipient, message.Priority);
await _emailService.SendAsync(
to: message.Recipient,
subject: message.Subject,
body: message.Message,
priority: message.Priority);
}
}
// ===== CONSUMER 2: Only Urgent Notifications (Any Channel) =====
[ConsumerBinding("notifications", "urgent-queue", RoutingKey = "notification.*.urgent")]
public class UrgentNotificationConsumer : ConsumerBase<NotificationEvent>
{
private readonly IAlertService _alertService;
public UrgentNotificationConsumer(IAlertService alertService, ILogger<UrgentNotificationConsumer> logger)
: base(logger)
{
_alertService = alertService;
}
protected override async Task HandleMessageAsync(NotificationEvent message)
{
Logger.LogWarning("URGENT {Channel} notification to {Recipient}",
message.Channel, message.Recipient);
// Send to on-call team
await _alertService.NotifyOnCallTeamAsync(message);
// Escalate if not acknowledged in 5 minutes
await _alertService.ScheduleEscalationAsync(message.Id, TimeSpan.FromMinutes(5));
}
}
// ===== CONSUMER 3: All SMS Notifications =====
[ConsumerBinding("notifications", "sms-queue", RoutingKey = "notification.sms.#")]
public class SmsConsumer : ConsumerBase<NotificationEvent>
{
private readonly ISmsService _smsService;
public SmsConsumer(ISmsService smsService, ILogger<SmsConsumer> logger)
: base(logger)
{
_smsService = smsService;
}
protected override async Task HandleMessageAsync(NotificationEvent message)
{
Logger.LogInformation("Sending SMS to: {Recipient}", message.Recipient);
await _smsService.SendAsync(message.Recipient, message.Message);
}
}
// ===== CONSUMER 4: All Notifications (Audit Log) =====
[ConsumerBinding("notifications", "audit-queue", RoutingKey = "notification.#")]
public class NotificationAuditConsumer : ConsumerBase<NotificationEvent>, IInboxTrackingPreference
{
public bool UseInboxTracking => true; // Prevent duplicate audit entries
private readonly IAuditService _auditService;
public NotificationAuditConsumer(IAuditService auditService, ILogger<NotificationAuditConsumer> logger)
: base(logger)
{
_auditService = auditService;
}
protected override async Task HandleMessageAsync(NotificationEvent message)
{
await _auditService.LogNotificationAsync(new AuditEntry
{
NotificationId = message.Id,
Channel = message.Channel,
Priority = message.Priority,
Recipient = message.Recipient,
Timestamp = DateTime.UtcNow
});
}
}
// ===== PUBLISHER =====
public class NotificationService
{
private readonly IMessagePublisher _publisher;
public NotificationService(IMessagePublisher publisher)
{
_publisher = publisher;
}
public async Task SendEmailAsync(string recipient, string subject, string message, bool isUrgent = false)
{
var notification = new NotificationEvent
{
Channel = "email",
Priority = isUrgent ? "urgent" : "normal",
Recipient = recipient,
Subject = subject,
Message = message
};
// Routing key: notification.email.urgent or notification.email.normal
var routingKey = $"notification.{notification.Channel}.{notification.Priority}";
await _publisher.PublishAsync(notification, routingKey);
}
public async Task SendSmsAsync(string phoneNumber, string message, bool isUrgent = false)
{
var notification = new NotificationEvent
{
Channel = "sms",
Priority = isUrgent ? "urgent" : "normal",
Recipient = phoneNumber,
Message = message
};
var routingKey = $"notification.{notification.Channel}.{notification.Priority}";
await _publisher.PublishAsync(notification, routingKey);
}
}
Message Flow Example:
Publisher sends: "notification.email.urgent"
├─ EmailConsumer receives (matches "notification.email.*")
├─ UrgentNotificationConsumer receives (matches "notification.*.urgent")
└─ NotificationAuditConsumer receives (matches "notification.#")
Publisher sends: "notification.sms.normal"
├─ SmsConsumer receives (matches "notification.sms.#")
└─ NotificationAuditConsumer receives (matches "notification.#")
Characteristics:
- ✅ Pattern-based routing
- ✅ One producer → Multiple filtered consumers
- ✅ Flexible subscription patterns
- ✅ Best for: Event notifications, logging, monitoring
3. Fanout Exchange (Broadcast)
Use Case: Broadcast to all consumers, no routing
When to Use:
- System-wide notifications (maintenance mode, system shutdown)
- Cache invalidation across multiple services
- Real-time updates to multiple dashboards
- Event sourcing (all events to all projections)
- Audit logging (all events to audit system)
Example: System Event Broadcasting
// ===== MESSAGE DEFINITION =====
[MessageBinding(
exchangeName: "system-events",
ExchangeType = ExchangeType.Fanout,
DurableExchange = true)]
public class SystemEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string EventType { get; set; }
public string Source { get; set; }
public string Description { get; set; }
public Dictionary<string, object> Data { get; set; }
public DateTime Timestamp { get; set; }
}
// ===== CONSUMER 1: Audit Logger =====
[ConsumerBinding("system-events", "audit-queue")]
public class AuditConsumer : ConsumerBase<SystemEvent>, IInboxTrackingPreference
{
public bool UseInboxTracking => true;
private readonly IAuditRepository _auditRepository;
public AuditConsumer(IAuditRepository auditRepository, ILogger<AuditConsumer> logger)
: base(logger)
{
_auditRepository = auditRepository;
}
protected override async Task HandleMessageAsync(SystemEvent message)
{
Logger.LogInformation("Auditing system event: {EventType} from {Source}",
message.EventType, message.Source);
await _auditRepository.LogEventAsync(new AuditLog
{
EventId = message.Id,
EventType = message.EventType,
Source = message.Source,
Description = message.Description,
Data = JsonSerializer.Serialize(message.Data),
Timestamp = message.Timestamp
});
}
}
// ===== CONSUMER 2: Metrics Collector =====
[ConsumerBinding("system-events", "metrics-queue")]
public class MetricsConsumer : ConsumerBase<SystemEvent>
{
private readonly IMetricsService _metricsService;
public MetricsConsumer(IMetricsService metricsService, ILogger<MetricsConsumer> logger)
: base(logger)
{
_metricsService = metricsService;
}
protected override async Task HandleMessageAsync(SystemEvent message)
{
Logger.LogInformation("Collecting metrics for: {EventType}", message.EventType);
await _metricsService.RecordEventAsync(
eventType: message.EventType,
source: message.Source,
timestamp: message.Timestamp,
metadata: message.Data);
}
}
// ===== CONSUMER 3: Real-time Dashboard =====
[ConsumerBinding("system-events", "dashboard-queue")]
public class DashboardConsumer : ConsumerBase<SystemEvent>
{
private readonly IHubContext<DashboardHub> _hubContext;
public DashboardConsumer(IHubContext<DashboardHub> hubContext, ILogger<DashboardConsumer> logger)
: base(logger)
{
_hubContext = hubContext;
}
protected override async Task HandleMessageAsync(SystemEvent message)
{
Logger.LogInformation("Broadcasting to dashboard: {EventType}", message.EventType);
// Send to all connected SignalR clients
await _hubContext.Clients.All.SendAsync("SystemEvent", new
{
message.EventType,
message.Source,
message.Description,
message.Timestamp
});
}
}
// ===== CONSUMER 4: Cache Invalidation =====
[ConsumerBinding("system-events", "cache-queue")]
public class CacheInvalidationConsumer : ConsumerBase<SystemEvent>
{
private readonly ICacheService _cacheService;
public CacheInvalidationConsumer(ICacheService cacheService, ILogger<CacheInvalidationConsumer> logger)
: base(logger)
{
_cacheService = cacheService;
}
protected override async Task HandleMessageAsync(SystemEvent message)
{
if (message.EventType == "CacheInvalidation")
{
Logger.LogInformation("Invalidating cache for: {Source}", message.Source);
var cacheKey = message.Data.GetValueOrDefault("CacheKey")?.ToString();
if (!string.IsNullOrEmpty(cacheKey))
{
await _cacheService.RemoveAsync(cacheKey);
}
}
}
}
// ===== PUBLISHER =====
public class SystemEventPublisher
{
private readonly IMessagePublisher _publisher;
public SystemEventPublisher(IMessagePublisher publisher)
{
_publisher = publisher;
}
public async Task PublishMaintenanceModeAsync(bool enabled)
{
var systemEvent = new SystemEvent
{
EventType = "MaintenanceMode",
Source = "SystemAdmin",
Description = enabled ? "System entering maintenance mode" : "System exiting maintenance mode",
Data = new Dictionary<string, object>
{
{ "Enabled", enabled },
{ "ScheduledDowntime", TimeSpan.FromHours(2) }
},
Timestamp = DateTime.UtcNow
};
// Broadcast to all consumers (routing key ignored in fanout)
await _publisher.PublishAsync(systemEvent);
}
public async Task InvalidateCacheAsync(string cacheKey)
{
var systemEvent = new SystemEvent
{
EventType = "CacheInvalidation",
Source = "CacheManager",
Description = $"Invalidating cache key: {cacheKey}",
Data = new Dictionary<string, object>
{
{ "CacheKey", cacheKey }
},
Timestamp = DateTime.UtcNow
};
await _publisher.PublishAsync(systemEvent);
}
}
Message Flow:
Publisher sends SystemEvent to fanout exchange
├─ AuditConsumer receives (logs to audit database)
├─ MetricsConsumer receives (records metrics)
├─ DashboardConsumer receives (updates real-time dashboard)
└─ CacheInvalidationConsumer receives (invalidates cache)
ALL consumers receive EVERY message
Characteristics:
- ✅ Broadcast to all queues
- ✅ Routing key is ignored
- ✅ One producer → All consumers
- ✅ Best for: System-wide notifications, cache invalidation, real-time updates
🎯 Advanced Scenarios
Scenario 1: Saga Pattern (Distributed Transactions)
// Order Saga Coordinator
public class OrderSagaCoordinator : ConsumerBase<OrderCreatedEvent>
{
private readonly IMessagePublisher _publisher;
private readonly ISagaRepository _sagaRepository;
protected override async Task HandleMessageAsync(OrderCreatedEvent message)
{
var saga = new OrderSaga { OrderId = message.Id };
await _sagaRepository.SaveAsync(saga);
// Step 1: Reserve inventory
await _publisher.PublishAsync(new ReserveInventoryCommand
{
OrderId = message.Id,
Items = message.Items
}, routingKey: "inventory.reserve");
}
}
// Inventory Service Consumer
[ConsumerBinding("inventory", "reserve-queue", RoutingKey = "inventory.reserve")]
public class InventoryReservationConsumer : ConsumerBase<ReserveInventoryCommand>
{
private readonly IMessagePublisher _publisher;
protected override async Task HandleMessageAsync(ReserveInventoryCommand message)
{
try
{
// Reserve inventory
await ReserveItemsAsync(message.Items);
// Publish success event
await _publisher.PublishAsync(new InventoryReservedEvent
{
OrderId = message.OrderId,
Success = true
}, routingKey: "inventory.reserved");
}
catch (Exception ex)
{
// Publish failure event (triggers compensation)
await _publisher.PublishAsync(new InventoryReservedEvent
{
OrderId = message.OrderId,
Success = false,
Error = ex.Message
}, routingKey: "inventory.reserved");
}
}
}
// Saga continues with payment, shipping, etc.
Scenario 2: Event Sourcing
// Event Store
[ConsumerBinding("events", "event-store-queue", RoutingKey = "event.#")]
public class EventStoreConsumer : ConsumerBase<DomainEvent>, IInboxTrackingPreference
{
public bool UseInboxTracking => true;
private readonly IEventStore _eventStore;
protected override async Task HandleMessageAsync(DomainEvent message)
{
await _eventStore.AppendAsync(new StoredEvent
{
EventId = message.Id,
EventType = message.GetType().Name,
AggregateId = message.AggregateId,
Data = JsonSerializer.Serialize(message),
Timestamp = message.Timestamp
});
}
}
// Projection Builder
[ConsumerBinding("events", "projection-queue", RoutingKey = "event.order.#")]
public class OrderProjectionConsumer : ConsumerBase<OrderEvent>
{
private readonly IProjectionRepository _projectionRepository;
protected override async Task HandleMessageAsync(OrderEvent message)
{
var projection = await _projectionRepository.GetOrCreateAsync(message.OrderId);
projection.Apply(message); // Update read model
await _projectionRepository.SaveAsync(projection);
}
}
Scenario 3: Priority Queues
// High priority queue
[ConsumerBinding("tasks", "high-priority-queue", RoutingKey = "task.high", Priority = 10)]
public class HighPriorityConsumer : ConsumerBase<TaskMessage>
{
protected override async Task HandleMessageAsync(TaskMessage message)
{
// Process high priority tasks first
await ProcessTaskAsync(message);
}
}
// Normal priority queue
[ConsumerBinding("tasks", "normal-priority-queue", RoutingKey = "task.normal", Priority = 5)]
public class NormalPriorityConsumer : ConsumerBase<TaskMessage>
{
protected override async Task HandleMessageAsync(TaskMessage message)
{
await ProcessTaskAsync(message);
}
}
Scenario 4: Delayed Messages (TTL)
// Publish with delay
public async Task ScheduleReminderAsync(string userId, TimeSpan delay)
{
var reminder = new ReminderEvent
{
UserId = userId,
Message = "Don't forget to complete your profile!"
};
// Message will be delivered after delay
await _publisher.PublishAsync(reminder,
routingKey: "reminder.user",
headers: new Dictionary<string, object>
{
{ "x-delay", (int)delay.TotalMilliseconds }
});
}
Scenario 5: Dead Letter Handling with Retry
// Consumer with automatic retry
[ConsumerBinding("orders", "order-queue", RoutingKey = "order.process")]
public class OrderConsumer : ConsumerBase<OrderEvent>
{
protected override async Task HandleMessageAsync(OrderEvent message)
{
// If this fails, EventTransit will:
// 1. Retry with exponential backoff (1s, 2s, 4s)
// 2. After 3 retries, send to dead letter exchange
// 3. Notify publisher service via et.dlx
await ProcessOrderAsync(message);
}
}
// Dead letter consumer (manual intervention)
[ConsumerBinding("et.dlx", "manual-review-queue", RoutingKey = "dead.letter.#")]
public class DeadLetterConsumer : ConsumerBase<DeadLetterNotification>
{
protected override async Task HandleMessageAsync(DeadLetterNotification message)
{
Logger.LogError("Message {MessageId} failed after {RetryCount} attempts: {Error}",
message.OriginalMessageId,
message.Error.RetryAttempt,
message.Error.Message);
// Send alert to operations team
await NotifyOpsTeamAsync(message);
}
}
⚙️ Configuration
Complete Configuration Example
builder.Services.AddEventTransitWithEntityFramework<MyDbContext>(
config =>
{
// ===== BROKER CONFIGURATION =====
config.Broker.ConnectionString = "Host=localhost;Port=5672;Username=guest;Password=guest;VirtualHost=/";
config.Broker.AutomaticRecoveryEnabled = true;
config.Broker.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
config.Broker.RequestedHeartbeat = TimeSpan.FromSeconds(60);
// ===== DEFAULT EXCHANGE =====
config.DefaultExchange.Name = "my-app-exchange";
config.DefaultExchange.Type = ExchangeType.Direct;
config.DefaultExchange.Durable = true;
config.DefaultExchange.AutoDelete = false;
// ===== OUTBOX CONFIGURATION =====
config.Outbox.Enabled = true;
config.Outbox.ProcessingIntervalSeconds = 10; // Process every 10 seconds
config.Outbox.BatchSize = 100; // Process 100 messages per batch
config.Outbox.MaxRetryAttempts = 3; // Retry failed messages 3 times
config.Outbox.RetryDelaySeconds = 60; // Wait 60 seconds between retries
config.Outbox.CleanupIntervalHours = 24; // Cleanup old messages every 24 hours
config.Outbox.RetentionDays = 7; // Keep messages for 7 days
// ===== INBOX CONFIGURATION =====
config.Inbox.Enabled = true;
config.Inbox.ProcessingIntervalSeconds = 30; // Check for retries every 30 seconds
config.Inbox.BatchSize = 100;
config.Inbox.MaxRetryAttempts = 3;
config.Inbox.RetryDelaySeconds = 30;
config.Inbox.CleanupIntervalHours = 24;
config.Inbox.RetentionDays = 7;
// ===== DEAD LETTER CONFIGURATION =====
config.DeadLetter.EnableCrossServiceNotifications = true; // Send DL notifications to publisher
config.DeadLetter.EnableDuplicateNotifications = true; // Treat duplicates as dead letters
config.DeadLetter.ServiceName = "MyService"; // Service identifier
config.DeadLetter.IncludeMessagePayload = true; // Include full message in DL notification
config.DeadLetter.MaxStackTraceLength = 2000; // Limit stack trace size
// ===== RESILIENCE CONFIGURATION (EXPONENTIAL RETRY) =====
config.Resilience.Enabled = true;
config.Resilience.MaxRetryAttempts = 3; // Retry 3 times before dead letter
config.Resilience.BaseRetryDelayMs = 1000; // Start with 1 second delay
config.Resilience.MaxRetryDelayMs = 30000; // Cap at 30 seconds
// Retry timeline: 1s, 2s, 4s (exponential backoff)
config.Resilience.CircuitBreakerFailureThreshold = 5; // Open circuit after 5 failures
config.Resilience.CircuitBreakerTimeoutSeconds = 60; // Keep circuit open for 60 seconds
},
new EventTransit.Schema.SchemaConfiguration
{
AutoCreateTables = true, // Automatically create OutboxMessage and InboxMessage tables
CreateIndexes = true // Create performance indexes
});
Complete appsettings.json Configuration
{
"ConnectionStrings": {
"Database": "Host=localhost;Database=myapp;Username=postgres;Password=password",
"RabbitMQ": "Host=localhost;Port=5672;Username=guest;Password=guest;VirtualHost=/"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning",
"EventTransit": "Debug"
}
},
"EventTransit": {
"Broker": {
"ConnectionString": "Host=localhost;Port=5672;Username=guest;Password=guest;VirtualHost=/",
"AutomaticRecoveryEnabled": true,
"NetworkRecoveryInterval": "00:00:10",
"RequestedHeartbeat": "00:01:00",
"RequestedConnectionTimeout": "00:00:30",
"SocketReadTimeout": "00:00:30",
"SocketWriteTimeout": "00:00:30",
"ContinuationTimeout": "00:00:20",
"HandshakeContinuationTimeout": "00:00:10",
"MaxMessageSize": 134217728
},
"DefaultExchange": {
"Name": "my-app-exchange",
"Type": "Direct",
"Durable": true,
"AutoDelete": false,
"Internal": false,
"Arguments": {}
},
"DefaultQueue": {
"Durable": true,
"Exclusive": false,
"AutoDelete": false,
"Arguments": {}
},
"Outbox": {
"Enabled": true,
"ProcessingIntervalSeconds": 10,
"BatchSize": 100,
"MaxRetryAttempts": 3,
"RetryDelaySeconds": 60,
"CleanupIntervalHours": 24,
"RetentionDays": 7,
"EnableMetrics": true,
"ParallelProcessing": false,
"MaxDegreeOfParallelism": 4
},
"Inbox": {
"Enabled": true,
"ProcessingIntervalSeconds": 30,
"BatchSize": 100,
"MaxRetryAttempts": 3,
"RetryDelaySeconds": 30,
"CleanupIntervalHours": 24,
"RetentionDays": 7,
"EnableMetrics": true,
"TrackDuplicates": true
},
"DeadLetter": {
"EnableCrossServiceNotifications": true,
"EnableDuplicateNotifications": true,
"ServiceName": "MyService",
"IncludeMessagePayload": true,
"MaxStackTraceLength": 2000,
"MaxPayloadLength": 10000,
"RetryFromDashboard": true
},
"Resilience": {
"Enabled": true,
"MaxRetryAttempts": 3,
"BaseRetryDelayMs": 1000,
"MaxRetryDelayMs": 30000,
"CircuitBreakerFailureThreshold": 5,
"CircuitBreakerTimeoutSeconds": 60
},
"Consumer": {
"PrefetchCount": 10,
"AutoAck": false,
"RequeueOnError": false,
"ConsumerTag": "",
"Exclusive": false,
"Arguments": {}
},
"Publisher": {
"ConfirmPublish": true,
"Mandatory": false,
"Persistent": true,
"Timeout": "00:00:30"
},
"Dashboard": {
"Enabled": true,
"BasePath": "/eventtransit",
"PageSize": 50,
"RefreshIntervalSeconds": 30,
"EnableAuthentication": false
}
}
}
Then load in Program.cs:
builder.Services.AddEventTransitWithEntityFramework<MyDbContext>(
config =>
{
builder.Configuration.GetSection("EventTransit").Bind(config);
config.Broker.ConnectionString = builder.Configuration.GetConnectionString("RabbitMQ");
},
new EventTransit.Schema.SchemaConfiguration
{
AutoCreateTables = true,
CreateIndexes = true
});
🏷️ Message and Consumer Attributes
MessageBinding Attribute
The MessageBindingAttribute is used to configure message routing and exchange settings.
Complete Attribute Properties
[MessageBinding(
exchangeName: "my-exchange", // Required: Exchange name
queueName: "my-queue", // Optional: Queue name (auto-generated if not specified)
RoutingKey = "my.routing.key", // Optional: Routing key (default: "")
ExchangeType = ExchangeType.Direct, // Optional: Exchange type (default: Direct)
DurableExchange = true, // Optional: Exchange durability (default: true)
DurableQueue = true, // Optional: Queue durability (default: true)
AutoDeleteExchange = false, // Optional: Auto-delete exchange (default: false)
AutoDeleteQueue = false, // Optional: Auto-delete queue (default: false)
ExclusiveQueue = false, // Optional: Exclusive queue (default: false)
DeadLetterExchange = "my-dlx", // Optional: Dead letter exchange name
DeadLetterRoutingKey = "dlx.key", // Optional: Dead letter routing key
MessageTTL = 60000, // Optional: Message TTL in milliseconds
MaxLength = 10000, // Optional: Max queue length
MaxLengthBytes = 1048576, // Optional: Max queue size in bytes
Priority = 5 // Optional: Queue priority (0-10)
)]
public class MyMessage : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
// Message properties...
}
MessageBinding Examples
Example 1: Simple Direct Exchange
[MessageBinding(exchangeName: "orders", queueName: "order-processing", RoutingKey = "order.created")]
public class OrderCreatedEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string OrderNumber { get; set; }
public decimal Amount { get; set; }
}
Example 2: Topic Exchange with Pattern
[MessageBinding(
exchangeName: "notifications",
queueName: "email-notifications",
RoutingKey = "notification.email.*",
ExchangeType = ExchangeType.Topic,
DurableExchange = true,
DurableQueue = true
)]
public class EmailNotification : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string To { get; set; }
public string Subject { get; set; }
public string Body { get; set; }
}
Example 3: Fanout Exchange
[MessageBinding(
exchangeName: "system-events",
queueName: "audit-queue",
ExchangeType = ExchangeType.Fanout,
DurableExchange = true,
DurableQueue = true
)]
public class SystemEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string EventType { get; set; }
public string Description { get; set; }
}
Example 4: With Dead Letter Exchange
[MessageBinding(
exchangeName: "payments",
queueName: "payment-processing",
RoutingKey = "payment.process",
DeadLetterExchange = "payments-dlx",
DeadLetterRoutingKey = "payment.failed",
MessageTTL = 300000 // 5 minutes
)]
public class PaymentEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string PaymentId { get; set; }
public decimal Amount { get; set; }
}
Example 5: Priority Queue
[MessageBinding(
exchangeName: "tasks",
queueName: "high-priority-tasks",
RoutingKey = "task.high",
Priority = 10,
MaxLength = 1000
)]
public class HighPriorityTask : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string TaskName { get; set; }
public string Description { get; set; }
}
ConsumerBinding Attribute
The ConsumerBindingAttribute is used to configure consumer behavior and queue bindings.
Complete Attribute Properties
[ConsumerBinding(
exchangeName: "my-exchange", // Required: Exchange name
queueName: "my-queue", // Required: Queue name
RoutingKey = "my.routing.key", // Optional: Routing key (default: "")
PrefetchCount = 10, // Optional: Prefetch count (default: 10)
AutoAck = false, // Optional: Auto-acknowledge (default: false)
Exclusive = false, // Optional: Exclusive consumer (default: false)
ConsumerTag = "my-consumer", // Optional: Consumer tag
Priority = 5, // Optional: Consumer priority (0-10)
Arguments = null // Optional: Additional arguments
)]
public class MyConsumer : ConsumerBase<MyMessage>
{
public MyConsumer(ILogger<MyConsumer> logger) : base(logger) { }
protected override async Task HandleMessageAsync(MyMessage message)
{
// Process message
}
}
ConsumerBinding Examples
Example 1: Basic Consumer
[ConsumerBinding("orders", "order-processing-queue", RoutingKey = "order.created")]
public class OrderConsumer : ConsumerBase<OrderCreatedEvent>
{
private readonly IOrderService _orderService;
public OrderConsumer(IOrderService orderService, ILogger<OrderConsumer> logger)
: base(logger)
{
_orderService = orderService;
}
protected override async Task HandleMessageAsync(OrderCreatedEvent message)
{
await _orderService.ProcessOrderAsync(message);
}
}
Example 2: Consumer with Prefetch
[ConsumerBinding(
exchangeName: "tasks",
queueName: "task-queue",
RoutingKey = "task.process",
PrefetchCount = 5 // Process 5 messages at a time
)]
public class TaskConsumer : ConsumerBase<TaskMessage>
{
public TaskConsumer(ILogger<TaskConsumer> logger) : base(logger) { }
protected override async Task HandleMessageAsync(TaskMessage message)
{
await ProcessTaskAsync(message);
}
}
Example 3: Topic Consumer with Wildcard
[ConsumerBinding(
exchangeName: "notifications",
queueName: "email-queue",
RoutingKey = "notification.email.*" // Matches notification.email.urgent, notification.email.normal, etc.
)]
public class EmailConsumer : ConsumerBase<NotificationEvent>
{
public EmailConsumer(ILogger<EmailConsumer> logger) : base(logger) { }
protected override async Task HandleMessageAsync(NotificationEvent message)
{
await SendEmailAsync(message);
}
}
Example 4: Multiple Routing Keys (Bind Multiple Times)
// Consumer 1: Handles urgent notifications
[ConsumerBinding("notifications", "urgent-queue", RoutingKey = "notification.*.urgent")]
public class UrgentConsumer : ConsumerBase<NotificationEvent>
{
public UrgentConsumer(ILogger<UrgentConsumer> logger) : base(logger) { }
protected override async Task HandleMessageAsync(NotificationEvent message)
{
await HandleUrgentAsync(message);
}
}
// Consumer 2: Handles all notifications
[ConsumerBinding("notifications", "all-queue", RoutingKey = "notification.#")]
public class AllNotificationsConsumer : ConsumerBase<NotificationEvent>
{
public AllNotificationsConsumer(ILogger<AllNotificationsConsumer> logger)
: base(logger) { }
protected override async Task HandleMessageAsync(NotificationEvent message)
{
await LogNotificationAsync(message);
}
}
Example 5: High Priority Consumer
[ConsumerBinding(
exchangeName: "tasks",
queueName: "high-priority-queue",
RoutingKey = "task.high",
Priority = 10,
PrefetchCount = 1 // Process one at a time for high priority
)]
public class HighPriorityConsumer : ConsumerBase<TaskMessage>
{
public HighPriorityConsumer(ILogger<HighPriorityConsumer> logger)
: base(logger) { }
protected override async Task HandleMessageAsync(TaskMessage message)
{
await ProcessHighPriorityTaskAsync(message);
}
}
Example 6: Exclusive Consumer
[ConsumerBinding(
exchangeName: "singleton-tasks",
queueName: "singleton-queue",
RoutingKey = "task.singleton",
Exclusive = true // Only one consumer can consume from this queue
)]
public class SingletonConsumer : ConsumerBase<SingletonTask>
{
public SingletonConsumer(ILogger<SingletonConsumer> logger)
: base(logger) { }
protected override async Task HandleMessageAsync(SingletonTask message)
{
// Only one instance of this consumer will process messages
await ProcessSingletonTaskAsync(message);
}
}
IInboxTrackingPreference Interface
Use this interface to enable inbox tracking (duplicate detection) for specific consumers.
[ConsumerBinding("payments", "payment-queue", RoutingKey = "payment.processed")]
public class PaymentConsumer : ConsumerBase<PaymentEvent>, IInboxTrackingPreference
{
public bool UseInboxTracking => true; // Enable duplicate detection
public PaymentConsumer(ILogger<PaymentConsumer> logger) : base(logger) { }
protected override async Task HandleMessageAsync(PaymentEvent message)
{
// This will only execute once, even if message is delivered multiple times
await ProcessPaymentAsync(message);
}
}
Complete Example: All Attributes Together
// ===== MESSAGE WITH FULL CONFIGURATION =====
[MessageBinding(
exchangeName: "orders",
queueName: "order-processing",
RoutingKey = "order.created",
ExchangeType = ExchangeType.Direct,
DurableExchange = true,
DurableQueue = true,
AutoDeleteExchange = false,
AutoDeleteQueue = false,
DeadLetterExchange = "orders-dlx",
DeadLetterRoutingKey = "order.failed",
MessageTTL = 300000, // 5 minutes
MaxLength = 10000,
Priority = 5
)]
public class OrderCreatedEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string OrderNumber { get; set; }
public decimal Amount { get; set; }
public string CustomerId { get; set; }
public List<OrderItem> Items { get; set; }
public DateTime CreatedAt { get; set; }
}
// ===== CONSUMER WITH FULL CONFIGURATION =====
[ConsumerBinding(
exchangeName: "orders",
queueName: "order-processing",
RoutingKey = "order.created",
PrefetchCount = 10,
AutoAck = false,
Exclusive = false,
ConsumerTag = "order-processor-1",
Priority = 5
)]
public class OrderConsumer : ConsumerBase<OrderCreatedEvent>, IInboxTrackingPreference
{
public bool UseInboxTracking => true; // Enable duplicate detection
private readonly IOrderService _orderService;
private readonly IInventoryService _inventoryService;
private readonly IMessagePublisher _publisher;
public OrderConsumer(
IOrderService orderService,
IInventoryService inventoryService,
IMessagePublisher publisher,
ILogger<OrderConsumer> logger) : base(logger)
{
_orderService = orderService;
_inventoryService = inventoryService;
_publisher = publisher;
}
protected override async Task HandleMessageAsync(OrderCreatedEvent message)
{
Logger.LogInformation("Processing order: {OrderNumber}", message.OrderNumber);
try
{
// Reserve inventory
await _inventoryService.ReserveItemsAsync(message.Items);
// Process order
await _orderService.ProcessOrderAsync(message);
// Publish success event
await _publisher.PublishAsync(new OrderProcessedEvent
{
OrderId = message.Id,
OrderNumber = message.OrderNumber,
Status = "Processed"
}, routingKey: "order.processed");
Logger.LogInformation("Order processed successfully: {OrderNumber}", message.OrderNumber);
}
catch (Exception ex)
{
Logger.LogError(ex, "Failed to process order: {OrderNumber}", message.OrderNumber);
throw; // Let EventTransit handle retry
}
}
}
Attribute Reference Tables
MessageBinding Attribute Properties
| Property | Type | Default | Description |
|---|---|---|---|
exchangeName |
string | Required | Name of the exchange |
queueName |
string | Auto-generated | Name of the queue (optional) |
RoutingKey |
string | "" | Routing key for message routing |
ExchangeType |
ExchangeType | Direct | Type of exchange (Direct, Topic, Fanout, Headers) |
DurableExchange |
bool | true | Exchange survives broker restart |
DurableQueue |
bool | true | Queue survives broker restart |
AutoDeleteExchange |
bool | false | Exchange deleted when last queue unbinds |
AutoDeleteQueue |
bool | false | Queue deleted when last consumer disconnects |
ExclusiveQueue |
bool | false | Queue used by only one connection |
DeadLetterExchange |
string | null | Dead letter exchange name |
DeadLetterRoutingKey |
string | null | Routing key for dead letters |
MessageTTL |
int | null | Message time-to-live in milliseconds |
MaxLength |
int | null | Maximum number of messages in queue |
MaxLengthBytes |
int | null | Maximum queue size in bytes |
Priority |
int | 0 | Queue priority (0-10) |
ConsumerBinding Attribute Properties
| Property | Type | Default | Description |
|---|---|---|---|
exchangeName |
string | Required | Name of the exchange |
queueName |
string | Required | Name of the queue |
RoutingKey |
string | "" | Routing key for binding |
PrefetchCount |
int | 10 | Number of messages to prefetch |
AutoAck |
bool | false | Automatically acknowledge messages |
Exclusive |
bool | false | Exclusive consumer (only one allowed) |
ConsumerTag |
string | Auto-generated | Consumer identifier |
Priority |
int | 0 | Consumer priority (0-10) |
Arguments |
Dictionary | null | Additional RabbitMQ arguments |
ExchangeType Enum Values
| Value | Description | Use Case |
|---|---|---|
Direct |
Exact routing key match | Point-to-point messaging, task queues |
Topic |
Pattern-based routing | Event notifications, logging, monitoring |
Fanout |
Broadcast to all queues | System-wide notifications, cache invalidation |
Headers |
Header-based routing | Complex routing based on message headers |
🔧 Worker Service Setup
Option 1: Worker Service with Dashboard (Recommended)
using EventTransit;
using Microsoft.EntityFrameworkCore;
var builder = WebApplication.CreateBuilder(args);
// Configure Kestrel for background service
builder.WebHost.ConfigureKestrel(options =>
{
options.ListenLocalhost(5000); // Dashboard port
});
// Add EventTransit
builder.Services.AddEventTransitWithEntityFramework<WorkerDbContext>(
config =>
{
config.Broker.ConnectionString = builder.Configuration.GetConnectionString("RabbitMQ");
config.DefaultExchange.Name = "worker-exchange";
config.Outbox.Enabled = true;
config.Inbox.Enabled = true;
config.DeadLetter.EnableCrossServiceNotifications = true;
config.DeadLetter.ServiceName = "WorkerService";
config.Resilience.Enabled = true;
config.Resilience.MaxRetryAttempts = 3;
},
new EventTransit.Schema.SchemaConfiguration
{
AutoCreateTables = true,
CreateIndexes = true
});
// Add DbContext
builder.Services.AddDbContext<WorkerDbContext>(options =>
options.UseNpgsql(builder.Configuration.GetConnectionString("Database")));
// Add your background workers
builder.Services.AddHostedService<OrderProcessingWorker>();
builder.Services.AddHostedService<EmailNotificationWorker>();
// Register consumers
builder.Services.AddEventTransitConsumer<OrderCreatedConsumer>();
builder.Services.AddEventTransitConsumer<PaymentProcessedConsumer>();
var app = builder.Build();
// Enable the dashboard
app.UseEventTransitDashboard();
// Optional: Add health checks
app.MapGet("/health", () => Results.Ok(new { status = "healthy", timestamp = DateTime.UtcNow }));
await app.RunAsync();
Option 2: Worker Service WITHOUT Dashboard
using EventTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
var builder = Host.CreateDefaultBuilder(args);
builder.ConfigureServices((context, services) =>
{
// Add EventTransit WITHOUT dashboard
services.AddEventTransitWithEntityFramework<WorkerDbContext>(
config =>
{
config.Broker.ConnectionString = context.Configuration.GetConnectionString("RabbitMQ");
config.DefaultExchange.Name = "worker-exchange";
config.Outbox.Enabled = true;
config.Inbox.Enabled = true;
},
new EventTransit.Schema.SchemaConfiguration
{
AutoCreateTables = true
});
// Add DbContext
services.AddDbContext<WorkerDbContext>(options =>
options.UseNpgsql(context.Configuration.GetConnectionString("Database")));
// Add background workers
services.AddHostedService<OrderProcessingWorker>();
// Register consumers
services.AddEventTransitConsumer<OrderCreatedConsumer>();
});
var host = builder.Build();
await host.RunAsync();
Background Worker Example
public class OrderProcessingWorker : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OrderProcessingWorker> _logger;
public OrderProcessingWorker(
IServiceProvider serviceProvider,
ILogger<OrderProcessingWorker> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Order Processing Worker started");
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = _serviceProvider.CreateScope();
var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>();
var publisher = scope.ServiceProvider.GetRequiredService<IMessagePublisher>();
// Check for pending orders
var pendingOrders = await orderService.GetPendingOrdersAsync();
foreach (var order in pendingOrders)
{
// Publish order event
await publisher.PublishAsync(new OrderCreatedEvent
{
OrderId = order.Id,
OrderNumber = order.OrderNumber,
Amount = order.Amount
}, routingKey: "order.created");
_logger.LogInformation("Published order event: {OrderNumber}", order.OrderNumber);
}
// Wait before next check
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in Order Processing Worker");
await Task.Delay(TimeSpan.FromSeconds(60), stoppingToken);
}
}
_logger.LogInformation("Order Processing Worker stopped");
}
}
📊 Dashboard
Accessing the Dashboard
Once configured, the dashboard is available at:
- URL:
http://localhost:5000/eventtransit(or your configured port) - Features:
- Real-time message monitoring
- Outbox/Inbox statistics
- Dead letter management
- Message retry/requeue
- Error analysis
- Performance metrics
Dashboard Features
1. Overview Page
- Total outbox messages
- Total inbox messages
- Dead letters count
- Pending outbox count
- Duplicate inbox count
- Completed outbox count
2. Outbox Page
- View all outbox messages
- Filter by status (Pending, Dispatched, Dead Lettered)
- Filter by date range
- Search by message ID
- View message payload
- Retry failed messages
- Bulk operations
3. Inbox Page
- View all inbox messages
- Filter by status (Processed, Failed)
- Filter by date range
- View message payload
- View processing attempts
- View error details
4. Dead Letters Page
- View all dead lettered messages
- View error details and stack traces
- View retry history
- Retry/republish messages
- Bulk retry operations
- Delete dead letters
Dashboard Screenshots
┌─────────────────────────────────────────────────────────┐
│ EventTransit Dashboard │
├─────────────────────────────────────────────────────────┤
│ │
│ 📊 Statistics │
│ ┌──────────────┬──────────────┬──────────────┐ │
│ │ Total Outbox │ Total Inbox │ Dead Letters │ │
│ │ 1,234 │ 987 │ 12 │ │
│ └──────────────┴──────────────┴──────────────┘ │
│ │
│ 📋 Recent Messages │
│ ┌────────────────────────────────────────────┐ │
│ │ MessageId │ Type │ Status │ │
│ ├────────────────────────────────────────────┤ │
│ │ abc-123 │ OrderCreated │ Dispatched │ │
│ │ def-456 │ PaymentProc │ Pending │ │
│ │ ghi-789 │ EmailSent │ Failed │ │
│ └────────────────────────────────────────────┘ │
│ │
│ [View Outbox] [View Inbox] [View Dead Letters] │
└─────────────────────────────────────────────────────────┘
🎯 Best Practices
1. Message Design
✅ DO:
// Use IEventWithId for automatic ID generation
public class OrderCreatedEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string OrderNumber { get; set; }
public decimal Amount { get; set; }
public DateTime CreatedAt { get; set; }
}
// Include all necessary data in the message
public class PaymentProcessedEvent : IEventWithId
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string PaymentId { get; set; }
public string OrderId { get; set; }
public decimal Amount { get; set; }
public string Currency { get; set; }
public string PaymentMethod { get; set; }
public DateTime ProcessedAt { get; set; }
}
❌ DON'T:
// Don't include sensitive data
public class BadPaymentEvent : IEventWithId
{
public string CreditCardNumber { get; set; } // ❌ Never include PII
public string CVV { get; set; } // ❌ Never include sensitive data
}
// Don't make messages too large
public class BadOrderEvent : IEventWithId
{
public byte[] LargeFile { get; set; } // ❌ Don't include large binary data
public List<Product> AllProducts { get; set; } // ❌ Don't include unnecessary data
}
2. Consumer Design
✅ DO:
// Use inbox tracking for idempotency
[ConsumerBinding("payments", "payment-queue", RoutingKey = "payment.processed")]
public class PaymentConsumer : ConsumerBase<PaymentEvent>, IInboxTrackingPreference
{
public bool UseInboxTracking => true; // ✅ Enable for critical operations
protected override async Task HandleMessageAsync(PaymentEvent message)
{
// This will only execute once, even if message is delivered multiple times
await ProcessPaymentAsync(message);
}
}
// Keep consumers focused and single-purpose
[ConsumerBinding("orders", "order-email-queue", RoutingKey = "order.created")]
public class OrderEmailConsumer : ConsumerBase<OrderCreatedEvent>
{
protected override async Task HandleMessageAsync(OrderCreatedEvent message)
{
// Only send email, nothing else
await SendOrderConfirmationEmailAsync(message);
}
}
❌ DON'T:
// Don't do too much in one consumer
public class BadOrderConsumer : ConsumerBase<OrderCreatedEvent>
{
protected override async Task HandleMessageAsync(OrderCreatedEvent message)
{
await ProcessPayment(message); // ❌ Too many responsibilities
await UpdateInventory(message); // ❌ Should be separate consumers
await SendEmail(message); // ❌ Should be separate consumers
await UpdateAnalytics(message); // ❌ Should be separate consumers
}
}
3. Error Handling
✅ DO:
[ConsumerBinding("orders", "order-queue", RoutingKey = "order.process")]
public class OrderConsumer : ConsumerBase<OrderEvent>
{
protected override async Task HandleMessageAsync(OrderEvent message)
{
try
{
await ProcessOrderAsync(message);
}
catch (ValidationException ex)
{
// Log validation errors
Logger.LogWarning(ex, "Validation failed for order {OrderId}", message.OrderId);
// Don't retry validation errors
return;
}
catch (TransientException ex)
{
// Log transient errors
Logger.LogError(ex, "Transient error processing order {OrderId}", message.OrderId);
// Let EventTransit retry with exponential backoff
throw;
}
}
}
4. Configuration
✅ DO:
// Use environment-specific configuration
builder.Services.AddEventTransitWithEntityFramework<MyDbContext>(
config =>
{
var environment = builder.Environment.EnvironmentName;
if (environment == "Production")
{
config.Outbox.ProcessingIntervalSeconds = 5; // Faster in production
config.Resilience.MaxRetryAttempts = 5; // More retries in production
}
else
{
config.Outbox.ProcessingIntervalSeconds = 30; // Slower in development
config.Resilience.MaxRetryAttempts = 2; // Fewer retries in development
}
config.DeadLetter.ServiceName = $"MyService-{environment}";
},
new EventTransit.Schema.SchemaConfiguration
{
AutoCreateTables = builder.Environment.IsDevelopment() // Only auto-create in dev
});
5. Testing
✅ DO:
// Unit test consumers
public class OrderConsumerTests
{
[Fact]
public async Task HandleMessageAsync_ValidOrder_ProcessesSuccessfully()
{
// Arrange
var logger = new Mock<ILogger<OrderConsumer>>();
var orderService = new Mock<IOrderService>();
var consumer = new OrderConsumer(orderService.Object, logger.Object);
var message = new OrderCreatedEvent
{
OrderNumber = "ORD-123",
Amount = 100.00m
};
// Act
await consumer.HandleMessageAsync(message);
// Assert
orderService.Verify(x => x.ProcessOrderAsync(message), Times.Once);
}
}
// Integration test with TestContainers
public class EventTransitIntegrationTests : IAsyncLifetime
{
private readonly RabbitMqContainer _rabbitMqContainer;
private readonly PostgreSqlContainer _postgresContainer;
public async Task InitializeAsync()
{
await _rabbitMqContainer.StartAsync();
await _postgresContainer.StartAsync();
}
[Fact]
public async Task PublishAndConsume_MessageDelivered()
{
// Test end-to-end message flow
}
}
🐛 Troubleshooting
Issue 1: Messages Not Being Consumed
Symptoms: Messages published but not consumed
Solutions:
Check RabbitMQ connection:
docker logs rabbitmqVerify consumer is registered:
builder.Services.AddEventTransitConsumer<MyConsumer>();Check exchange and queue bindings in RabbitMQ Management UI
Verify routing keys match
Issue 2: Duplicate Messages
Symptoms: Same message processed multiple times
Solutions:
Enable inbox tracking:
public class MyConsumer : ConsumerBase<MyEvent>, IInboxTrackingPreference { public bool UseInboxTracking => true; }Ensure message has unique ID:
public class MyEvent : IEventWithId { public string Id { get; set; } = Guid.NewGuid().ToString(); }
Issue 3: Messages Going to Dead Letter
Symptoms: Messages immediately go to dead letter
Solutions:
Check consumer error handling
Review logs for exceptions
Verify retry configuration:
config.Resilience.MaxRetryAttempts = 3; config.Resilience.BaseRetryDelayMs = 1000;Check Dashboard for error details
Issue 4: Slow Message Processing
Symptoms: Messages accumulating in outbox
Solutions:
Increase batch size:
config.Outbox.BatchSize = 200;Decrease processing interval:
config.Outbox.ProcessingIntervalSeconds = 5;Scale consumers horizontally
Check database performance
📞 Support
For issues, questions, or contributions:
- GitHub: [Your Repository URL]
- Documentation: [Your Docs URL]
- Email: [Your Support Email]
📄 License
MIT License - See LICENSE file for details
🎉 Conclusion
EventTransit provides a complete, production-ready solution for reliable message delivery in .NET applications. With built-in support for outbox/inbox patterns, dead letter handling, exponential retry, and a monitoring dashboard, you can build robust distributed systems with confidence.
Key Takeaways:
- ✅ Use Direct Exchange for point-to-point messaging
- ✅ Use Topic Exchange for pattern-based routing
- ✅ Use Fanout Exchange for broadcasting
- ✅ Enable Inbox Tracking for idempotency
- ✅ Configure Exponential Retry for resilience
- ✅ Monitor with the Built-in Dashboard
- ✅ Follow Best Practices for production deployments
Happy messaging! 🚀
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net9.0
- Microsoft.AspNetCore.Mvc.Core (>= 2.2.5)
- Microsoft.Data.SqlClient (>= 6.1.1)
- Microsoft.Data.Sqlite (>= 9.0.6)
- Microsoft.EntityFrameworkCore (>= 9.0.6)
- Microsoft.EntityFrameworkCore.Relational (>= 9.0.6)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.6)
- Microsoft.Extensions.Configuration.Binder (>= 9.0.6)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 9.0.9)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 9.0.6)
- Microsoft.Extensions.Hosting.Abstractions (>= 9.0.6)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.9)
- Microsoft.Extensions.Options (>= 9.0.9)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 9.0.6)
- MySqlConnector (>= 2.4.0)
- Npgsql (>= 9.0.3)
- Polly (>= 8.5.2)
- Polly.Extensions (>= 8.5.2)
- RabbitMQ.Client (>= 7.1.2)
- System.ComponentModel.Annotations (>= 5.0.0)
- System.Text.Json (>= 9.0.6)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
v2.0.0: Complete rewrite with multi-broker support, clean architecture, and high performance.