EventTransit 2.0.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package EventTransit --version 2.0.0
                    
NuGet\Install-Package EventTransit -Version 2.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="EventTransit" Version="2.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="EventTransit" Version="2.0.0" />
                    
Directory.Packages.props
<PackageReference Include="EventTransit" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add EventTransit --version 2.0.0
                    
#r "nuget: EventTransit, 2.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package EventTransit@2.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=EventTransit&version=2.0.0
                    
Install as a Cake Addin
#tool nuget:?package=EventTransit&version=2.0.0
                    
Install as a Cake Tool

EventTransit - Complete Documentation

📦 Package Information

Package Name: EventTransit
Version: 2.0.0
Package Location: ./nupkg/EventTransit.2.0.0.nupkg
Target Framework: .NET 9.0
License: MIT

🚀 Installation

Option 1: Install from Local Package

# Add local NuGet source
dotnet nuget add source /path/to/nupkg --name LocalEventTransit

# Install package
dotnet add package EventTransit --version 2.0.0 --source LocalEventTransit

Option 2: Direct Installation

dotnet add package EventTransit --source /path/to/nupkg

Option 3: Package Manager Console

Install-Package EventTransit -Source /path/to/nupkg

✨ Key Features

  • Outbox Pattern - Guaranteed message delivery with transactional outbox
  • Inbox Pattern - Automatic duplicate detection and idempotency
  • Dead Letter Handling - Automatic DLQ management with cross-service notifications
  • Exponential Retry - Configurable retry with exponential backoff (1s, 2s, 4s, 8s...)
  • Multiple Exchange Types - Direct, Topic, Fanout, Headers
  • Built-in Dashboard - Real-time monitoring UI with Razor Pages
  • Entity Framework Integration - Seamless EF Core integration
  • Repository Pattern Support - Works with custom repositories
  • Multi-Database Support - PostgreSQL, SQL Server, MySQL, SQLite
  • RabbitMQ Integration - Production-ready RabbitMQ support
  • Worker Service Support - Works with ASP.NET Core and Worker Services
  • Type-Safe - Strongly-typed message contracts
  • Resilience - Circuit breaker, retry policies, error handling

📚 Table of Contents

  1. Quick Start
  2. Exchange Types
  3. Advanced Scenarios
  4. Configuration
  5. Worker Service Setup
  6. Dashboard
  7. Best Practices

🚀 Quick Start

1. Setup (ASP.NET Core Web Application)

using EventTransit;
using Microsoft.EntityFrameworkCore;

var builder = WebApplication.CreateBuilder(args);

// Add EventTransit with Entity Framework
builder.Services.AddEventTransitWithEntityFramework<MyDbContext>(
    config =>
    {
        // RabbitMQ Connection
        config.Broker.ConnectionString = "Host=localhost;Port=5672;Username=guest;Password=guest";
        
        // Default Exchange
        config.DefaultExchange.Name = "my-app-exchange";
        config.DefaultExchange.Type = EventTransit.Brokers.Abstractions.ExchangeType.Direct;
        config.DefaultExchange.Durable = true;
        
        // Outbox Configuration
        config.Outbox.Enabled = true;
        config.Outbox.ProcessingIntervalSeconds = 10;
        config.Outbox.BatchSize = 100;
        config.Outbox.MaxRetryAttempts = 3;
        
        // Inbox Configuration
        config.Inbox.Enabled = true;
        config.Inbox.ProcessingIntervalSeconds = 30;
        
        // Dead Letter Configuration
        config.DeadLetter.EnableCrossServiceNotifications = true;
        config.DeadLetter.EnableDuplicateNotifications = true;
        config.DeadLetter.ServiceName = "MyService";
        config.DeadLetter.IncludeMessagePayload = true;
        
        // Resilience Configuration (Exponential Retry)
        config.Resilience.Enabled = true;
        config.Resilience.MaxRetryAttempts = 3;
        config.Resilience.BaseRetryDelayMs = 1000;  // 1 second
        config.Resilience.MaxRetryDelayMs = 30000;  // 30 seconds
    },
    new EventTransit.Schema.SchemaConfiguration
    {
        AutoCreateTables = true,
        CreateIndexes = true
    });

// Add your DbContext
builder.Services.AddDbContext<MyDbContext>(options =>
    options.UseNpgsql(builder.Configuration.GetConnectionString("Database")));

// Register consumers
builder.Services.AddEventTransitConsumer<OrderCreatedConsumer>();
builder.Services.AddEventTransitConsumer<PaymentProcessedConsumer>();

var app = builder.Build();

// Enable the dashboard at /eventtransit
app.UseEventTransitDashboard();

await app.RunAsync();

2. Define Your DbContext

using EventTransit.EntityFramework.Entities;
using Microsoft.EntityFrameworkCore;

public class MyDbContext : DbContext
{
    public MyDbContext(DbContextOptions<MyDbContext> options) : base(options) { }
    
    // EventTransit tables
    public DbSet<OutboxMessage> OutboxMessages { get; set; }
    public DbSet<InboxMessage> InboxMessages { get; set; }
    
    // Your domain entities
    public DbSet<Order> Orders { get; set; }
    public DbSet<Payment> Payments { get; set; }
    
    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
        
        // Apply EventTransit configurations
        modelBuilder.ApplyConfiguration(new EventTransit.EntityFramework.Configurations.OutboxMessageConfiguration());
        modelBuilder.ApplyConfiguration(new EventTransit.EntityFramework.Configurations.InboxMessageConfiguration());
    }
}

3. Define Message Contracts

using EventTransit.Core;

// Simple message
public class OrderCreatedEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string OrderNumber { get; set; }
    public decimal Amount { get; set; }
    public string CustomerId { get; set; }
    public DateTime CreatedAt { get; set; }
}

// Message with binding configuration
[MessageBinding(exchangeName: "orders", queueName: "order-processing", RoutingKey = "order.created")]
public class OrderProcessingEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string OrderId { get; set; }
    public string Status { get; set; }
}

4. Create Consumers

using EventTransit.Consumers;
using EventTransit.Core;
using Microsoft.Extensions.Logging;

// Basic Consumer
[ConsumerBinding("orders", "order-processing-queue", RoutingKey = "order.created")]
public class OrderCreatedConsumer : ConsumerBase<OrderCreatedEvent>
{
    private readonly IOrderService _orderService;
    
    public OrderCreatedConsumer(
        IOrderService orderService,
        ILogger<OrderCreatedConsumer> logger) : base(logger)
    {
        _orderService = orderService;
    }

    protected override async Task HandleMessageAsync(OrderCreatedEvent message)
    {
        Logger.LogInformation("Processing order: {OrderNumber}", message.OrderNumber);
        
        // Your business logic
        await _orderService.ProcessOrderAsync(message);
        
        Logger.LogInformation("Order processed: {OrderNumber}", message.OrderNumber);
    }
}

// Consumer with Inbox Pattern (Idempotency)
[ConsumerBinding("payments", "payment-queue", RoutingKey = "payment.processed")]
public class PaymentConsumer : ConsumerBase<PaymentEvent>, IInboxTrackingPreference
{
    public bool UseInboxTracking => true; // Enable duplicate detection
    
    private readonly IPaymentService _paymentService;
    
    public PaymentConsumer(
        IPaymentService paymentService,
        ILogger<PaymentConsumer> logger) : base(logger)
    {
        _paymentService = paymentService;
    }

    protected override async Task HandleMessageAsync(PaymentEvent message)
    {
        // This will only be processed once, even if delivered multiple times
        Logger.LogInformation("Processing payment: {PaymentId}", message.Id);
        await _paymentService.ProcessPaymentAsync(message);
    }
}

// Consumer with Dependency Injection
[ConsumerBinding("notifications", "email-queue", RoutingKey = "notification.email")]
public class EmailConsumer : ConsumerBase<EmailNotification>
{
    private readonly IEmailService _emailService;
    private readonly ITemplateService _templateService;
    
    public EmailConsumer(
        IEmailService emailService,
        ITemplateService templateService,
        ILogger<EmailConsumer> logger) : base(logger)
    {
        _emailService = emailService;
        _templateService = templateService;
    }

    protected override async Task HandleMessageAsync(EmailNotification message)
    {
        var template = await _templateService.GetTemplateAsync(message.TemplateId);
        var body = _templateService.RenderTemplate(template, message.Data);
        
        await _emailService.SendAsync(message.To, message.Subject, body);
    }
}

5. Publish Messages

using EventTransit.Core;
using Microsoft.AspNetCore.Mvc;

[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
    private readonly IMessagePublisher _publisher;
    private readonly IOrderRepository _orderRepository;
    
    public OrdersController(
        IMessagePublisher publisher,
        IOrderRepository orderRepository)
    {
        _publisher = publisher;
        _orderRepository = orderRepository;
    }
    
    [HttpPost]
    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
    {
        // Create order in database
        var order = await _orderRepository.CreateAsync(request);
        
        // Publish event (stored in outbox, guaranteed delivery)
        var orderEvent = new OrderCreatedEvent
        {
            Id = order.Id.ToString(),
            OrderNumber = order.OrderNumber,
            Amount = order.Amount,
            CustomerId = order.CustomerId,
            CreatedAt = DateTime.UtcNow
        };
        
        await _publisher.PublishAsync(orderEvent, routingKey: "order.created");
        
        return Ok(new { orderId = order.Id });
    }
    
    [HttpPost("{id}/cancel")]
    public async Task<IActionResult> CancelOrder(string id)
    {
        var order = await _orderRepository.GetByIdAsync(id);
        if (order == null) return NotFound();
        
        order.Status = "Cancelled";
        await _orderRepository.UpdateAsync(order);
        
        // Publish cancellation event
        var cancelEvent = new OrderCancelledEvent
        {
            Id = Guid.NewGuid().ToString(),
            OrderId = order.Id.ToString(),
            Reason = "Customer requested cancellation"
        };
        
        await _publisher.PublishAsync(cancelEvent, routingKey: "order.cancelled");
        
        return Ok();
    }
}

📚 Exchange Types

1. Direct Exchange (Point-to-Point)

Use Case: One-to-one message delivery with specific routing

When to Use:

  • Task queues (order processing, email sending)
  • Command processing (create user, update inventory)
  • Direct notifications (user-specific alerts)
  • Background jobs (report generation, data export)

Example: Order Processing System

// ===== MESSAGE DEFINITION =====
[MessageBinding(
    exchangeName: "orders",
    queueName: "order-processing",
    RoutingKey = "order.created",
    ExchangeType = ExchangeType.Direct,
    DurableQueue = true)]
public class OrderCreatedEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string OrderNumber { get; set; }
    public decimal Amount { get; set; }
    public string CustomerId { get; set; }
    public List<OrderItem> Items { get; set; }
}

// ===== CONSUMER =====
[ConsumerBinding("orders", "order-processing", RoutingKey = "order.created")]
public class OrderProcessor : ConsumerBase<OrderCreatedEvent>
{
    private readonly IOrderService _orderService;
    private readonly IInventoryService _inventoryService;
    
    public OrderProcessor(
        IOrderService orderService,
        IInventoryService inventoryService,
        ILogger<OrderProcessor> logger) : base(logger)
    {
        _orderService = orderService;
        _inventoryService = inventoryService;
    }
    
    protected override async Task HandleMessageAsync(OrderCreatedEvent message)
    {
        Logger.LogInformation("Processing order: {OrderNumber}", message.OrderNumber);
        
        // Reserve inventory
        await _inventoryService.ReserveItemsAsync(message.Items);
        
        // Process payment
        await _orderService.ProcessPaymentAsync(message.OrderNumber);
        
        // Update order status
        await _orderService.UpdateStatusAsync(message.OrderNumber, "Processing");
        
        Logger.LogInformation("Order processed: {OrderNumber}", message.OrderNumber);
    }
}

// ===== PUBLISHER =====
public class OrderService
{
    private readonly IMessagePublisher _publisher;
    
    public async Task CreateOrderAsync(CreateOrderDto dto)
    {
        var orderEvent = new OrderCreatedEvent
        {
            OrderNumber = GenerateOrderNumber(),
            Amount = dto.Amount,
            CustomerId = dto.CustomerId,
            Items = dto.Items
        };
        
        // Publish to direct exchange
        await _publisher.PublishAsync(orderEvent, routingKey: "order.created");
    }
}

Characteristics:

  • ✅ Exact routing key match
  • ✅ One producer → One consumer
  • ✅ Guaranteed delivery with outbox
  • ✅ Best for: Task queues, commands

2. Topic Exchange (Pattern Matching)

Use Case: Selective routing based on patterns, pub/sub with filtering

When to Use:

  • Event notifications with categories (user.created, user.updated, user.deleted)
  • Logging systems (log.error, log.warning, log.info)
  • Monitoring and alerting (metric.cpu.high, metric.memory.low)
  • Multi-tenant systems (tenant.123.order.created)

Routing Key Patterns:

  • * (star) - Matches exactly one word
  • # (hash) - Matches zero or more words

Examples:

  • user.*.created matches user.admin.created, user.customer.created
  • order.# matches order.created, order.payment.completed, order.shipped.tracking.updated
  • log.*.error matches log.api.error, log.database.error

Example: Multi-Channel Notification System

// ===== MESSAGE DEFINITION =====
[MessageBinding(
    exchangeName: "notifications",
    ExchangeType = ExchangeType.Topic,
    DurableExchange = true)]
public class NotificationEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string Channel { get; set; } // email, sms, push
    public string Priority { get; set; } // urgent, normal, low
    public string Recipient { get; set; }
    public string Subject { get; set; }
    public string Message { get; set; }
    public Dictionary<string, string> Metadata { get; set; }
}

// ===== CONSUMER 1: All Email Notifications =====
[ConsumerBinding("notifications", "email-queue", RoutingKey = "notification.email.*")]
public class EmailConsumer : ConsumerBase<NotificationEvent>
{
    private readonly IEmailService _emailService;

    public EmailConsumer(IEmailService emailService, ILogger<EmailConsumer> logger)
        : base(logger)
    {
        _emailService = emailService;
    }

    protected override async Task HandleMessageAsync(NotificationEvent message)
    {
        Logger.LogInformation("Sending email to: {Recipient} (Priority: {Priority})",
            message.Recipient, message.Priority);

        await _emailService.SendAsync(
            to: message.Recipient,
            subject: message.Subject,
            body: message.Message,
            priority: message.Priority);
    }
}

// ===== CONSUMER 2: Only Urgent Notifications (Any Channel) =====
[ConsumerBinding("notifications", "urgent-queue", RoutingKey = "notification.*.urgent")]
public class UrgentNotificationConsumer : ConsumerBase<NotificationEvent>
{
    private readonly IAlertService _alertService;

    public UrgentNotificationConsumer(IAlertService alertService, ILogger<UrgentNotificationConsumer> logger)
        : base(logger)
    {
        _alertService = alertService;
    }

    protected override async Task HandleMessageAsync(NotificationEvent message)
    {
        Logger.LogWarning("URGENT {Channel} notification to {Recipient}",
            message.Channel, message.Recipient);

        // Send to on-call team
        await _alertService.NotifyOnCallTeamAsync(message);

        // Escalate if not acknowledged in 5 minutes
        await _alertService.ScheduleEscalationAsync(message.Id, TimeSpan.FromMinutes(5));
    }
}

// ===== CONSUMER 3: All SMS Notifications =====
[ConsumerBinding("notifications", "sms-queue", RoutingKey = "notification.sms.#")]
public class SmsConsumer : ConsumerBase<NotificationEvent>
{
    private readonly ISmsService _smsService;

    public SmsConsumer(ISmsService smsService, ILogger<SmsConsumer> logger)
        : base(logger)
    {
        _smsService = smsService;
    }

    protected override async Task HandleMessageAsync(NotificationEvent message)
    {
        Logger.LogInformation("Sending SMS to: {Recipient}", message.Recipient);
        await _smsService.SendAsync(message.Recipient, message.Message);
    }
}

// ===== CONSUMER 4: All Notifications (Audit Log) =====
[ConsumerBinding("notifications", "audit-queue", RoutingKey = "notification.#")]
public class NotificationAuditConsumer : ConsumerBase<NotificationEvent>, IInboxTrackingPreference
{
    public bool UseInboxTracking => true; // Prevent duplicate audit entries

    private readonly IAuditService _auditService;

    public NotificationAuditConsumer(IAuditService auditService, ILogger<NotificationAuditConsumer> logger)
        : base(logger)
    {
        _auditService = auditService;
    }

    protected override async Task HandleMessageAsync(NotificationEvent message)
    {
        await _auditService.LogNotificationAsync(new AuditEntry
        {
            NotificationId = message.Id,
            Channel = message.Channel,
            Priority = message.Priority,
            Recipient = message.Recipient,
            Timestamp = DateTime.UtcNow
        });
    }
}

// ===== PUBLISHER =====
public class NotificationService
{
    private readonly IMessagePublisher _publisher;

    public NotificationService(IMessagePublisher publisher)
    {
        _publisher = publisher;
    }

    public async Task SendEmailAsync(string recipient, string subject, string message, bool isUrgent = false)
    {
        var notification = new NotificationEvent
        {
            Channel = "email",
            Priority = isUrgent ? "urgent" : "normal",
            Recipient = recipient,
            Subject = subject,
            Message = message
        };

        // Routing key: notification.email.urgent or notification.email.normal
        var routingKey = $"notification.{notification.Channel}.{notification.Priority}";
        await _publisher.PublishAsync(notification, routingKey);
    }

    public async Task SendSmsAsync(string phoneNumber, string message, bool isUrgent = false)
    {
        var notification = new NotificationEvent
        {
            Channel = "sms",
            Priority = isUrgent ? "urgent" : "normal",
            Recipient = phoneNumber,
            Message = message
        };

        var routingKey = $"notification.{notification.Channel}.{notification.Priority}";
        await _publisher.PublishAsync(notification, routingKey);
    }
}

Message Flow Example:

Publisher sends: "notification.email.urgent"
├─ EmailConsumer receives (matches "notification.email.*")
├─ UrgentNotificationConsumer receives (matches "notification.*.urgent")
└─ NotificationAuditConsumer receives (matches "notification.#")

Publisher sends: "notification.sms.normal"
├─ SmsConsumer receives (matches "notification.sms.#")
└─ NotificationAuditConsumer receives (matches "notification.#")

Characteristics:

  • ✅ Pattern-based routing
  • ✅ One producer → Multiple filtered consumers
  • ✅ Flexible subscription patterns
  • ✅ Best for: Event notifications, logging, monitoring

3. Fanout Exchange (Broadcast)

Use Case: Broadcast to all consumers, no routing

When to Use:

  • System-wide notifications (maintenance mode, system shutdown)
  • Cache invalidation across multiple services
  • Real-time updates to multiple dashboards
  • Event sourcing (all events to all projections)
  • Audit logging (all events to audit system)

Example: System Event Broadcasting

// ===== MESSAGE DEFINITION =====
[MessageBinding(
    exchangeName: "system-events",
    ExchangeType = ExchangeType.Fanout,
    DurableExchange = true)]
public class SystemEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string EventType { get; set; }
    public string Source { get; set; }
    public string Description { get; set; }
    public Dictionary<string, object> Data { get; set; }
    public DateTime Timestamp { get; set; }
}

// ===== CONSUMER 1: Audit Logger =====
[ConsumerBinding("system-events", "audit-queue")]
public class AuditConsumer : ConsumerBase<SystemEvent>, IInboxTrackingPreference
{
    public bool UseInboxTracking => true;

    private readonly IAuditRepository _auditRepository;

    public AuditConsumer(IAuditRepository auditRepository, ILogger<AuditConsumer> logger)
        : base(logger)
    {
        _auditRepository = auditRepository;
    }

    protected override async Task HandleMessageAsync(SystemEvent message)
    {
        Logger.LogInformation("Auditing system event: {EventType} from {Source}",
            message.EventType, message.Source);

        await _auditRepository.LogEventAsync(new AuditLog
        {
            EventId = message.Id,
            EventType = message.EventType,
            Source = message.Source,
            Description = message.Description,
            Data = JsonSerializer.Serialize(message.Data),
            Timestamp = message.Timestamp
        });
    }
}

// ===== CONSUMER 2: Metrics Collector =====
[ConsumerBinding("system-events", "metrics-queue")]
public class MetricsConsumer : ConsumerBase<SystemEvent>
{
    private readonly IMetricsService _metricsService;

    public MetricsConsumer(IMetricsService metricsService, ILogger<MetricsConsumer> logger)
        : base(logger)
    {
        _metricsService = metricsService;
    }

    protected override async Task HandleMessageAsync(SystemEvent message)
    {
        Logger.LogInformation("Collecting metrics for: {EventType}", message.EventType);

        await _metricsService.RecordEventAsync(
            eventType: message.EventType,
            source: message.Source,
            timestamp: message.Timestamp,
            metadata: message.Data);
    }
}

// ===== CONSUMER 3: Real-time Dashboard =====
[ConsumerBinding("system-events", "dashboard-queue")]
public class DashboardConsumer : ConsumerBase<SystemEvent>
{
    private readonly IHubContext<DashboardHub> _hubContext;

    public DashboardConsumer(IHubContext<DashboardHub> hubContext, ILogger<DashboardConsumer> logger)
        : base(logger)
    {
        _hubContext = hubContext;
    }

    protected override async Task HandleMessageAsync(SystemEvent message)
    {
        Logger.LogInformation("Broadcasting to dashboard: {EventType}", message.EventType);

        // Send to all connected SignalR clients
        await _hubContext.Clients.All.SendAsync("SystemEvent", new
        {
            message.EventType,
            message.Source,
            message.Description,
            message.Timestamp
        });
    }
}

// ===== CONSUMER 4: Cache Invalidation =====
[ConsumerBinding("system-events", "cache-queue")]
public class CacheInvalidationConsumer : ConsumerBase<SystemEvent>
{
    private readonly ICacheService _cacheService;

    public CacheInvalidationConsumer(ICacheService cacheService, ILogger<CacheInvalidationConsumer> logger)
        : base(logger)
    {
        _cacheService = cacheService;
    }

    protected override async Task HandleMessageAsync(SystemEvent message)
    {
        if (message.EventType == "CacheInvalidation")
        {
            Logger.LogInformation("Invalidating cache for: {Source}", message.Source);

            var cacheKey = message.Data.GetValueOrDefault("CacheKey")?.ToString();
            if (!string.IsNullOrEmpty(cacheKey))
            {
                await _cacheService.RemoveAsync(cacheKey);
            }
        }
    }
}

// ===== PUBLISHER =====
public class SystemEventPublisher
{
    private readonly IMessagePublisher _publisher;

    public SystemEventPublisher(IMessagePublisher publisher)
    {
        _publisher = publisher;
    }

    public async Task PublishMaintenanceModeAsync(bool enabled)
    {
        var systemEvent = new SystemEvent
        {
            EventType = "MaintenanceMode",
            Source = "SystemAdmin",
            Description = enabled ? "System entering maintenance mode" : "System exiting maintenance mode",
            Data = new Dictionary<string, object>
            {
                { "Enabled", enabled },
                { "ScheduledDowntime", TimeSpan.FromHours(2) }
            },
            Timestamp = DateTime.UtcNow
        };

        // Broadcast to all consumers (routing key ignored in fanout)
        await _publisher.PublishAsync(systemEvent);
    }

    public async Task InvalidateCacheAsync(string cacheKey)
    {
        var systemEvent = new SystemEvent
        {
            EventType = "CacheInvalidation",
            Source = "CacheManager",
            Description = $"Invalidating cache key: {cacheKey}",
            Data = new Dictionary<string, object>
            {
                { "CacheKey", cacheKey }
            },
            Timestamp = DateTime.UtcNow
        };

        await _publisher.PublishAsync(systemEvent);
    }
}

Message Flow:

Publisher sends SystemEvent to fanout exchange
├─ AuditConsumer receives (logs to audit database)
├─ MetricsConsumer receives (records metrics)
├─ DashboardConsumer receives (updates real-time dashboard)
└─ CacheInvalidationConsumer receives (invalidates cache)

ALL consumers receive EVERY message

Characteristics:

  • ✅ Broadcast to all queues
  • ✅ Routing key is ignored
  • ✅ One producer → All consumers
  • ✅ Best for: System-wide notifications, cache invalidation, real-time updates

🎯 Advanced Scenarios

Scenario 1: Saga Pattern (Distributed Transactions)

// Order Saga Coordinator
public class OrderSagaCoordinator : ConsumerBase<OrderCreatedEvent>
{
    private readonly IMessagePublisher _publisher;
    private readonly ISagaRepository _sagaRepository;

    protected override async Task HandleMessageAsync(OrderCreatedEvent message)
    {
        var saga = new OrderSaga { OrderId = message.Id };
        await _sagaRepository.SaveAsync(saga);

        // Step 1: Reserve inventory
        await _publisher.PublishAsync(new ReserveInventoryCommand
        {
            OrderId = message.Id,
            Items = message.Items
        }, routingKey: "inventory.reserve");
    }
}

// Inventory Service Consumer
[ConsumerBinding("inventory", "reserve-queue", RoutingKey = "inventory.reserve")]
public class InventoryReservationConsumer : ConsumerBase<ReserveInventoryCommand>
{
    private readonly IMessagePublisher _publisher;

    protected override async Task HandleMessageAsync(ReserveInventoryCommand message)
    {
        try
        {
            // Reserve inventory
            await ReserveItemsAsync(message.Items);

            // Publish success event
            await _publisher.PublishAsync(new InventoryReservedEvent
            {
                OrderId = message.OrderId,
                Success = true
            }, routingKey: "inventory.reserved");
        }
        catch (Exception ex)
        {
            // Publish failure event (triggers compensation)
            await _publisher.PublishAsync(new InventoryReservedEvent
            {
                OrderId = message.OrderId,
                Success = false,
                Error = ex.Message
            }, routingKey: "inventory.reserved");
        }
    }
}

// Saga continues with payment, shipping, etc.

Scenario 2: Event Sourcing

// Event Store
[ConsumerBinding("events", "event-store-queue", RoutingKey = "event.#")]
public class EventStoreConsumer : ConsumerBase<DomainEvent>, IInboxTrackingPreference
{
    public bool UseInboxTracking => true;

    private readonly IEventStore _eventStore;

    protected override async Task HandleMessageAsync(DomainEvent message)
    {
        await _eventStore.AppendAsync(new StoredEvent
        {
            EventId = message.Id,
            EventType = message.GetType().Name,
            AggregateId = message.AggregateId,
            Data = JsonSerializer.Serialize(message),
            Timestamp = message.Timestamp
        });
    }
}

// Projection Builder
[ConsumerBinding("events", "projection-queue", RoutingKey = "event.order.#")]
public class OrderProjectionConsumer : ConsumerBase<OrderEvent>
{
    private readonly IProjectionRepository _projectionRepository;

    protected override async Task HandleMessageAsync(OrderEvent message)
    {
        var projection = await _projectionRepository.GetOrCreateAsync(message.OrderId);

        projection.Apply(message); // Update read model

        await _projectionRepository.SaveAsync(projection);
    }
}

Scenario 3: Priority Queues

// High priority queue
[ConsumerBinding("tasks", "high-priority-queue", RoutingKey = "task.high", Priority = 10)]
public class HighPriorityConsumer : ConsumerBase<TaskMessage>
{
    protected override async Task HandleMessageAsync(TaskMessage message)
    {
        // Process high priority tasks first
        await ProcessTaskAsync(message);
    }
}

// Normal priority queue
[ConsumerBinding("tasks", "normal-priority-queue", RoutingKey = "task.normal", Priority = 5)]
public class NormalPriorityConsumer : ConsumerBase<TaskMessage>
{
    protected override async Task HandleMessageAsync(TaskMessage message)
    {
        await ProcessTaskAsync(message);
    }
}

Scenario 4: Delayed Messages (TTL)

// Publish with delay
public async Task ScheduleReminderAsync(string userId, TimeSpan delay)
{
    var reminder = new ReminderEvent
    {
        UserId = userId,
        Message = "Don't forget to complete your profile!"
    };

    // Message will be delivered after delay
    await _publisher.PublishAsync(reminder,
        routingKey: "reminder.user",
        headers: new Dictionary<string, object>
        {
            { "x-delay", (int)delay.TotalMilliseconds }
        });
}

Scenario 5: Dead Letter Handling with Retry

// Consumer with automatic retry
[ConsumerBinding("orders", "order-queue", RoutingKey = "order.process")]
public class OrderConsumer : ConsumerBase<OrderEvent>
{
    protected override async Task HandleMessageAsync(OrderEvent message)
    {
        // If this fails, EventTransit will:
        // 1. Retry with exponential backoff (1s, 2s, 4s)
        // 2. After 3 retries, send to dead letter exchange
        // 3. Notify publisher service via et.dlx
        await ProcessOrderAsync(message);
    }
}

// Dead letter consumer (manual intervention)
[ConsumerBinding("et.dlx", "manual-review-queue", RoutingKey = "dead.letter.#")]
public class DeadLetterConsumer : ConsumerBase<DeadLetterNotification>
{
    protected override async Task HandleMessageAsync(DeadLetterNotification message)
    {
        Logger.LogError("Message {MessageId} failed after {RetryCount} attempts: {Error}",
            message.OriginalMessageId,
            message.Error.RetryAttempt,
            message.Error.Message);

        // Send alert to operations team
        await NotifyOpsTeamAsync(message);
    }
}

⚙️ Configuration

Complete Configuration Example

builder.Services.AddEventTransitWithEntityFramework<MyDbContext>(
    config =>
    {
        // ===== BROKER CONFIGURATION =====
        config.Broker.ConnectionString = "Host=localhost;Port=5672;Username=guest;Password=guest;VirtualHost=/";
        config.Broker.AutomaticRecoveryEnabled = true;
        config.Broker.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
        config.Broker.RequestedHeartbeat = TimeSpan.FromSeconds(60);

        // ===== DEFAULT EXCHANGE =====
        config.DefaultExchange.Name = "my-app-exchange";
        config.DefaultExchange.Type = ExchangeType.Direct;
        config.DefaultExchange.Durable = true;
        config.DefaultExchange.AutoDelete = false;

        // ===== OUTBOX CONFIGURATION =====
        config.Outbox.Enabled = true;
        config.Outbox.ProcessingIntervalSeconds = 10;      // Process every 10 seconds
        config.Outbox.BatchSize = 100;                     // Process 100 messages per batch
        config.Outbox.MaxRetryAttempts = 3;                // Retry failed messages 3 times
        config.Outbox.RetryDelaySeconds = 60;              // Wait 60 seconds between retries
        config.Outbox.CleanupIntervalHours = 24;           // Cleanup old messages every 24 hours
        config.Outbox.RetentionDays = 7;                   // Keep messages for 7 days

        // ===== INBOX CONFIGURATION =====
        config.Inbox.Enabled = true;
        config.Inbox.ProcessingIntervalSeconds = 30;       // Check for retries every 30 seconds
        config.Inbox.BatchSize = 100;
        config.Inbox.MaxRetryAttempts = 3;
        config.Inbox.RetryDelaySeconds = 30;
        config.Inbox.CleanupIntervalHours = 24;
        config.Inbox.RetentionDays = 7;

        // ===== DEAD LETTER CONFIGURATION =====
        config.DeadLetter.EnableCrossServiceNotifications = true;  // Send DL notifications to publisher
        config.DeadLetter.EnableDuplicateNotifications = true;     // Treat duplicates as dead letters
        config.DeadLetter.ServiceName = "MyService";               // Service identifier
        config.DeadLetter.IncludeMessagePayload = true;            // Include full message in DL notification
        config.DeadLetter.MaxStackTraceLength = 2000;              // Limit stack trace size

        // ===== RESILIENCE CONFIGURATION (EXPONENTIAL RETRY) =====
        config.Resilience.Enabled = true;
        config.Resilience.MaxRetryAttempts = 3;            // Retry 3 times before dead letter
        config.Resilience.BaseRetryDelayMs = 1000;         // Start with 1 second delay
        config.Resilience.MaxRetryDelayMs = 30000;         // Cap at 30 seconds
        // Retry timeline: 1s, 2s, 4s (exponential backoff)

        config.Resilience.CircuitBreakerFailureThreshold = 5;      // Open circuit after 5 failures
        config.Resilience.CircuitBreakerTimeoutSeconds = 60;       // Keep circuit open for 60 seconds
    },
    new EventTransit.Schema.SchemaConfiguration
    {
        AutoCreateTables = true,        // Automatically create OutboxMessage and InboxMessage tables
        CreateIndexes = true            // Create performance indexes
    });

Complete appsettings.json Configuration

{
  "ConnectionStrings": {
    "Database": "Host=localhost;Database=myapp;Username=postgres;Password=password",
    "RabbitMQ": "Host=localhost;Port=5672;Username=guest;Password=guest;VirtualHost=/"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning",
      "EventTransit": "Debug"
    }
  },
  "EventTransit": {
    "Broker": {
      "ConnectionString": "Host=localhost;Port=5672;Username=guest;Password=guest;VirtualHost=/",
      "AutomaticRecoveryEnabled": true,
      "NetworkRecoveryInterval": "00:00:10",
      "RequestedHeartbeat": "00:01:00",
      "RequestedConnectionTimeout": "00:00:30",
      "SocketReadTimeout": "00:00:30",
      "SocketWriteTimeout": "00:00:30",
      "ContinuationTimeout": "00:00:20",
      "HandshakeContinuationTimeout": "00:00:10",
      "MaxMessageSize": 134217728
    },
    "DefaultExchange": {
      "Name": "my-app-exchange",
      "Type": "Direct",
      "Durable": true,
      "AutoDelete": false,
      "Internal": false,
      "Arguments": {}
    },
    "DefaultQueue": {
      "Durable": true,
      "Exclusive": false,
      "AutoDelete": false,
      "Arguments": {}
    },
    "Outbox": {
      "Enabled": true,
      "ProcessingIntervalSeconds": 10,
      "BatchSize": 100,
      "MaxRetryAttempts": 3,
      "RetryDelaySeconds": 60,
      "CleanupIntervalHours": 24,
      "RetentionDays": 7,
      "EnableMetrics": true,
      "ParallelProcessing": false,
      "MaxDegreeOfParallelism": 4
    },
    "Inbox": {
      "Enabled": true,
      "ProcessingIntervalSeconds": 30,
      "BatchSize": 100,
      "MaxRetryAttempts": 3,
      "RetryDelaySeconds": 30,
      "CleanupIntervalHours": 24,
      "RetentionDays": 7,
      "EnableMetrics": true,
      "TrackDuplicates": true
    },
    "DeadLetter": {
      "EnableCrossServiceNotifications": true,
      "EnableDuplicateNotifications": true,
      "ServiceName": "MyService",
      "IncludeMessagePayload": true,
      "MaxStackTraceLength": 2000,
      "MaxPayloadLength": 10000,
      "RetryFromDashboard": true
    },
    "Resilience": {
      "Enabled": true,
      "MaxRetryAttempts": 3,
      "BaseRetryDelayMs": 1000,
      "MaxRetryDelayMs": 30000,
      "CircuitBreakerFailureThreshold": 5,
      "CircuitBreakerTimeoutSeconds": 60
    },
    "Consumer": {
      "PrefetchCount": 10,
      "AutoAck": false,
      "RequeueOnError": false,
      "ConsumerTag": "",
      "Exclusive": false,
      "Arguments": {}
    },
    "Publisher": {
      "ConfirmPublish": true,
      "Mandatory": false,
      "Persistent": true,
      "Timeout": "00:00:30"
    },
    "Dashboard": {
      "Enabled": true,
      "BasePath": "/eventtransit",
      "PageSize": 50,
      "RefreshIntervalSeconds": 30,
      "EnableAuthentication": false
    }
  }
}

Then load in Program.cs:

builder.Services.AddEventTransitWithEntityFramework<MyDbContext>(
    config =>
    {
        builder.Configuration.GetSection("EventTransit").Bind(config);
        config.Broker.ConnectionString = builder.Configuration.GetConnectionString("RabbitMQ");
    },
    new EventTransit.Schema.SchemaConfiguration
    {
        AutoCreateTables = true,
        CreateIndexes = true
    });

🏷️ Message and Consumer Attributes

MessageBinding Attribute

The MessageBindingAttribute is used to configure message routing and exchange settings.

Complete Attribute Properties
[MessageBinding(
    exchangeName: "my-exchange",           // Required: Exchange name
    queueName: "my-queue",                 // Optional: Queue name (auto-generated if not specified)
    RoutingKey = "my.routing.key",         // Optional: Routing key (default: "")
    ExchangeType = ExchangeType.Direct,    // Optional: Exchange type (default: Direct)
    DurableExchange = true,                // Optional: Exchange durability (default: true)
    DurableQueue = true,                   // Optional: Queue durability (default: true)
    AutoDeleteExchange = false,            // Optional: Auto-delete exchange (default: false)
    AutoDeleteQueue = false,               // Optional: Auto-delete queue (default: false)
    ExclusiveQueue = false,                // Optional: Exclusive queue (default: false)
    DeadLetterExchange = "my-dlx",         // Optional: Dead letter exchange name
    DeadLetterRoutingKey = "dlx.key",      // Optional: Dead letter routing key
    MessageTTL = 60000,                    // Optional: Message TTL in milliseconds
    MaxLength = 10000,                     // Optional: Max queue length
    MaxLengthBytes = 1048576,              // Optional: Max queue size in bytes
    Priority = 5                           // Optional: Queue priority (0-10)
)]
public class MyMessage : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    // Message properties...
}
MessageBinding Examples

Example 1: Simple Direct Exchange

[MessageBinding(exchangeName: "orders", queueName: "order-processing", RoutingKey = "order.created")]
public class OrderCreatedEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string OrderNumber { get; set; }
    public decimal Amount { get; set; }
}

Example 2: Topic Exchange with Pattern

[MessageBinding(
    exchangeName: "notifications",
    queueName: "email-notifications",
    RoutingKey = "notification.email.*",
    ExchangeType = ExchangeType.Topic,
    DurableExchange = true,
    DurableQueue = true
)]
public class EmailNotification : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string To { get; set; }
    public string Subject { get; set; }
    public string Body { get; set; }
}

Example 3: Fanout Exchange

[MessageBinding(
    exchangeName: "system-events",
    queueName: "audit-queue",
    ExchangeType = ExchangeType.Fanout,
    DurableExchange = true,
    DurableQueue = true
)]
public class SystemEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string EventType { get; set; }
    public string Description { get; set; }
}

Example 4: With Dead Letter Exchange

[MessageBinding(
    exchangeName: "payments",
    queueName: "payment-processing",
    RoutingKey = "payment.process",
    DeadLetterExchange = "payments-dlx",
    DeadLetterRoutingKey = "payment.failed",
    MessageTTL = 300000  // 5 minutes
)]
public class PaymentEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string PaymentId { get; set; }
    public decimal Amount { get; set; }
}

Example 5: Priority Queue

[MessageBinding(
    exchangeName: "tasks",
    queueName: "high-priority-tasks",
    RoutingKey = "task.high",
    Priority = 10,
    MaxLength = 1000
)]
public class HighPriorityTask : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string TaskName { get; set; }
    public string Description { get; set; }
}

ConsumerBinding Attribute

The ConsumerBindingAttribute is used to configure consumer behavior and queue bindings.

Complete Attribute Properties
[ConsumerBinding(
    exchangeName: "my-exchange",           // Required: Exchange name
    queueName: "my-queue",                 // Required: Queue name
    RoutingKey = "my.routing.key",         // Optional: Routing key (default: "")
    PrefetchCount = 10,                    // Optional: Prefetch count (default: 10)
    AutoAck = false,                       // Optional: Auto-acknowledge (default: false)
    Exclusive = false,                     // Optional: Exclusive consumer (default: false)
    ConsumerTag = "my-consumer",           // Optional: Consumer tag
    Priority = 5,                          // Optional: Consumer priority (0-10)
    Arguments = null                       // Optional: Additional arguments
)]
public class MyConsumer : ConsumerBase<MyMessage>
{
    public MyConsumer(ILogger<MyConsumer> logger) : base(logger) { }

    protected override async Task HandleMessageAsync(MyMessage message)
    {
        // Process message
    }
}
ConsumerBinding Examples

Example 1: Basic Consumer

[ConsumerBinding("orders", "order-processing-queue", RoutingKey = "order.created")]
public class OrderConsumer : ConsumerBase<OrderCreatedEvent>
{
    private readonly IOrderService _orderService;

    public OrderConsumer(IOrderService orderService, ILogger<OrderConsumer> logger)
        : base(logger)
    {
        _orderService = orderService;
    }

    protected override async Task HandleMessageAsync(OrderCreatedEvent message)
    {
        await _orderService.ProcessOrderAsync(message);
    }
}

Example 2: Consumer with Prefetch

[ConsumerBinding(
    exchangeName: "tasks",
    queueName: "task-queue",
    RoutingKey = "task.process",
    PrefetchCount = 5  // Process 5 messages at a time
)]
public class TaskConsumer : ConsumerBase<TaskMessage>
{
    public TaskConsumer(ILogger<TaskConsumer> logger) : base(logger) { }

    protected override async Task HandleMessageAsync(TaskMessage message)
    {
        await ProcessTaskAsync(message);
    }
}

Example 3: Topic Consumer with Wildcard

[ConsumerBinding(
    exchangeName: "notifications",
    queueName: "email-queue",
    RoutingKey = "notification.email.*"  // Matches notification.email.urgent, notification.email.normal, etc.
)]
public class EmailConsumer : ConsumerBase<NotificationEvent>
{
    public EmailConsumer(ILogger<EmailConsumer> logger) : base(logger) { }

    protected override async Task HandleMessageAsync(NotificationEvent message)
    {
        await SendEmailAsync(message);
    }
}

Example 4: Multiple Routing Keys (Bind Multiple Times)

// Consumer 1: Handles urgent notifications
[ConsumerBinding("notifications", "urgent-queue", RoutingKey = "notification.*.urgent")]
public class UrgentConsumer : ConsumerBase<NotificationEvent>
{
    public UrgentConsumer(ILogger<UrgentConsumer> logger) : base(logger) { }

    protected override async Task HandleMessageAsync(NotificationEvent message)
    {
        await HandleUrgentAsync(message);
    }
}

// Consumer 2: Handles all notifications
[ConsumerBinding("notifications", "all-queue", RoutingKey = "notification.#")]
public class AllNotificationsConsumer : ConsumerBase<NotificationEvent>
{
    public AllNotificationsConsumer(ILogger<AllNotificationsConsumer> logger)
        : base(logger) { }

    protected override async Task HandleMessageAsync(NotificationEvent message)
    {
        await LogNotificationAsync(message);
    }
}

Example 5: High Priority Consumer

[ConsumerBinding(
    exchangeName: "tasks",
    queueName: "high-priority-queue",
    RoutingKey = "task.high",
    Priority = 10,
    PrefetchCount = 1  // Process one at a time for high priority
)]
public class HighPriorityConsumer : ConsumerBase<TaskMessage>
{
    public HighPriorityConsumer(ILogger<HighPriorityConsumer> logger)
        : base(logger) { }

    protected override async Task HandleMessageAsync(TaskMessage message)
    {
        await ProcessHighPriorityTaskAsync(message);
    }
}

Example 6: Exclusive Consumer

[ConsumerBinding(
    exchangeName: "singleton-tasks",
    queueName: "singleton-queue",
    RoutingKey = "task.singleton",
    Exclusive = true  // Only one consumer can consume from this queue
)]
public class SingletonConsumer : ConsumerBase<SingletonTask>
{
    public SingletonConsumer(ILogger<SingletonConsumer> logger)
        : base(logger) { }

    protected override async Task HandleMessageAsync(SingletonTask message)
    {
        // Only one instance of this consumer will process messages
        await ProcessSingletonTaskAsync(message);
    }
}

IInboxTrackingPreference Interface

Use this interface to enable inbox tracking (duplicate detection) for specific consumers.

[ConsumerBinding("payments", "payment-queue", RoutingKey = "payment.processed")]
public class PaymentConsumer : ConsumerBase<PaymentEvent>, IInboxTrackingPreference
{
    public bool UseInboxTracking => true;  // Enable duplicate detection

    public PaymentConsumer(ILogger<PaymentConsumer> logger) : base(logger) { }

    protected override async Task HandleMessageAsync(PaymentEvent message)
    {
        // This will only execute once, even if message is delivered multiple times
        await ProcessPaymentAsync(message);
    }
}

Complete Example: All Attributes Together

// ===== MESSAGE WITH FULL CONFIGURATION =====
[MessageBinding(
    exchangeName: "orders",
    queueName: "order-processing",
    RoutingKey = "order.created",
    ExchangeType = ExchangeType.Direct,
    DurableExchange = true,
    DurableQueue = true,
    AutoDeleteExchange = false,
    AutoDeleteQueue = false,
    DeadLetterExchange = "orders-dlx",
    DeadLetterRoutingKey = "order.failed",
    MessageTTL = 300000,  // 5 minutes
    MaxLength = 10000,
    Priority = 5
)]
public class OrderCreatedEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string OrderNumber { get; set; }
    public decimal Amount { get; set; }
    public string CustomerId { get; set; }
    public List<OrderItem> Items { get; set; }
    public DateTime CreatedAt { get; set; }
}

// ===== CONSUMER WITH FULL CONFIGURATION =====
[ConsumerBinding(
    exchangeName: "orders",
    queueName: "order-processing",
    RoutingKey = "order.created",
    PrefetchCount = 10,
    AutoAck = false,
    Exclusive = false,
    ConsumerTag = "order-processor-1",
    Priority = 5
)]
public class OrderConsumer : ConsumerBase<OrderCreatedEvent>, IInboxTrackingPreference
{
    public bool UseInboxTracking => true;  // Enable duplicate detection

    private readonly IOrderService _orderService;
    private readonly IInventoryService _inventoryService;
    private readonly IMessagePublisher _publisher;

    public OrderConsumer(
        IOrderService orderService,
        IInventoryService inventoryService,
        IMessagePublisher publisher,
        ILogger<OrderConsumer> logger) : base(logger)
    {
        _orderService = orderService;
        _inventoryService = inventoryService;
        _publisher = publisher;
    }

    protected override async Task HandleMessageAsync(OrderCreatedEvent message)
    {
        Logger.LogInformation("Processing order: {OrderNumber}", message.OrderNumber);

        try
        {
            // Reserve inventory
            await _inventoryService.ReserveItemsAsync(message.Items);

            // Process order
            await _orderService.ProcessOrderAsync(message);

            // Publish success event
            await _publisher.PublishAsync(new OrderProcessedEvent
            {
                OrderId = message.Id,
                OrderNumber = message.OrderNumber,
                Status = "Processed"
            }, routingKey: "order.processed");

            Logger.LogInformation("Order processed successfully: {OrderNumber}", message.OrderNumber);
        }
        catch (Exception ex)
        {
            Logger.LogError(ex, "Failed to process order: {OrderNumber}", message.OrderNumber);
            throw; // Let EventTransit handle retry
        }
    }
}

Attribute Reference Tables

MessageBinding Attribute Properties
Property Type Default Description
exchangeName string Required Name of the exchange
queueName string Auto-generated Name of the queue (optional)
RoutingKey string "" Routing key for message routing
ExchangeType ExchangeType Direct Type of exchange (Direct, Topic, Fanout, Headers)
DurableExchange bool true Exchange survives broker restart
DurableQueue bool true Queue survives broker restart
AutoDeleteExchange bool false Exchange deleted when last queue unbinds
AutoDeleteQueue bool false Queue deleted when last consumer disconnects
ExclusiveQueue bool false Queue used by only one connection
DeadLetterExchange string null Dead letter exchange name
DeadLetterRoutingKey string null Routing key for dead letters
MessageTTL int null Message time-to-live in milliseconds
MaxLength int null Maximum number of messages in queue
MaxLengthBytes int null Maximum queue size in bytes
Priority int 0 Queue priority (0-10)
ConsumerBinding Attribute Properties
Property Type Default Description
exchangeName string Required Name of the exchange
queueName string Required Name of the queue
RoutingKey string "" Routing key for binding
PrefetchCount int 10 Number of messages to prefetch
AutoAck bool false Automatically acknowledge messages
Exclusive bool false Exclusive consumer (only one allowed)
ConsumerTag string Auto-generated Consumer identifier
Priority int 0 Consumer priority (0-10)
Arguments Dictionary null Additional RabbitMQ arguments
ExchangeType Enum Values
Value Description Use Case
Direct Exact routing key match Point-to-point messaging, task queues
Topic Pattern-based routing Event notifications, logging, monitoring
Fanout Broadcast to all queues System-wide notifications, cache invalidation
Headers Header-based routing Complex routing based on message headers

🔧 Worker Service Setup

using EventTransit;
using Microsoft.EntityFrameworkCore;

var builder = WebApplication.CreateBuilder(args);

// Configure Kestrel for background service
builder.WebHost.ConfigureKestrel(options =>
{
    options.ListenLocalhost(5000); // Dashboard port
});

// Add EventTransit
builder.Services.AddEventTransitWithEntityFramework<WorkerDbContext>(
    config =>
    {
        config.Broker.ConnectionString = builder.Configuration.GetConnectionString("RabbitMQ");
        config.DefaultExchange.Name = "worker-exchange";
        config.Outbox.Enabled = true;
        config.Inbox.Enabled = true;
        config.DeadLetter.EnableCrossServiceNotifications = true;
        config.DeadLetter.ServiceName = "WorkerService";
        config.Resilience.Enabled = true;
        config.Resilience.MaxRetryAttempts = 3;
    },
    new EventTransit.Schema.SchemaConfiguration
    {
        AutoCreateTables = true,
        CreateIndexes = true
    });

// Add DbContext
builder.Services.AddDbContext<WorkerDbContext>(options =>
    options.UseNpgsql(builder.Configuration.GetConnectionString("Database")));

// Add your background workers
builder.Services.AddHostedService<OrderProcessingWorker>();
builder.Services.AddHostedService<EmailNotificationWorker>();

// Register consumers
builder.Services.AddEventTransitConsumer<OrderCreatedConsumer>();
builder.Services.AddEventTransitConsumer<PaymentProcessedConsumer>();

var app = builder.Build();

// Enable the dashboard
app.UseEventTransitDashboard();

// Optional: Add health checks
app.MapGet("/health", () => Results.Ok(new { status = "healthy", timestamp = DateTime.UtcNow }));

await app.RunAsync();

Option 2: Worker Service WITHOUT Dashboard

using EventTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

var builder = Host.CreateDefaultBuilder(args);

builder.ConfigureServices((context, services) =>
{
    // Add EventTransit WITHOUT dashboard
    services.AddEventTransitWithEntityFramework<WorkerDbContext>(
        config =>
        {
            config.Broker.ConnectionString = context.Configuration.GetConnectionString("RabbitMQ");
            config.DefaultExchange.Name = "worker-exchange";
            config.Outbox.Enabled = true;
            config.Inbox.Enabled = true;
        },
        new EventTransit.Schema.SchemaConfiguration
        {
            AutoCreateTables = true
        });

    // Add DbContext
    services.AddDbContext<WorkerDbContext>(options =>
        options.UseNpgsql(context.Configuration.GetConnectionString("Database")));

    // Add background workers
    services.AddHostedService<OrderProcessingWorker>();

    // Register consumers
    services.AddEventTransitConsumer<OrderCreatedConsumer>();
});

var host = builder.Build();
await host.RunAsync();

Background Worker Example

public class OrderProcessingWorker : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<OrderProcessingWorker> _logger;

    public OrderProcessingWorker(
        IServiceProvider serviceProvider,
        ILogger<OrderProcessingWorker> logger)
    {
        _serviceProvider = serviceProvider;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Order Processing Worker started");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                using var scope = _serviceProvider.CreateScope();
                var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>();
                var publisher = scope.ServiceProvider.GetRequiredService<IMessagePublisher>();

                // Check for pending orders
                var pendingOrders = await orderService.GetPendingOrdersAsync();

                foreach (var order in pendingOrders)
                {
                    // Publish order event
                    await publisher.PublishAsync(new OrderCreatedEvent
                    {
                        OrderId = order.Id,
                        OrderNumber = order.OrderNumber,
                        Amount = order.Amount
                    }, routingKey: "order.created");

                    _logger.LogInformation("Published order event: {OrderNumber}", order.OrderNumber);
                }

                // Wait before next check
                await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error in Order Processing Worker");
                await Task.Delay(TimeSpan.FromSeconds(60), stoppingToken);
            }
        }

        _logger.LogInformation("Order Processing Worker stopped");
    }
}

📊 Dashboard

Accessing the Dashboard

Once configured, the dashboard is available at:

  • URL: http://localhost:5000/eventtransit (or your configured port)
  • Features:
    • Real-time message monitoring
    • Outbox/Inbox statistics
    • Dead letter management
    • Message retry/requeue
    • Error analysis
    • Performance metrics

Dashboard Features

1. Overview Page
  • Total outbox messages
  • Total inbox messages
  • Dead letters count
  • Pending outbox count
  • Duplicate inbox count
  • Completed outbox count
2. Outbox Page
  • View all outbox messages
  • Filter by status (Pending, Dispatched, Dead Lettered)
  • Filter by date range
  • Search by message ID
  • View message payload
  • Retry failed messages
  • Bulk operations
3. Inbox Page
  • View all inbox messages
  • Filter by status (Processed, Failed)
  • Filter by date range
  • View message payload
  • View processing attempts
  • View error details
4. Dead Letters Page
  • View all dead lettered messages
  • View error details and stack traces
  • View retry history
  • Retry/republish messages
  • Bulk retry operations
  • Delete dead letters

Dashboard Screenshots

┌─────────────────────────────────────────────────────────┐
│  EventTransit Dashboard                                 │
├─────────────────────────────────────────────────────────┤
│                                                          │
│  📊 Statistics                                          │
│  ┌──────────────┬──────────────┬──────────────┐        │
│  │ Total Outbox │ Total Inbox  │ Dead Letters │        │
│  │    1,234     │     987      │      12      │        │
│  └──────────────┴──────────────┴──────────────┘        │
│                                                          │
│  📋 Recent Messages                                     │
│  ┌────────────────────────────────────────────┐        │
│  │ MessageId    │ Type         │ Status       │        │
│  ├────────────────────────────────────────────┤        │
│  │ abc-123      │ OrderCreated │ Dispatched   │        │
│  │ def-456      │ PaymentProc  │ Pending      │        │
│  │ ghi-789      │ EmailSent    │ Failed       │        │
│  └────────────────────────────────────────────┘        │
│                                                          │
│  [View Outbox] [View Inbox] [View Dead Letters]        │
└─────────────────────────────────────────────────────────┘

🎯 Best Practices

1. Message Design

✅ DO:

// Use IEventWithId for automatic ID generation
public class OrderCreatedEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string OrderNumber { get; set; }
    public decimal Amount { get; set; }
    public DateTime CreatedAt { get; set; }
}

// Include all necessary data in the message
public class PaymentProcessedEvent : IEventWithId
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public string PaymentId { get; set; }
    public string OrderId { get; set; }
    public decimal Amount { get; set; }
    public string Currency { get; set; }
    public string PaymentMethod { get; set; }
    public DateTime ProcessedAt { get; set; }
}

❌ DON'T:

// Don't include sensitive data
public class BadPaymentEvent : IEventWithId
{
    public string CreditCardNumber { get; set; } // ❌ Never include PII
    public string CVV { get; set; }               // ❌ Never include sensitive data
}

// Don't make messages too large
public class BadOrderEvent : IEventWithId
{
    public byte[] LargeFile { get; set; }  // ❌ Don't include large binary data
    public List<Product> AllProducts { get; set; }  // ❌ Don't include unnecessary data
}

2. Consumer Design

✅ DO:

// Use inbox tracking for idempotency
[ConsumerBinding("payments", "payment-queue", RoutingKey = "payment.processed")]
public class PaymentConsumer : ConsumerBase<PaymentEvent>, IInboxTrackingPreference
{
    public bool UseInboxTracking => true; // ✅ Enable for critical operations

    protected override async Task HandleMessageAsync(PaymentEvent message)
    {
        // This will only execute once, even if message is delivered multiple times
        await ProcessPaymentAsync(message);
    }
}

// Keep consumers focused and single-purpose
[ConsumerBinding("orders", "order-email-queue", RoutingKey = "order.created")]
public class OrderEmailConsumer : ConsumerBase<OrderCreatedEvent>
{
    protected override async Task HandleMessageAsync(OrderCreatedEvent message)
    {
        // Only send email, nothing else
        await SendOrderConfirmationEmailAsync(message);
    }
}

❌ DON'T:

// Don't do too much in one consumer
public class BadOrderConsumer : ConsumerBase<OrderCreatedEvent>
{
    protected override async Task HandleMessageAsync(OrderCreatedEvent message)
    {
        await ProcessPayment(message);      // ❌ Too many responsibilities
        await UpdateInventory(message);     // ❌ Should be separate consumers
        await SendEmail(message);           // ❌ Should be separate consumers
        await UpdateAnalytics(message);     // ❌ Should be separate consumers
    }
}

3. Error Handling

✅ DO:

[ConsumerBinding("orders", "order-queue", RoutingKey = "order.process")]
public class OrderConsumer : ConsumerBase<OrderEvent>
{
    protected override async Task HandleMessageAsync(OrderEvent message)
    {
        try
        {
            await ProcessOrderAsync(message);
        }
        catch (ValidationException ex)
        {
            // Log validation errors
            Logger.LogWarning(ex, "Validation failed for order {OrderId}", message.OrderId);
            // Don't retry validation errors
            return;
        }
        catch (TransientException ex)
        {
            // Log transient errors
            Logger.LogError(ex, "Transient error processing order {OrderId}", message.OrderId);
            // Let EventTransit retry with exponential backoff
            throw;
        }
    }
}

4. Configuration

✅ DO:

// Use environment-specific configuration
builder.Services.AddEventTransitWithEntityFramework<MyDbContext>(
    config =>
    {
        var environment = builder.Environment.EnvironmentName;

        if (environment == "Production")
        {
            config.Outbox.ProcessingIntervalSeconds = 5;  // Faster in production
            config.Resilience.MaxRetryAttempts = 5;       // More retries in production
        }
        else
        {
            config.Outbox.ProcessingIntervalSeconds = 30; // Slower in development
            config.Resilience.MaxRetryAttempts = 2;       // Fewer retries in development
        }

        config.DeadLetter.ServiceName = $"MyService-{environment}";
    },
    new EventTransit.Schema.SchemaConfiguration
    {
        AutoCreateTables = builder.Environment.IsDevelopment() // Only auto-create in dev
    });

5. Testing

✅ DO:

// Unit test consumers
public class OrderConsumerTests
{
    [Fact]
    public async Task HandleMessageAsync_ValidOrder_ProcessesSuccessfully()
    {
        // Arrange
        var logger = new Mock<ILogger<OrderConsumer>>();
        var orderService = new Mock<IOrderService>();
        var consumer = new OrderConsumer(orderService.Object, logger.Object);

        var message = new OrderCreatedEvent
        {
            OrderNumber = "ORD-123",
            Amount = 100.00m
        };

        // Act
        await consumer.HandleMessageAsync(message);

        // Assert
        orderService.Verify(x => x.ProcessOrderAsync(message), Times.Once);
    }
}

// Integration test with TestContainers
public class EventTransitIntegrationTests : IAsyncLifetime
{
    private readonly RabbitMqContainer _rabbitMqContainer;
    private readonly PostgreSqlContainer _postgresContainer;

    public async Task InitializeAsync()
    {
        await _rabbitMqContainer.StartAsync();
        await _postgresContainer.StartAsync();
    }

    [Fact]
    public async Task PublishAndConsume_MessageDelivered()
    {
        // Test end-to-end message flow
    }
}

🐛 Troubleshooting

Issue 1: Messages Not Being Consumed

Symptoms: Messages published but not consumed

Solutions:

  1. Check RabbitMQ connection:

    docker logs rabbitmq
    
  2. Verify consumer is registered:

    builder.Services.AddEventTransitConsumer<MyConsumer>();
    
  3. Check exchange and queue bindings in RabbitMQ Management UI

  4. Verify routing keys match

Issue 2: Duplicate Messages

Symptoms: Same message processed multiple times

Solutions:

  1. Enable inbox tracking:

    public class MyConsumer : ConsumerBase<MyEvent>, IInboxTrackingPreference
    {
        public bool UseInboxTracking => true;
    }
    
  2. Ensure message has unique ID:

    public class MyEvent : IEventWithId
    {
        public string Id { get; set; } = Guid.NewGuid().ToString();
    }
    

Issue 3: Messages Going to Dead Letter

Symptoms: Messages immediately go to dead letter

Solutions:

  1. Check consumer error handling

  2. Review logs for exceptions

  3. Verify retry configuration:

    config.Resilience.MaxRetryAttempts = 3;
    config.Resilience.BaseRetryDelayMs = 1000;
    
  4. Check Dashboard for error details

Issue 4: Slow Message Processing

Symptoms: Messages accumulating in outbox

Solutions:

  1. Increase batch size:

    config.Outbox.BatchSize = 200;
    
  2. Decrease processing interval:

    config.Outbox.ProcessingIntervalSeconds = 5;
    
  3. Scale consumers horizontally

  4. Check database performance


📞 Support

For issues, questions, or contributions:

  • GitHub: [Your Repository URL]
  • Documentation: [Your Docs URL]
  • Email: [Your Support Email]

📄 License

MIT License - See LICENSE file for details


🎉 Conclusion

EventTransit provides a complete, production-ready solution for reliable message delivery in .NET applications. With built-in support for outbox/inbox patterns, dead letter handling, exponential retry, and a monitoring dashboard, you can build robust distributed systems with confidence.

Key Takeaways:

  • ✅ Use Direct Exchange for point-to-point messaging
  • ✅ Use Topic Exchange for pattern-based routing
  • ✅ Use Fanout Exchange for broadcasting
  • ✅ Enable Inbox Tracking for idempotency
  • ✅ Configure Exponential Retry for resilience
  • ✅ Monitor with the Built-in Dashboard
  • ✅ Follow Best Practices for production deployments

Happy messaging! 🚀

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.0.3 196 10/1/2025
2.0.2 170 10/1/2025
2.0.1 174 9/30/2025
2.0.0 182 9/30/2025

v2.0.0: Complete rewrite with multi-broker support, clean architecture, and high performance.