CounterpointCollective.Dataflow.Composable
10.1.153
dotnet add package CounterpointCollective.Dataflow.Composable --version 10.1.153
NuGet\Install-Package CounterpointCollective.Dataflow.Composable -Version 10.1.153
<PackageReference Include="CounterpointCollective.Dataflow.Composable" Version="10.1.153" />
<PackageVersion Include="CounterpointCollective.Dataflow.Composable" Version="10.1.153" />
<PackageReference Include="CounterpointCollective.Dataflow.Composable" />
paket add CounterpointCollective.Dataflow.Composable --version 10.1.153
#r "nuget: CounterpointCollective.Dataflow.Composable, 10.1.153"
#:package CounterpointCollective.Dataflow.Composable@10.1.153
#addin nuget:?package=CounterpointCollective.Dataflow.Composable&version=10.1.153
#tool nuget:?package=CounterpointCollective.Dataflow.Composable&version=10.1.153
CounterpointCollective.Dataflow.Composable
CounterpointCollective.Dataflow.Composable is a library designed to simplify the construction DataflowBlock pipelines. It provides a collection of modular, reusable blocks for processing and transforming data, enabling developers to build complex workflows with composability, scalability, and clarity.
Documentation: Browse the docs
Source & contributions: GitLab repository
Purpose
The CounterpointCollective.Dataflow.Composable library makes building complex, high-performance data processing pipelines in .NET easier, safer, and more flexible. It extends TPL Dataflow with a set of modular building blocks that solve real-world problems developers frequently face but that the base library does not address.
In short, the library helps you construct powerful, adaptable, and production-grade Dataflow solutions with minimal friction, while staying fully compatible with the familiar TPL Dataflow model.
Features
- BoundedBlocks: Allow adding bounded capacity and real-time item counting to any Dataflow propagator block, including user-composed or encapsulated blocks, enabling true compositionality.
- ResizableBlocks: Dynamically adjusts the bounded capacity of any block at runtime.
- AutoScalingBlock: Automatically determines and optimizes batch sizes at runtime by measuring throughput, ensuring efficient processing without manual tuning.
- OrderPreservingChoiceBlock: Routes items to different targets based on a predicate while preserving the original order of the items.
- ParallelBlock: Sends messages to multiple blocks in parallel and recombines the results, facilitating concurrent processing.
- GroupAdjacentBlock: Groups consecutive items that share a key or predicate into batches, simplifying aggregation and batch processing.
- PriorityBufferBlock: Delivers highest-priority messages first, dynamically reordering when new higher-priority items arrive.
Installation
Clone the repository:
git clone https://gitlab.com/counterpointcollective/composabledataflowblocks.git
Install dependencies (if any):
# Example for .NET/C# projects
dotnet restore
Usage Example
Example 1: Bounding Your Own Composed Block
//Example of using a BoundedPropagatorBlock to add bounded capacity and item counting
using CounterpointCollective.Dataflow;
//First we create a DataflowBlock ourselves, composed of multiple subblocks.
var b = new BufferBlock<int>(new() { BoundedCapacity = DataflowBlockOptions.Unbounded });
var t = new TransformBlock<int, int>(async i =>
{
await Task.Yield(); //simulate some work.
return i + 1;
}, new() { BoundedCapacity = DataflowBlockOptions.Unbounded });
b.LinkTo(t, new DataflowLinkOptions() { PropagateCompletion = true });
var ourOwnDataflowBlock = DataflowBlock.Encapsulate(b, t);
//Now we want to
// - put a boundedCapacity on our new block and
// - be able to count how many items are contained with it, at any given time
//This is what we will use a BoundedPropagatorBlock for.
var testSubject = new BoundedPropagatorBlock<int,int>(ourOwnDataflowBlock, boundedCapacity: 2000);
//Thus we enabled a bounded capacity of 2000 messages, and real-time counting on our own custom DataflowBlock!
Assert.Equal(0, testSubject.Count);
//we should be able to push synchronously up to the bounded capacity.
for (var i = 0; i < 2000; i++)
{
Assert.True(testSubject.Post(i));
Assert.Equal(i + 1, testSubject.Count); //count is administered properly
}
Example 2: Making any DataflowBlock dynamically resizable
//Example showing that you can dynamically resize the bounded capacity of any block by wrapping it into a BoundedPropagatorBlock
var bufferBlock = new BufferBlock<int>(new() { BoundedCapacity = DataflowBlockOptions.Unbounded });
var resizableBufferBlock = new BoundedPropagatorBlock<int,int>(bufferBlock);
//We did not specify a bounded capacity, so it defaults to DataflowBlockOptions.Unbounded
Assert.True(resizableBufferBlock.Post(1));
//But we can dynamically set the bounded capacity at any point.
resizableBufferBlock.BoundedCapacity = 2;
Assert.True(resizableBufferBlock.Post(2));
Assert.False(resizableBufferBlock.Post(3));
resizableBufferBlock.BoundedCapacity = 3;
Assert.True(resizableBufferBlock.Post(3));
Example 3: Automatically optimizing batch sizes in real-time
// Example: Using AutoScaling on a ResizableBatchTransformBlock
// For demonstration: assume our workload performs best when batch size is ~100 items.
async Task<IEnumerable<int>> ProcessBatch(int[] batch)
{
var distanceFromOptimal = Math.Abs(batch.Length - 100);
await Task.Delay(distanceFromOptimal * batch.Length); // Simulate slower processing when batch isn't optimal
return batch;
}
// Create a ResizableBatchTransformBlock using our ProcessBatch function.
var testSubject = new ResizableBatchTransformBlock<int, int>(
ProcessBatch,
initialBatchSize: 1,
new ExecutionDataflowBlockOptions { BoundedCapacity = 10000 }
);
// Batch size can be manually adjusted:
testSubject.BatchSize = 5;
// Or automatically optimized using AutoScaling:
testSubject.EnableAutoScaling(
new DefaultBatchSizeStrategy(minBatchSize: 1, maxBatchSize: 200)
);
// Send some work:
for (var i = 0; i < 10000; i++)
{
await testSubject.SendAsync(i);
}
// Process outputs while AutoScaling gradually converges toward the optimal batch size (~100).
for (var i = 0; i < 10000; i++)
{
var result = await testSubject.ReceiveAsync();
}
Language Composition
ComposableDataflowBlocks is written in C#.
Contributing
Contributions are welcome! If you'd like to suggest improvements, report bugs, or contribute new dataflow blocks, please open an issue or submit a pull request.
- Fork the repo.
- Create your feature branch (
git checkout -b feature/MyFeature). - Commit your changes (
git commit -am 'Add new feature'). - Push to the branch (
git push origin feature/MyFeature). - Open a Merge Request.
License
Distributed under the MIT License. See LICENSE for more information.
Resources
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- CounterpointCollective.CoalescingQueue (>= 10.1.124)
- CounterpointCollective.Dataflow.Fluent (>= 10.1.153)
- CounterpointCollective.UniquePriorityQueue (>= 10.1.124)
- Microsoft.Extensions.ObjectPool (>= 10.0.1)
- morelinq (>= 4.4.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.