Rabbit.Common.MQ.Cap
1.3.1
dotnet add package Rabbit.Common.MQ.Cap --version 1.3.1
NuGet\Install-Package Rabbit.Common.MQ.Cap -Version 1.3.1
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Rabbit.Common.MQ.Cap" Version="1.3.1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Rabbit.Common.MQ.Cap" Version="1.3.1" />
<PackageReference Include="Rabbit.Common.MQ.Cap" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Rabbit.Common.MQ.Cap --version 1.3.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Rabbit.Common.MQ.Cap, 1.3.1"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Rabbit.Common.MQ.Cap@1.3.1
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Rabbit.Common.MQ.Cap&version=1.3.1
#tool nuget:?package=Rabbit.Common.MQ.Cap&version=1.3.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Rabbit.Common.MQ.Cap
Distributed messaging via DotNetCore.CAP with outbox pattern, idempotency support, and dashboard. Supports RabbitMQ/Kafka transport with MySQL/PostgreSQL/SQL Server storage.
基于 CAP 的消息队列组件,支持分布式事务。
安装
dotnet add package Rabbit.Common.MQ.Cap
依赖
- Rabbit.Common.Lite
- DotNetCore.CAP 10.0.1
- DotNetCore.CAP.RabbitMQ 10.0.1
- DotNetCore.CAP.MySql/SqlServer/PostgreSql
特性
- 分布式事务(Outbox 模式)
- 消息持久化
- 失败重试
- 延迟消息(原生支持)
- Dashboard 监控(支持认证配置)
- 幂等性支持(消息去重)
- 多种存储支持
使用方式
Console 项目
using Microsoft.Extensions.DependencyInjection;
using Rabbit.Common.MQ.Cap;
using DotNetCore.CAP;
// 1. 创建 DI 容器
var services = new ServiceCollection();
// 2. 注册 CAP(RabbitMQ + MySQL)
services.AddCapWithRabbitMQAndMySql(
"amqp://guest:guest@localhost:5672/",
"Server=localhost;Database=test;Uid=root;Pwd=123456;"
);
// 内存存储(测试用)
// services.AddCapWithMemoryStorage();
// 3. 注册事件发布器
services.AddCapEventPublisher();
// 4. 构建服务提供者
var provider = services.BuildServiceProvider();
// 5. 发布事件
var publisher = provider.GetRequiredService<ICapEventPublisher>();
await publisher.PublishAsync("order.created", new Order { Id = 1 });
// 延迟发布
await publisher.PublishDelayAsync("order.remind", order, TimeSpan.FromHours(1));
Web API 项目
using Rabbit.Common.MQ.Cap;
// RabbitMQ + MySQL
services.AddCapWithRabbitMQAndMySql(
"amqp://guest:guest@localhost:5672/",
"Server=localhost;Database=test;Uid=root;Pwd=123456;",
dashboardOptions: new CapDashboardOptions
{
PathMatch = "/capmanager",
Enable = true
}
);
// 或使用其他存储
services.AddCapWithRabbitMQAndSqlServer(rabbitConn, sqlServerConn);
services.AddCapWithRabbitMQAndPostgreSql(rabbitConn, postgresConn);
// 内存存储(测试用)
services.AddCapWithMemoryStorage();
// 注册事件发布器
services.AddCapEventPublisher();
发布事件
using Rabbit.Common.MQ.Cap;
public class OrderService
{
private readonly ICapEventPublisher _publisher;
public OrderService(ICapEventPublisher publisher)
{
_publisher = publisher;
}
// 普通发布
public async Task CreateOrder(Order order)
{
await _publisher.PublishAsync("order.created", order);
}
// 延迟发布(1小时后执行)
public async Task RemindPayment(Order order)
{
await _publisher.PublishDelayAsync(
"order.payment.remind",
order,
TimeSpan.FromHours(1)
);
}
// 定时发布
public async Task ScheduleReport(Report report, DateTime publishTime)
{
await _publisher.PublishAtAsync("report.generate", report, publishTime);
}
}
消费事件
using Rabbit.Common.MQ.Cap;
public class OrderConsumer : CapConsumerBase
{
private readonly ILogger<OrderConsumer> _logger;
public OrderConsumer(ILogger<OrderConsumer> logger)
{
_logger = logger;
}
[CapSubscribe("order.created")]
public async Task HandleOrderCreated(Order order)
{
_logger.LogInformation("订单创建: {OrderId}", order.Id);
// 发送邮件通知...
}
}
消费者幂等性(推荐)
确保消息不会被重复处理:
public class OrderConsumer : CapConsumerBase
{
public OrderConsumer(IIdempotentMessageStore idempotentStore)
{
IdempotentStore = idempotentStore;
}
[CapSubscribe("order.created")]
public async Task HandleOrderCreated(Order order)
{
// 方式1:使用 TryProcessMessageAsync(推荐)
await TryProcessMessageAsync(
messageId: order.Id.ToString(),
handler: async () =>
{
// 你的业务逻辑,只会执行一次
await SendEmailAsync(order);
await UpdateInventoryAsync(order);
},
messageType: "order.created",
expiry: TimeSpan.FromHours(24)
);
}
}
配置选项
CAP 配置
services.AddCap(options =>
{
// 重试配置
options.FailedRetryCount = 5; // 失败重试次数
options.FailedRetryInterval = 30; // 重试间隔(秒)
options.SucceedMessageExpiredAfter = 3600; // 成功消息过期时间
});
Dashboard 配置
var dashboardOptions = new CapDashboardOptions
{
PathMatch = "/capmanager", // 访问路径
Enable = true // 是否启用
};
services.AddCapWithRabbitMQAndMySql(
rabbitConnectionString,
mysqlConnectionString,
dashboardOptions: dashboardOptions
);
API 参考
ICapEventPublisher
| 方法 | 说明 |
|---|---|
PublishAsync |
发布事件 |
PublishDelayAsync |
延迟发布 |
PublishAtAsync |
定时发布 |
IIdempotentMessageStore
| 方法 | 说明 |
|---|---|
IsProcessedAsync |
检查消息是否已处理 |
MarkAsProcessedAsync |
标记消息为已处理 |
TryAcquireAsync |
原子操作:尝试获取处理权 |
Dashboard
启动后访问 /capmanager 查看:
- 已发布消息
- 已接收消息
- 失败消息重试
注意:生产环境请配置认证。
与原生 RabbitMQ 的区别
| 特性 | MQ.Cap | MQ.RabbitMQ |
|---|---|---|
| 分布式事务 | 支持 | 不支持 |
| Dashboard | 支持 | 不支持 |
| 消息审计 | 支持 | 不支持 |
| 延迟消息 | 原生支持 | 需要插件 |
| 幂等性 | 内置支持 | 需手动实现 |
| 包体积 | ~5MB | ~500KB |
| 性能 | 中等 | 更高 |
最佳实践
- 消息幂等性:始终实现幂等性检查,防止重复处理
- Dashboard 安全:生产环境关闭匿名访问
- 消息清理:定期清理已消费的成功消息
- 监控告警:监控消息积压情况,设置告警阈值
许可证
MIT
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net10.0
- DotNetCore.CAP (>= 10.0.1)
- DotNetCore.CAP.Dashboard (>= 10.0.1)
- DotNetCore.CAP.InMemoryStorage (>= 10.0.1)
- DotNetCore.CAP.Kafka (>= 10.0.1)
- DotNetCore.CAP.MySql (>= 10.0.1)
- DotNetCore.CAP.PostgreSql (>= 10.0.1)
- DotNetCore.CAP.RabbitMQ (>= 10.0.1)
- DotNetCore.CAP.SqlServer (>= 10.0.1)
- Rabbit.Common.Lite (>= 1.3.1)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Rabbit.Common.MQ.Cap:
| Package | Downloads |
|---|---|
|
Rabbit.Common.Full
Meta-package referencing all Rabbit.Common utility packages. / Rabbit.Common 全家桶,引用所有子包 |
GitHub repositories
This package is not used by any popular GitHub repositories.