mgzhenhong.ASW2.Net.EventChannel 2.1.3

dotnet add package mgzhenhong.ASW2.Net.EventChannel --version 2.1.3
                    
NuGet\Install-Package mgzhenhong.ASW2.Net.EventChannel -Version 2.1.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="mgzhenhong.ASW2.Net.EventChannel" Version="2.1.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="mgzhenhong.ASW2.Net.EventChannel" Version="2.1.3" />
                    
Directory.Packages.props
<PackageReference Include="mgzhenhong.ASW2.Net.EventChannel" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add mgzhenhong.ASW2.Net.EventChannel --version 2.1.3
                    
#r "nuget: mgzhenhong.ASW2.Net.EventChannel, 2.1.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package mgzhenhong.ASW2.Net.EventChannel@2.1.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=mgzhenhong.ASW2.Net.EventChannel&version=2.1.3
                    
Install as a Cake Addin
#tool nuget:?package=mgzhenhong.ASW2.Net.EventChannel&version=2.1.3
                    
Install as a Cake Tool

ASW.Net.EventChannel

License .NET NuGet

📖 项目简介

ASW.Net.EventChannel 是基于 ASW.Net 构建的高性能事件通道框架,提供了发布-订阅模式的实时事件通信解决方案。该框架基于RDC协议,支持事件的可靠传输、多客户端订阅、事件路由和分发等功能,适用于实时通信、系统解耦和事件驱动架构场景。

✨ 核心特性

🚀 高性能事件通信

  • 基于RDC协议: 使用可靠数据通信协议确保事件传输的可靠性
  • 异步事件处理: 全异步的事件发布和订阅机制
  • 事件队列: 内置事件队列防止事件丢失和堆积
  • 流量控制: 支持事件队列满载保护和背压处理

📡 发布-订阅模式

  • 多主题订阅: 客户端可以订阅多个事件主题
  • 事件路由: 服务端智能路由事件到对应的订阅者
  • 广播支持: 支持一对多的事件广播
  • 订阅管理: 动态的订阅和取消订阅

🔧 灵活易用

  • 事件参数: 支持强类型的 NamedArgs 参数传递
  • 回调机制: 丰富的事件回调和状态通知
  • 自动重连: 客户端断线自动重连和订阅恢复
  • 状态监控: 完整的连接状态和队列状态监控

⚡ 企业级特性

  • SSL支持: 支持安全的事件传输
  • 连接池: 高效的连接管理和复用
  • 统计监控: 详细的传输统计和性能监控
  • 错误处理: 完善的异常处理和重试机制

🏗️ 架构设计

组件架构

┌─────────────────────────────────────────────────────────────┐
│                  ASW.Net.EventChannel                      │
├─────────────────────────────────────────────────────────────┤
│  EventChannelServer      │  EventChannelClient             │
│  ├─ 事件路由管理          │  ├─ 事件发布                     │
│  ├─ 订阅者管理           │  ├─ 事件订阅                     │
│  ├─ 事件分发             │  ├─ 事件队列                     │
│  └─ 连接状态监控         │  └─ 自动重连                     │
├─────────────────────────────────────────────────────────────┤
│                    RDC协议层                                │
│  RdcTcpServer | RdcTcpClient | RdcRequest | RdcResponse   │
├─────────────────────────────────────────────────────────────┤
│                    ASW.Net 网络层                           │
│           TcpServer | TcpClient | DotNetty               │
└─────────────────────────────────────────────────────────────┘

事件流转过程

graph TB
    A[客户端发布事件] --> B[EventChannelServer]
    B --> C{查找订阅者}
    C -->|找到订阅者| D[分发事件]
    C -->|无订阅者| E[丢弃事件]
    D --> F[客户端A事件队列]
    D --> G[客户端B事件队列]
    D --> H[客户端C事件队列]
    F --> I[客户端A回调]
    G --> J[客户端B回调]
    H --> K[客户端C回调]
    
    L[客户端订阅事件] --> M[服务端订阅管理]
    M --> N[更新路由表]

订阅管理

服务端订阅字典:
┌─────────────────────────────────┐
│ "user.login" → [Client1, Client3] │
│ "order.create" → [Client2]        │
│ "system.alert" → [Client1, Client2] │
└─────────────────────────────────┘

客户端订阅字典:
┌─────────────────────────────────┐
│ Client1 → ["user.login", "system.alert"] │
│ Client2 → ["order.create", "system.alert"] │
│ Client3 → ["user.login"]         │
└─────────────────────────────────┘

🚀 快速开始

安装

# Package Manager
Install-Package mgzhenhong.ASW.Net.EventChannel

# .NET CLI
dotnet add package mgzhenhong.ASW.Net.EventChannel

# PackageReference
<PackageReference Include="mgzhenhong.ASW.Net.EventChannel" Version="x.x.x" />

事件通道服务端

using ASW.Net.EventChannel;

// 创建事件通道服务器
var server = new EventChannelServer(2208);
server.Name = "EventHub";

// 事件监听
server.Connected += endpoint => {
    Console.WriteLine($"客户端连接: {endpoint}");
};

server.PublishReceived += (endpoint, eventName, args) => {
    Console.WriteLine($"收到事件发布: {eventName} from {endpoint}");
    Console.WriteLine($"事件参数: {args}");
};

server.SubscribeReceived += (endpoint, eventName) => {
    Console.WriteLine($"客户端订阅事件: {eventName} from {endpoint}");
};

server.Published += (endpoint, eventName, args) => {
    Console.WriteLine($"事件已分发: {eventName} to {endpoint}");
};

// 启动服务器
server.Start();
Console.WriteLine($"事件通道服务器已启动: {server.BoundEndPoint}");

事件通道客户端

using ASW.Net.EventChannel;
using ASW.Utility;

// 创建事件通道客户端
var client = new EventChannelClient("127.0.0.1", 2208);
client.Name = "OrderService";

// 自动重连配置
client.AutoReconnectIsEnabled = true;
client.AutoReconnectDelayMs = 3000;

// 连接事件
client.Connected += () => {
    Console.WriteLine("连接事件通道成功");
};

// 连接服务器
client.Connect();

// 订阅事件
client.On("user.login", (eventName, args) => {
    var userId = args.GetValue<string>("userId");
    var loginTime = args.GetValue<DateTime>("loginTime");
    Console.WriteLine($"用户登录: {userId} at {loginTime}");
    
    // 处理用户登录逻辑
    HandleUserLogin(userId, loginTime);
});

// 发布事件
var loginArgs = new NamedArgs();
loginArgs.Add("userId", "user123");
loginArgs.Add("loginTime", DateTime.Now);
loginArgs.Add("ip", "192.168.1.100");

await client.Publish("user.login", loginArgs);

📚 详细使用指南

事件发布

// 简单事件发布
await client.Publish("notification.send", new NamedArgs {
    ["type"] = "email",
    ["recipient"] = "user@example.com",
    ["subject"] = "Welcome"
});

// 复杂对象事件
var orderData = new {
    OrderId = "ORD-001",
    CustomerId = 123,
    Amount = 99.99m,
    Items = new[] { "Product A", "Product B" }
};

var args = new NamedArgs();
args.Add("order", orderData);
args.Add("timestamp", DateTime.Now);
args.Add("source", "WebAPI");

client.Publish("order.created", args);

// 带回调的事件发布
client.PublishResponse += (eventName, result) => {
    if (result.State) {
        Console.WriteLine($"事件 {eventName} 发布成功");
    } else {
        Console.WriteLine($"事件 {eventName} 发布失败: {result.Message}");
    }
};

事件订阅

// 订阅单个事件
client.On("user.logout", (eventName, args) => {
    var userId = args.GetValue<string>("userId");
    var reason = args.GetValue<string>("reason") ?? "normal";
    
    Console.WriteLine($"用户退出: {userId}, 原因: {reason}");
    UpdateUserStatus(userId, "offline");
});

// 订阅多个事件
var events = new[] { "order.created", "order.updated", "order.cancelled" };
foreach (var eventName in events) {
    client.On(eventName, HandleOrderEvent);
}

// 订阅时带错误处理
client.SubscribeResponse += (eventName, result) => {
    if (result.State) {
        Console.WriteLine($"订阅事件 {eventName} 成功");
    } else {
        Console.WriteLine($"订阅事件 {eventName} 失败: {result.Message}");
    }
};

// 事件处理方法
private static void HandleOrderEvent(string eventName, NamedArgs args) {
    var orderId = args.GetValue<string>("orderId");
    
    switch (eventName) {
        case "order.created":
            ProcessNewOrder(orderId, args);
            break;
        case "order.updated":
            UpdateOrderStatus(orderId, args);
            break;
        case "order.cancelled":
            CancelOrder(orderId, args);
            break;
    }
}

取消订阅

// 取消单个事件订阅
client.Off("user.login");

// 取消多个事件订阅
var eventsToUnsubscribe = new[] { "order.created", "order.updated" };
foreach (var eventName in eventsToUnsubscribe) {
    client.Off(eventName);
}

// 取消订阅回调
client.UnSubscribeResponse += (eventName, result) => {
    if (result.State) {
        Console.WriteLine($"取消订阅 {eventName} 成功");
    } else {
        Console.WriteLine($"取消订阅 {eventName} 失败: {result.Message}");
    }
};

🔧 高级功能

事件队列管理

// 配置事件队列大小
client.EventReceiveQueueMaxSize = 5000;  // 最大队列长度

// 监控队列状态
client.EventReceiveQueueFull += () => {
    Console.WriteLine("事件接收队列已满,可能存在处理积压");
    // 可以考虑暂停发布或增加处理能力
};

client.EventReceiveQueueDrain += () => {
    Console.WriteLine("事件接收队列恢复正常");
};

client.EventDiscarded += eventName => {
    Console.WriteLine($"事件 {eventName} 因队列满而被丢弃");
    // 记录丢失的事件,考虑重要事件的持久化
};

// 获取队列统计
Console.WriteLine($"当前队列长度: {client.EventQueueSize}");
Console.WriteLine($"队列最大长度: {client.EventReceiveQueueMaxSize}");

服务端统计监控

// 获取连接统计
Console.WriteLine($"当前连接数: {server.ConnectionCount}");
Console.WriteLine($"累计发送字节: {server.TotalSendBytes}");
Console.WriteLine($"累计接收字节: {server.TotalReceiveBytes}");

// 获取处理统计
Console.WriteLine($"待处理请求数: {server.UnHandledRequestCount}");
Console.WriteLine($"待处理发布请求数: {server.UnHandledPublishRequestCount}");

// 按事件类型统计
foreach (var kvp in server.UnHandledPublishRequestCountDic) {
    Console.WriteLine($"事件 {kvp.Key}: {kvp.Value} 个待处理");
}

// 监控服务器状态
server.BeforePublish += (eventName, subscriberCount) => {
    Console.WriteLine($"即将分发事件 {eventName} 给 {subscriberCount} 个订阅者");
};

server.PublishFail += (endpoint, eventName, error) => {
    Console.WriteLine($"向 {endpoint} 分发事件 {eventName} 失败: {error}");
};

SSL安全通信

// 服务端SSL配置
var certificate = new X509Certificate2("server.pfx", "password");
var server = new EventChannelServer(2208);
server.UseSsl(certificate);

// 客户端SSL配置
var client = new EventChannelClient("secure.example.com", 2208);
client.UseSsl("secure.example.com");  // 服务器域名用于证书验证

自动重连和恢复

// 客户端自动重连配置
client.AutoReconnectIsEnabled = true;
client.AutoReconnectDelayMs = 5000;  // 5秒重连延迟

// 重连时自动恢复订阅
var subscribedEvents = new HashSet<string>();

client.Connected += () => {
    Console.WriteLine("连接已恢复");
    
    // 重新订阅之前的事件
    foreach (var eventName in subscribedEvents) {
        client.On(eventName, GetEventHandler(eventName));
        Console.WriteLine($"恢复订阅: {eventName}");
    }
};

// 记录订阅的事件
client.SubscribeResponse += (eventName, result) => {
    if (result.State) {
        subscribedEvents.Add(eventName);
    }
};

client.UnSubscribeResponse += (eventName, result) => {
    if (result.State) {
        subscribedEvents.Remove(eventName);
    }
};

📊 性能基准

吞吐量测试

测试环境: Intel i7-8700K, 16GB RAM, Windows 10
网络环境: 本地回环 (127.0.0.1)

事件发布:
- 单客户端: 50,000 events/s
- 100并发客户端: 200,000 events/s
- 事件延迟: < 2ms

事件分发:
- 单订阅者: 45,000 events/s  
- 100订阅者: 180,000 events/s
- 分发延迟: < 5ms

连接管理:
- 最大并发连接: 5,000+
- 连接建立速度: 500 conn/s
- 内存占用: ~40KB/连接

队列性能

事件队列处理:
- 队列入队: 100,000 ops/s
- 队列出队: 95,000 ops/s
- 队列内存开销: ~100 bytes/event

订阅管理:
- 订阅查找: O(1) 时间复杂度
- 路由表更新: O(log n)
- 内存占用: ~50 bytes/subscription

🐛 故障排除

常见问题

Q: 事件丢失怎么办?

// 1. 检查队列是否满载
if (client.EventQueueSize >= client.EventReceiveQueueMaxSize * 0.9) {
    Console.WriteLine("队列接近满载,考虑增加处理能力");
}

// 2. 启用事件丢弃监控
client.EventDiscarded += eventName => {
    // 记录丢失的事件,重要事件可考虑持久化
    logger.LogWarning($"事件丢失: {eventName}");
};

// 3. 优化事件处理速度
await client.On("high.volume.event", async (eventName, args) => {
    // 使用异步处理避免阻塞队列
    _ = Task.Run(() => ProcessEventAsync(args));
});

Q: 连接频繁断开?

// 1. 检查网络状态
client.StateChanged += (oldState, newState) => {
    Console.WriteLine($"连接状态变化: {oldState} -> {newState}");
    if (newState == TcpClientState.CLOSED) {
        // 分析断开原因
    }
};

// 2. 调整重连参数
client.AutoReconnectDelayMs = 10000;  // 增加重连间隔

// 3. 启用心跳检测(如果支持)
// 可以发送周期性的ping事件保持连接活跃

Q: 事件处理延迟高?

// 1. 监控队列积压
if (client.EventQueueSize > 100) {
    Console.WriteLine($"事件队列积压: {client.EventQueueSize}");
}

// 2. 优化事件处理器
client.On("bulk.event", (eventName, args) => {
    // 使用批量处理
    var events = args.Get<List<object>>("events");
    ProcessEventsBatch(events);  // 批量处理而非逐个处理
});

// 3. 分离重要和非重要事件
client.On("critical.event", HandleCriticalEventSync);
client.On("normal.event", HandleNormalEventAsync);

调试技巧

// 1. 启用详细日志
server.Error += (endpoint, ex) => {
    logger.LogError(ex, $"服务器错误 from {endpoint}");
};

client.ConnectFailed += ex => {
    logger.LogError(ex, "客户端连接失败");
};

// 2. 监控事件流
server.PublishReceived += (endpoint, eventName, args) => {
    logger.LogDebug($"收到事件: {eventName} from {endpoint}");
};

server.Published += (endpoint, eventName, args) => {
    logger.LogDebug($"分发事件: {eventName} to {endpoint}");
};

// 3. 性能分析
var stopwatch = Stopwatch.StartNew();
client.Publish("test.event", args);
stopwatch.Stop();
Console.WriteLine($"事件发布耗时: {stopwatch.ElapsedMilliseconds}ms");

💡 最佳实践

事件设计

// 1. 使用有意义的事件名称
client.Publish("user.profile.updated", args);     // ✅ 好
client.Publish("event1", args);                   // ❌ 差

// 2. 事件参数使用NamedArgs
var args = new NamedArgs();
args.Set("userId", userId);
args.Set("timestamp", DateTime.UtcNow);  // 使用UTC时间
args.Set("source", "UserService");
args.Set("version", "1.0");              // 事件版本控制

// 3. 避免发布过大的事件
// 如果数据量大,考虑只发布ID,让订阅者自行查询详细数据
args.Set("orderId", orderId);  // ✅ 只发布ID
// args.Set("orderDetail", largeOrderObject);  // ❌ 避免大对象

错误处理

// 1. 实现事件处理的幂等性
client.On("payment.processed", (eventName, args) => {
    var paymentId = args.Get<string>("paymentId");
    
    // 检查是否已处理
    if (IsPaymentAlreadyProcessed(paymentId)) {
        return;  // 幂等处理
    }
    
    ProcessPayment(paymentId);
});

// 2. 处理事件处理异常
client.On("order.created", (eventName, args) => {
    try {
        ProcessOrder(args);
    } catch (Exception ex) {
        logger.LogError(ex, "处理订单事件失败");
        // 可以发布错误事件或写入死信队列
    }
});

// 3. 设置超时机制
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try {
    client.Publish("long.running.event", args, cts.Token);
} catch (OperationCanceledException) {
    Console.WriteLine("事件发布超时");
}

🤝 贡献

欢迎提交Issue和Pull Request来改进这个项目。

📄 许可证

本项目基于 MIT 许可证开源。详见 LICENSE 文件。

🔗 相关项目

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.1.3 104 5/11/2026
2.1.2 100 5/9/2026
2.1.1 109 4/15/2026
2.0.43 112 3/7/2026
2.0.41 121 2/6/2026
2.0.40 120 2/4/2026
2.0.39 115 2/4/2026
2.0.38 111 2/3/2026
2.0.37 108 2/1/2026
2.0.36 116 2/1/2026
2.0.35 115 2/1/2026
2.0.34 116 2/1/2026
2.0.33 121 1/31/2026
2.0.32 115 1/30/2026
2.0.31 115 1/22/2026
2.0.30 119 1/22/2026
2.0.29 114 1/21/2026
2.0.28 112 1/20/2026
2.0.27 121 1/12/2026
2.0.26 115 1/9/2026
Loading failed