EventTransit 2.0.2
See the version list below for details.
dotnet add package EventTransit --version 2.0.2
NuGet\Install-Package EventTransit -Version 2.0.2
<PackageReference Include="EventTransit" Version="2.0.2" />
<PackageVersion Include="EventTransit" Version="2.0.2" />
<PackageReference Include="EventTransit" />
paket add EventTransit --version 2.0.2
#r "nuget: EventTransit, 2.0.2"
#:package EventTransit@2.0.2
#addin nuget:?package=EventTransit&version=2.0.2
#tool nuget:?package=EventTransit&version=2.0.2
EventTransit
EventTransit Integration Guide
Overview
EventTransit is an enterprise message bus library that implements the Transactional Outbox Pattern for reliable async message processing with RabbitMQ and Entity Framework Core.
Installation
dotnet add package EventTransit --version 2.0.2
Quick Start
1. Configure appsettings.json
Minimal Configuration:
{
"ConnectionStrings": {
"DefaultConnection": "Host=localhost;Database=mydb;Username=user;Password=pass"
},
"EventTransit": {
"Broker": {
"BrokerType": "RabbitMQ",
"ConnectionString": "amqp://guest:guest@localhost:5672"
},
"DefaultExchange": {
"Name": "my-exchange",
"Type": "direct"
}
}
}
Full Configuration (All Options):
{
"ConnectionStrings": {
"DefaultConnection": "Host=localhost;Database=mydb;Username=user;Password=pass"
},
"EventTransit": {
"Broker": {
"BrokerType": "RabbitMQ",
"ConnectionString": "amqp://guest:guest@localhost:5672",
"ClientProvidedName": "my-service-publisher",
"HeartbeatSeconds": 60,
"NetworkRecoveryIntervalSeconds": 10,
"AutomaticRecoveryEnabled": true,
"TopologyRecoveryEnabled": true,
"RequestedChannelMax": 2047,
"RequestedFrameMax": 0,
"UseBackgroundThreadsForIO": false,
"DispatchConsumersConcurrently": true,
"ConsumerDispatchConcurrency": 1
},
"DefaultExchange": {
"Name": "my-exchange",
"Type": "direct",
"Durable": true,
"AutoDelete": false
},
"DefaultQueue": {
"Durable": true,
"Exclusive": false,
"AutoDelete": false,
"Arguments": {}
},
"DefaultPublisher": {
"ConfirmSelect": true,
"Persistent": true,
"Mandatory": false,
"Immediate": false
},
"DefaultConsumer": {
"PrefetchCount": 20,
"PrefetchSize": 0,
"Global": false,
"AutoAck": false,
"ExclusiveConsumer": false,
"ConsumerTag": "",
"NoLocal": false,
"Arguments": {}
},
"Outbox": {
"Enabled": true,
"ProcessingIntervalSeconds": 1,
"BatchSize": 100,
"MaxRetryAttempts": 3,
"RetryDelaySeconds": 30,
"DeadLetterAfterMaxRetries": true,
"KeepProcessedMessagesDays": 7
},
"Inbox": {
"Enabled": true,
"ProcessingIntervalSeconds": 30,
"BatchSize": 100,
"MaxRetryAttempts": 3,
"RetryDelaySeconds": 30,
"DeadLetterAfterMaxRetries": true,
"KeepProcessedMessagesDays": 7
},
"DeadLetter": {
"ServiceName": "MyService",
"EnableDuplicateNotifications": true,
"MaxStackTraceLength": 2000
}
}
}
Configuration Options Explained:
| Section | Option | Description | Default |
|---|---|---|---|
| Broker | BrokerType |
Message broker type | RabbitMQ |
ConnectionString |
RabbitMQ connection string | Required | |
ClientProvidedName |
Client name for RabbitMQ connection | eventtransit-publisher |
|
| DefaultExchange | Name |
Exchange name | Required |
Type |
Exchange type (direct, topic, fanout) |
direct |
|
Durable |
Survive broker restart | true |
|
| Outbox | ProcessingIntervalSeconds |
How often to process outbox | 1 |
BatchSize |
Messages per batch | 100 |
|
MaxRetryAttempts |
Retry attempts before dead letter | 3 |
|
| Inbox | ProcessingIntervalSeconds |
How often to clean inbox | 30 |
KeepProcessedMessagesDays |
Retention period | 7 |
|
| DefaultConsumer | PrefetchCount |
Messages to prefetch | 20 |
AutoAck |
Auto-acknowledge messages | false |
2. Update Your DbContext
using EventTransit.EntityFramework.Extensions;
using Microsoft.EntityFrameworkCore;
public class MyDbContext : DbContext
{
public MyDbContext(DbContextOptions<MyDbContext> options) : base(options) {}
// Your entities
public DbSet<Order> Orders => Set<Order>();
public DbSet<Customer> Customers => Set<Customer>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
// Your entity configurations
modelBuilder.ApplyConfigurationsFromAssembly(typeof(MyDbContext).Assembly);
// Add EventTransit tables (OutboxMessage, InboxMessage)
modelBuilder.AddEventTransitOutboxInbox();
base.OnModelCreating(modelBuilder);
}
}
3. Register Services
using EventTransit.Extensions;
using EventTransit.Dashboard.Extensions;
var builder = WebApplication.CreateBuilder(args);
// Register your DbContext
builder.Services.AddDbContext<MyDbContext>(options =>
options.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection")));
// Register EventTransit
builder.Services.AddEventTransitWithEntityFramework<MyDbContext>(
configureOptions: cfg => builder.Configuration.GetSection("EventTransit").Bind(cfg)
);
// Optional: Add EventTransit Dashboard
builder.Services.AddEventTransitDashboardWithEntityFramework<MyDbContext>(builder.Configuration);
var app = builder.Build();
// Optional: Map dashboard UI
app.UseEventTransitDashboard("/eventtransit");
app.Run();
4. Create Database Migration
dotnet ef migrations add AddEventTransit
dotnet ef database update
Usage
Step 1: Define Your Event
using EventTransit.Core;
[MessageBinding(
exchangeName: "my-exchange",
queueName: "order-created-queue",
RoutingKey = "order.created",
ExchangeType = ExchangeType.Direct,
DurableQueue = true,
DurableExchange = true)]
public class OrderCreatedEvent : IEventWithId
{
// Must be string, not Guid
public string Id { get; set; } = Guid.NewGuid().ToString();
public string OrderId { get; set; } = string.Empty;
public string CustomerId { get; set; } = string.Empty;
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
Exchange Type Examples:
// Direct Exchange - Point-to-point messaging
[MessageBinding("orders.direct", "order-queue", RoutingKey = "order.created")]
// Topic Exchange - Pattern-based routing
[MessageBinding("events.topic", "payment-queue", RoutingKey = "order.*.created")]
// Fanout Exchange - Broadcast to all queues
[MessageBinding("notifications.fanout", "email-queue")]
Step 2: Publish Events
Option A: Using Outbox Publisher (Transactional - Recommended)
using EventTransit.Core;
public class OrderService
{
private readonly IOutboxPublisher _publisher;
private readonly MyDbContext _context;
public OrderService(IOutboxPublisher publisher, MyDbContext context)
{
_publisher = publisher;
_context = context;
}
public async Task<Order> CreateOrderAsync(CreateOrderRequest request)
{
// Start transaction
using var transaction = await _context.Database.BeginTransactionAsync();
try
{
// Save order to database
var order = new Order
{
CustomerId = request.CustomerId,
TotalAmount = request.TotalAmount
};
_context.Orders.Add(order);
await _context.SaveChangesAsync();
// Publish event (saved to outbox table in same transaction)
await _publisher.PublishAsync(new OrderCreatedEvent
{
Id = Guid.NewGuid().ToString(),
OrderId = order.Id.ToString(),
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount,
CreatedAt = DateTime.UtcNow
});
// Commit transaction - both order and event are saved atomically
await transaction.CommitAsync();
return order;
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
Option B: Using Direct Publisher (Fire-and-Forget)
public class NotificationService
{
private readonly IDirectPublisher _publisher;
public NotificationService(IDirectPublisher publisher)
{
_publisher = publisher;
}
public async Task SendNotificationAsync(string userId, string message)
{
// Publishes immediately to RabbitMQ without database transaction
await _publisher.PublishAsync(new NotificationEvent
{
Id = Guid.NewGuid().ToString(),
UserId = userId,
Message = message,
SentAt = DateTime.UtcNow
});
}
}
Publishing Delayed Messages:
// Publish a message to be delivered after 5 minutes
await _publisher.PublishDelayedAsync(
new ReminderEvent
{
Id = Guid.NewGuid().ToString(),
UserId = "123",
Message = "Don't forget!"
},
delay: TimeSpan.FromMinutes(5)
);
Publishing with Custom Headers:
await _publisher.PublishAsync(
message: orderEvent,
headers: new Dictionary<string, object>
{
{ "correlation-id", correlationId },
{ "user-id", userId },
{ "priority", 5 }
}
);
Batch Publishing:
var events = new List<OrderCreatedEvent>
{
new() { Id = Guid.NewGuid().ToString(), OrderId = "1", TotalAmount = 100 },
new() { Id = Guid.NewGuid().ToString(), OrderId = "2", TotalAmount = 200 },
new() { Id = Guid.NewGuid().ToString(), OrderId = "3", TotalAmount = 300 }
};
await _publisher.PublishBatchAsync(events);
Step 3: Create Consumer
Basic Consumer:
using EventTransit;
using EventTransit.Consumers;
using EventTransit.Core;
using Microsoft.Extensions.Logging;
[ConsumerBinding("my-exchange", "order-created-queue",
RoutingKey = "order.created",
ExchangeType = ExchangeType.Direct)]
public class OrderCreatedConsumer : ConsumerBase<OrderCreatedEvent>
{
private readonly ILogger<OrderCreatedConsumer> _logger;
private readonly IOrderProcessor _orderProcessor;
public OrderCreatedConsumer(
ILogger<OrderCreatedConsumer> logger,
IOrderProcessor orderProcessor) : base(logger)
{
_logger = logger;
_orderProcessor = orderProcessor;
}
protected override async Task HandleMessageAsync(OrderCreatedEvent message)
{
_logger.LogInformation("Processing order {OrderId}", message.OrderId);
await _orderProcessor.ProcessOrderAsync(message.OrderId);
_logger.LogInformation("Order {OrderId} processed successfully", message.OrderId);
}
}
Consumer with Inbox (Automatic Duplicate Detection):
using EventTransit;
using EventTransit.Consumers;
using EventTransit.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
[ConsumerBinding("my-exchange", "payment-queue",
RoutingKey = "order.created",
ExchangeType = ExchangeType.Direct)]
public class PaymentConsumer : ConsumerBase<OrderCreatedEvent>, IInboxTrackingPreference
{
private readonly IServiceScopeFactory _serviceScopeFactory;
// Enable idempotency - prevents duplicate processing
public bool UseInboxTracking => true;
public PaymentConsumer(
IServiceScopeFactory serviceScopeFactory,
ILogger<PaymentConsumer> logger)
: base(logger)
{
_serviceScopeFactory = serviceScopeFactory;
}
protected override async Task HandleMessageAsync(OrderCreatedEvent message)
{
Logger.LogInformation("Processing payment for order {OrderId}", message.OrderId);
// Create scope for scoped services
using var scope = _serviceScopeFactory.CreateScope();
var paymentService = scope.ServiceProvider.GetRequiredService<IPaymentService>();
// This will only be called once per unique message ID
// Duplicates are automatically detected and rejected
await paymentService.ProcessPaymentAsync(message.OrderId, message.TotalAmount);
Logger.LogInformation("Payment processed for order {OrderId}", message.OrderId);
}
}
Querying Failed Messages - The dashboarad already does this
EventTransit tracks failed messages in the InboxMessage table. Query them using EF Core:
using EventTransit.EntityFramework.Entities;
using Microsoft.EntityFrameworkCore;
public class FailedMessageService
{
private readonly MyDbContext _context;
public FailedMessageService(MyDbContext context)
{
_context = context;
}
public async Task<List<InboxMessage>> GetFailedMessagesAsync(int limit = 100)
{
return await _context.Set<InboxMessage>()
.Where(m => m.FailedOn != null)
.OrderByDescending(m => m.FailedOn)
.Take(limit)
.ToListAsync();
}
public async Task<List<InboxMessage>> GetFailedMessagesByTypeAsync(string eventType, int limit = 100)
{
return await _context.Set<InboxMessage>()
.Where(m => m.FailedOn != null)
.Where(m => m.Type == eventType)
.OrderByDescending(m => m.FailedOn)
.Take(limit)
.ToListAsync();
}
}
When to Use Which Publisher?
| Feature | Outbox Publisher | Direct Publisher |
|---|---|---|
| Guaranteed Delivery | ✅ Yes | ❌ No |
| Transactional | ✅ Yes | ❌ No |
| Survives Broker Downtime | ✅ Yes | ❌ No |
| Performance | Good | Excellent |
| Use Case | Critical business events | High-throughput notifications |
| Examples | Orders, Payments, Inventory | Logs, Metrics, Analytics |
Recommendation: Use Outbox Publisher for critical business events where you cannot afford to lose messages. Use Direct Publisher for high-throughput scenarios where occasional message loss is acceptable.
Key Features
1. Transactional Outbox Pattern
Events are saved to the database in the same transaction as your business data, then published to RabbitMQ by a background processor. This guarantees at-least-once delivery.
2. Inbox Pattern (Idempotency)
Enable UseInboxTracking = true in your consumer to prevent duplicate processing. EventTransit tracks processed messages by MessageId.
3. Automatic Retry & Dead Letter Queue
- Failed messages are automatically retried (default: 3 attempts)
- After max retries, messages are marked with
FailedOntimestamp - Query
InboxMessagetable whereFailedOn IS NOT NULLto get failed messages
4. Delayed Messages
Schedule messages for future delivery using TTL + DLX pattern:
await _publisher.PublishDelayedAsync(message, TimeSpan.FromMinutes(5));
5. Built-in Dashboard
Access the dashboard at /eventtransit to:
- 📊 View overview with total messages, pending, completed, dead letters
- 📤 View outbox messages with filtering and search
- 📥 View inbox messages with duplicate tracking
- ☠️ View dead letters with retry/republish capabilities
- 🔍 Click any message to see full payload, headers, and history
Important Notes
| Topic | Requirement |
|---|---|
| Event ID Type | Must be string, not Guid |
| Consumer Dependency Injection | Use IServiceScopeFactory, not IServiceProvider |
| Idempotency | Set UseInboxTracking = true in consumer |
| Exchange Types | Use ExchangeType.Direct for point-to-point, ExchangeType.Topic for pub/sub |
| Failed Messages | Query InboxMessage table where FailedOn IS NOT NULL |
Database Tables
EventTransit creates two tables in your database:
OutboxMessage- Stores events to be published to RabbitMQInboxMessage- Tracks received messages and processing status
Troubleshooting
"HandleMessageAsync doesn't exist" Error
If you encounter this error after installing EventTransit:
# Clear NuGet cache
dotnet nuget locals all --clear
# Restore without cache
dotnet restore --no-cache
# Rebuild
dotnet build
This ensures you're using the latest version of the library.
Consumer throws ObjectDisposedException
Solution: Inject IServiceScopeFactory instead of IServiceProvider in your consumer.
// ❌ Wrong
public MyConsumer(IServiceProvider serviceProvider, ILogger<MyConsumer> logger) : base(logger)
{
_serviceProvider = serviceProvider;
}
// ✅ Correct
public MyConsumer(IServiceScopeFactory serviceScopeFactory, ILogger<MyConsumer> logger) : base(logger)
{
_serviceScopeFactory = serviceScopeFactory;
}
Messages not being consumed
Solution: Ensure RabbitMQ is running and the exchange/queue names match between event and consumer attributes.
Duplicate message processing
Solution: Implement IInboxTrackingPreference and set UseInboxTracking = true in your consumer.
Manual Retry from Dashboard
- Navigate to
/eventtransitdashboard - Go to Dead Letters page
- Select failed messages
- Click "Retry" or "Republish"
- Messages are requeued automatically
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net9.0
- Microsoft.AspNetCore.Mvc.Core (>= 2.2.5)
- Microsoft.Data.SqlClient (>= 6.1.1)
- Microsoft.Data.Sqlite (>= 9.0.6)
- Microsoft.EntityFrameworkCore (>= 9.0.6)
- Microsoft.EntityFrameworkCore.Relational (>= 9.0.6)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.6)
- Microsoft.Extensions.Configuration.Binder (>= 9.0.6)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 9.0.9)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 9.0.6)
- Microsoft.Extensions.Hosting.Abstractions (>= 9.0.6)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.9)
- Microsoft.Extensions.Options (>= 9.0.9)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 9.0.6)
- MySqlConnector (>= 2.4.0)
- Npgsql (>= 9.0.3)
- Polly (>= 8.5.2)
- Polly.Extensions (>= 8.5.2)
- RabbitMQ.Client (>= 7.1.2)
- System.ComponentModel.Annotations (>= 5.0.0)
- System.Text.Json (>= 9.0.6)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
v2.0.2: Fixed dashboard UI issues (filter dialog positioning, table scrolling, column alignment), corrected Dead Letter status filter enum mapping, improved CSS specificity for page-specific styles.