Devplus.Messaging 3.0.1

dotnet add package Devplus.Messaging --version 3.0.1
                    
NuGet\Install-Package Devplus.Messaging -Version 3.0.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Devplus.Messaging" Version="3.0.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Devplus.Messaging" Version="3.0.1" />
                    
Directory.Packages.props
<PackageReference Include="Devplus.Messaging" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Devplus.Messaging --version 3.0.1
                    
#r "nuget: Devplus.Messaging, 3.0.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Devplus.Messaging@3.0.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Devplus.Messaging&version=3.0.1
                    
Install as a Cake Addin
#tool nuget:?package=Devplus.Messaging&version=3.0.1
                    
Install as a Cake Tool

Devplus.Messaging 📬

NuGet Downloads

Devplus.Messaging é uma biblioteca .NET para integração avançada com RabbitMQ, oferecendo funcionalidades completas de publicação e consumo de mensagens baseadas no padrão CloudEvents, com suporte a Dead Letter Queue (DLQ), retry automático, múltiplos tipos de fila e configurações flexíveis.

Características

  • 📬 Publicação e consumo de mensagens baseado no padrão CloudEvents
  • 🔄 Sistema de retry automático com configuração personalizável
  • 💀 Dead Letter Queue (DLQ) automática para mensagens falhas
  • 🔁 Sistema de Redrive completo para reprocessamento de mensagens DLQ
  • 🏗️ Suporte a filas Quorum e Classic do RabbitMQ
  • ⚙️ Configuração flexível via appsettings.json
  • 🚀 Hosted Service integrado para gerenciamento automático de consumidores
  • 📊 QoS (Quality of Service) configurável por consumidor
  • 🔗 Binding automático de exchanges e filas
  • 🛡️ Tratamento robusto de erros e reconexão automática
  • 📝 Logging detalhado para monitoramento e debug

🚀 Instalação

Para instalar a biblioteca via NuGet, utilize o seguinte comando:

dotnet add package Devplus.Messaging

Ou, no Visual Studio:

  1. Abra o Gerenciador de Pacotes NuGet.
  2. Busque por Devplus.Messaging.
  3. Clique em Instalar.

Como Usar

📦 Publicando Mensagens

O IMessagingPublisher permite publicar mensagens seguindo o padrão CloudEvents:

using Devplus.Messaging.Interfaces;

public class ProdutoService
{
    private readonly IMessagingPublisher _messagingPublisher;

    public ProdutoService(IMessagingPublisher messagingPublisher)
    {
        _messagingPublisher = messagingPublisher;
    }

    public async Task CriarProduto(Produto produto)
    {
        // Salvar produto no banco...

        // Publicar evento
        await _messagingPublisher.PublishAsync(
            exchangeName: "produtos-exchange",
            message: new {
                ProdutoId = produto.Id,
                Nome = produto.Nome,
                Preco = produto.Preco
            },
            source: "produtos.api",
            typeEvent: "produto.criado",
            messageId: Guid.NewGuid().ToString(), // Opcional
            routingKey: "produtos.criados" // Opcional
        );
    }
}
Parâmetros do PublishAsync:
Parâmetro Tipo Obrigatório Descrição
exchangeName string ✅ Sim Nome do exchange RabbitMQ
message T ✅ Sim Objeto da mensagem a ser publicada
source string ✅ Sim Origem da mensagem (ex: "api.produtos")
typeEvent string ✅ Sim Tipo do evento (ex: "produto.criado")
messageId string ❌ Não ID único da mensagem (gerado automaticamente se não informado)
routingKey string ❌ Não Chave de roteamento (padrão: vazio)

📩 Consumindo Mensagens

Para consumir mensagens, implemente a interface IMessagingConsumer:

using Devplus.Messaging.Interfaces;
using Devplus.Messaging.Models;
using Devplus.Messaging.Enum;

public class ProdutoCriadoConsumer : IMessagingConsumer
{
    // ✅ Obrigatório
    public string ExchangeName => "produtos-exchange";

    // ⚙️ Configurações opcionais (valores padrão mostrados)
    public string QueueName => "produtos-criados-queue";
    public string RoutingKey => "produtos.criados";
    public int MaxRetry => 5;
    public ushort PrefetchCount => 3;
    public QueueType QueueType => QueueType.Quorum;

    private readonly ILogger<ProdutoCriadoConsumer> _logger;
    private readonly IEmailService _emailService;

    public ProdutoCriadoConsumer(
        ILogger<ProdutoCriadoConsumer> logger,
        IEmailService emailService)
    {
        _logger = logger;
        _emailService = emailService;
    }

    public async Task ConsumeAsync(CloudEvent<object> cloudEvent, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Processando produto criado: {Data}", cloudEvent.Data);

        try
        {
            // Processar a mensagem
            var produto = JsonSerializer.Deserialize<ProdutoDto>(cloudEvent.Data.ToString());

            // Enviar email de notificação
            await _emailService.EnviarNotificacao($"Novo produto: {produto.Nome}");

            _logger.LogInformation("Produto {ProdutoId} processado com sucesso", produto.Id);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Erro ao processar produto criado");
            throw; // Será reenviado automaticamente até MaxRetry
        }
    }
}

⚙️ Propriedades do IMessagingConsumer

Propriedade Obrigatório Padrão Descrição
ExchangeName Sim N/A Nome do exchange RabbitMQ para binding
QueueName ❌ Não "{ExchangeName-sem-exchange}-queue" Nome da fila personalizada
RoutingKey ❌ Não "" Chave de roteamento para binding específico
MaxRetry ❌ Não 5 Número máximo de tentativas antes de enviar para DLQ
PrefetchCount ❌ Não 3 Máximo de mensagens simultâneas sem ACK (QoS)
QueueType ❌ Não QueueType.Quorum Tipo de fila: Quorum ou Classic
📋 Exemplos de Nomes Automáticos:
// ExchangeName: "produtos-exchange" → QueueName: "produtos-queue"
// ExchangeName: "pedidos.exchange" → QueueName: "pedidos-queue"
// ExchangeName: "eventos-sistema" → QueueName: "eventos-sistema-queue"

🔄 Sistema de Retry e Recuperação

public class ProcessadorPedidos : IMessagingConsumer
{
    public string ExchangeName => "pedidos-exchange";
    public int MaxRetry => 3; // Tentar até 3 vezes antes de enviar para DLQ

    public async Task ConsumeAsync(CloudEvent<object> cloudEvent, CancellationToken cancellationToken)
    {
        try
        {
            // Processar pedido...
            await ProcessarPedido(cloudEvent.Data);
        }
        catch (HttpRequestException ex)
        {
            // Erro temporário - será reprocessado automaticamente
            _logger.LogWarning("Falha temporária: {Error}", ex.Message);
            throw;
        }
        catch (ArgumentException ex)
        {
            // Erro permanente - registrar e não relançar (vai para ACK)
            _logger.LogError("Dados inválidos: {Error}", ex.Message);
            // Não faz throw - mensagem será confirmada (ACK)
        }
    }
}

DLQ (Dead Letter Queue)

A Dead Letter Queue (DLQ) é uma funcionalidade automática para lidar com mensagens que falharam após múltiplas tentativas.

🏗️ Configuração Automática

A biblioteca cria automaticamente:

// Exchange principal: "produtos-exchange"
// Exchange DLX:      "produtos-exchange-dlx"
// Fila principal:    "produtos-queue"
// Fila DLQ:          "produtos-queue-dlq"

📊 Headers Automáticos na DLQ

Quando uma mensagem é enviada para DLQ, os seguintes headers são adicionados:

{
    "x-retry-count": "5",                           // Total de tentativas
    "x-last-process": "2025-09-28T10:30:00.000Z",   // Última tentativa
    "x-send-dlq": "2025-09-28T10:30:05.000Z",       // Timestamp do envio para DLQ
    "x-failure-reason": "HttpRequestException",      // Motivo da falha
    "x-original-exchange": "produtos-exchange",      // Exchange original
    "x-original-routing-key": "produtos.criados"    // Routing key original
}

🔍 Monitoramento de DLQ

public class DlqMonitorService
{
    public async Task ProcessarDlq()
    {
        // Processar mensagens na DLQ manualmente
        // ou configurar alertas para DLQ com muitas mensagens
    }
}

⚠️ Cenários de Envio para DLQ

  • Exception não tratada após MaxRetry tentativas
  • Timeout de processamento recorrente
  • Falhas de conexão com APIs externas
  • Erro de dados inválidos (deve ser tratado sem throw)

Sistema de Redrive

O Sistema de Redrive permite reprocessar mensagens que estão na Dead Letter Queue (DLQ), oferecendo controle total sobre recuperação de mensagens falhas.

🎯 Funcionalidades do Redrive

  • 📋 Listar mensagens na DLQ com metadados completos
  • 🔄 Reenviar mensagem específica de volta para processamento
  • 🚀 Reenviar todas as mensagens da DLQ em lote
  • 🗑️ Remover mensagem específica permanentemente da DLQ
  • 🧹 Limpar toda a DLQ removendo todas as mensagens
  • Verificar contagem de mensagens na DLQ de forma ultra-performática

🔌 Interface IMessagingRedrive

public interface IMessagingRedrive
{
    /// <summary>
    /// Lista mensagens na Dead Letter Queue
    /// </summary>
    Task<IEnumerable<DeadLetterMessage>> GetDeadLetterMessagesAsync(string queueName, int maxMessages = 100);

    /// <summary>
    /// Reenvia uma mensagem específica da DLQ para a fila principal
    /// </summary>
    Task<bool> RedriveMessageAsync(string queueName, string messageId);

    /// <summary>
    /// Reenvia todas as mensagens da DLQ para a fila principal
    /// </summary>
    Task<int> RedriveAllMessagesAsync(string queueName, int maxMessages = 0);

    /// <summary>
    /// Remove uma mensagem específica da DLQ permanentemente
    /// </summary>
    Task<bool> PurgeDeadLetterMessageAsync(string queueName, string messageId);

    /// <summary>
    /// Limpa completamente uma DLQ (remove todas as mensagens)
    /// </summary>
    Task<int> PurgeDeadLetterQueueAsync(string queueName);
}

📊 Modelo DeadLetterMessage

public class DeadLetterMessage
{
    public string MessageId { get; set; }              // ID único da mensagem
    public string Body { get; set; }                   // Conteúdo da mensagem
    public Dictionary<string, object> Headers { get; set; } // Headers completos
    public int RetryCount { get; set; }                // Número de tentativas realizadas
    public DateTime LastProcessAttempt { get; set; }   // Última tentativa de processamento
    public DateTime SentToDlqAt { get; set; }          // Quando foi enviada para DLQ
    public string FailureReason { get; set; }          // Motivo da falha
    public string OriginalExchange { get; set; }       // Exchange original
    public string OriginalRoutingKey { get; set; }     // Routing key original
    public string DlqName { get; set; }                // Nome da DLQ
}

🔧 Configuração do Redrive

// Program.cs
var builder = WebApplication.CreateBuilder(args);

// ✅ Registrar a biblioteca Messaging (inclui IMessagingRedrive automaticamente)
builder.Services.AddMessaging(builder.Configuration);

var app = builder.Build();
app.Run();

📋 Exemplos Práticos de Uso

🔍 1. Listar Mensagens na DLQ
public class DlqManagementService
{
    private readonly IMessagingRedrive _redrive;
    private readonly ILogger<DlqManagementService> _logger;

    public DlqManagementService(IMessagingRedrive redrive, ILogger<DlqManagementService> logger)
    {
        _redrive = redrive;
        _logger = logger;
    }

    public async Task<List<DeadLetterMessage>> ListarMensagensDlq(string queueName)
    {
        try
        {
            // Listar até 50 mensagens na DLQ
            var messages = await _redrive.GetDeadLetterMessagesAsync(queueName, maxMessages: 50);

            _logger.LogInformation("Encontradas {Count} mensagens na DLQ da fila {QueueName}",
                messages.Count(), queueName);

            foreach (var message in messages)
            {
                _logger.LogInformation("Mensagem {MessageId}: {RetryCount} tentativas, falha: {FailureReason}",
                    message.MessageId, message.RetryCount, message.FailureReason);
            }

            return messages.ToList();
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Erro ao listar mensagens da DLQ {QueueName}", queueName);
            throw;
        }
    }
}
🔄 2. Reenviar Mensagem Específica
public class RedriveService
{
    private readonly IMessagingRedrive _redrive;
    private readonly ILogger<RedriveService> _logger;

    public async Task<bool> ReenviarMensagem(string queueName, string messageId)
    {
        try
        {
            var success = await _redrive.RedriveMessageAsync(queueName, messageId);

            if (success)
            {
                _logger.LogInformation("Mensagem {MessageId} reenviada com sucesso para {QueueName}",
                    messageId, queueName);
                return true;
            }
            else
            {
                _logger.LogWarning("Mensagem {MessageId} não encontrada na DLQ da fila {QueueName}",
                    messageId, queueName);
                return false;
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Erro ao reenviar mensagem {MessageId} da DLQ {QueueName}",
                messageId, queueName);
            return false;
        }
    }
}
🚀 3. Reenviar Todas as Mensagens
public async Task<int> ReenviarTodasMensagens(string queueName, int maxMessages = 100)
{
    try
    {
        // Reenviar até 100 mensagens da DLQ
        var count = await _redrive.RedriveAllMessagesAsync(queueName, maxMessages);

        _logger.LogInformation("{Count} mensagens reenviadas da DLQ para a fila {QueueName}",
            count, queueName);

        return count;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Erro ao reenviar todas as mensagens da DLQ {QueueName}", queueName);
        return 0;
    }
}
🗑️ 4. Remover Mensagens da DLQ
public async Task<bool> RemoverMensagemPermanentemente(string queueName, string messageId)
{
    try
    {
        var success = await _redrive.PurgeDeadLetterMessageAsync(queueName, messageId);

        if (success)
        {
            _logger.LogInformation("Mensagem {MessageId} removida permanentemente da DLQ {QueueName}",
                messageId, queueName);
        }

        return success;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Erro ao remover mensagem {MessageId} da DLQ {QueueName}",
            messageId, queueName);
        return false;
    }
}

public async Task<int> LimparTodaDlq(string queueName)
{
    try
    {
        var count = await _redrive.PurgeDeadLetterQueueAsync(queueName);

        _logger.LogWarning("DLQ da fila {QueueName} limpa completamente. {Count} mensagens removidas",
            queueName, count);

        return count;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Erro ao limpar DLQ da fila {QueueName}", queueName);
        return 0;
    }
}
5. Verificação Rápida de DLQ (Ultra-Performática)
public class DlqMonitorService
{
    private readonly IMessagingRedrive _redrive;
    private readonly ILogger<DlqMonitorService> _logger;

    /// <summary>
    /// Verifica rapidamente se há mensagens na DLQ sem impacto na performance
    /// </summary>
    public async Task<bool> HasPendingMessages(string queueName)
    {
        // ⚡ Operação ultra-rápida - apenas consulta metadados (1-2ms)
        var count = await _redrive.GetDeadLetterQueueMessageCountAsync(queueName);

        if (count > 0)
        {
            _logger.LogWarning("DLQ {QueueName} contém {Count} mensagens pendentes",
                queueName, count);
        }

        return count > 0;
    }

    /// <summary>
    /// Monitoramento em tempo real de múltiplas DLQs
    /// </summary>
    public async Task<Dictionary<string, uint>> GetAllDlqCounts()
    {
        var queues = new[] { "pedidos-queue", "pagamentos-queue", "estoque-queue" };
        var results = new Dictionary<string, uint>();

        // Verifica todas as filas em paralelo para máxima performance
        var tasks = queues.Select(async queue =>
        {
            var count = await _redrive.GetDeadLetterQueueMessageCountAsync(queue);
            return new KeyValuePair<string, uint>(queue, count);
        });

        var counts = await Task.WhenAll(tasks);

        foreach (var (queue, count) in counts)
        {
            results[queue] = count;

            if (count > 50) // Threshold de alerta
            {
                _logger.LogError("🚨 DLQ crítica: {Queue} com {Count} mensagens", queue, count);
            }
        }

        return results;
    }
}

🎛️ Estratégias de Redrive

📊 1. Redrive Baseado em Análise
public class SmartRedriveService
{
    public async Task ExecutarRedriveInteligente(string queueName)
    {
        // 1. Analisar mensagens na DLQ
        var messages = await _redrive.GetDeadLetterMessagesAsync(queueName, 1000);

        // 2. Categorizar por tipo de erro
        var retriableMessages = messages.Where(m => IsRetriableError(m.FailureReason));
        var permanentErrors = messages.Where(m => !IsRetriableError(m.FailureReason));

        // 3. Reenviar apenas erros temporários
        foreach (var message in retriableMessages)
        {
            await _redrive.RedriveMessageAsync(queueName, message.MessageId);
            await Task.Delay(100); // Throttling
        }

        // 4. Registrar erros permanentes para análise
        foreach (var message in permanentErrors)
        {
            _logger.LogWarning("Erro permanente na mensagem {MessageId}: {Reason}",
                message.MessageId, message.FailureReason);
        }
    }

    private bool IsRetriableError(string failureReason)
    {
        return failureReason.Contains("HttpRequestException") ||
               failureReason.Contains("TimeoutException") ||
               failureReason.Contains("SocketException");
    }
}
2. Redrive Agendado
public class ScheduledRedriveService : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                // Verificar DLQs críticas a cada 5 minutos
                await ProcessarDlqsCriticas();

                // Aguardar próxima execução
                await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Erro no redrive agendado");
                await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
            }
        }
    }

    private async Task ProcessarDlqsCriticas()
    {
        var filasMonitoradas = new[] { "pedidos-queue", "pagamentos-queue", "estoque-queue" };

        foreach (var fila in filasMonitoradas)
        {
            var messages = await _redrive.GetDeadLetterMessagesAsync(fila, 10);

            if (messages.Any())
            {
                _logger.LogWarning("DLQ {QueueName} contém {Count} mensagens",
                    fila, messages.Count());

                // Reenviar mensagens antigas (mais de 1 hora na DLQ)
                var oldMessages = messages.Where(m =>
                    DateTime.UtcNow - m.SentToDlqAt > TimeSpan.FromHours(1));

                foreach (var message in oldMessages)
                {
                    await _redrive.RedriveMessageAsync(fila, message.MessageId);
                }
            }
        }
    }
}

🔍 Monitoramento de DLQ

public class DlqMonitoringService
{
    public async Task<DlqHealthReport> GenerateHealthReport()
    {
        var report = new DlqHealthReport();
        var queues = new[] { "pedidos-queue", "produtos-queue", "usuarios-queue" };

        foreach (var queue in queues)
        {
            try
            {
                var messages = await _redrive.GetDeadLetterMessagesAsync(queue, 1000);

                report.QueueReports.Add(new QueueHealthReport
                {
                    QueueName = queue,
                    MessageCount = messages.Count(),
                    OldestMessage = messages.OrderBy(m => m.SentToDlqAt).FirstOrDefault()?.SentToDlqAt,
                    MostCommonError = messages.GroupBy(m => m.FailureReason)
                                           .OrderByDescending(g => g.Count())
                                           .FirstOrDefault()?.Key,
                    Status = messages.Count() > 100 ? "Critical" :
                            messages.Count() > 10 ? "Warning" : "Healthy"
                });
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Erro ao gerar relatório da DLQ {QueueName}", queue);
            }
        }

        return report;
    }
}

�📋 Modelos e Interfaces

🌐 CloudEvent<T> (Padrão CloudEvents)

public class CloudEvent<T>
{
    [JsonPropertyName("specversion")]
    public string SpecVersion { get; set; } = "1.0";

    [JsonPropertyName("type")]
    public string Type { get; set; }              // Tipo do evento

    [JsonPropertyName("source")]
    public string Source { get; set; }            // Origem da mensagem

    [JsonPropertyName("id")]
    public string Id { get; set; }                // ID único da mensagem

    [JsonPropertyName("time")]
    public DateTimeOffset Time { get; set; }      // Timestamp do evento

    [JsonPropertyName("datacontenttype")]
    public string DataContentType { get; set; } = "application/json";

    [JsonPropertyName("data")]
    public T Data { get; set; }                   // Payload da mensagem
}

🔌 IMessagingPublisher

public interface IMessagingPublisher
{
    /// <summary>
    /// Publica uma mensagem no exchange especificado
    /// </summary>
    Task PublishAsync<T>(
        string exchangeName,    // Exchange de destino
        T message,             // Dados da mensagem
        string typeEvent,      // Tipo do evento (ex: "pedido.criado")
        string source,         // Origem (ex: "api.pedidos")
        string messageId = "", // ID único (auto-gerado se vazio)
        string routingKey = "" // Chave de roteamento (opcional)
    );
}

📥 IMessagingConsumer

public interface IMessagingConsumer
{
    // ✅ Obrigatórias
    string ExchangeName { get; }
    Task ConsumeAsync(CloudEvent<object> cloudEvent, CancellationToken cancellationToken);

    // ⚙️ Opcionais (com valores padrão)
    string QueueName => /* gerado automaticamente */;
    string RoutingKey => "";
    int MaxRetry => 5;
    ushort PrefetchCount => 3;
    QueueType QueueType => QueueType.Quorum;
}

🔁 IMessagingRedrive

public interface IMessagingRedrive
{
    /// <summary>
    /// Lista mensagens na Dead Letter Queue
    /// </summary>
    Task<IEnumerable<DeadLetterMessage>> GetDeadLetterMessagesAsync(string queueName, int maxMessages = 100);

    /// <summary>
    /// Reenvia uma mensagem específica da DLQ para a fila principal
    /// </summary>
    Task<bool> RedriveMessageAsync(string queueName, string messageId);

    /// <summary>
    /// Reenvia todas as mensagens da DLQ para a fila principal
    /// </summary>
    Task<int> RedriveAllMessagesAsync(string queueName, int maxMessages = 0);

    /// <summary>
    /// Remove uma mensagem específica da DLQ permanentemente
    /// </summary>
    Task<bool> PurgeDeadLetterMessageAsync(string queueName, string messageId);

    /// <summary>
    /// Limpa completamente uma DLQ (remove todas as mensagens)
    /// </summary>
    Task<int> PurgeDeadLetterQueueAsync(string queueName);
}

🏗️ Tipos de Fila (QueueType)

public enum QueueType
{
    Quorum,  // ✅ Recomendado - Alta disponibilidade e durabilidade
    Classic  // ⚠️  Tradicional - Para compatibilidade com versões antigas
}

🔧 Configuração

⚙️ appsettings.json

{
  "RabbitMq": {
    "Host": "localhost", // Endereço do servidor RabbitMQ
    "Port": 5672, // Porta de conexão (padrão: 5672)
    "Username": "admin", // Usuário de autenticação
    "Password": "senha123", // Senha de autenticação
    "VHost": "/", // Virtual Host (padrão: "/")
    "GlobalPrefetchCount": 10, // QoS global (padrão: 3)
    "UseGlobalPrefetch": true // Usar QoS global (padrão: true)
  }
}

🚀 Configuração no Program.cs

using Devplus.Messaging;

var builder = WebApplication.CreateBuilder(args);

// ✅ Registrar a biblioteca Messaging
builder.Services.AddMessaging(builder.Configuration);

// ✅ Registrar seus consumidores
builder.Services.AddScoped<IMessagingConsumer, ProdutoCriadoConsumer>();
builder.Services.AddScoped<IMessagingConsumer, PedidoCanceladoConsumer>();
builder.Services.AddScoped<IMessagingConsumer, EmailEnviadoConsumer>();

// ✅ Registrar outros serviços
builder.Services.AddScoped<IProdutoService, ProdutoService>();
builder.Services.AddScoped<IEmailService, EmailService>();

var app = builder.Build();

// O RabbitMqHostedService será iniciado automaticamente
app.Run();

📊 Configurações Avançadas de RabbitMQ

{
  "RabbitMq": {
    "Host": "rabbitmq-cluster.empresa.com",
    "Port": 5672,
    "Username": "app-user",
    "Password": "senha-super-segura",
    "VHost": "/producao",
    "GlobalPrefetchCount": 50, // Para alto throughput
    "UseGlobalPrefetch": false // QoS por consumidor individual
  }
}

🧪 Exemplo Completo de Implementação

📦 Serviço de Publicação

public class PedidoService
{
    private readonly IMessagingPublisher _publisher;
    private readonly IPedidoRepository _repository;

    public PedidoService(IMessagingPublisher publisher, IPedidoRepository repository)
    {
        _publisher = publisher;
        _repository = repository;
    }

    public async Task CriarPedido(CriarPedidoDto dto)
    {
        // 1. Salvar no banco
        var pedido = new Pedido(dto.ClienteId, dto.Produtos);
        await _repository.SalvarAsync(pedido);

        // 2. Publicar eventos
        await _publisher.PublishAsync(
            exchangeName: "pedidos-exchange",
            message: new PedidoCriadoEvent
            {
                PedidoId = pedido.Id,
                ClienteId = pedido.ClienteId,
                ValorTotal = pedido.ValorTotal,
                DataCriacao = pedido.DataCriacao
            },
            source: "api.pedidos",
            typeEvent: "pedido.criado",
            routingKey: "pedidos.criados"
        );

        // 3. Evento para estoque
        await _publisher.PublishAsync(
            exchangeName: "estoque-exchange",
            message: new { PedidoId = pedido.Id, Produtos = pedido.Produtos },
            source: "api.pedidos",
            typeEvent: "estoque.reserva-solicitada",
            routingKey: "estoque.reservas"
        );
    }
}

📥 Múltiplos Consumidores

// ✅ Consumidor para notificações por email
public class NotificacaoEmailConsumer : IMessagingConsumer
{
    public string ExchangeName => "pedidos-exchange";
    public string QueueName => "notificacoes-email-queue";
    public string RoutingKey => "pedidos.criados";

    private readonly IEmailService _emailService;

    public async Task ConsumeAsync(CloudEvent<object> cloudEvent, CancellationToken cancellationToken)
    {
        var pedido = JsonSerializer.Deserialize<PedidoCriadoEvent>(cloudEvent.Data.ToString());
        await _emailService.EnviarConfirmacao(pedido.ClienteId, pedido.PedidoId);
    }
}

// ✅ Consumidor para integração com ERP
public class IntegracaoErpConsumer : IMessagingConsumer
{
    public string ExchangeName => "pedidos-exchange";
    public string QueueName => "integracao-erp-queue";
    public string RoutingKey => "pedidos.criados";
    public int MaxRetry => 10; // ERP pode estar instável

    private readonly IErpService _erpService;

    public async Task ConsumeAsync(CloudEvent<object> cloudEvent, CancellationToken cancellationToken)
    {
        var pedido = JsonSerializer.Deserialize<PedidoCriadoEvent>(cloudEvent.Data.ToString());
        await _erpService.SincronizarPedido(pedido);
    }
}

// ✅ Consumidor para reserva de estoque
public class ReservaEstoqueConsumer : IMessagingConsumer
{
    public string ExchangeName => "estoque-exchange";
    public string QueueName => "reservas-estoque-queue";
    public string RoutingKey => "estoque.reservas";
    public QueueType QueueType => QueueType.Quorum; // Alta disponibilidade

    private readonly IEstoqueService _estoqueService;

    public async Task ConsumeAsync(CloudEvent<object> cloudEvent, CancellationToken cancellationToken)
    {
        var reserva = JsonSerializer.Deserialize<ReservaEstoqueEvent>(cloudEvent.Data.ToString());
        await _estoqueService.ReservarProdutos(reserva.PedidoId, reserva.Produtos);
    }
}

Funcionalidades Técnicas

🏗️ Arquitetura da Biblioteca

Devplus.Messaging/
├── src/
│   ├── Configuration/
│   │   └── RabbitMqConfig.cs           # Configurações RabbitMQ
│   ├── Enum/
│   │   └── QueueType.cs                # Tipos de fila (Quorum/Classic)
│   ├── Interfaces/
│   │   ├── IMessagingConsumer.cs       # Interface para consumidores
│   │   ├── IMessagingPublisher.cs      # Interface para publicação
│   │   └── IMessagingRedrive.cs        # Interface para operações de redrive
│   ├── Models/
│   │   ├── CloudEvent.cs               # Modelo CloudEvents
│   │   └── DeadLetterMessage.cs        # Modelo para mensagens DLQ
│   ├── Services/
│   │   ├── RabbitMqHostedService.cs    # Background service para consumidores
│   │   ├── RabbitMqPublisher.cs        # Implementação do publisher
│   │   └── RabbitMqRedriveService.cs   # Implementação do redrive
│   └── MessagingServiceCollectionExtensions.cs # DI configuration

🔗 Recursos Automáticos

Recurso Descrição
Exchange Declaration Criação automática de exchanges do tipo topic
Queue Declaration Criação automática de filas (principais e DLQ)
Binding Automation Vinculação automática entre exchanges e filas
DLX Setup Configuração automática de Dead Letter Exchange
Redrive System Operações completas de redrive para mensagens DLQ
Reconnection Reconexão automática em caso de falhas
QoS Management Gerenciamento de Quality of Service por consumidor

📊 Métricas e Monitoramento

// Logs automáticos gerados pela biblioteca
[INFO] Message publish - Exchange: produtos-exchange, RoutingKey: produtos.criados, MessageId: abc123
[INFO] Message received Queue: produtos-queue Consumer: ProdutoCriadoConsumer
[WARN] Erro ao processar mensagem da fila produtos-queue - Retry 2/5
[ERROR] Mensagem enviada para DLQ após 5 tentativas - Queue: produtos-queue-dlq
[WARN] Canal do consumidor ProdutoCriadoConsumer foi encerrado. Reason: Connection lost
[INFO] Consumidor ProdutoCriadoConsumer reconectado com sucesso

�️ Tratamento de Erros e Resiliência

public class ProcessadorPagamentos : IMessagingConsumer
{
    public string ExchangeName => "pagamentos-exchange";
    public int MaxRetry => 3;

    public async Task ConsumeAsync(CloudEvent<object> cloudEvent, CancellationToken cancellationToken)
    {
        try
        {
            await ProcessarPagamento(cloudEvent.Data);
        }
        catch (PaymentGatewayException ex) when (ex.IsRetryable)
        {
            // ✅ Erro temporário - será reprocessado
            _logger.LogWarning("Falha temporária no gateway: {Error}", ex.Message);
            throw; // Reprocessar
        }
        catch (InvalidCardException ex)
        {
            // ❌ Erro permanente - não reprocessar
            _logger.LogError("Cartão inválido: {Error}", ex.Message);
            await _notificationService.NotificarCartaoInvalido(ex.CardId);
            // Não faz throw - vai para ACK
        }
        catch (Exception ex)
        {
            // ⚠️ Erro desconhecido - reprocessar
            _logger.LogError(ex, "Erro inesperado no processamento");
            throw;
        }
    }
}

🔧 Configurações de Performance

{
  "RabbitMq": {
    "Host": "localhost",
    "Port": 5672,
    "Username": "user",
    "Password": "pass",
    "VHost": "/",
    "GlobalPrefetchCount": 100, // Alto throughput
    "UseGlobalPrefetch": false // QoS individual por consumidor
  }
}
// Consumidor otimizado para alta performance
public class HighVolumeConsumer : IMessagingConsumer
{
    public string ExchangeName => "eventos-volume-exchange";
    public ushort PrefetchCount => 50;           // Processar até 50 mensagens simultâneas
    public QueueType QueueType => QueueType.Quorum; // Alta disponibilidade
    public int MaxRetry => 2;                    // Retry rápido

    public async Task ConsumeAsync(CloudEvent<object> cloudEvent, CancellationToken cancellationToken)
    {
        // Processamento otimizado e rápido
        await ProcessarRapidamente(cloudEvent.Data);
    }
}

🛠️ Compatibilidade e Versioning

.NET Version Suportado RabbitMQ Client
.NET 8.0 Recomendado 6.8.x
.NET 7.0 ✅ Sim 6.8.x
.NET 6.0 ✅ Sim 6.8.x
.NET Core 3.1 ⚠️ Limitado 6.x
.NET Framework ❌ Não -

📈 Versionamento Semântico

  • Major (X.y.z): Mudanças incompatíveis na API
  • Minor (x.Y.z): Novas funcionalidades compatíveis
  • Patch (x.y.Z): Correções de bugs

🚀 Docker e Desenvolvimento

🐳 RabbitMQ com Docker Compose

A biblioteca inclui um docker-compose.yaml para desenvolvimento local:

# docker/docker-compose.yaml
version: "3.8"
services:
  rabbitmq:
    image: rabbitmq:3.13-management
    container_name: devplus-rabbitmq
    ports:
      - "5672:5672" # AMQP port
      - "15672:15672" # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
      RABBITMQ_DEFAULT_VHOST: /
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

volumes:
  rabbitmq_data:

Comandos úteis:

# Iniciar RabbitMQ
docker-compose -f docker/docker-compose.yaml up -d

# Acessar Management UI
open http://localhost:15672
# Usuário: admin, Senha: admin123

# Parar RabbitMQ
docker-compose -f docker/docker-compose.yaml down

🏗️ Melhores Práticas

Recomendações

  1. 📝 Nomeação Consistente: Use padrões como {dominio}-{acao}-exchange
  2. 🔄 Retry Inteligente: Configure MaxRetry baseado no tipo de operação
  3. 📊 QoS Adequado: Ajuste PrefetchCount conforme capacidade de processamento
  4. 🏗️ Filas Quorum: Use QueueType.Quorum para alta disponibilidade
  5. 🔍 Logs Detalhados: Implemente logging estruturado nos consumidores
  6. ⚠️ Tratamento de Erros: Diferencie erros temporários de permanentes
  7. 🎯 Routing Keys: Use routing keys para roteamento específico
  8. 🔁 Monitoramento DLQ: Configure alertas para DLQs com muitas mensagens
  9. 📊 Redrive Inteligente: Use análise de erros antes de fazer redrive em massa
  10. ⏰ Redrive Agendado: Implemente redrive automático para erros temporários

Evite

  • ❌ Processar mensagens grandes (>1MB) - use referências
  • ❌ Operações síncronas longas nos consumidores
  • ❌ Fazer throw para erros de dados inválidos
  • ❌ Usar PrefetchCount muito alto sem CPU/memória adequada
  • ❌ Ignorar mensagens na DLQ sem monitoramento
  • ❌ Fazer redrive em massa sem analisar os erros primeiro
  • ❌ Remover mensagens da DLQ sem investigação adequada
  • ❌ Redrive frequente de mensagens com erros permanentes

🔧 Exemplo de Configuração de Produção

{
  "RabbitMq": {
    "Host": "rabbitmq-cluster.prod.empresa.com",
    "Port": 5672,
    "Username": "${RABBITMQ_USER}",
    "Password": "${RABBITMQ_PASSWORD}",
    "VHost": "/producao",
    "GlobalPrefetchCount": 20,
    "UseGlobalPrefetch": false
  }
}

📄 Licença

Este projeto está licenciado sob a MIT License.


📞 Suporte

Para dúvidas, sugestões ou problemas:

📧 Email: clayton@devplus.com.br
🔗 LinkedIn: Clayton Oliveira
🚀 Website: www.devplus.com.br

🤝 Contribuições

Contribuições são bem-vindas! Para contribuir:

  1. 🍴 Fork o repositório
  2. 🌿 Crie uma branch: git checkout -b feature/nova-funcionalidade
  3. 💻 Implemente sua funcionalidade com testes
  4. 📝 Commit: git commit -m "feat: adicionar nova funcionalidade"
  5. 📤 Push: git push origin feature/nova-funcionalidade
  6. 🔄 Abra um Pull Request com descrição detalhada

Devplus.Messaging v3.0.1 - Mensageria robusta e escalável com sistema de redrive completo para aplicações .NET 🚀

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
3.0.1 411 10/14/2025
2.7.5 189 9/29/2025
2.7.4 2,094 5/20/2025
2.7.2 187 5/17/2025
2.7.0 262 4/30/2025
2.6.1 225 4/29/2025
2.5.1 273 4/23/2025
2.5.0 306 4/14/2025
2.4.1 232 4/8/2025
2.4.0 239 4/8/2025
2.2.0 241 4/7/2025
2.1.0 223 4/6/2025
2.0.0 518 3/25/2025
1.0.2 334 3/23/2025
1.0.1 223 3/20/2025
1.0.0 234 3/19/2025