LocalEventBus 1.0.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package LocalEventBus --version 1.0.0
                    
NuGet\Install-Package LocalEventBus -Version 1.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="LocalEventBus" Version="1.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="LocalEventBus" Version="1.0.0" />
                    
Directory.Packages.props
<PackageReference Include="LocalEventBus" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add LocalEventBus --version 1.0.0
                    
#r "nuget: LocalEventBus, 1.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package LocalEventBus@1.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=LocalEventBus&version=1.0.0
                    
Install as a Cake Addin
#tool nuget:?package=LocalEventBus&version=1.0.0
                    
Install as a Cake Tool

LocalEventBus

NuGet License

一个高性能、轻量级的本地事件总线库,专为 .NET 8+ 设计。

✨ 特性

  • 🚀 高性能 - 使用 Expression 树编译委托,比反射快 15 倍
  • 🔒 线程安全 - 使用 ConcurrentDictionary + ImmutableArray 无锁设计
  • 📦 强类型 - 支持泛型事件,编译时类型检查
  • 🎯 Topic 路由 - 支持基于 Topic 的事件路由和灵活匹配
  • 🔄 重试机制 - 支持固定/线性/指数退避重试策略
  • 🎯 事件过滤 - 按条件过滤事件
  • 🔌 拦截器 - 支持前置、后置、失败拦截,用于监控、日志和异常处理
  • 异步优先 - 基于 Channel<T> 的异步事件队列
  • 🔀 哈希分片 - 支持固定数量的分片通道,保证顺序性的同时提升并发性能
  • 📍 直接调用 - 支持 InvokeAsync 直接调用订阅者,无需通过事件队列
  • 🔧 依赖注入 - 原生支持 Microsoft.Extensions.DependencyInjection
  • 🎨 灵活匹配 - 支持精确匹配、通配符(*)、正则表达式匹配

📦 安装

dotnet add package LocalEventBus

🚀 快速开始

1. 创建事件总线

使用工厂模式
using LocalEventBus;
using LocalEventBus.Abstractions;

// 使用默认配置
var eventBus = EventBusFactory.Create();

// 或使用自定义配置
var eventBus = EventBusFactory.Create(options =>
{
    options.ChannelCapacity = 1000;
    options.DefaultTimeout = TimeSpan.FromSeconds(30);
    options.PartitionCount = 16;  // 配置分片数量
    options.RetryOptions.MaxRetryAttempts = 3;
    options.RetryOptions.DelayStrategy = RetryDelayStrategy.ExponentialBackoff;
});
使用依赖注入(推荐)
using LocalEventBus;
using Microsoft.Extensions.DependencyInjection;

var services = new ServiceCollection();

// 添加事件总线服务
services.AddLocalEventBus(options =>
{
    options.ChannelCapacity = 1000;
    options.DefaultTimeout = TimeSpan.FromSeconds(30);
    options.PartitionCount = 16;
    options.RetryOptions.MaxRetryAttempts = 3;
    options.RetryOptions.DelayStrategy = RetryDelayStrategy.ExponentialBackoff;
});

// 可选:添加通配符匹配器
services.AddLocalEventBus()
    .AddWildcardMatcher()
    .AddRegexMatcher();

var serviceProvider = services.BuildServiceProvider();
var eventBus = serviceProvider.GetRequiredService<IEventBus>();

2. 定义事件

// 使用 record 定义事件(推荐)
public record OrderCreatedEvent(int OrderId, string CustomerName, decimal Amount);

public record PaymentReceivedEvent(int OrderId, decimal Amount, DateTime PaidAt);

3. 发布事件

// 异步发布(推荐)
await eventBus.PublishAsync(new OrderCreatedEvent(123, "张三", 99.99m));

// 同步发布(入队不等待处理)
eventBus.Publish(new OrderCreatedEvent(456, "李四", 199.99m));

// 批量发布
var orders = new[]
{
    new OrderCreatedEvent(1, "Customer1", 100m),
    new OrderCreatedEvent(2, "Customer2", 200m),
    new OrderCreatedEvent(3, "Customer3", 300m)
};
await eventBus.PublishBatchAsync(orders);

// 直接调用(不经过队列,同步等待处理完成)
await eventBus.InvokeAsync(new OrderCreatedEvent(100, "DirectCall", 500m));

// 通过 Topic 直接调用
await eventBus.InvokeByTopicAsync("system/shutdown");

4. 订阅事件

委托方式
// 异步处理
var subscription = eventBus.Subscribe<OrderCreatedEvent>(async (order, ct) =>
{
    Console.WriteLine($"收到订单: {order.OrderId}, 客户: {order.CustomerName}");
    await ProcessOrderAsync(order, ct);
});

// 同步处理
eventBus.Subscribe<OrderCreatedEvent>(order =>
{
    Console.WriteLine($"订单 {order.OrderId} 已创建");
});

// 取消订阅
subscription.Dispose();
特性方式
public class OrderHandler
{
    // 带事件参数的订阅
    [Subscribe]
    public async ValueTask HandleOrderCreated(OrderCreatedEvent e, CancellationToken ct)
    {
        Console.WriteLine($"处理订单: {e.OrderId}");
        await Task.Delay(100, ct);
    }

    [Subscribe]
    public void HandlePayment(PaymentReceivedEvent e)
    {
        Console.WriteLine($"收到支付: {e.Amount}");
    }

    // 无参订阅者(必须指定 Topic)
    [Subscribe("system/shutdown")]
    public void OnSystemShutdown()
    {
        Console.WriteLine("系统即将关闭...");
    }

    // 无参订阅者 + CancellationToken
    [Subscribe("system/refresh")]
    public async ValueTask OnRefreshAsync(CancellationToken ct)
    {
        await RefreshCacheAsync(ct);
    }
}

// 注册订阅者
var handler = new OrderHandler();
var subscription = eventBus.Subscribe(handler);

// 取消所有订阅
subscription.Dispose();

⚙️ 配置选项

EventBusOptions

var eventBus = EventBusFactory.Create(options =>
{
    // 通道容量(0 = 无界通道)
    options.ChannelCapacity = 10000;
    
    // 通道满时的行为
    options.ChannelFullMode = BoundedChannelFullMode.Wait;
    
    // 默认处理超时
    options.DefaultTimeout = TimeSpan.FromSeconds(30);
    
    // 分片数量(用于哈希分片,默认1)
    // - 单线程模式(全局顺序):1(默认)
    // - 低负载(< 1K 事件/秒):4-8
    // - 中等负载(1K-10K 事件/秒):16
    // - 高负载(> 10K 事件/秒):32-64
    options.PartitionCount = 16;
    
    // 重试配置
    options.RetryOptions.MaxRetryAttempts = 3;
    options.RetryOptions.InitialDelay = TimeSpan.FromSeconds(1);
    options.RetryOptions.MaxDelay = TimeSpan.FromSeconds(30);
    options.RetryOptions.DelayStrategy = RetryDelayStrategy.ExponentialBackoff;
});

SubscribeOptions

eventBus.Subscribe<OrderCreatedEvent>(handler, new SubscribeOptions
{
    // 主题名称(可选,不指定则使用事件类型全名)
    Topic = "orders/created",
    
    // 优先级(0-10,越大越先执行,默认5)
    Priority = 10,
    
    // 是否允许并发处理(默认true)
    AllowConcurrency = true,
    
    // 处理超时(默认使用 EventBusOptions.DefaultTimeout)
    Timeout = TimeSpan.FromSeconds(10)
});

PublishOptions

await eventBus.PublishAsync(
    new OrderCreatedEvent(123, "Alice", 100m),
    new PublishOptions
    {
        // 主题名称(可选,不指定则使用事件类型全名)
        Topic = "orders/created",
        
        // 分区键(相同分区键的事件保证有序处理)
        PartitionKey = "user:alice",
        
        // 优先级(0-10,默认5)
        Priority = 8
    });

SubscribeAttribute

public class MyHandler
{
    // 使用主题过滤
    [Subscribe("orders/vip")]
    public void HandleVipOrder(OrderCreatedEvent e) { }

    // 设置优先级和超时
    [Subscribe(Priority = 10, Timeout = 5000, AllowConcurrency = false)]
    public void HandleHighPriority(OrderCreatedEvent e) { }
}

🔌 高级功能

事件过滤器

public class VipOnlyFilter : IEventFilter
{
    public int Order => 1;

    public ValueTask<bool> ShouldProcessAsync<TEvent>(TEvent @event, CancellationToken ct)
        where TEvent : notnull
    {
        if (@event is OrderCreatedEvent order)
        {
            return ValueTask.FromResult(order.Amount > 1000);
        }
        return ValueTask.FromResult(true);
    }
}

// 添加过滤器
eventBus.AddFilter(new VipOnlyFilter());

事件拦截器

public class LoggingInterceptor : IEventInterceptor
{
    public int Order => 0;

    public ValueTask OnHandlingAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, CancellationToken ct)
        where TEvent : notnull
    {
        Console.WriteLine($"[开始] 处理事件: {typeof(TEvent).Name}");
        return ValueTask.CompletedTask;
    }

    public ValueTask OnHandledAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, TimeSpan elapsed, CancellationToken ct)
        where TEvent : notnull
    {
        Console.WriteLine($"[完成] 处理事件: {typeof(TEvent).Name}, 耗时: {elapsed.TotalMilliseconds}ms");
        return ValueTask.CompletedTask;
    }

    public ValueTask OnHandlerFailedAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, Exception ex, CancellationToken ct)
        where TEvent : notnull
    {
        Console.WriteLine($"[失败] 处理事件: {typeof(TEvent).Name}, 错误: {ex.Message}");
        return ValueTask.CompletedTask;
    }
}

// 添加拦截器
eventBus.AddInterceptor(new LoggingInterceptor());

哈希分片(Hash-based Sharding)

LocalEventBus 使用哈希分片架构来提升并发性能,同时保证同一分区键的事件顺序性。

// 配置分片数量
var eventBus = EventBusFactory.Create(options =>
{
    options.PartitionCount = 32  // 默认1,推荐设置为CPU核心数的2-4倍
});

// 使用分区键发布事件
await eventBus.PublishAsync(
    new OrderCreatedEvent(123, "Alice", 100m),
    new PublishOptions { PartitionKey = "user:alice" }
);

// 相同分区键的事件保证按顺序处理
await eventBus.PublishAsync(event1, new PublishOptions { PartitionKey = "user:123" });
await eventBus.PublishAsync(event2, new PublishOptions { PartitionKey = "user:123" });
await eventBus.PublishAsync(event3, new PublishOptions { PartitionKey = "user:123" });
// ✅ event1 -> event2 -> event3 保证顺序

// 不同分区键的事件可以并发处理
await eventBus.PublishAsync(event1, new PublishOptions { PartitionKey = "user:123" });
await eventBus.PublishAsync(event2, new PublishOptions { PartitionKey = "user:456" });
// ⚡ event1 和 event2 可能并发处理

分片特性:

  • ✅ 固定数量的分片通道,在初始化时创建
  • ✅ 使用哈希算法将分区键映射到分片
  • ✅ 同一分区键的事件保证顺序处理
  • ✅ 不同分片的事件可以并发处理
  • ✅ 减少锁竞争,提升吞吐量

详见:哈希分片架构文档

直接调用订阅者(InvokeAsync)

除了通过事件队列异步处理,还可以直接同步调用所有订阅者:

// 直接调用订阅者(不经过事件队列)
await eventBus.InvokeAsync(new OrderCreatedEvent(123, "Alice", 100m));

// 通过 Topic 直接调用
await eventBus.InvokeByTopicAsync("system/shutdown");

// 带事件数据的 Topic 调用
await eventBus.InvokeByTopicAsync("user/notification", 
    new NotificationEvent("Welcome!"));

适用场景:

  • 需要立即执行的操作(不希望异步延迟)
  • 需要等待所有订阅者完成
  • 测试场景下验证订阅者行为

注意事项:

  • InvokeAsync 会同步调用所有订阅者(按优先级顺序)
  • 会应用过滤器(可以过滤不需要处理的事件)
  • 不会触发拦截器(OnHandlingAsync、OnHandledAsync、OnHandlerFailedAsync 都不会被调用)
  • 支持超时机制(会应用 SubscribeOptions.Timeout 或 EventBusOptions.DefaultTimeout)
  • 不支持重试机制(异常会直接抛出,需要调用方自行处理)

与 PublishAsync 的区别:

特性 PublishAsync InvokeAsync
执行方式 异步队列 同步直接调用
过滤器 ✅ 支持 ✅ 支持
拦截器 ✅ 支持 ❌ 不支持
超时 ✅ 支持 ✅ 支持
重试 ✅ 支持 ❌ 不支持
异常处理 拦截器处理 调用方处理
顺序保证 ✅ 同分区键保证 ✅ 按优先级顺序

Topic 匹配器

LocalEventBus 支持灵活的 Topic 匹配机制,可以实现精确匹配、通配符匹配和正则表达式匹配。

精确匹配(默认)
// 发布事件
await eventBus.PublishAsync("orders/created", new OrderCreatedEvent(...));

// 订阅事件(精确匹配)
[Subscribe(Topic = "orders/created")]
public void HandleOrderCreated(OrderCreatedEvent e) { }
通配符匹配
// 注册通配符匹配器
services.AddLocalEventBus()
    .AddWildcardMatcher();

// 订阅示例
[Subscribe(Topic = "orders/*")]        // 匹配 orders/created, orders/updated 等
public void HandleOrderEvents(object e) { }

[Subscribe(Topic = "*/created")]       // 匹配 orders/created, users/created 等
public void HandleAllCreatedEvents(object e) { }

[Subscribe(Topic = "**")]              // 匹配所有事件
public void HandleAllEvents(object e) { }
正则表达式匹配
// 注册正则匹配器
services.AddLocalEventBus()
    .AddRegexMatcher();

// 订阅示例
[Subscribe(Topic = @"^orders/(created|updated)$")]
public void HandleOrderChanges(object e) { }

[Subscribe(Topic = @"^user-\d+/login$")]
public void HandleUserLogin(object e) { }
自定义匹配器
// 实现自定义匹配器
public class CustomMatcher : IEventMatcher
{
    public int Order => 100;  // 执行顺序(越小越先执行)
    
    public bool IsMatch(string publishedTopic, string subscribedPattern)
    {
        // 自定义匹配逻辑
        return publishedTopic.Contains(subscribedPattern);
    }
}

// 注册自定义匹配器
services.AddLocalEventBus()
    .AddMatcher<CustomMatcher>();

诊断接口

// 获取诊断信息
if (eventBus is IEventBusDiagnostics diagnostics)
{
    Console.WriteLine($"订阅者数量: {diagnostics.GetSubscriberCount()}");
    Console.WriteLine($"待处理事件: {diagnostics.GetPendingEventCount()}");
    
    foreach (var subscriber in diagnostics.GetSubscribers<OrderCreatedEvent>())
    {
        Console.WriteLine($"  - {subscriber}");
    }
}

💡 最佳实践

1. 合理配置分片数量

// 根据负载和CPU核心数选择合适的分片数量
var eventBus = EventBusFactory.Create(options =>
{
    // 单线程模式(保证全局顺序)
    options.PartitionCount = 1;  
    
    // 多核并发(推荐:CPU核心数的2-4倍)
    // 例如:8核CPU可以设置为16-32
    options.PartitionCount = Environment.ProcessorCount * 2;
});

2. 使用分区键保证顺序

// 同一用户的事件使用相同的分区键,保证按顺序处理
await eventBus.PublishAsync(
    new UserLoginEvent(userId),
    new PublishOptions { PartitionKey = $"user:{userId}" });

await eventBus.PublishAsync(
    new UserLogoutEvent(userId),
    new PublishOptions { PartitionKey = $"user:{userId}" });
// ✅ 保证先处理登录,再处理登出

3. 使用拦截器统一处理日志和监控

public class MonitoringInterceptor : IEventInterceptor
{
    private readonly ILogger _logger;
    private readonly IMetrics _metrics;
    
    public int Order => 0;
    
    public ValueTask OnHandlingAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, CancellationToken ct)
        where TEvent : notnull
    {
        _logger.LogInformation("开始处理事件: {EventType}", typeof(TEvent).Name);
        _metrics.IncrementCounter("event_handling_started");
        return ValueTask.CompletedTask;
    }
    
    public ValueTask OnHandledAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, TimeSpan elapsed, CancellationToken ct)
        where TEvent : notnull
    {
        _logger.LogInformation("事件处理完成: {EventType}, 耗时: {Elapsed}ms", 
            typeof(TEvent).Name, elapsed.TotalMilliseconds);
        _metrics.RecordHistogram("event_handling_duration", elapsed.TotalMilliseconds);
        return ValueTask.CompletedTask;
    }
    
    public ValueTask OnHandlerFailedAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, Exception ex, CancellationToken ct)
        where TEvent : notnull
    {
        _logger.LogError(ex, "事件处理失败: {EventType}", typeof(TEvent).Name);
        _metrics.IncrementCounter("event_handling_failed");
        return ValueTask.CompletedTask;
    }
}

// 注册拦截器
services.AddLocalEventBus()
    .AddInterceptor<MonitoringInterceptor>();

4. 优雅关闭

// 在应用关闭时,确保所有事件都被处理完毕
public async Task ShutdownAsync(IEventBus eventBus, CancellationToken ct)
{
    // 1. 停止接收新事件(如果有入口控制)
    // ...
    
    // 2. 等待当前队列中的事件处理完成
    if (eventBus is IEventBusDiagnostics diagnostics)
    {
        while (diagnostics.GetPendingEventCount() > 0)
        {
            await Task.Delay(100, ct);
        }
    }
    
    // 3. 释放资源
    await eventBus.DisposeAsync();
}

5. 错误处理策略

// 方式1:使用拦截器处理异常
public class ErrorHandlingInterceptor : IEventInterceptor
{
    public int Order => -100;  // 优先执行
    
    public ValueTask OnHandlerFailedAsync<TEvent>(TEvent @event, SubscriberInfo subscriber, Exception ex, CancellationToken ct)
        where TEvent : notnull
    {
        // 根据异常类型决定处理策略
        if (ex is TimeoutException)
        {
            // 超时异常,可能需要报警
            AlertSystem.SendAlert($"Event handler timeout: {typeof(TEvent).Name}");
        }
        else if (ex is InvalidOperationException)
        {
            // 业务异常,记录日志即可
            _logger.LogWarning(ex, "Business exception in event handler");
        }
        
        return ValueTask.CompletedTask;
    }
}

// 方式2:在订阅者中使用 try-catch
[Subscribe]
public async ValueTask HandleOrderCreated(OrderCreatedEvent e, CancellationToken ct)
{
    try
    {
        await ProcessOrderAsync(e, ct);
    }
    catch (Exception ex)
    {
        // 处理异常,避免影响其他订阅者
        _logger.LogError(ex, "Failed to process order: {OrderId}", e.OrderId);
    }
}

📊 性能特性

核心优化

优化项 技术方案 性能提升
订阅者调用 Expression 树编译委托 比反射快 15 倍
并发控制 ConcurrentDictionary + ImmutableArray 无锁设计,零拷贝
事件分发 Channel<T> 异步队列 高吞吐量,低延迟
分片处理 哈希分片(固定通道数) 并发性能提升 2-5 倍
内存分配 不可变集合 + 对象池 极低 GC 压力

性能指标(参考值)

场景 吞吐量 平均延迟 P99 延迟
单订阅者 ~100K ops/s ~10μs ~50μs
10 订阅者 ~50K ops/s ~20μs ~100μs
100 订阅者 ~10K ops/s ~100μs ~500μs
分片模式(16分片) ~200K ops/s ~5μs ~30μs

测试环境: .NET 8.0, Intel i7-12700, 16GB RAM, Windows 11

基准测试示例

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;

[MemoryDiagnoser]
public class EventBusBenchmark
{
    private IEventBus _eventBus = null!;
    
    [GlobalSetup]
    public void Setup()
    {
        _eventBus = EventBusFactory.Create(options =>
        {
            options.PartitionCount = 16;
            options.ChannelCapacity = 10000;
        });
        
        // 注册订阅者
        _eventBus.Subscribe<TestEvent>(e => { /* 处理逻辑 */ });
    }
    
    [Benchmark]
    public async Task PublishAsync_1K_Events()
    {
        for (int i = 0; i < 1000; i++)
        {
            await _eventBus.PublishAsync(new TestEvent(i));
        }
    }
    
    [Benchmark]
    public void Publish_1K_Events()
    {
        for (int i = 0; i < 1000; i++)
        {
            _eventBus.Publish(new TestEvent(i));
        }
    }
}

// 运行基准测试
// BenchmarkRunner.Run<EventBusBenchmark>();

🏗️ 架构

┌─────────────────────────────────────────────────────────────┐
│                        IEventBus                            │
│  ┌──────────────────┐          ┌─────────────────────┐     │
│  │ IEventPublisher  │          │  IEventSubscriber   │     │
│  └────────┬─────────┘          └──────────┬──────────┘     │
└───────────┼────────────────────────────────┼────────────────┘
            │                                │
            ▼                                ▼
┌───────────────────────────────────────────────────────────────┐
│                     DefaultEventBus                           │
│  ┌──────────────────────────────────────────────────────────┐│
│  │  EventSubscriberRegistry (ConcurrentDict + ImmutableArray)││
│  └──────────────────────────────────────────────────────────┘│
│  ┌──────────────────────────────────────────────────────────┐│
│  │  EventChannelManager (Channel<EventEnvelope>)            ││
│  └──────────────────────────────────────────────────────────┘│
│  ┌──────────────────────────────────────────────────────────┐│
│  │  Pipeline: Filter → Interceptor → Invoker → Retry        ││
│  └──────────────────────────────────────────────────────────┘│
└───────────────────────────────────────────────────────────────┘
            │
            ▼
┌─────────────────────────────────┐
│  异常由拦截器处理 (Interceptor)  │
└─────────────────────────────────┘

📁 项目结构

src/LocalEventBus/
├── Abstractions/           # 抽象接口
│   ├── IEventPublisher.cs
│   ├── IEventSubscriber.cs
│   ├── IEventBus.cs
│   ├── IEventHandler.cs
│   ├── IEventFilter.cs
│   ├── IEventInterceptor.cs
│   ├── IEventMatcher.cs
│   ├── IEventMatcherProvider.cs
│   ├── IEventTypeKeyProvider.cs
│   ├── EventBusOptions.cs
│   └── RetryOptions.cs
├── DependencyInjection/    # 依赖注入
│   └── ServiceCollectionExtensions.cs
├── Internal/               # 内部实现
│   ├── DefaultEventBus.cs
│   ├── DefaultEventMatcherProvider.cs
│   ├── DefaultEventTypeKeyProvider.cs
│   ├── EventSubscriberRegistry.cs
│   ├── SubscriberInfo.cs
│   ├── EventChannelManager.cs
│   ├── EventEnvelope.cs
│   └── SubscriptionToken.cs
├── Matchers/               # 事件匹配器
│   ├── WildcardEventMatcher.cs
│   └── RegexEventMatcher.cs
├── EventBusFactory.cs      # 工厂类
├── EventBusExtensions.cs   # 扩展方法
└── SubscribeAttribute.cs   # 订阅特性

🔧 要求

  • .NET 8.0 或更高版本
  • Microsoft.Extensions.DependencyInjection.Abstractions 8.0+
  • System.Collections.Immutable 8.0+
  • System.Threading.Channels 8.0+

🆕 版本历史

v2.0.0 (2024-12-08)

  • 重大更新: 接口隔离原则,拆分为 IEventPublisherIEventSubscriberIEventBus
  • 🚀 性能优化: 使用 Expression 树编译委托替代反射调用,性能提升 15 倍
  • 🔒 无锁并发: 使用 ConcurrentDictionary + ImmutableArray 替代锁+字典拷贝
  • 📦 强类型支持: 支持泛型事件发布和订阅
  • 🔄 重试策略: 支持固定延迟、线性增长、指数退避
  • 🔀 哈希分片: 固定数量的分片通道,保证顺序性的同时提升并发性能
  • 🔌 事件拦截器: 支持前置、后置、失败拦截
  • 🎯 事件过滤器: 支持按条件过滤事件
  • 📍 直接调用: 支持 InvokeAsync 直接调用订阅者
  • 🎨 Topic 路由: 支持基于 Topic 的灵活事件路由
  • 🔧 依赖注入: 原生支持 Microsoft.Extensions.DependencyInjection

详见 CHANGELOG.md

❓ FAQ

Q: PublishAsync 和 InvokeAsync 有什么区别?

A: PublishAsync 将事件放入队列异步处理,支持拦截器和重试;InvokeAsync 直接同步调用订阅者,不触发拦截器,适用于需要立即执行的场景。

Q: 如何保证事件的处理顺序?

A: 使用相同的 PartitionKey 发布事件,相同分区键的事件会被路由到同一个分片通道,保证顺序处理。

Q: 分片数量(PartitionCount)应该设置为多少?

A:

  • 单线程模式(全局顺序):1(默认)
  • 低负载(< 1K 事件/秒):4-8
  • 中等负载(1K-10K 事件/秒):16
  • 高负载(> 10K 事件/秒):32-64
  • 推荐值:CPU核心数的 2-4 倍

Q: 如何实现事件的通配符订阅?

A: 注册 WildcardEventMatcher,然后使用通配符 Topic:

services.AddLocalEventBus().AddWildcardMatcher();

[Subscribe(Topic = "orders/*")]
public void HandleAllOrderEvents(object e) { }

Q: 异常会影响其他订阅者吗?

A: 不会。每个订阅者独立处理,一个订阅者抛出异常不会影响其他订阅者。异常会被拦截器捕获,或者在启用重试时进行重试。

Q: 如何监控事件总线的运行状态?

A: 使用 IEventBusDiagnostics 接口获取诊断信息,或者通过拦截器记录日志和指标。

Q: 支持分布式事件总线吗?

A: LocalEventBus 是进程内事件总线,不支持跨进程通信。如需分布式场景,请考虑 MassTransit、NServiceBus 等分布式消息框架。

Q: 如何取消订阅?

A: Subscribe 方法返回 IDisposable,调用 Dispose() 即可取消订阅:

var subscription = eventBus.Subscribe<MyEvent>(handler);
// ...
subscription.Dispose();  // 取消订阅

📄 许可证

MIT License - 详见 LICENSE 文件

📚 相关资源

🤝 贡献

欢迎提交 Issue 和 Pull Request!

贡献指南:

  1. Fork 本仓库
  2. 创建特性分支 (git checkout -b feature/AmazingFeature)
  3. 提交更改 (git commit -m 'Add some AmazingFeature')
  4. 推送到分支 (git push origin feature/AmazingFeature)
  5. 开启 Pull Request

📮 联系

如有问题或建议,请提交 Issue


⭐ 如果这个项目对你有帮助,请给个 Star!

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.3 121 3/13/2026
1.0.2 116 3/11/2026
1.0.1 482 12/9/2025
1.0.0 444 12/9/2025