mgzhenhong.ASW2.Net.EventChannel
2.1.3
dotnet add package mgzhenhong.ASW2.Net.EventChannel --version 2.1.3
NuGet\Install-Package mgzhenhong.ASW2.Net.EventChannel -Version 2.1.3
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="mgzhenhong.ASW2.Net.EventChannel" Version="2.1.3" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="mgzhenhong.ASW2.Net.EventChannel" Version="2.1.3" />
<PackageReference Include="mgzhenhong.ASW2.Net.EventChannel" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add mgzhenhong.ASW2.Net.EventChannel --version 2.1.3
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: mgzhenhong.ASW2.Net.EventChannel, 2.1.3"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package mgzhenhong.ASW2.Net.EventChannel@2.1.3
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=mgzhenhong.ASW2.Net.EventChannel&version=2.1.3
#tool nuget:?package=mgzhenhong.ASW2.Net.EventChannel&version=2.1.3
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
ASW.Net.EventChannel
📖 项目简介
ASW.Net.EventChannel 是基于 ASW.Net 构建的高性能事件通道框架,提供了发布-订阅模式的实时事件通信解决方案。该框架基于RDC协议,支持事件的可靠传输、多客户端订阅、事件路由和分发等功能,适用于实时通信、系统解耦和事件驱动架构场景。
✨ 核心特性
🚀 高性能事件通信
- 基于RDC协议: 使用可靠数据通信协议确保事件传输的可靠性
- 异步事件处理: 全异步的事件发布和订阅机制
- 事件队列: 内置事件队列防止事件丢失和堆积
- 流量控制: 支持事件队列满载保护和背压处理
📡 发布-订阅模式
- 多主题订阅: 客户端可以订阅多个事件主题
- 事件路由: 服务端智能路由事件到对应的订阅者
- 广播支持: 支持一对多的事件广播
- 订阅管理: 动态的订阅和取消订阅
🔧 灵活易用
- 事件参数: 支持强类型的
NamedArgs参数传递 - 回调机制: 丰富的事件回调和状态通知
- 自动重连: 客户端断线自动重连和订阅恢复
- 状态监控: 完整的连接状态和队列状态监控
⚡ 企业级特性
- SSL支持: 支持安全的事件传输
- 连接池: 高效的连接管理和复用
- 统计监控: 详细的传输统计和性能监控
- 错误处理: 完善的异常处理和重试机制
🏗️ 架构设计
组件架构
┌─────────────────────────────────────────────────────────────┐
│ ASW.Net.EventChannel │
├─────────────────────────────────────────────────────────────┤
│ EventChannelServer │ EventChannelClient │
│ ├─ 事件路由管理 │ ├─ 事件发布 │
│ ├─ 订阅者管理 │ ├─ 事件订阅 │
│ ├─ 事件分发 │ ├─ 事件队列 │
│ └─ 连接状态监控 │ └─ 自动重连 │
├─────────────────────────────────────────────────────────────┤
│ RDC协议层 │
│ RdcTcpServer | RdcTcpClient | RdcRequest | RdcResponse │
├─────────────────────────────────────────────────────────────┤
│ ASW.Net 网络层 │
│ TcpServer | TcpClient | DotNetty │
└─────────────────────────────────────────────────────────────┘
事件流转过程
graph TB
A[客户端发布事件] --> B[EventChannelServer]
B --> C{查找订阅者}
C -->|找到订阅者| D[分发事件]
C -->|无订阅者| E[丢弃事件]
D --> F[客户端A事件队列]
D --> G[客户端B事件队列]
D --> H[客户端C事件队列]
F --> I[客户端A回调]
G --> J[客户端B回调]
H --> K[客户端C回调]
L[客户端订阅事件] --> M[服务端订阅管理]
M --> N[更新路由表]
订阅管理
服务端订阅字典:
┌─────────────────────────────────┐
│ "user.login" → [Client1, Client3] │
│ "order.create" → [Client2] │
│ "system.alert" → [Client1, Client2] │
└─────────────────────────────────┘
客户端订阅字典:
┌─────────────────────────────────┐
│ Client1 → ["user.login", "system.alert"] │
│ Client2 → ["order.create", "system.alert"] │
│ Client3 → ["user.login"] │
└─────────────────────────────────┘
🚀 快速开始
安装
# Package Manager
Install-Package mgzhenhong.ASW.Net.EventChannel
# .NET CLI
dotnet add package mgzhenhong.ASW.Net.EventChannel
# PackageReference
<PackageReference Include="mgzhenhong.ASW.Net.EventChannel" Version="x.x.x" />
事件通道服务端
using ASW.Net.EventChannel;
// 创建事件通道服务器
var server = new EventChannelServer(2208);
server.Name = "EventHub";
// 事件监听
server.Connected += endpoint => {
Console.WriteLine($"客户端连接: {endpoint}");
};
server.PublishReceived += (endpoint, eventName, args) => {
Console.WriteLine($"收到事件发布: {eventName} from {endpoint}");
Console.WriteLine($"事件参数: {args}");
};
server.SubscribeReceived += (endpoint, eventName) => {
Console.WriteLine($"客户端订阅事件: {eventName} from {endpoint}");
};
server.Published += (endpoint, eventName, args) => {
Console.WriteLine($"事件已分发: {eventName} to {endpoint}");
};
// 启动服务器
server.Start();
Console.WriteLine($"事件通道服务器已启动: {server.BoundEndPoint}");
事件通道客户端
using ASW.Net.EventChannel;
using ASW.Utility;
// 创建事件通道客户端
var client = new EventChannelClient("127.0.0.1", 2208);
client.Name = "OrderService";
// 自动重连配置
client.AutoReconnectIsEnabled = true;
client.AutoReconnectDelayMs = 3000;
// 连接事件
client.Connected += () => {
Console.WriteLine("连接事件通道成功");
};
// 连接服务器
client.Connect();
// 订阅事件
client.On("user.login", (eventName, args) => {
var userId = args.GetValue<string>("userId");
var loginTime = args.GetValue<DateTime>("loginTime");
Console.WriteLine($"用户登录: {userId} at {loginTime}");
// 处理用户登录逻辑
HandleUserLogin(userId, loginTime);
});
// 发布事件
var loginArgs = new NamedArgs();
loginArgs.Add("userId", "user123");
loginArgs.Add("loginTime", DateTime.Now);
loginArgs.Add("ip", "192.168.1.100");
await client.Publish("user.login", loginArgs);
📚 详细使用指南
事件发布
// 简单事件发布
await client.Publish("notification.send", new NamedArgs {
["type"] = "email",
["recipient"] = "user@example.com",
["subject"] = "Welcome"
});
// 复杂对象事件
var orderData = new {
OrderId = "ORD-001",
CustomerId = 123,
Amount = 99.99m,
Items = new[] { "Product A", "Product B" }
};
var args = new NamedArgs();
args.Add("order", orderData);
args.Add("timestamp", DateTime.Now);
args.Add("source", "WebAPI");
client.Publish("order.created", args);
// 带回调的事件发布
client.PublishResponse += (eventName, result) => {
if (result.State) {
Console.WriteLine($"事件 {eventName} 发布成功");
} else {
Console.WriteLine($"事件 {eventName} 发布失败: {result.Message}");
}
};
事件订阅
// 订阅单个事件
client.On("user.logout", (eventName, args) => {
var userId = args.GetValue<string>("userId");
var reason = args.GetValue<string>("reason") ?? "normal";
Console.WriteLine($"用户退出: {userId}, 原因: {reason}");
UpdateUserStatus(userId, "offline");
});
// 订阅多个事件
var events = new[] { "order.created", "order.updated", "order.cancelled" };
foreach (var eventName in events) {
client.On(eventName, HandleOrderEvent);
}
// 订阅时带错误处理
client.SubscribeResponse += (eventName, result) => {
if (result.State) {
Console.WriteLine($"订阅事件 {eventName} 成功");
} else {
Console.WriteLine($"订阅事件 {eventName} 失败: {result.Message}");
}
};
// 事件处理方法
private static void HandleOrderEvent(string eventName, NamedArgs args) {
var orderId = args.GetValue<string>("orderId");
switch (eventName) {
case "order.created":
ProcessNewOrder(orderId, args);
break;
case "order.updated":
UpdateOrderStatus(orderId, args);
break;
case "order.cancelled":
CancelOrder(orderId, args);
break;
}
}
取消订阅
// 取消单个事件订阅
client.Off("user.login");
// 取消多个事件订阅
var eventsToUnsubscribe = new[] { "order.created", "order.updated" };
foreach (var eventName in eventsToUnsubscribe) {
client.Off(eventName);
}
// 取消订阅回调
client.UnSubscribeResponse += (eventName, result) => {
if (result.State) {
Console.WriteLine($"取消订阅 {eventName} 成功");
} else {
Console.WriteLine($"取消订阅 {eventName} 失败: {result.Message}");
}
};
🔧 高级功能
事件队列管理
// 配置事件队列大小
client.EventReceiveQueueMaxSize = 5000; // 最大队列长度
// 监控队列状态
client.EventReceiveQueueFull += () => {
Console.WriteLine("事件接收队列已满,可能存在处理积压");
// 可以考虑暂停发布或增加处理能力
};
client.EventReceiveQueueDrain += () => {
Console.WriteLine("事件接收队列恢复正常");
};
client.EventDiscarded += eventName => {
Console.WriteLine($"事件 {eventName} 因队列满而被丢弃");
// 记录丢失的事件,考虑重要事件的持久化
};
// 获取队列统计
Console.WriteLine($"当前队列长度: {client.EventQueueSize}");
Console.WriteLine($"队列最大长度: {client.EventReceiveQueueMaxSize}");
服务端统计监控
// 获取连接统计
Console.WriteLine($"当前连接数: {server.ConnectionCount}");
Console.WriteLine($"累计发送字节: {server.TotalSendBytes}");
Console.WriteLine($"累计接收字节: {server.TotalReceiveBytes}");
// 获取处理统计
Console.WriteLine($"待处理请求数: {server.UnHandledRequestCount}");
Console.WriteLine($"待处理发布请求数: {server.UnHandledPublishRequestCount}");
// 按事件类型统计
foreach (var kvp in server.UnHandledPublishRequestCountDic) {
Console.WriteLine($"事件 {kvp.Key}: {kvp.Value} 个待处理");
}
// 监控服务器状态
server.BeforePublish += (eventName, subscriberCount) => {
Console.WriteLine($"即将分发事件 {eventName} 给 {subscriberCount} 个订阅者");
};
server.PublishFail += (endpoint, eventName, error) => {
Console.WriteLine($"向 {endpoint} 分发事件 {eventName} 失败: {error}");
};
SSL安全通信
// 服务端SSL配置
var certificate = new X509Certificate2("server.pfx", "password");
var server = new EventChannelServer(2208);
server.UseSsl(certificate);
// 客户端SSL配置
var client = new EventChannelClient("secure.example.com", 2208);
client.UseSsl("secure.example.com"); // 服务器域名用于证书验证
自动重连和恢复
// 客户端自动重连配置
client.AutoReconnectIsEnabled = true;
client.AutoReconnectDelayMs = 5000; // 5秒重连延迟
// 重连时自动恢复订阅
var subscribedEvents = new HashSet<string>();
client.Connected += () => {
Console.WriteLine("连接已恢复");
// 重新订阅之前的事件
foreach (var eventName in subscribedEvents) {
client.On(eventName, GetEventHandler(eventName));
Console.WriteLine($"恢复订阅: {eventName}");
}
};
// 记录订阅的事件
client.SubscribeResponse += (eventName, result) => {
if (result.State) {
subscribedEvents.Add(eventName);
}
};
client.UnSubscribeResponse += (eventName, result) => {
if (result.State) {
subscribedEvents.Remove(eventName);
}
};
📊 性能基准
吞吐量测试
测试环境: Intel i7-8700K, 16GB RAM, Windows 10
网络环境: 本地回环 (127.0.0.1)
事件发布:
- 单客户端: 50,000 events/s
- 100并发客户端: 200,000 events/s
- 事件延迟: < 2ms
事件分发:
- 单订阅者: 45,000 events/s
- 100订阅者: 180,000 events/s
- 分发延迟: < 5ms
连接管理:
- 最大并发连接: 5,000+
- 连接建立速度: 500 conn/s
- 内存占用: ~40KB/连接
队列性能
事件队列处理:
- 队列入队: 100,000 ops/s
- 队列出队: 95,000 ops/s
- 队列内存开销: ~100 bytes/event
订阅管理:
- 订阅查找: O(1) 时间复杂度
- 路由表更新: O(log n)
- 内存占用: ~50 bytes/subscription
🐛 故障排除
常见问题
Q: 事件丢失怎么办?
// 1. 检查队列是否满载
if (client.EventQueueSize >= client.EventReceiveQueueMaxSize * 0.9) {
Console.WriteLine("队列接近满载,考虑增加处理能力");
}
// 2. 启用事件丢弃监控
client.EventDiscarded += eventName => {
// 记录丢失的事件,重要事件可考虑持久化
logger.LogWarning($"事件丢失: {eventName}");
};
// 3. 优化事件处理速度
await client.On("high.volume.event", async (eventName, args) => {
// 使用异步处理避免阻塞队列
_ = Task.Run(() => ProcessEventAsync(args));
});
Q: 连接频繁断开?
// 1. 检查网络状态
client.StateChanged += (oldState, newState) => {
Console.WriteLine($"连接状态变化: {oldState} -> {newState}");
if (newState == TcpClientState.CLOSED) {
// 分析断开原因
}
};
// 2. 调整重连参数
client.AutoReconnectDelayMs = 10000; // 增加重连间隔
// 3. 启用心跳检测(如果支持)
// 可以发送周期性的ping事件保持连接活跃
Q: 事件处理延迟高?
// 1. 监控队列积压
if (client.EventQueueSize > 100) {
Console.WriteLine($"事件队列积压: {client.EventQueueSize}");
}
// 2. 优化事件处理器
client.On("bulk.event", (eventName, args) => {
// 使用批量处理
var events = args.Get<List<object>>("events");
ProcessEventsBatch(events); // 批量处理而非逐个处理
});
// 3. 分离重要和非重要事件
client.On("critical.event", HandleCriticalEventSync);
client.On("normal.event", HandleNormalEventAsync);
调试技巧
// 1. 启用详细日志
server.Error += (endpoint, ex) => {
logger.LogError(ex, $"服务器错误 from {endpoint}");
};
client.ConnectFailed += ex => {
logger.LogError(ex, "客户端连接失败");
};
// 2. 监控事件流
server.PublishReceived += (endpoint, eventName, args) => {
logger.LogDebug($"收到事件: {eventName} from {endpoint}");
};
server.Published += (endpoint, eventName, args) => {
logger.LogDebug($"分发事件: {eventName} to {endpoint}");
};
// 3. 性能分析
var stopwatch = Stopwatch.StartNew();
client.Publish("test.event", args);
stopwatch.Stop();
Console.WriteLine($"事件发布耗时: {stopwatch.ElapsedMilliseconds}ms");
💡 最佳实践
事件设计
// 1. 使用有意义的事件名称
client.Publish("user.profile.updated", args); // ✅ 好
client.Publish("event1", args); // ❌ 差
// 2. 事件参数使用NamedArgs
var args = new NamedArgs();
args.Set("userId", userId);
args.Set("timestamp", DateTime.UtcNow); // 使用UTC时间
args.Set("source", "UserService");
args.Set("version", "1.0"); // 事件版本控制
// 3. 避免发布过大的事件
// 如果数据量大,考虑只发布ID,让订阅者自行查询详细数据
args.Set("orderId", orderId); // ✅ 只发布ID
// args.Set("orderDetail", largeOrderObject); // ❌ 避免大对象
错误处理
// 1. 实现事件处理的幂等性
client.On("payment.processed", (eventName, args) => {
var paymentId = args.Get<string>("paymentId");
// 检查是否已处理
if (IsPaymentAlreadyProcessed(paymentId)) {
return; // 幂等处理
}
ProcessPayment(paymentId);
});
// 2. 处理事件处理异常
client.On("order.created", (eventName, args) => {
try {
ProcessOrder(args);
} catch (Exception ex) {
logger.LogError(ex, "处理订单事件失败");
// 可以发布错误事件或写入死信队列
}
});
// 3. 设置超时机制
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try {
client.Publish("long.running.event", args, cts.Token);
} catch (OperationCanceledException) {
Console.WriteLine("事件发布超时");
}
🤝 贡献
欢迎提交Issue和Pull Request来改进这个项目。
📄 许可证
本项目基于 MIT 许可证开源。详见 LICENSE 文件。
🔗 相关项目
- ASW.Net - 网络通信基础库
- ASW.MessageQueue - 消息队列框架
- ASW.Utility - 通用工具库(包含NamedArgs)
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net10.0
- mgzhenhong.ASW2.Net (>= 2.1.3)
- mgzhenhong.ASW2.Utility (>= 2.1.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2.1.3 | 104 | 5/11/2026 |
| 2.1.2 | 100 | 5/9/2026 |
| 2.1.1 | 109 | 4/15/2026 |
| 2.0.43 | 112 | 3/7/2026 |
| 2.0.41 | 121 | 2/6/2026 |
| 2.0.40 | 120 | 2/4/2026 |
| 2.0.39 | 115 | 2/4/2026 |
| 2.0.38 | 111 | 2/3/2026 |
| 2.0.37 | 108 | 2/1/2026 |
| 2.0.36 | 116 | 2/1/2026 |
| 2.0.35 | 115 | 2/1/2026 |
| 2.0.34 | 116 | 2/1/2026 |
| 2.0.33 | 121 | 1/31/2026 |
| 2.0.32 | 115 | 1/30/2026 |
| 2.0.31 | 115 | 1/22/2026 |
| 2.0.30 | 119 | 1/22/2026 |
| 2.0.29 | 114 | 1/21/2026 |
| 2.0.28 | 112 | 1/20/2026 |
| 2.0.27 | 121 | 1/12/2026 |
| 2.0.26 | 115 | 1/9/2026 |
Loading failed