LT.EventBus.Channels
1.0.0
dotnet add package LT.EventBus.Channels --version 1.0.0
NuGet\Install-Package LT.EventBus.Channels -Version 1.0.0
<PackageReference Include="LT.EventBus.Channels" Version="1.0.0" />
<PackageVersion Include="LT.EventBus.Channels" Version="1.0.0" />
<PackageReference Include="LT.EventBus.Channels" />
paket add LT.EventBus.Channels --version 1.0.0
#r "nuget: LT.EventBus.Channels, 1.0.0"
#:package LT.EventBus.Channels@1.0.0
#addin nuget:?package=LT.EventBus.Channels&version=1.0.0
#tool nuget:?package=LT.EventBus.Channels&version=1.0.0
EventBus
轻量级 .NET 进程内事件总线类库 - 内存占用最小,功能强大
特点
- 零依赖 - 核心库不依赖任何第三方包
- 渐进增强 - 从 .NET Framework 4.6 到 .NET 9 全平台支持
- 内存极小 - 空实例 < 1KB,比同类库低 5-15 倍
- 防内存泄漏 - 默认弱引用订阅,订阅者释放后自动清理
- 零分配热路径 - 委托缓存 + Span,避免运行时分配
- 管道中间件 - 可扩展的拦截器链(日志、重试、验证)
快速开始
安装
# 核心库
dotnet add package EventBus.Core
# Channel 高性能总线 (.NET 6+)
dotnet add package EventBus.Channels
# DI 集成
dotnet add package EventBus.DependencyInjection
基础用法
using EventBus;
// 1. 创建总线
var bus = new EventBus();
// 2. 订阅事件
bus.Subscribe<OrderCreatedEvent>(e =>
{
Console.WriteLine($"Order created: {e.OrderId}");
});
// 3. 发布事件
bus.Publish(new OrderCreatedEvent("ORD001", "John Doe"));
异步用法
// 异步订阅
bus.Subscribe<PaymentEvent>(async e =>
{
await _repository.SaveAsync(e);
await _emailService.SendAsync(e.Email);
});
// 异步发布
await bus.PublishAsync(new PaymentEvent(...));
带选项订阅
// 带过滤器和优先级
bus.Subscribe<OrderEvent>(e => ProcessVIP(e),
new SubscriptionOptions(
priority: 10, // 高优先级先执行
filter: e => e.IsVIP, // 仅 VIP
maxRetries: 3 // 失败重试
));
管道中间件
// 添加中间件
bus.Pipeline.Use(new LoggingMiddleware());
bus.Pipeline.Use(new RetryMiddleware(maxRetries: 3));
bus.Pipeline.Use(new ValidationMiddleware(e => Validate(e)));
// 正常使用,自动经过中间件链
bus.Publish(new OrderEvent(...));
取消订阅
var subscription = bus.Subscribe<Event>(HandleEvent);
// 不再需要时取消
subscription.Dispose();
Channel 高性能模式 (.NET 6+)
using EventBus.Channels;
var channelBus = new ChannelEventBus(new ChannelOptions
{
Capacity = 1024,
FullMode = BoundedChannelFullMode.Wait
});
// 发布
await channelBus.PublishAsync(new LogEvent(...));
// 以 IAsyncEnumerable 方式消费
await foreach (var evt in channelBus.SubscribeAsync<LogEvent>())
{
await _logWriter.WriteAsync(evt);
}
模块详解
一、EventBus.Core(核心库)
核心库是整个项目的基础,提供发布/订阅、弱引用、管道中间件、优先级调度等功能。零依赖,可在任何 .NET 项目中使用。
二、EventBus.Channels(高性能总线)
基于 System.Threading.Channels 构建的高性能事件总线,适用于高吞吐和需要背压控制的场景。仅支持 .NET 6+。
三、EventBus.DependencyInjection(DI 集成)
作用:将 EventBus 集成到 Microsoft.Extensions.DependencyInjection 容器中,实现自动生命周期管理和依赖注入。
与基础用法的区别:
| 对比项 | 基础用法 | DI 集成 |
|---|---|---|
| 实例创建 | new EventBus() 手动创建 |
容器自动创建单例 |
| 实例共享 | 需要手动传递引用 | 构造函数注入,处处同一实例 |
| 生命周期 | 手动 Dispose() |
应用关闭时自动释放 |
| 订阅者依赖 | Handler 无法注入服务 | Handler 可从容器获取依赖 |
| 适用场景 | 控制台程序、简单场景 | ASP.NET Core、中大型项目 |
提供的扩展方法:
| 方法 | 作用 | 说明 |
|---|---|---|
AddEventBus() |
注册 EventBus 单例 | 注册为 IEventBus 接口,全局共享 |
AddEventAggregator() |
注册事件聚合器 | 用于多事件合并处理 |
AddSubscriber<T>() |
注册订阅者类 | 把你的 Handler 类注册到容器 |
AddChannelEventBus() |
注册 ChannelEventBus | .NET 6+,自动管理后台服务生命周期 |
使用示例:
// 1. 在 Program.cs 中注册
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEventBus(options =>
{
options.DefaultWeakReference = true;
options.ErrorHandler = (ex, evt, type) =>
Log.Error(ex, "Event {Type} failed", type.Name);
})
.AddEventAggregator()
.AddSubscriber<OrderCreatedEventHandler>();
var app = builder.Build();
app.Run();
// 2. 在任意服务中通过构造函数注入使用
public class OrderService
{
private readonly IEventBus _bus;
private readonly ILogger<OrderService> _logger;
// 容器自动注入同一个 EventBus 单例
public OrderService(IEventBus bus, ILogger<OrderService> logger)
{
_bus = bus;
_logger = logger;
}
public async Task CreateOrderAsync(OrderDto dto)
{
// 直接使用,无需手动创建总线
await _bus.PublishAsync(new OrderCreatedEvent(dto.Id, dto.CustomerName));
_logger.LogInformation("Order {Id} created", dto.Id);
}
}
// 3. 订阅者类也可以从容器获取依赖
public class OrderCreatedEventHandler
{
private readonly IEmailService _emailService;
private readonly IRepository _repository;
public OrderCreatedEventHandler(IEmailService emailService, IRepository repository)
{
_emailService = emailService;
_repository = repository;
}
public void Handle(OrderCreatedEvent evt)
{
_repository.Save(evt);
_emailService.SendConfirmation(evt.CustomerEmail);
}
}
ChannelEventBus 的 DI 注册(.NET 6+):
builder.Services.AddChannelEventBus(options =>
{
options.Capacity = 2048;
options.FullMode = BoundedChannelFullMode.Wait;
});
注册后会自动创建一个 IHostedService 后台托管服务,确保应用停止时 ChannelEventBus 被正确释放,无需手动管理。
WinForms UI 更新指南
在 WinForms 或 WPF 等 GUI 应用中使用 EventBus 时,能否直接更新 UI 控件取决于发布事件的方式。
同步发布 —— 可以直接更新 UI
同步发布(Publish / Emit)会在调用线程上执行所有同步订阅者。如果调用线程是 UI 线程,订阅回调中可以直接修改 UI 控件。
// 在 UI 线程中调用
bus.Publish(new OrderCreatedEvent("ORD001"));
// 订阅者可以直接更新 UI(同一线程)
bus.Subscribe<OrderCreatedEvent>(e =>
{
textBox1.Text = $"订单号: {e.Id}"; // ✅ 可以
dataGridView1.Rows.Add(e.Id, e.CustomerName); // ✅ 可以
});
异步发布 —— 不能直接更新 UI
异步发布(PublishAsync / EmitAsync)或异步订阅者(Func<TEvent, Task>)会在线程池线程上执行。直接修改 UI 控件会引发 InvalidOperationException(跨线程操作异常)。
// 异步发布 - 订阅者在线程池线程上执行
await bus.PublishAsync(new OrderCreatedEvent("ORD001"));
// ❌ 错误:会引发跨线程异常
bus.Subscribe<OrderCreatedEvent>(async e =>
{
textBox1.Text = $"订单号: {e.Id}"; // InvalidOperationException
});
正确方式:使用 Invoke 封送到 UI 线程
// ✅ 使用 Invoke 同步封送
bus.Subscribe<OrderCreatedEvent>(async e =>
{
// 1. 在后台处理业务逻辑
await ProcessOrderAsync(e);
// 2. 更新 UI 时封送到 UI 线程
this.Invoke(() =>
{
textBox1.Text = $"订单号: {e.Id}";
dataGridView1.Rows.Add(e.Id, e.CustomerName);
lblStatus.Text = "处理完成";
});
});
// ✅ 或使用 BeginInvoke 异步封送(不阻塞)
bus.Subscribe<ProgressEvent>(async e =>
{
this.BeginInvoke(() =>
{
progressBar1.Value = e.Percent;
lblProgress.Text = $"{e.Percent}%";
});
});
推荐:封装扩展方法简化使用
public static class ControlExtensions
{
/// <summary>
/// 如果调用线程不是控件的创建线程,则封送到 UI 线程执行
/// </summary>
public static void InvokeIfRequired(this Control control, Action action)
{
if (control.InvokeRequired)
control.Invoke(action);
else
action();
}
/// <summary>
/// 带返回值版本
/// </summary>
public static T InvokeIfRequired<T>(this Control control, Func<T> func)
{
if (control.InvokeRequired)
return (T)control.Invoke(func);
else
return func();
}
}
使用方式:
// 在订阅者中安全地更新 UI
bus.Subscribe<OrderCreatedEvent>(async e =>
{
await ProcessOrderAsync(e);
// 无论当前线程是否为 UI 线程,都能安全执行
textBox1.InvokeIfRequired(() =>
{
textBox1.Text = $"订单号: {e.Id}";
});
dataGridView1.InvokeIfRequired(() =>
{
dataGridView1.Rows.Add(e.Id, e.CustomerName, e.Amount);
});
// 带返回值的调用
var currentText = lblStatus.InvokeIfRequired(() => lblStatus.Text);
});
WPF 场景
WPF 使用 Dispatcher 而非 Invoke:
bus.Subscribe<OrderCreatedEvent>(async e =>
{
await ProcessOrderAsync(e);
Application.Current.Dispatcher.Invoke(() =>
{
OrderIdTextBlock.Text = e.Id;
StatusLabel.Content = "处理完成";
});
});
总结
| 场景 | 发布方式 | 订阅者类型 | 能否直接更新 UI |
|---|---|---|---|
| 同步 | Publish() |
Action<TEvent> |
✅ 可以(同线程) |
| 异步 | PublishAsync() |
Action<TEvent> |
❌ 需要 Invoke |
| 同步 | Publish() |
Func<TEvent, Task> |
❌ 需要 Invoke |
| 异步 | PublishAsync() |
Func<TEvent, Task> |
❌ 需要 Invoke |
最佳实践:无论同步还是异步,统一使用 InvokeIfRequired 可以确保代码在任何线程上下文中都能安全运行,是最推荐的做法。
API 参考
EventBus 主类
| 方法 | 说明 |
|---|---|
Publish<TEvent>(event) |
同步发布事件 |
PublishAsync<TEvent>(event, ct) |
异步发布事件 |
Subscribe<TEvent>(handler, options?) |
订阅同步处理器 |
Subscribe<TEvent>(asyncHandler, options?) |
订阅异步处理器 |
GetSubscriptionCount<TEvent>() |
获取订阅者数量 |
Purge() |
清理失效的弱引用订阅者 |
Dispose() |
释放总线 |
别名方法(JS/TS 风格)
为了方便来自 JavaScript/TypeScript 生态的开发者(熟悉 Node.js 的 EventEmitter 或浏览器的 addEventListener 模式),EventBus 提供了一套与 JS 生态命名一致的别名 API:
| 别名方法 | 对应原始方法 | JS 生态对应 |
|---|---|---|
Emit<TEvent>(event) |
Publish<TEvent>(event) |
Node.js emitter.emit('event', data) |
EmitAsync<TEvent>(event, ct) |
PublishAsync<TEvent>(event, ct) |
Node.js 异步 emit |
On<TEvent>(handler, options?) |
Subscribe<TEvent>(handler, options?) |
Node.js emitter.on('event', handler) / 浏览器 addEventListener |
Off(调用 IDisposable.Dispose()) |
subscription.Dispose() |
Node.js emitter.off('event', handler) |
使用示例:
var bus = new EventBus();
// JS 风格:on / emit
bus.On<OrderCreated>(e => Console.WriteLine($"Order {e.Id} created"));
bus.Emit(new OrderCreated("ORD001"));
// 异步版本
bus.On<PaymentProcessed>(async e => await ProcessPayment(e));
await bus.EmitAsync(new PaymentProcessed("PAY001"));
// 取消订阅(等价于 Node.js 的 emitter.off)
var subscription = bus.On<OrderCreated>(e => HandleOrder(e));
subscription.Dispose(); // 等价于 off
两种风格的对比:
// 风格一:.NET 传统命名(推荐 C# 项目使用)
bus.Publish(new OrderCreated("ORD001"));
bus.Subscribe<OrderCreated>(e => Console.WriteLine(e.Id));
// 风格二:JS/TS 风格命名(推荐熟悉 JS 的开发者使用)
bus.Emit(new OrderCreated("ORD001"));
bus.On<OrderCreated>(e => Console.WriteLine(e.Id));
// 两者完全等价,底层是同一个实现
为什么提供别名:
- 降低学习成本 - 从 Node.js 或前端迁移到 .NET 的开发者可以直接使用熟悉的
on/emit命名 - 代码迁移友好 - JS/TS 项目迁移到 .NET 时,只需将
on('event', fn)改为On<TEvent>(fn),emit('event', data)改为Emit(data) - 无性能损耗 - 别名方法是直接转发调用,零额外开销
- 团队协作灵活 - 团队可以根据成员背景选择统一的命名风格
SubscriptionOptions
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
priority |
int | 0 | 执行优先级,越大越先执行 |
isWeakReference |
bool | true | 是否使用弱引用 |
filter |
Func<object, bool>? | null | 条件过滤器 |
maxRetries |
int | 0 | 失败重试次数 |
内置中间件
| 中间件 | 说明 |
|---|---|
LoggingMiddleware |
记录事件处理日志(开始、成功、失败、耗时) |
RetryMiddleware |
自动重试失败处理器,指数退避策略 |
ValidationMiddleware |
事件验证拦截器,验证失败时抛出异常 |
管道中间件扩展指南
中间件的执行模型
EventBus 的管道中间件采用 洋葱模型(责任链模式)。中间件按照注册的顺序从外向内包裹,执行流程如下:
注册顺序:Middleware A → Middleware B → Middleware C → Handler
实际执行(洋葱模型):
Middleware A(before)
Middleware B(before)
Middleware C(before)
★ Handler(事件处理器)
Middleware C(after)
Middleware B(after)
Middleware A(after)
每个中间件都可以在 next() 调用前后执行自己的逻辑,也可以在不调用 next() 的情况下中断后续执行。
如何扩展自定义中间件
只需实现 IMiddleware 接口并注册到管道即可:
public interface IMiddleware
{
Task InvokeAsync<TEvent>(TEvent @event, Func<Task> next, CancellationToken cancellationToken)
where TEvent : class;
}
示例 1:性能监控中间件
public class PerformanceMiddleware : IMiddleware
{
private readonly ILogger<PerformanceMiddleware> _logger;
private readonly long _thresholdMs; // 性能告警阈值
public PerformanceMiddleware(ILogger<PerformanceMiddleware> logger, long thresholdMs = 1000)
{
_logger = logger;
_thresholdMs = thresholdMs;
}
public async Task InvokeAsync<TEvent>(TEvent @event, Func<Task> next, CancellationToken ct)
{
var sw = System.Diagnostics.Stopwatch.StartNew();
try
{
await next();
}
finally
{
sw.Stop();
if (sw.ElapsedMilliseconds > _thresholdMs)
{
_logger.LogWarning(
"[性能告警] 事件 {EventType} 处理耗时 {Elapsed}ms,超过阈值 {Threshold}ms",
typeof(TEvent).Name, sw.ElapsedMilliseconds, _thresholdMs);
}
}
}
}
// 注册到管道
bus.Pipeline.Use(new PerformanceMiddleware(logger, thresholdMs: 500));
示例 2:审计日志中间件
public class AuditMiddleware : IMiddleware
{
private readonly IEventAuditor _auditor;
public AuditMiddleware(IEventAuditor auditor) => _auditor = auditor;
public async Task InvokeAsync<TEvent>(TEvent @event, Func<Task> next, CancellationToken ct)
{
var auditRecord = new AuditRecord
{
EventType = typeof(TEvent).Name,
Timestamp = DateTime.UtcNow,
Payload = System.Text.Json.JsonSerializer.Serialize(@event)
};
try
{
await next();
auditRecord.Status = "Success";
}
catch (Exception ex)
{
auditRecord.Status = "Failed";
auditRecord.ErrorMessage = ex.Message;
throw;
}
finally
{
await _auditor.RecordAsync(auditRecord);
}
}
}
// 注册
bus.Pipeline.Use(new AuditMiddleware(auditor));
示例 3:事件去重中间件
public class DeduplicationMiddleware : IMiddleware
{
private readonly IMemoryCache _cache;
private readonly TimeSpan _window;
public DeduplicationMiddleware(IMemoryCache cache, TimeSpan? window = null)
{
_cache = cache;
_window = window ?? TimeSpan.FromSeconds(5);
}
public async Task InvokeAsync<TEvent>(TEvent @event, Func<Task> next, CancellationToken ct)
{
var key = $"{typeof(TEvent).Name}:{GenerateHash(@event)}";
if (_cache.TryGetValue(key, out _))
{
// 重复事件,跳过处理
return;
}
_cache.Set(key, true, _window);
await next();
}
private string GenerateHash(object obj)
{
// 根据事件内容生成唯一标识
return System.Text.Json.JsonSerializer.Serialize(obj).GetHashCode().ToString();
}
}
// 注册
bus.Pipeline.Use(new DeduplicationMiddleware(cache));
示例 4:权限校验中间件
public class AuthorizationMiddleware : IMiddleware
{
private readonly Func<object, Type, bool> _authorizer;
public AuthorizationMiddleware(Func<object, Type, bool> authorizer)
{
_authorizer = authorizer;
}
public async Task InvokeAsync<TEvent>(TEvent @event, Func<Task> next, CancellationToken ct)
{
if (!_authorizer(@event, typeof(TEvent)))
{
throw new UnauthorizedAccessException(
$"无权处理事件: {typeof(TEvent).Name}");
}
await next();
}
}
// 注册
bus.Pipeline.Use(new AuthorizationMiddleware((evt, type) =>
HasPermission(CurrentUser, type.Name)));
中间件注册的最佳实践
var bus = new EventBus();
// 推荐顺序:
// 1. 验证类中间件(最先执行,快速失败)
bus.Pipeline.Use(new ValidationMiddleware(e => Validate(e)));
// 2. 安全类中间件
bus.Pipeline.Use(new AuthorizationMiddleware(Authorize));
// 3. 去重/幂等中间件
bus.Pipeline.Use(new DeduplicationMiddleware(cache));
// 4. 日志/审计中间件
bus.Pipeline.Use(new LoggingMiddleware());
bus.Pipeline.Use(new AuditMiddleware(auditor));
// 5. 性能监控中间件
bus.Pipeline.Use(new PerformanceMiddleware(logger));
// 6. 重试中间件(通常最后,避免重复重试已记录的事件)
bus.Pipeline.Use(new RetryMiddleware(maxRetries: 3));
管道中间件 vs AOP(日志场景分析)
对于日志记录等横切关注点,管道中间件和 AOP 两种方案各有适用场景:
| 对比维度 | 管道中间件(推荐) | AOP(如 Castle DynamicProxy) |
|---|---|---|
| 适用范围 | 仅限 EventBus 事件处理流程 | 全局所有方法调用 |
| 事件上下文 | 可获取完整事件信息(类型、过滤器、优先级) | 只能获取方法参数,无事件语义 |
| 性能 | 无代理开销,直接调用 | 需要动态代理,每次调用有额外开销 |
| 事件类型感知 | typeof(TEvent) 泛型类型,编译期可知 |
只能运行时反射获取 |
| 过滤控制 | 可根据事件类型决定是否跳过日志 | 需额外配置哪些类/方法需要拦截 |
| 与 EventBus 集成度 | 原生支持,共享同一管道 | 独立于 EventBus,需要额外配置 |
| 异步友好 | 原生 async/await 支持 | 部分 AOP 框架对异步支持不完善 |
| 依赖 | 零额外依赖 | 需引入 AOP 框架(Castle、AspectInjector 等) |
结论:
- 日志、审计、监控 等针对 EventBus 事件流的场景,管道中间件 是最佳选择。它天然适配事件总线的工作流,可以获取事件类型、过滤器、优先级等上下文信息,且零额外开销。
- AOP 更适合全局性的横切关注点,如方法级日志、事务管理、缓存等,这些需要拦截任意方法调用而非仅事件流的场景。
实际建议:两者可以配合使用。管道中间件负责事件总线层面的横切逻辑,AOP 负责业务方法层面的横切逻辑,各司其职。
单元测试模块
概述
单元测试位于 tests/EventBus.Core.Tests/ 项目,基于 xUnit 框架编写,覆盖 EventBus 核心功能的全部场景。当前共包含 15 个测试用例,全部通过。
为什么用 xUnit?
xUnit 是 .NET 生态中最流行的单元测试框架之一,相比 MSTest 和 NUnit 有以下优势:
- 简洁的 API - 通过
[Fact]和[Theory]属性标记测试方法,语义清晰 - 灵活的断言 -
Assert.Equal、Assert.Throws、Assert.NotNull等丰富的断言方法 - 异常测试 -
Record.Exception()优雅地捕获异常进行验证 - 异步测试原生支持 - 直接
async Task返回,无需额外包装 - 广泛的 IDE 支持 - Visual Studio、Rider、VS Code 均可直接运行和调试
测试结构
测试项目分为三个测试类,每个类关注不同的功能维度:
1. EventBusBasicTests(基础功能测试)- 8 个用例
覆盖发布/订阅的核心场景:
| 测试方法 | 测试内容 |
|---|---|
Publish_NoSubscribers_ShouldNotThrow |
没有订阅者时发布不抛出异常 |
Subscribe_SyncHandler_ShouldReceiveEvent |
同步订阅能正确接收事件 |
Subscribe_AsyncHandler_ShouldReceiveEvent |
异步订阅能正确接收事件 |
MultipleSubscribers_ShouldAllReceive |
多个订阅者都能收到事件 |
Unsubscribe_ShouldStopReceivingEvents |
取消订阅后不再接收事件 |
GetSubscriptionCount_ShouldReturnCorrectCount |
订阅数量统计正确 |
Purge_ShouldRemoveDeadWeakReferences |
弱引用订阅者 GC 后能被清理 |
Dispose_ShouldClearAllSubscriptions |
Dispose 后再次发布抛出 ObjectDisposedException |
2. EventBusOptionsTests(选项配置测试)- 3 个用例
覆盖 SubscriptionOptions 的各项配置:
| 测试方法 | 测试内容 |
|---|---|
Subscribe_WithFilter_ShouldOnlyReceiveMatchingEvents |
过滤器只接收匹配的事件 |
Subscribe_WithPriority_ShouldExecuteInOrder |
高优先级订阅者先执行 |
Subscribe_StrongReference_ShouldNotBeCollected |
强引用订阅者不会被 GC 回收 |
3. EventBusPipelineTests(管道中间件测试)- 4 个用例
覆盖中间件管道的行为:
| 测试方法 | 测试内容 |
|---|---|
Pipeline_WithLoggingMiddleware_ShouldLogExecution |
日志中间件记录事件处理过程 |
Pipeline_WithRetryMiddleware_ShouldRetryOnFailure |
重试中间件在失败时自动重试 |
Pipeline_WithValidationMiddleware_ShouldRejectInvalidEvents |
验证中间件拦截不合法的事件 |
Pipeline_MultipleMiddleware_ShouldExecuteInOrder |
多个中间件按洋葱模型顺序执行 |
xUnit 核心用法解析
基本测试结构(AAA 模式):
[Fact]
public void Subscribe_SyncHandler_ShouldReceiveEvent()
{
// Arrange(准备)- 创建测试对象和初始状态
var bus = new EventBus();
TestEvent? receivedEvent = null;
// Act(执行)- 调用被测试的方法
bus.Subscribe<TestEvent>(e => receivedEvent = e);
bus.Publish(new TestEvent { Message = "Test Message" });
// Assert(验证)- 检查结果是否符合预期
Assert.NotNull(receivedEvent);
Assert.Equal("Test Message", receivedEvent.Message);
}
异常测试:
// 方式一:Record.Exception 捕获异常后验证
var exception = Record.Exception(() => bus.Publish(evt));
Assert.Null(exception); // 期望不抛异常
// 方式二:Assert.Throws 期望抛出特定异常
Assert.Throws<ObjectDisposedException>(() => bus.Publish(new TestEvent()));
异步测试:
[Fact]
public async Task Subscribe_AsyncHandler_ShouldReceiveEvent()
{
var bus = new EventBus();
TestEvent? receivedEvent = null;
bus.Subscribe<TestEvent>(async e =>
{
await Task.Delay(10);
receivedEvent = e;
});
await bus.PublishAsync(new TestEvent { Message = "Async Test" });
Assert.NotNull(receivedEvent);
}
如何运行测试
方式一:命令行运行(控制台测试)
# 运行所有测试
dotnet test
# 指定项目运行
dotnet test tests/EventBus.Core.Tests/EventBus.Core.Tests.csproj
# 带详细输出
dotnet test --logger:"console;verbosity=detailed"
# 只运行包含 "Retry" 的测试
dotnet test --filter "Retry"
方式二:WinForms 图形界面运行器(推荐)
项目提供了一个 WinForms 图形化测试运行器,可以方便地选择、运行和查看测试结果:
# 启动 WinForms 测试运行器
dotnet run --project tests/EventBus.Core.Tests.WinRunner/
WinForms 运行器功能:
- 测试列表:以表格形式展示全部 15 个测试用例,带有勾选框
- 全选/取消全选:一键选择或取消所有测试
- 运行全部:按顺序执行所有测试,实时显示进度
- 运行选中:只运行勾选的测试用例
- 状态显示:通过显示绿色,失败显示红色,一目了然
- 详细输出:底部日志区显示每个测试的执行时间和结果信息
- 统计汇总:顶部状态栏显示总计/通过/失败数量
测试覆盖的功能矩阵
| 功能 | 是否有测试 | 测试文件 |
|---|---|---|
| 同步发布/订阅 | 是 | EventBusBasicTests |
| 异步发布/订阅 | 是 | EventBusBasicTests |
| 多订阅者 | 是 | EventBusBasicTests |
| 取消订阅 | 是 | EventBusBasicTests |
| 弱引用自动清理 | 是 | EventBusBasicTests |
| 生命周期管理 (Dispose) | 是 | EventBusBasicTests |
| 过滤器 (Filter) | 是 | EventBusOptionsTests |
| 优先级 (Priority) | 是 | EventBusOptionsTests |
| 强引用/弱引用 | 是 | EventBusOptionsTests |
| 日志中间件 | 是 | EventBusPipelineTests |
| 重试中间件 | 是 | EventBusPipelineTests |
| 验证中间件 | 是 | EventBusPipelineTests |
| 洋葱模型中间件链 | 是 | EventBusPipelineTests |
框架支持
| 框架 | 同步 | 异步 | 弱引用 | 管道 | Channel |
|---|---|---|---|---|---|
| .NET Framework 4.6.2 | 支持 | 支持 | 支持 | 支持 | 不支持 |
| .NET Standard 2.0 | 支持 | 支持 | 支持 | 支持 | 不支持 |
| .NET 6.0+ | 支持 | 支持 | 支持 | 支持 | 支持 |
| .NET 8.0+ | 支持 | 支持 | 支持 | 支持 | 支持 |
| .NET 9.0+ | 支持 | 支持 | 支持 | 支持 | 支持 |
项目结构
EventBus/
├── src/
│ ├── EventBus.Core/ # 核心库(零依赖)
│ │ ├── EventBus.cs # 总线主类
│ │ ├── IEventBus.cs # 总线接口
│ │ ├── SubscriptionOptions.cs # 订阅选项配置
│ │ ├── Internal/ # 内部实现
│ │ │ ├── EventDispatcher.cs # 事件分发器
│ │ │ ├── SubscriptionManager.cs # 订阅管理器
│ │ │ └── SubscriptionEntry.cs # 订阅条目
│ │ ├── Extensions/ # 扩展功能
│ │ │ └── EventAggregator.cs # 事件聚合器
│ │ └── Pipeline/ # 管道中间件
│ │ ├── EventPipeline.cs # 管道实现
│ │ ├── IMiddleware.cs # 中间件接口
│ │ └── BuiltIn/ # 内置中间件
│ │ ├── LoggingMiddleware.cs
│ │ ├── RetryMiddleware.cs
│ │ └── ValidationMiddleware.cs
│ ├── EventBus.Channels/ # Channel 高性能总线 (.NET 6+)
│ │ ├── ChannelEventBus.cs
│ │ ├── ChannelOptions.cs
│ │ └── EventEnvelope.cs
│ └── EventBus.DependencyInjection/ # DI 集成
│ ├── ServiceCollectionExtensions.cs
│ └── EventBusOptions.cs
├── tests/
│ ├── EventBus.Core.Tests/ # 单元测试(xUnit,15 个用例)
│ │ ├── EventBusBasicTests.cs
│ │ ├── EventBusOptionsTests.cs
│ │ ├── EventBusPipelineTests.cs
│ │ └── TestEvents.cs
│ └── EventBus.Core.Tests.WinRunner/ # WinForms 图形化测试运行器
│ ├── Program.cs
│ ├── TestRunnerForm.cs
│ └── Tests/
│ └── TestRunner.cs
├── samples/
│ └── Sample.Basic/ # 基础使用示例
│ └── Program.cs
├── EventBus.sln
└── README.md
性能
| 场景 | 延迟 | 分配 |
|---|---|---|
| 同步发布(无订阅) | < 10ns | 0B |
| 同步发布(1个订阅) | < 100ns | 0B |
| 同步发布(10个订阅) | < 500ns | ~100B |
| 异步发布(10个订阅) | < 5μs | ~500B |
设计原则
- 零反射热路径 - 使用委托缓存,避免运行时类型发现
- 默认弱引用 - 订阅者释放后自动清理,彻底消除内存泄漏
- 零依赖核心 - 不强制依赖 DI 或其他库,按需引用
- 渐进增强 - 旧框架获得基础功能,新框架获得高性能特性
打包与发布
打包命令
项目采用多目标框架打包(每个项目包含多个运行时),生成的 .nupkg 文件包含所有目标框架的 DLL。
# 进入项目根目录
cd e:\工具\EventBus
# 创建输出目录
mkdir nupkgs
# 分别打包三个项目
dotnet pack src/EventBus.Core/EventBus.Core.csproj -c Release -o ./nupkgs
dotnet pack src/EventBus.Channels/EventBus.Channels.csproj -c Release -o ./nupkgs
dotnet pack src/EventBus.DependencyInjection/EventBus.DependencyInjection.csproj -c Release -o ./nupkgs
打包完成后,nupkgs/ 目录下会生成以下文件:
| 文件名 | 说明 |
|---|---|
EventBus.Core.1.0.0.nupkg |
核心库主包(包含 net462、netstandard2.0、net6.0、net8.0、net9.0) |
EventBus.Core.1.0.0.snupkg |
核心库符号包(用于调试) |
EventBus.Channels.1.0.0.nupkg |
Channel 高性能总线包(包含 net6.0、net8.0、net9.0) |
EventBus.Channels.1.0.0.snupkg |
Channel 符号包 |
EventBus.DependencyInjection.1.0.0.nupkg |
DI 集成包(包含 netstandard2.0、net6.0、net8.0、net9.0) |
EventBus.DependencyInjection.1.0.0.snupkg |
DI 符号包 |
上传到 NuGet.org
方式一:dotnet nuget 命令(推荐)
# 1. 登录 NuGet.org 获取 API Key
# https://www.nuget.org/account/apikeys
# 2. 上传所有包(一次性上传)
dotnet nuget push "./nupkgs/*.nupkg" --api-key YOUR_API_KEY --source https://api.nuget.org/v3/index.json
# 3. 或单独上传某个包
dotnet nuget push "./nupkgs/EventBus.Core.1.0.0.nupkg" --api-key YOUR_API_KEY --source https://api.nuget.org/v3/index.json
方式二:通过网页上传
- 登录 https://www.nuget.org
- 点击 "Upload" 按钮
- 拖拽或选择
.nupkg文件上传 - 填写验证信息后提交
发布前注意事项
1. 修改元数据信息
在 src/Directory.Build.props 中有几处需要替换为你的真实信息:
| 配置项 | 当前占位符 | 修改为 |
|---|---|---|
Authors |
YourName |
你的真实姓名或组织名 |
Company |
YourCompany |
公司名(个人项目可留空或删除此行) |
PackageProjectUrl |
github.com/yourusername/EventBus |
你的 GitHub 仓库地址 |
RepositoryUrl |
同上 | 同上 |
Copyright |
Copyright © 2026 YourName |
你的版权声明 |
2. 版本号管理
发布新版本时,需要修改以下三个项目文件中的 <Version> 标签:
<Version>1.0.1</Version>
<Version>1.0.1</Version>
<Version>1.0.1</Version>
版本命名遵循语义化版本规范(SemVer 2.0):
主版本号.次版本号.修订号- 主版本号:不兼容的 API 变更
- 次版本号:向后兼容的功能添加
- 修订号:向后兼容的 bug 修复
3. 依赖关系
三个包之间存在依赖关系:
EventBus.Core (零依赖,可独立使用)
↑
├── EventBus.Channels (依赖 EventBus.Core)
└── EventBus.DependencyInjection (依赖 EventBus.Core 和 EventBus.Channels)
上传顺序建议:先上传 EventBus.Core,再上传 EventBus.Channels,最后上传 EventBus.DependencyInjection。如果顺序颠倒,NuGet.org 会提示依赖包不存在。
4. 上传顺序注意事项
# 推荐顺序上传
dotnet nuget push "./nupkgs/EventBus.Core.1.0.0.nupkg" --api-key YOUR_KEY --source https://api.nuget.org/v3/index.json
dotnet nuget push "./nupkgs/EventBus.Channels.1.0.0.nupkg" --api-key YOUR_KEY --source https://api.nuget.org/v3/index.json
dotnet nuget push "./nupkgs/EventBus.DependencyInjection.1.0.0.nupkg" --api-key YOUR_KEY --source https://api.nuget.org/v3/index.json
5. 验证包内容
上传前可以验证 .nupkg 的内容是否正确:
# 查看包包含的文件
dotnet nuget verify ./nupkgs/EventBus.Core.1.0.0.nupkg
# 或使用 NuGet Package Explorer 工具(图形界面)
# https://github.com/NuGetPackageExplorer/NuGetPackageExplorer
6. 符号包上传
符号包(.snupkg)需要上传到 NuGet.org 的符号服务器:
# 同时上传主包和符号包
dotnet nuget push "./nupkgs/EventBus.Core.1.0.0.nupkg" --api-key YOUR_KEY --source https://api.nuget.org/v3/index.json
dotnet nuget push "./nupkgs/EventBus.Core.1.0.0.snupkg" --api-key YOUR_KEY --source https://api.nuget.org/v3/index.json
用户在使用你的库时,可以在调试时看到源码和符号,方便问题排查。
7. 预览版发布
如果是发布测试版或预览版,版本号后加预览标识符:
<Version>1.0.1-preview.1</Version>
<Version>1.0.1-beta.2</Version>
<Version>1.0.1-rc.1</Version>
NuGet 会自动识别这些版本为预览版,用户需要加上 --prerelease 参数才能安装:
dotnet add package EventBus.Core --prerelease
8. CI/CD 自动化
建议在 GitHub Actions 中配置自动打包和发布:
# .github/workflows/publish.yml
name: Publish to NuGet
on:
push:
tags:
- 'v*'
jobs:
publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-dotnet@v4
with:
dotnet-version: '8.0.x'
- run: dotnet build -c Release
- run: dotnet test
- run: dotnet pack src/EventBus.Core/EventBus.Core.csproj -c Release -o ./nupkgs
- run: dotnet pack src/EventBus.Channels/EventBus.Channels.csproj -c Release -o ./nupkgs
- run: dotnet pack src/EventBus.DependencyInjection/EventBus.DependencyInjection.csproj -c Release -o ./nupkgs
- run: dotnet nuget push "./nupkgs/*.nupkg" --api-key ${{ secrets.NUGET_API_KEY }} --source https://api.nuget.org/v3/index.json
License
MIT
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net6.0
- LT.EventBus.Core (>= 1.0.0)
-
net8.0
- LT.EventBus.Core (>= 1.0.0)
-
net9.0
- LT.EventBus.Core (>= 1.0.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on LT.EventBus.Channels:
| Package | Downloads |
|---|---|
|
LT.EventBus.DependencyInjection
Dependency injection integration for EventBus. Provides IServiceCollection extensions for registering EventBus, EventAggregator, and ChannelEventBus. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0 | 115 | 5/18/2026 |
Initial release with ChannelEventBus supporting IAsyncEnumerable consumption and backpressure control.