Synadia.Orbit.PCGroups
1.0.0-preview.4
Prefix Reserved
dotnet add package Synadia.Orbit.PCGroups --version 1.0.0-preview.4
NuGet\Install-Package Synadia.Orbit.PCGroups -Version 1.0.0-preview.4
<PackageReference Include="Synadia.Orbit.PCGroups" Version="1.0.0-preview.4" />
<PackageVersion Include="Synadia.Orbit.PCGroups" Version="1.0.0-preview.4" />
<PackageReference Include="Synadia.Orbit.PCGroups" />
paket add Synadia.Orbit.PCGroups --version 1.0.0-preview.4
#r "nuget: Synadia.Orbit.PCGroups, 1.0.0-preview.4"
#:package Synadia.Orbit.PCGroups@1.0.0-preview.4
#addin nuget:?package=Synadia.Orbit.PCGroups&version=1.0.0-preview.4&prerelease
#tool nuget:?package=Synadia.Orbit.PCGroups&version=1.0.0-preview.4&prerelease
Synadia.Orbit.PCGroups
Partitioned Consumer Groups (PCGroups) for NATS JetStream. This library enables horizontal scaling of message processing by automatically distributing partitions across consumer group members.
Requirements: NATS Server 2.11+ (for Priority Groups/Pinning support)
Features
- Static Consumer Groups: Fixed membership defined at creation time
- Elastic Consumer Groups: Dynamic membership changes at runtime
- Automatic Partition Distribution: Partitions are automatically balanced across members
- Custom Partition Mappings: Explicit control over which partitions each member handles
- Self-Healing: Consumers automatically recover from failures
- KV-based Configuration: Group configurations stored in NATS KV for coordination
Installation
dotnet add package Synadia.Orbit.PCGroups
Quick Start
Static Consumer Groups
Static groups have a fixed membership that cannot change after creation. The stream must be configured with a subject transform to add partition prefixes.
using NATS.Client.JetStream.Models;
using NATS.Net;
using Synadia.Orbit.PCGroups.Static;
await using var nats = new NatsClient();
var js = nats.CreateJetStreamContext();
// Create the stream with subject transform for partitioning
// The transform adds a partition prefix based on the wildcard token
await js.CreateStreamAsync(new StreamConfig("orders", ["orders.*"])
{
SubjectTransform = new SubjectTransform
{
Src = "orders.*",
Dest = "{{partition(3,1)}}.orders.{{wildcard(1)}}",
},
});
// Create a static consumer group with 3 partitions
await js.CreatePcgStaticAsync(
streamName: "orders",
consumerGroupName: "order-processors",
maxNumMembers: 3,
filter: "orders.*");
// Publish messages - they get transformed to {partition}.orders.{id}
await js.PublishAsync("orders.123", new Order("ORD-123", "CUST-1", 99.99m));
// Start consuming using async enumerable
await foreach (var msg in js.ConsumePcgStaticAsync<Order>(
streamName: "orders",
consumerGroupName: "order-processors",
memberName: "worker-1"))
{
Console.WriteLine($"Processing order: {msg.Subject} - {msg.Data}");
await msg.AckAsync();
}
record Order(string OrderId, string CustomerId, decimal Amount);
Elastic Consumer Groups
Elastic groups allow dynamic membership changes at runtime. The library automatically creates a work-queue stream with the appropriate transforms.
using System.Threading.Channels;
using NATS.Client.JetStream.Models;
using NATS.Net;
using Synadia.Orbit.PCGroups;
using Synadia.Orbit.PCGroups.Elastic;
await using var nats = new NatsClient();
var js = nats.CreateJetStreamContext();
// Create the source stream (no transform needed - elastic creates work-queue stream)
await js.CreateStreamAsync(new StreamConfig("events", ["events.*"]));
// Create an elastic consumer group
// Partitioning is based on the first wildcard token in the subject
await js.CreatePcgElasticAsync(
streamName: "events",
consumerGroupName: "event-processors",
maxNumMembers: 10,
filter: "events.*", // e.g., events.user123, events.user456
partitioningWildcards: [1]); // Partition by the first wildcard (user ID)
// Add members dynamically - partitions will be distributed across them
string[] members = ["worker-1", "worker-2", "worker-3"];
await js.AddPcgElasticMembersAsync("events", "event-processors", members);
// Publish some messages
await js.PublishAsync("events.user123", new Event("EVT-1", "user123", "click"));
// Use a channel to aggregate messages from multiple workers
var channel = Channel.CreateUnbounded<(string Worker, NatsPcgMsg<Event> Msg)>();
using var cts = new CancellationTokenSource();
// Start a consumer task for each worker
var consumerTasks = members.Select(worker => Task.Run(async () =>
{
try
{
await foreach (var msg in js.ConsumePcgElasticAsync<Event>(
streamName: "events",
consumerGroupName: "event-processors",
memberName: worker,
cancellationToken: cts.Token))
{
await channel.Writer.WriteAsync((worker, msg), cts.Token);
}
}
catch (OperationCanceledException)
{
// Expected when cancelled
}
})).ToArray();
// Process messages from all workers
await foreach (var (worker, msg) in channel.Reader.ReadAllAsync(cts.Token))
{
Console.WriteLine($"[{worker}] Processing event: {msg.Subject} - {msg.Data}");
await msg.AckAsync();
}
record Event(string EventId, string UserId, string Type);
Static vs Elastic Comparison
| Feature | Static | Elastic |
|---|---|---|
| Membership | Fixed at creation | Dynamic at runtime |
| Use Case | Stable workloads | Scaling workloads |
| Stream Setup | Requires SubjectTransform | Auto-creates work-queue stream |
| Configuration | Simpler | More flexible |
Custom Partition Mappings
For fine-grained control over partition distribution:
using NATS.Client.JetStream.Models;
using NATS.Net;
using Synadia.Orbit.PCGroups;
using Synadia.Orbit.PCGroups.Static;
using Synadia.Orbit.PCGroups.Elastic;
await using var nats = new NatsClient();
var js = nats.CreateJetStreamContext();
// Define explicit member-to-partition mappings
var mappings = new[]
{
new NatsPcgMemberMapping("high-priority-worker", [0, 1, 2]),
new NatsPcgMemberMapping("low-priority-worker", [3, 4, 5]),
};
// For static groups - stream needs subject transform
await js.CreateStreamAsync(new StreamConfig("orders", ["orders.*"])
{
SubjectTransform = new SubjectTransform
{
Src = "orders.*",
Dest = "{{partition(6,1)}}.orders.{{wildcard(1)}}",
},
});
await js.CreatePcgStaticAsync("orders", "processors", maxNumMembers: 6,
filter: "orders.*", memberMappings: mappings);
// For elastic groups - no transform needed on source stream
await js.CreateStreamAsync(new StreamConfig("events", ["events.*"]));
await js.CreatePcgElasticAsync("events", "processors", maxNumMembers: 6,
filter: "events.*", partitioningWildcards: [1]);
await js.SetPcgElasticMemberMappingsAsync("events", "processors", mappings);
Subject Transform Syntax
For static consumer groups, the stream must use subject transforms to add partition prefixes:
{{partition(N,wildcards)}}- Computes partition (0 to N-1) based on specified wildcard positions{{wildcard(N)}}- References the Nth wildcard token from the source subject (1-indexed)
Example: For orders.* with transform {{partition(3,1)}}.orders.{{wildcard(1)}}:
orders.ABCbecomes0.orders.ABC,1.orders.ABC, or2.orders.ABC(based on hash of "ABC")
API Reference
Static Consumer Groups (Extension methods on INatsJSContext)
CreatePcgStaticAsync- Create a new static consumer groupGetPcgStaticConfigAsync- Get configuration for an existing groupConsumePcgStaticAsync- Start consuming messages (returnsIAsyncEnumerable<NatsPcgMsg<T>>)DeletePcgStaticAsync- Delete a consumer groupListPcgStaticAsync- List all consumer groups for a streamListPcgStaticActiveMembersAsync- List active membersPcgStaticMemberStepDownAsync- Force a member to step down
Elastic Consumer Groups (Extension methods on INatsJSContext)
CreatePcgElasticAsync- Create a new elastic consumer groupGetPcgElasticConfigAsync- Get configuration for an existing groupConsumePcgElasticAsync- Start consuming messages (returnsIAsyncEnumerable<NatsPcgMsg<T>>)DeletePcgElasticAsync- Delete a consumer group and its work-queue streamListPcgElasticAsync- List all consumer groups for a streamListPcgElasticActiveMembersAsync- List active membersAddPcgElasticMembersAsync- Add members to the groupDeletePcgElasticMembersAsync- Remove members from the groupSetPcgElasticMemberMappingsAsync- Set explicit partition mappingsDeletePcgElasticMemberMappingsAsync- Remove mappings (revert to auto-distribution)IsInPcgElasticMembershipAndActiveAsync- Check if a member is in the group and activeGetPcgElasticPartitionFilters- Get partition filters for a member (extension onNatsPcgElasticConfig)PcgElasticMemberStepDownAsync- Force a member to step down
Validation (Static class NatsPcgMemberMappingValidator)
Validate- Validate member mappings (checks for duplicates, overlaps, out-of-range partitions)ValidateFilterAndWildcards- Validate filter and partitioning wildcards for elastic groups
How It Works
- Partitioning: Messages are assigned to partitions (0 to maxMembers-1) based on subject content
- Distribution: Partitions are distributed across active members
- Pinning: Each member "pins" to its assigned partitions using priority groups
- Coordination: Configuration stored in NATS KV enables coordination
- Self-healing: Members watch for configuration changes and automatically adjust
License
Apache License 2.0
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.0 is compatible. netstandard2.1 is compatible. |
| .NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen40 was computed. tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Microsoft.Bcl.AsyncInterfaces (>= 8.0.0)
- NATS.Client.JetStream (>= 2.7.2)
- NATS.Client.KeyValueStore (>= 2.7.2)
- System.Buffers (>= 4.5.1)
- System.Diagnostics.DiagnosticSource (>= 8.0.1)
- System.Memory (>= 4.5.5)
- System.Threading.Channels (>= 8.0.0)
-
.NETStandard 2.1
- NATS.Client.JetStream (>= 2.7.2)
- NATS.Client.KeyValueStore (>= 2.7.2)
- System.Diagnostics.DiagnosticSource (>= 8.0.1)
- System.Threading.Channels (>= 8.0.0)
-
net10.0
- NATS.Client.JetStream (>= 2.7.2)
- NATS.Client.KeyValueStore (>= 2.7.2)
-
net8.0
- NATS.Client.JetStream (>= 2.7.2)
- NATS.Client.KeyValueStore (>= 2.7.2)
-
net9.0
- NATS.Client.JetStream (>= 2.7.2)
- NATS.Client.KeyValueStore (>= 2.7.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0-preview.4 | 42 | 2/27/2026 |
| 1.0.0-preview.3 | 42 | 2/23/2026 |
| 1.0.0-preview.2 | 48 | 1/27/2026 |
| 1.0.0-preview.1 | 47 | 1/27/2026 |