PasteCluster 25.9.2

dotnet add package PasteCluster --version 25.9.2
                    
NuGet\Install-Package PasteCluster -Version 25.9.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="PasteCluster" Version="25.9.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="PasteCluster" Version="25.9.2" />
                    
Directory.Packages.props
<PackageReference Include="PasteCluster" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add PasteCluster --version 25.9.2
                    
#r "nuget: PasteCluster, 25.9.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package PasteCluster@25.9.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=PasteCluster&version=25.9.2
                    
Install as a Cake Addin
#tool nuget:?package=PasteCluster&version=25.9.2
                    
Install as a Cake Tool

源码查阅: https://gitee.com/pastecode/paste-cluster

组件适用

1.需要集群部署的服务,需要有主从的集群服务
2.如果是定时的任务,建议使用PasteTask任务调度器实现
3.组件适用于.net8.0以上的框架
4.集群对外的消息使用Channel队列发出

引入组件

1.先引入IHttpClientFactory,示例:context.Services.AddHttpClient();
2.添加对组件的应用 示例:<PackageReference Include="PasteCluster" Version="1.0.0" />
3.使用单例注入,示例:context.Services.AddSingleton<PasteClusterHandler>();
4.配置配置文件,示例:context.Services.Configure<PasteSloveConfig>(Configuration.GetSection("ClusterConfig"));
  "ClusterConfig": {
    "SloveToken": "zxcvfr43dr56hgt5",//集群密钥,可自定义,防止其他集群乱入!
    "ClusterHost": "",//已有的节点地址链路,示例http://192.110.0.3:80;http://192.110.0.7:80
    "CurrentHost": ""//当前节点的访问地址是多少 示例http://192.110.0.6:80
  }
以上是基础配置,需要自定义更多的请查阅PasteSloveConfig的属性

5.如果使用了Route路由,注意本组件需要/api/cluster/的路由转发配置

开始使用

1.初始化集群组件,写入必要的信息,比如集群中还有谁,自己的HOST是多少
2.定义一个HostedService用于接收集群产生的事件
示例代码:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using PasteCluster;
using TestCluster.redismodel;

namespace TestCluster
{
    /// <summary>
    /// 系统初始化
    /// </summary>
    public class StartHostedService : IHostedService
    {
        private readonly IServiceProvider _serviceProvider;
        private PasteClusterHandler _cluster_handler;
        private IAppCache _appCache;
        private ILogger<StartHostedService> _logger;
        /// <summary>
        /// 
        /// </summary>
        /// <param name="serviceProvider"></param>
        /// <param name="pasteClusterHandler"></param>
        /// <param name="appCache"></param>
        /// <param name="logger"></param>
        public StartHostedService(IServiceProvider serviceProvider, PasteClusterHandler pasteClusterHandler, IAppCache appCache, ILogger<StartHostedService> logger)
        {
            _serviceProvider = serviceProvider;
            _cluster_handler = pasteClusterHandler;
            _appCache = appCache;
            _logger = logger;
        }

        private const string node_list_cache_key = "cache:last:nodes";
        //private System.Timers.Timer _timer;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        /// <exception cref="System.NotImplementedException"></exception>
        public Task StartAsync(CancellationToken cancellationToken)
        {
            //可以从他处读取集群节点列表,然后把列表写入到系统中
            var reads = _appCache.Get<List<PasteNodeModel>>(node_list_cache_key);
            if (reads != null && reads.Count > 0)
            {
                foreach (var _node in reads)
                {
                    _cluster_handler.AddNodeToList(_node).Wait();
                    //await _cluster_handler.AddNodeToList(_node);
                }
            }
            //如果已知当前节点的Host(这里示例默认为80端口)信息,可以用下方的函数写入,也可以在启动的时候写入配置中
            // -e ClusterConfig:CurrentHost="http://192.168.1.100"
            //如果使用PasteSpider部署,则是-e ClusterConfig:CurrentHost="http://{{App.ProAddress}}"
            //_cluster_handler.Register("http://192.168.1.100",0,0);
            //启动集群中的当前节点(在这之前必须要确认当前节点的host是多少)
            _cluster_handler.StartCluster();
            //读取集群产生的数据,比如其他节点发送的数据等,这里一般是业务相关的消息
            ReadClusterChannel();

            //_timer = new System.Timers.Timer();
            //hit_index = new Random().Next(1,100);
            //_timer.Interval = 1000;
            //_timer.AutoReset = true;
            //_timer.Elapsed += _timer_Elapsed;
            //_timer.Start();
            return Task.CompletedTask;
        }

        //private int hit_index = 5;
        //测试产生数据进行交互
        //private void _timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        //{
        //    try
        //    {
        //        hit_index--;
        //        if (hit_index == 0)
        //        {
        //            _cluster_handler.PushMsgToNode($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
        //            hit_index = new Random().Next(1, 20);
        //        }
        //    }
        //    catch (Exception exl)
        //    {
        //        _logger.LogException(exl);
        //    }
        //}

        /// <summary>
        /// 处理集群产生的数据
        /// </summary>
        private async Task ReadClusterChannel()
        {
                    try
            {
                var _info = await _cluster_handler.ChannelCluster.Reader.ReadAsync();
                if (_info != null && _info != default)
                {
                    switch (_info.msg_type)
                    {
                        case 1:
                            {
                                //当前节点成为master _info.body ==PasteMasterResponse.JsonString
                            }
                            break;
                        case 2:
                            {
                                //当前节点不再是master _info.body ==PasteMasterResponse.JsonString
                            }
                            break;
                        case 3:
                            {
                                //有节点加入 _info.body == PasteNodeModel.JsonString
                            }
                            break;
                        case 4:
                            {
                                if (_cluster_handler.CurrentIsMaster)
                                {
                                    var _node = System.Text.Json.JsonSerializer.Deserialize<PasteNodeModel>(_info.body);
                                    if (_node != null && _node != default)
                                    {
                                        if (!String.IsNullOrEmpty(_node.host))
                                        {
                                            await _appCache.HashDeleteAsync(node_list_cache_key, _node.host);
                                        }
                                    }
                                }
                            }
                            break;
                        default:
                            {
                                //业务代码 // _info.body是什么消息格式要基于业务而定
                            }
                            break;
                    }
                }
            }
            catch (Exception exl)
            {
                _logger.LogException(exl);
                await Task.Delay(1000);
            }
            finally
            {
                ReadClusterChannel();
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        /// <exception cref="System.NotImplementedException"></exception>
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _cluster_handler.Dispose();
            return Task.CompletedTask;
        }
    }
}
3.在入口函数中启动这个HostedService
                context.Services.AddHostedService<StartHostedService>();

相关问答

1.如何知道当前运行情况
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/status
2.获取所有节点信息
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/nodes
3.手动注入当前节点IP信息
curl http://xxxx.xxxx.xxxx.xxxx/api/cluster/joinin?host=http://192.168.1.5
4.如何查找当前节点?
有一种模式,就是知道所有集群的IP,需要找到A是A,那么有前提
a.先要把节点的信息写入到组件,所有的节点
b.给当前节点设定一个code,可以使用SetNodeCode
c.后者使用环境信息 -e ClusterConfig:CurrentCode="xxxxxuuuu323423"
d.StartCluster()后,组件会遍历节点,查找哪个节点是自己!主要他们的链接要能通!
5.如何防止多Master的情况出现
a.在每次交互的时候,会附带当前节点的信息,比如master是谁,有多少各节点等,通过这个信息进行判断各自节点的信息是否一致,不一致的时候触发信息同步!
b.master冲突的时候,会检查哪一个节点的选举时间更早,更早的为主,时间采用时间戳ms模式,如果时间戳一致,则再对比对应的Id信息
c.节点数据交互的时候,会进行master和count的消息确定
6.消息完整性?
a.消息分成2部分,一部分是组件内部消息,另一部分是涉及业务的部分
b.内部消息,其实在配置的时间周期内会重新发送重新检查,以便维护集群节点的正确性
c.业务消息,一般是节点发送消息给master,在发送失败的时候,会压入队列,等待选举完成后再解压队列发送,这里有疑问的是,业务消息是否也有可以丢弃的消息,比如如下场景:
   x.已知有N个node节点,作为在线客服的websocket的服务端,内部链接又使用site分隔用户
  xx.如果要获得某一个站点site有多少人在线,要么遍历所有的node节点,然后统计数据,要么所有节点有信息的时候告知某一个点,很显然在这个需求中,各node节点有在线数变更的时候主动通知是最合适的
 xxx.比如node1告知node1:site1:30,node2告知node2:site1:25 信息归总到master中,master中本地缓存这个数据,当有变动的时候,推送给所有客服节点即可!
xxxx.这里要处理的就是,某一个节点宕机的情况,比如node5不可用了,如何告知master他掉了!

逻辑介绍

1.Master的选举逻辑
    a.当前组件集群支持从1到N,这个数量理论上不是无上限的,要基于配置文件中的最长交互时间决定
    b.当当前没有Master的时候,会从节点队列中问询谁是master,被问询的如果也没有master信息,则被问询方直接成为master,并进入master的选举环节
    c.节点交互会重置最后交互时间,主要在于master和cluster
    d.组件会定期检查这个交互时间是否过期,如果过期了,会尝试健康检查,如果不通,则进入选举模式
2.选举原则
    a.谁先启动谁为主
    b.谁被告知消息不符,谁放弃,进行master查找

写在最后

1.当前组件的代码还没有经过严格测试,我估计还有问题的地方在于
a.时效性的问题,虽然可以修改检查的间隔时间,但是这个时间也是有间隙的
b.选举间隔,产生太多消息的话?消息重复的话!压缩的消息是否对业务有影响
c.如果同时多个节点掉线,那么这个选举的时间会更长
d.当前集群模式将在后续更新到PasteSpider的集群策略中,替换到目前的版本!

更新摘要

v24.07.28.03

1.如果没有注入节点,则默认为master

v24.07.28.02

1.PushMsgToMaster的msg_type没有跟随赋值的问题!

v24.07.28.01

1.添加对xml的支持,也就是文档
2.集群的消息,修正2和修正5 5表示自己不再是master了
3.准备启用新的版本号yy.MM.dd.version

v24.6.10.1

1.消息添加from_api,表示这个消息是从其他节点过来的,因为节点之间的通讯是api形式,这样可以避免消息回环!(如果自己是master,消息都是来自channel,这时候就要区分来源了)
2.mag_type划分0|>10表示业务的类型,1为当前节点选举为master,可以在这个之后处理一些master的事宜,其他的类型留空
3.添加发送消息的模式,可以直接对所有节点发送消息,因为选举完成后,每一个节点上存储的节点消息都是同样的
4.节点消息有host name id group 这几个除了host是严格排重的,其他的没有排重的说法,需要基于业务自己处理,比如我有一个业务,某些linux使用某个name的节点进行处理任务
5.节点列表居然没有自己!!! master的时候 判断是否有自己,如果没有,则执行scan

==自实现模式==
1.可以加入一种自己查找模式,就是在各自节点启动的时候,向某一个节点加入自己的信息,然后读取信息,然后有人先写入自己的Host信息,则组建这个集群了!
2.Hashset模式,由master进行管理(这里主要是指remove的时候,需要业务上进行处理) field使用host value使用PasteNodeModel的信息
3.当前这种模式在于需要有公共的缓存对象,比如redis,还有需要处理msg_type==4的时候的数据,用于移除节点信息

v24.6.09.1

1.消息交互之间,添加来源信息,添加来源的当前配置信息,这样用于同步集群消息,比如node_count master_host等,通过这个可以判断是否出现了多个集群的情况
2.消息交互采用最后消息时间戳的形式,所有有附带from_node这样可以更加灵活的知道某一个节点是否离线太久了
3.离线判断采用相互模式,就是master监听对所有节点的最小交互时间,每个节点检查自己对master的最后交互时间,从而处理node离线或者master离线的事宜
4.内部采用2个Channel,一个是集群内部使用的,其实是为了提高响应速度,毕竟修改成队列模式了,还有一个就是和业务有关的队列了!无论是Master或者Node都需要消费这个队列来处理任务!有点类似EventBus
5.选举采用先进原则,不采用他们说的单偶模式,采用vote_time和id双重排序,谁先谁就是master,确定后快速确认一遍,然后群发,这样可以极大的提高选举的效率和防止出现多个小集群的情况
6.发送消息模式,可以基于节点的id/name/group等模式发送

v24.6.19.1

1.修复master的节点总数小于分支的节点总数,分支没有减少的Bug!
2.修复由于定时器重叠引起的互锁的问题!
4.PasteTask项目引入PasteCluster插件,实现集群部署!
5.修复集群分层现象,就是master是一个,但是不同节点有不一样的集群信息,master都存在

6.节点上报是有先后的,那就有了
    a.1
    b.2 1
    c.3 2 1
    d.4 3 2 1
    e.5.4.3.2.1
7.scan的时候需要查询非自己的节点 如果全部不能访问,则自己是master
8.添加状态互斥代码,选举中状态等!减少不必要的问询!
9.查找master的时候,全部全部遍历查找,如果有多少,个选定最小那个!提升选举速度!可以极大的减少由于多次触发选举造成的流量混乱
10.master的信息,在特定的情况下,按需发送,不一定群发!
11.获取到master后,需要主动通知下,被通知的,如果不是master,则需要把master信息返回
12.为了防止游离的发生,被ask后,通知master群发master信息,用于进一步广播master信息

v24.6.21.1

1.新的发现,master居然不在nodes里面!!这是如何发生的?
2.当自己成为master后,检查自己是否在列表,如果不在,则添加进去
3.添加log的日志记录

25.08.001

1.修改之前的async Task代码 改成async Task 更加安全!
Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
25.9.2 160 9/7/2025
25.8.2 243 8/28/2025
25.8.1 221 8/27/2025
25.4.1 202 4/24/2025
24.7.28.3 112 7/31/2024
24.7.28.2 107 7/31/2024
24.7.28.1 101 7/31/2024
1.0.0.9 140 7/28/2024
1.0.0.8 151 7/28/2024
1.0.0.5 162 6/19/2024
1.0.0.1 150 6/10/2024
1.0.0 163 6/10/2024