MindLab.Messaging 1.0.2

dotnet add package MindLab.Messaging --version 1.0.2
NuGet\Install-Package MindLab.Messaging -Version 1.0.2
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MindLab.Messaging" Version="1.0.2" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add MindLab.Messaging --version 1.0.2
#r "nuget: MindLab.Messaging, 1.0.2"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install MindLab.Messaging as a Cake Addin
#addin nuget:?package=MindLab.Messaging&version=1.0.2

// Install MindLab.Messaging as a Cake Tool
#tool nuget:?package=MindLab.Messaging&version=1.0.2

MindLab.Messaging

MindLab.Messaging 提供了一个轻量级的消息订阅/发布模式(进程内),支持消息单播、广播,所有接口均为使用async/await形式的Task异步接口

使用

用户可从nuget下载 MindLab.Messaging

组件设计

MindLab.Messaging 参考了RdbbitMQ的组件设计,并根据C#自身的使用习惯做了调整,主要包含两个组件:路由器(MessageRouter)和队列(MessageQueue)。

队列可以绑定到指定的路由器上,它们之间是多对多关系,即一个队列可同时绑定多个路由器,一个路由器也可同时被多个队列绑定。

队列绑定时需提供一个key作为消息路由的过滤依据,该key由路由器内部识别。

发布者通过路由器发布消息,发布时需提供一个路由键routingKey, 路由器内部根据自身的策略将该消息转发到符合条件的队列中。

消费者通过队列接收并消费消息。

入门

示例1:消息广播

从一个简单的例子开始,我们将创建一个用于广播消息的路由器(BroadcastMessageRouter),并模拟两个消费者分别从两条队列中消费消息, 消费者A将消息打印到屏幕上, 消费者B将消息输出到文件中

private readonly var m_router = new BroadcastMessageRouter<string>(); // 广播式路由器

public async Task RunAsync()
{
    using var tokenSrc = new CancellationTokenSource();
    var t1 = Task.Run(async () => await ConsumerA(tokenSrc.Token));
    var t2 = Task.Run(async () => await ConsumerB(tokenSrc.Token));
    
    while(true)
    {
        var txt = Console.ReadLine();
        if(string.Equals("exit", txt))
        {
            break;
        }
        
        // 发布消息txt
        await m_router.PublishMessageAsync(string.Empty, txt);
    }
    tokenSrc.Cancel();
    await Task.WhenAll(t1, t2);
}

private async Task ConsumerA(CancellationToken token)
{
    var queueA = new AsyncMessageQueue<string>(); // 队列A, 供消费者A使用
    await using(await queueA.BindAsync(string.Empty, m_router, token)) // 绑定队列A到路由器
    {
        while(true)
        {
            var msg = await queueA.TakeMessageAsync(token); // 等待消息
            Console.WriteLine(msg.Payload);
        }
    }
}

private async Task ConsumerB(CancellationToken token)
{
    var queueB = new AsyncMessageQueue<string>(); // 队列B, 供消费者B使用
    await using(await queueA.BindAsync(string.Empty, m_router, token)) // 绑定队列B到路由器
    {
        while(true)
        {
            var msg = await queueA.TakeMessageAsync(token); // 等待消息
            File.AppendText("1.txt", msg.Payload, token); // 输出到文件
        }
    }
}

上述代码中,队列绑定时使用的key和消息发布时的key都使用了string.Empty,这是因为BroadcastMessageRouter在内部策略上无视了key的区别,仅是单纯地将消息广播至其绑定的所有队列中。

示例2:消息单播

在这个例子中,我们将创建一个用于单播消息的路由器(UnicastMessageRouter)并演示如何利用路由Key把消息发布给指定的一个或一组队列。

private readonly var m_router = new UnicastMessageRouter<string>(); // 单播式路由器
private readonly string[] m_keys = new []{"ConsumerA", "ConsumerB"};

public async Task RunAsync()
{
    using var tokenSrc = new CancellationTokenSource();
    var t1 = Task.Run(async () => await ConsumerA(tokenSrc.Token));
    var t2 = Task.Run(async () => await ConsumerB(tokenSrc.Token));
    
    int i = 0;
    while(true)
    {
        var txt = Console.ReadLine();
        if(string.Equals("exit", txt))
        {
            break;
        }
        
        // 依次轮流使用两个不同的key
        var key = m_keys[i = 1-i];
        
        // 发布消息txt
        await m_router.PublishMessageAsync(key, txt);
    }
    tokenSrc.Cancel();
    await Task.WhenAll(t1, t2);
}

private async Task ConsumerA(CancellationToken token)
{
    var queueA = new AsyncMessageQueue<string>(); // 队列A, 供消费者A使用
    await using(await queueA.BindAsync(m_keys[0], m_router, token)) // 使用m_keys[0]绑定队列A到路由器
    {
        while(true)
        {
            var msg = await queueA.TakeMessageAsync(token); // 等待消息
            Console.WriteLine($"ConsumerA: {msg.Payload}");
        }
    }
}

private async Task ConsumerB(CancellationToken token)
{
    var queueB = new AsyncMessageQueue<string>(); // 队列B, 供消费者B使用
    await using(await queueA.BindAsync(m_keys[1], m_router, token)) // 使用m_keys[1]绑定队列B到路由器
    {
        while(true)
        {
            var msg = await queueA.TakeMessageAsync(token); // 等待消息
            Console.WriteLine($"ConsumerB: {msg.Payload}");
        }
    }
}

高级

消息分派

当使用Router发布消息时,消息被并行地插入到关联的队列内部,并立刻从异步方法中返回。此时,消息只是被缓存到队列中,并不意味着已经被消费者处理。

从发布者的立场而言,它并不知道该消息是否已被处理。与传统的基于委托的event不同,event总是同步且串行地触发回调方法,这会导致发布者被阻塞。同时,一旦某一个回调中出现异常,更会阻断整个业务流程并影响同一event的后继订阅者。

Router内部使用并行触发的方式公平地对待每个关联的消息队列,各个队列间是互不干扰的,如果发布者确实需要知道消息的转发情况,可以从PublishMessageAsync() 方法的返回值中获取相关信息。

var result = await m_router.PublishMessageAsync(key, txt);
Console.WriteLine(result.ReceiverCount); // 接收到此消息的订阅者数量

if (result.Exception != null) // 转发消息时产生的异常
{
    Console.WriteLine(result.Exception);
}

队列容量与满载处理

如果消息发布者无节制地发布消息,远远超过了消费者处理消息的速度,那么,这些消息都会被缓存到队列中,这会导致内存占用过高,甚至造成奔溃。为了避免这种情况,我们可以在构造队列时显式指定队列容量与满载处理的方式。

var mq = new AsyncMessageQueue<string>(8192, QueueFullBehaviour.BlockPublisher);

上面的代码指定队列最大容量为8192,当容量满载时,新插入消息会导致 PublishMessageAsync() 方法发生阻塞,直至消息从队列中被取走,这能反向抑制发布者的发布速度。在默认的无参数构造队列时,内部使用上述的两个值作为默认参数。

仅关注最新消息

设想一个常见的业务场景:位于后台的线程在下载一个大文件,并把下载进度以消息的形式发布到路由器;UI线程从队列中消费此进度消息,并更新到界面上的进度条控件。

在这样的业务场景,消费方(UI线程)其实并不需要处理队列中的每一条进度消息,而只需要关注最新一条的进度消息,为了达到这样的目的,我们可以使用如下方式构造队列

var mq = new AsyncMessageQueue<string>(1, QueueFullBehaviour.RemoveOldest);

通过把满载处理方式设置为QueueFullBehaviour.RemoveOldest, 使得队列在满载时自动移除队列中最老的一条消息,然后再把新消息插入队列。同时该队列的容量又设置为1,这样一来,队列中永远都只有最新一条的消息。

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 is compatible. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.0.2 645 3/22/2020
1.0.0 410 3/19/2020