Rabbit.Common.MQ.Cap 1.3.1

dotnet add package Rabbit.Common.MQ.Cap --version 1.3.1
                    
NuGet\Install-Package Rabbit.Common.MQ.Cap -Version 1.3.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Rabbit.Common.MQ.Cap" Version="1.3.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Rabbit.Common.MQ.Cap" Version="1.3.1" />
                    
Directory.Packages.props
<PackageReference Include="Rabbit.Common.MQ.Cap" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Rabbit.Common.MQ.Cap --version 1.3.1
                    
#r "nuget: Rabbit.Common.MQ.Cap, 1.3.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Rabbit.Common.MQ.Cap@1.3.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Rabbit.Common.MQ.Cap&version=1.3.1
                    
Install as a Cake Addin
#tool nuget:?package=Rabbit.Common.MQ.Cap&version=1.3.1
                    
Install as a Cake Tool

Rabbit.Common.MQ.Cap

Distributed messaging via DotNetCore.CAP with outbox pattern, idempotency support, and dashboard. Supports RabbitMQ/Kafka transport with MySQL/PostgreSQL/SQL Server storage.

基于 CAP 的消息队列组件,支持分布式事务。

安装

dotnet add package Rabbit.Common.MQ.Cap

依赖

  • Rabbit.Common.Lite
  • DotNetCore.CAP 10.0.1
  • DotNetCore.CAP.RabbitMQ 10.0.1
  • DotNetCore.CAP.MySql/SqlServer/PostgreSql

特性

  • 分布式事务(Outbox 模式)
  • 消息持久化
  • 失败重试
  • 延迟消息(原生支持)
  • Dashboard 监控(支持认证配置)
  • 幂等性支持(消息去重)
  • 多种存储支持

使用方式

Console 项目

using Microsoft.Extensions.DependencyInjection;
using Rabbit.Common.MQ.Cap;
using DotNetCore.CAP;

// 1. 创建 DI 容器
var services = new ServiceCollection();

// 2. 注册 CAP(RabbitMQ + MySQL)
services.AddCapWithRabbitMQAndMySql(
    "amqp://guest:guest@localhost:5672/",
    "Server=localhost;Database=test;Uid=root;Pwd=123456;"
);

// 内存存储(测试用)
// services.AddCapWithMemoryStorage();

// 3. 注册事件发布器
services.AddCapEventPublisher();

// 4. 构建服务提供者
var provider = services.BuildServiceProvider();

// 5. 发布事件
var publisher = provider.GetRequiredService<ICapEventPublisher>();
await publisher.PublishAsync("order.created", new Order { Id = 1 });

// 延迟发布
await publisher.PublishDelayAsync("order.remind", order, TimeSpan.FromHours(1));

Web API 项目

using Rabbit.Common.MQ.Cap;

// RabbitMQ + MySQL
services.AddCapWithRabbitMQAndMySql(
    "amqp://guest:guest@localhost:5672/",
    "Server=localhost;Database=test;Uid=root;Pwd=123456;",
    dashboardOptions: new CapDashboardOptions
    {
        PathMatch = "/capmanager",
        Enable = true
    }
);

// 或使用其他存储
services.AddCapWithRabbitMQAndSqlServer(rabbitConn, sqlServerConn);
services.AddCapWithRabbitMQAndPostgreSql(rabbitConn, postgresConn);

// 内存存储(测试用)
services.AddCapWithMemoryStorage();

// 注册事件发布器
services.AddCapEventPublisher();

发布事件

using Rabbit.Common.MQ.Cap;

public class OrderService
{
    private readonly ICapEventPublisher _publisher;

    public OrderService(ICapEventPublisher publisher)
    {
        _publisher = publisher;
    }

    // 普通发布
    public async Task CreateOrder(Order order)
    {
        await _publisher.PublishAsync("order.created", order);
    }

    // 延迟发布(1小时后执行)
    public async Task RemindPayment(Order order)
    {
        await _publisher.PublishDelayAsync(
            "order.payment.remind",
            order,
            TimeSpan.FromHours(1)
        );
    }

    // 定时发布
    public async Task ScheduleReport(Report report, DateTime publishTime)
    {
        await _publisher.PublishAtAsync("report.generate", report, publishTime);
    }
}

消费事件

using Rabbit.Common.MQ.Cap;

public class OrderConsumer : CapConsumerBase
{
    private readonly ILogger<OrderConsumer> _logger;

    public OrderConsumer(ILogger<OrderConsumer> logger)
    {
        _logger = logger;
    }

    [CapSubscribe("order.created")]
    public async Task HandleOrderCreated(Order order)
    {
        _logger.LogInformation("订单创建: {OrderId}", order.Id);
        // 发送邮件通知...
    }
}

消费者幂等性(推荐)

确保消息不会被重复处理:

public class OrderConsumer : CapConsumerBase
{
    public OrderConsumer(IIdempotentMessageStore idempotentStore)
    {
        IdempotentStore = idempotentStore;
    }

    [CapSubscribe("order.created")]
    public async Task HandleOrderCreated(Order order)
    {
        // 方式1:使用 TryProcessMessageAsync(推荐)
        await TryProcessMessageAsync(
            messageId: order.Id.ToString(),
            handler: async () =>
            {
                // 你的业务逻辑,只会执行一次
                await SendEmailAsync(order);
                await UpdateInventoryAsync(order);
            },
            messageType: "order.created",
            expiry: TimeSpan.FromHours(24)
        );
    }
}

配置选项

CAP 配置

services.AddCap(options =>
{
    // 重试配置
    options.FailedRetryCount = 5;        // 失败重试次数
    options.FailedRetryInterval = 30;    // 重试间隔(秒)
    options.SucceedMessageExpiredAfter = 3600; // 成功消息过期时间
});

Dashboard 配置

var dashboardOptions = new CapDashboardOptions
{
    PathMatch = "/capmanager",    // 访问路径
    Enable = true                  // 是否启用
};

services.AddCapWithRabbitMQAndMySql(
    rabbitConnectionString,
    mysqlConnectionString,
    dashboardOptions: dashboardOptions
);

API 参考

ICapEventPublisher

方法 说明
PublishAsync 发布事件
PublishDelayAsync 延迟发布
PublishAtAsync 定时发布

IIdempotentMessageStore

方法 说明
IsProcessedAsync 检查消息是否已处理
MarkAsProcessedAsync 标记消息为已处理
TryAcquireAsync 原子操作:尝试获取处理权

Dashboard

启动后访问 /capmanager 查看:

  • 已发布消息
  • 已接收消息
  • 失败消息重试

注意:生产环境请配置认证。

与原生 RabbitMQ 的区别

特性 MQ.Cap MQ.RabbitMQ
分布式事务 支持 不支持
Dashboard 支持 不支持
消息审计 支持 不支持
延迟消息 原生支持 需要插件
幂等性 内置支持 需手动实现
包体积 ~5MB ~500KB
性能 中等 更高

最佳实践

  1. 消息幂等性:始终实现幂等性检查,防止重复处理
  2. Dashboard 安全:生产环境关闭匿名访问
  3. 消息清理:定期清理已消费的成功消息
  4. 监控告警:监控消息积压情况,设置告警阈值

许可证

MIT

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Rabbit.Common.MQ.Cap:

Package Downloads
Rabbit.Common.Full

Meta-package referencing all Rabbit.Common utility packages. / Rabbit.Common 全家桶,引用所有子包

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.3.1 102 5/28/2026
1.3.0 103 5/19/2026
1.2.1 110 4/7/2026
1.2.0 127 3/27/2026
1.1.0 127 3/12/2026
1.0.0 114 3/12/2026