EventTransit 2.0.3

dotnet add package EventTransit --version 2.0.3
                    
NuGet\Install-Package EventTransit -Version 2.0.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="EventTransit" Version="2.0.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="EventTransit" Version="2.0.3" />
                    
Directory.Packages.props
<PackageReference Include="EventTransit" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add EventTransit --version 2.0.3
                    
#r "nuget: EventTransit, 2.0.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package EventTransit@2.0.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=EventTransit&version=2.0.3
                    
Install as a Cake Addin
#tool nuget:?package=EventTransit&version=2.0.3
                    
Install as a Cake Tool

EventTransit

NuGet License: MIT .NET

EventTransit Integration Guide

Overview

EventTransit is an enterprise message bus library that implements the Transactional Outbox Pattern for reliable async message processing with RabbitMQ and Entity Framework Core.

Installation

dotnet add package EventTransit --version 2.0.3

Quick Start

1. Configure appsettings.json

Minimal Configuration:

{
  "ConnectionStrings": {
    "DefaultConnection": "Host=localhost;Database=mydb;Username=user;Password=pass"
  },
  "EventTransit": {
    "Broker": {
      "BrokerType": "RabbitMQ",
      "ConnectionString": "amqp://guest:guest@localhost:5672"
    },
    "DefaultExchange": {
      "Name": "my-exchange",
      "Type": "direct"
    }
  }
}

Full Configuration (All Options):

{
  "ConnectionStrings": {
    "DefaultConnection": "Host=localhost;Database=mydb;Username=user;Password=pass"
  },
  "EventTransit": {
    "Broker": {
      "BrokerType": "RabbitMQ",
      "ConnectionString": "amqp://guest:guest@localhost:5672",
      "ClientProvidedName": "my-service-publisher",
      "HeartbeatSeconds": 60,
      "NetworkRecoveryIntervalSeconds": 10,
      "AutomaticRecoveryEnabled": true,
      "TopologyRecoveryEnabled": true,
      "RequestedChannelMax": 2047,
      "RequestedFrameMax": 0,
      "UseBackgroundThreadsForIO": false,
      "DispatchConsumersConcurrently": true,
      "ConsumerDispatchConcurrency": 1
    },
    "DefaultExchange": {
      "Name": "my-exchange",
      "Type": "direct",
      "Durable": true,
      "AutoDelete": false
    },
    "DefaultQueue": {
      "Durable": true,
      "Exclusive": false,
      "AutoDelete": false,
      "Arguments": {}
    },
    "DefaultPublisher": {
      "ConfirmSelect": true,
      "Persistent": true,
      "Mandatory": false,
      "Immediate": false
    },
    "DefaultConsumer": {
      "PrefetchCount": 20,
      "PrefetchSize": 0,
      "Global": false,
      "AutoAck": false,
      "ExclusiveConsumer": false,
      "ConsumerTag": "",
      "NoLocal": false,
      "Arguments": {}
    },
    "Outbox": {
      "Enabled": true,
      "ProcessingIntervalSeconds": 1,
      "BatchSize": 100,
      "MaxRetryAttempts": 3,
      "RetryDelaySeconds": 30,
      "DeadLetterAfterMaxRetries": true,
      "KeepProcessedMessagesDays": 7
    },
    "Inbox": {
      "Enabled": true,
      "ProcessingIntervalSeconds": 30,
      "BatchSize": 100,
      "MaxRetryAttempts": 3,
      "RetryDelaySeconds": 30,
      "DeadLetterAfterMaxRetries": true,
      "KeepProcessedMessagesDays": 7
    },
    "DeadLetter": {
      "ServiceName": "MyService",
      "EnableDuplicateNotifications": true,
      "MaxStackTraceLength": 2000
    }
  }
}

Configuration Options Explained:

Section Option Description Default
Broker BrokerType Message broker type RabbitMQ
ConnectionString RabbitMQ connection string Required
ClientProvidedName Client name for RabbitMQ connection eventtransit-publisher
DefaultExchange Name Exchange name Required
Type Exchange type (direct, topic, fanout) direct
Durable Survive broker restart true
Outbox ProcessingIntervalSeconds How often to process outbox 1
BatchSize Messages per batch 100
MaxRetryAttempts Retry attempts before dead letter 3
Inbox ProcessingIntervalSeconds How often to clean inbox 30
KeepProcessedMessagesDays Retention period 7
DefaultConsumer PrefetchCount Messages to prefetch 20
AutoAck Auto-acknowledge messages false

2. Update Your DbContext

using EventTransit.EntityFramework.Extensions;
using Microsoft.EntityFrameworkCore;

public class MyDbContext : DbContext
{
    public MyDbContext(DbContextOptions<MyDbContext> options) : base(options) {}

    // Your entities
    public DbSet<Order> Orders => Set<Order>();
    public DbSet<Customer> Customers => Set<Customer>();

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        // Your entity configurations
        modelBuilder.ApplyConfigurationsFromAssembly(typeof(MyDbContext).Assembly);

        // Add EventTransit tables (OutboxMessage, InboxMessage)
        modelBuilder.AddEventTransitOutboxInbox();

        base.OnModelCreating(modelBuilder);
    }
}

3. Register Services

using EventTransit.Extensions;
using EventTransit.Dashboard.Extensions;

var builder = WebApplication.CreateBuilder(args);

// Register your DbContext
builder.Services.AddDbContext<MyDbContext>(options =>
    options.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection")));

// Register EventTransit
builder.Services.AddEventTransitWithEntityFramework<MyDbContext>(
    configureOptions: cfg => builder.Configuration.GetSection("EventTransit").Bind(cfg)
);

// Optional: Add EventTransit Dashboard
builder.Services.AddEventTransitDashboardWithEntityFramework<MyDbContext>(builder.Configuration);

var app = builder.Build();

// Optional: Map dashboard UI
app.UseEventTransitDashboard("/eventtransit");

app.Run();

4. Create Database Migration

dotnet ef migrations add AddEventTransit
dotnet ef database update

Usage

Step 1: Define Your Event

using EventTransit.Core;

[MessageBinding(
    exchangeName: "my-exchange",
    queueName: "order-created-queue",
    RoutingKey = "order.created",
    ExchangeType = ExchangeType.Direct,
    DurableQueue = true,
    DurableExchange = true)]
public class OrderCreatedEvent : IEventWithId
{
    // Must be string, not Guid
    public string Id { get; set; } = Guid.NewGuid().ToString();

    public string OrderId { get; set; } = string.Empty;
    public string CustomerId { get; set; } = string.Empty;
    public decimal TotalAmount { get; set; }
    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}

Exchange Type Examples:

// Direct Exchange - Point-to-point messaging
[MessageBinding("orders.direct", "order-queue", RoutingKey = "order.created")]

// Topic Exchange - Pattern-based routing
[MessageBinding("events.topic", "payment-queue", RoutingKey = "order.*.created")]

// Fanout Exchange - Broadcast to all queues
[MessageBinding("notifications.fanout", "email-queue")]

Step 2: Publish Events

Option A: Using Outbox Publisher (Transactional - Recommended)

using EventTransit.Core;

public class OrderService
{
    private readonly IOutboxPublisher _publisher;
    private readonly MyDbContext _context;

    public OrderService(IOutboxPublisher publisher, MyDbContext context)
    {
        _publisher = publisher;
        _context = context;
    }

    public async Task<Order> CreateOrderAsync(CreateOrderRequest request)
    {
        // Start transaction
        using var transaction = await _context.Database.BeginTransactionAsync();

        try
        {
            // Save order to database
            var order = new Order
            {
                CustomerId = request.CustomerId,
                TotalAmount = request.TotalAmount
            };

            _context.Orders.Add(order);
            await _context.SaveChangesAsync();

            // Publish event (saved to outbox table in same transaction)
            await _publisher.PublishAsync(new OrderCreatedEvent
            {
                Id = Guid.NewGuid().ToString(),
                OrderId = order.Id.ToString(),
                CustomerId = order.CustomerId,
                TotalAmount = order.TotalAmount,
                CreatedAt = DateTime.UtcNow
            });

            // Commit transaction - both order and event are saved atomically
            await transaction.CommitAsync();

            return order;
        }
        catch
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

Option B: Using Direct Publisher (Fire-and-Forget)

public class NotificationService
{
    private readonly IDirectPublisher _publisher;

    public NotificationService(IDirectPublisher publisher)
    {
        _publisher = publisher;
    }

    public async Task SendNotificationAsync(string userId, string message)
    {
        // Publishes immediately to RabbitMQ without database transaction
        await _publisher.PublishAsync(new NotificationEvent
        {
            Id = Guid.NewGuid().ToString(),
            UserId = userId,
            Message = message,
            SentAt = DateTime.UtcNow
        });
    }
}

Publishing Delayed Messages:

// Publish a message to be delivered after 5 minutes
await _publisher.PublishDelayedAsync(
    new ReminderEvent
    {
        Id = Guid.NewGuid().ToString(),
        UserId = "123",
        Message = "Don't forget!"
    },
    delay: TimeSpan.FromMinutes(5)
);

Publishing with Custom Headers:

await _publisher.PublishAsync(
    message: orderEvent,
    headers: new Dictionary<string, object>
    {
        { "correlation-id", correlationId },
        { "user-id", userId },
        { "priority", 5 }
    }
);

Batch Publishing:

var events = new List<OrderCreatedEvent>
{
    new() { Id = Guid.NewGuid().ToString(), OrderId = "1", TotalAmount = 100 },
    new() { Id = Guid.NewGuid().ToString(), OrderId = "2", TotalAmount = 200 },
    new() { Id = Guid.NewGuid().ToString(), OrderId = "3", TotalAmount = 300 }
};

await _publisher.PublishBatchAsync(events);

Step 3: Create Consumer

Basic Consumer:

using EventTransit;
using EventTransit.Consumers;
using EventTransit.Core;
using Microsoft.Extensions.Logging;

[ConsumerBinding("my-exchange", "order-created-queue",
    RoutingKey = "order.created",
    ExchangeType = ExchangeType.Direct)]
public class OrderCreatedConsumer : ConsumerBase<OrderCreatedEvent>
{
    private readonly ILogger<OrderCreatedConsumer> _logger;
    private readonly IOrderProcessor _orderProcessor;

    public OrderCreatedConsumer(
        ILogger<OrderCreatedConsumer> logger,
        IOrderProcessor orderProcessor) : base(logger)
    {
        _logger = logger;
        _orderProcessor = orderProcessor;
    }

    protected override async Task HandleMessageAsync(OrderCreatedEvent message)
    {
        _logger.LogInformation("Processing order {OrderId}", message.OrderId);

        await _orderProcessor.ProcessOrderAsync(message.OrderId);

        _logger.LogInformation("Order {OrderId} processed successfully", message.OrderId);
    }
}

Consumer with Inbox (Automatic Duplicate Detection):

using EventTransit;
using EventTransit.Consumers;
using EventTransit.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

[ConsumerBinding("my-exchange", "payment-queue",
    RoutingKey = "order.created",
    ExchangeType = ExchangeType.Direct)]
public class PaymentConsumer : ConsumerBase<OrderCreatedEvent>, IInboxTrackingPreference
{
    private readonly IServiceScopeFactory _serviceScopeFactory;

    // Enable idempotency - prevents duplicate processing
    public bool UseInboxTracking => true;

    public PaymentConsumer(
        IServiceScopeFactory serviceScopeFactory,
        ILogger<PaymentConsumer> logger)
        : base(logger)
    {
        _serviceScopeFactory = serviceScopeFactory;
    }

    protected override async Task HandleMessageAsync(OrderCreatedEvent message)
    {
        Logger.LogInformation("Processing payment for order {OrderId}", message.OrderId);

        // Create scope for scoped services
        using var scope = _serviceScopeFactory.CreateScope();
        var paymentService = scope.ServiceProvider.GetRequiredService<IPaymentService>();

        // This will only be called once per unique message ID
        // Duplicates are automatically detected and rejected
        await paymentService.ProcessPaymentAsync(message.OrderId, message.TotalAmount);

        Logger.LogInformation("Payment processed for order {OrderId}", message.OrderId);
    }
}

Querying Failed Messages - The dashboarad already does this

EventTransit tracks failed messages in the InboxMessage table. Query them using EF Core:

using EventTransit.EntityFramework.Entities;
using Microsoft.EntityFrameworkCore;

public class FailedMessageService
{
    private readonly MyDbContext _context;

    public FailedMessageService(MyDbContext context)
    {
        _context = context;
    }

    public async Task<List<InboxMessage>> GetFailedMessagesAsync(int limit = 100)
    {
        return await _context.Set<InboxMessage>()
            .Where(m => m.FailedOn != null)
            .OrderByDescending(m => m.FailedOn)
            .Take(limit)
            .ToListAsync();
    }

    public async Task<List<InboxMessage>> GetFailedMessagesByTypeAsync(string eventType, int limit = 100)
    {
        return await _context.Set<InboxMessage>()
            .Where(m => m.FailedOn != null)
            .Where(m => m.Type == eventType)
            .OrderByDescending(m => m.FailedOn)
            .Take(limit)
            .ToListAsync();
    }
}

When to Use Which Publisher?

Feature Outbox Publisher Direct Publisher
Guaranteed Delivery ✅ Yes ❌ No
Transactional ✅ Yes ❌ No
Survives Broker Downtime ✅ Yes ❌ No
Performance Good Excellent
Use Case Critical business events High-throughput notifications
Examples Orders, Payments, Inventory Logs, Metrics, Analytics

Recommendation: Use Outbox Publisher for critical business events where you cannot afford to lose messages. Use Direct Publisher for high-throughput scenarios where occasional message loss is acceptable.

Key Features

1. Transactional Outbox Pattern

Events are saved to the database in the same transaction as your business data, then published to RabbitMQ by a background processor. This guarantees at-least-once delivery.

2. Inbox Pattern (Idempotency)

Enable UseInboxTracking = true in your consumer to prevent duplicate processing. EventTransit tracks processed messages by MessageId.

3. Automatic Retry & Dead Letter Queue

  • Failed messages are automatically retried (default: 3 attempts)
  • After max retries, messages are marked with FailedOn timestamp
  • Query InboxMessage table where FailedOn IS NOT NULL to get failed messages

4. Delayed Messages

Schedule messages for future delivery using TTL + DLX pattern:

await _publisher.PublishDelayedAsync(message, TimeSpan.FromMinutes(5));

5. Built-in Dashboard

Access the dashboard at /eventtransit to:

  • 📊 View overview with total messages, pending, completed, dead letters
  • 📤 View outbox messages with filtering and search
  • 📥 View inbox messages with duplicate tracking
  • ☠️ View dead letters with retry/republish capabilities
  • 🔍 Click any message to see full payload, headers, and history

Important Notes

Topic Requirement
Event ID Type Must be string, not Guid
Consumer Dependency Injection Use IServiceScopeFactory, not IServiceProvider
Idempotency Set UseInboxTracking = true in consumer
Exchange Types Use ExchangeType.Direct for point-to-point, ExchangeType.Topic for pub/sub
Failed Messages Query InboxMessage table where FailedOn IS NOT NULL

Database Tables

EventTransit creates two tables in your database:

  • OutboxMessage - Stores events to be published to RabbitMQ
  • InboxMessage - Tracks received messages and processing status

Troubleshooting

"HandleMessageAsync doesn't exist" Error

If you encounter this error after installing EventTransit:

# Clear NuGet cache
dotnet nuget locals all --clear

# Restore without cache
dotnet restore --no-cache

# Rebuild
dotnet build

This ensures you're using the latest version of the library.

Consumer throws ObjectDisposedException

Solution: Inject IServiceScopeFactory instead of IServiceProvider in your consumer.

// ❌ Wrong
public MyConsumer(IServiceProvider serviceProvider, ILogger<MyConsumer> logger) : base(logger)
{
    _serviceProvider = serviceProvider;
}

// ✅ Correct
public MyConsumer(IServiceScopeFactory serviceScopeFactory, ILogger<MyConsumer> logger) : base(logger)
{
    _serviceScopeFactory = serviceScopeFactory;
}

Messages not being consumed

Solution: Ensure RabbitMQ is running and the exchange/queue names match between event and consumer attributes.

Duplicate message processing

Solution: Implement IInboxTrackingPreference and set UseInboxTracking = true in your consumer.

Manual Retry from Dashboard

  1. Navigate to /eventtransit dashboard
  2. Go to Dead Letters page
  3. Select failed messages
  4. Click "Retry" or "Republish"
  5. Messages are requeued automatically
Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.0.3 194 10/1/2025
2.0.2 169 10/1/2025
2.0.1 173 9/30/2025
2.0.0 181 9/30/2025

v2.0.3: Updated package metadata with new author and repository information.