CounterpointCollective.Dataflow.Composable 10.1.153

dotnet add package CounterpointCollective.Dataflow.Composable --version 10.1.153
                    
NuGet\Install-Package CounterpointCollective.Dataflow.Composable -Version 10.1.153
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="CounterpointCollective.Dataflow.Composable" Version="10.1.153" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="CounterpointCollective.Dataflow.Composable" Version="10.1.153" />
                    
Directory.Packages.props
<PackageReference Include="CounterpointCollective.Dataflow.Composable" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add CounterpointCollective.Dataflow.Composable --version 10.1.153
                    
#r "nuget: CounterpointCollective.Dataflow.Composable, 10.1.153"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package CounterpointCollective.Dataflow.Composable@10.1.153
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=CounterpointCollective.Dataflow.Composable&version=10.1.153
                    
Install as a Cake Addin
#tool nuget:?package=CounterpointCollective.Dataflow.Composable&version=10.1.153
                    
Install as a Cake Tool

CounterpointCollective.Dataflow.Composable

Build Status NuGet Docs

CounterpointCollective.Dataflow.Composable is a library designed to simplify the construction DataflowBlock pipelines. It provides a collection of modular, reusable blocks for processing and transforming data, enabling developers to build complex workflows with composability, scalability, and clarity.

Documentation: Browse the docs

Source & contributions: GitLab repository

Purpose

The CounterpointCollective.Dataflow.Composable library makes building complex, high-performance data processing pipelines in .NET easier, safer, and more flexible. It extends TPL Dataflow with a set of modular building blocks that solve real-world problems developers frequently face but that the base library does not address.

In short, the library helps you construct powerful, adaptable, and production-grade Dataflow solutions with minimal friction, while staying fully compatible with the familiar TPL Dataflow model.

Features

  • BoundedBlocks: Allow adding bounded capacity and real-time item counting to any Dataflow propagator block, including user-composed or encapsulated blocks, enabling true compositionality.
  • ResizableBlocks: Dynamically adjusts the bounded capacity of any block at runtime.
  • AutoScalingBlock: Automatically determines and optimizes batch sizes at runtime by measuring throughput, ensuring efficient processing without manual tuning.
  • OrderPreservingChoiceBlock: Routes items to different targets based on a predicate while preserving the original order of the items.
  • ParallelBlock: Sends messages to multiple blocks in parallel and recombines the results, facilitating concurrent processing.
  • GroupAdjacentBlock: Groups consecutive items that share a key or predicate into batches, simplifying aggregation and batch processing.
  • PriorityBufferBlock: Delivers highest-priority messages first, dynamically reordering when new higher-priority items arrive.

Installation

Clone the repository:

git clone https://gitlab.com/counterpointcollective/composabledataflowblocks.git

Install dependencies (if any):

# Example for .NET/C# projects
dotnet restore

Usage Example

Example 1: Bounding Your Own Composed Block

//Example of using a BoundedPropagatorBlock to add bounded capacity and item counting

using CounterpointCollective.Dataflow;

//First we create a DataflowBlock ourselves, composed of multiple subblocks.
var b = new BufferBlock<int>(new() { BoundedCapacity = DataflowBlockOptions.Unbounded });
var t = new TransformBlock<int, int>(async i =>
{
    await Task.Yield(); //simulate some work.
    return i + 1;
}, new() { BoundedCapacity = DataflowBlockOptions.Unbounded });
b.LinkTo(t, new DataflowLinkOptions() { PropagateCompletion = true });
var ourOwnDataflowBlock = DataflowBlock.Encapsulate(b, t);


//Now we want to
// - put a boundedCapacity on our new block and
// - be able to count how many items are contained with it, at any given time
//This is what we will use a BoundedPropagatorBlock for.

var testSubject = new BoundedPropagatorBlock<int,int>(ourOwnDataflowBlock, boundedCapacity: 2000);

//Thus we enabled a bounded capacity of 2000 messages, and real-time counting on our own custom DataflowBlock!

Assert.Equal(0, testSubject.Count);

//we should be able to push synchronously up to the bounded capacity.
for (var i = 0; i < 2000; i++)
{
    Assert.True(testSubject.Post(i));
    Assert.Equal(i + 1, testSubject.Count); //count is administered properly
}

Example 2: Making any DataflowBlock dynamically resizable

//Example showing that you can dynamically resize the bounded capacity of any block by wrapping it into a BoundedPropagatorBlock
var bufferBlock = new BufferBlock<int>(new() { BoundedCapacity = DataflowBlockOptions.Unbounded });
var resizableBufferBlock = new BoundedPropagatorBlock<int,int>(bufferBlock);

//We did not specify a bounded capacity, so it defaults to DataflowBlockOptions.Unbounded

Assert.True(resizableBufferBlock.Post(1));

//But we can dynamically set the bounded capacity at any point.
resizableBufferBlock.BoundedCapacity = 2;
Assert.True(resizableBufferBlock.Post(2));
Assert.False(resizableBufferBlock.Post(3));

resizableBufferBlock.BoundedCapacity = 3;
Assert.True(resizableBufferBlock.Post(3));

Example 3: Automatically optimizing batch sizes in real-time

// Example: Using AutoScaling on a ResizableBatchTransformBlock
// For demonstration: assume our workload performs best when batch size is ~100 items.
async Task<IEnumerable<int>> ProcessBatch(int[] batch)
{
    var distanceFromOptimal = Math.Abs(batch.Length - 100);
    await Task.Delay(distanceFromOptimal * batch.Length); // Simulate slower processing when batch isn't optimal
    return batch;
}

// Create a ResizableBatchTransformBlock using our ProcessBatch function.
var testSubject = new ResizableBatchTransformBlock<int, int>(
    ProcessBatch,
    initialBatchSize: 1,
    new ExecutionDataflowBlockOptions { BoundedCapacity = 10000 }
);

// Batch size can be manually adjusted:
testSubject.BatchSize = 5;

// Or automatically optimized using AutoScaling:
testSubject.EnableAutoScaling(
    new DefaultBatchSizeStrategy(minBatchSize: 1, maxBatchSize: 200)
);

// Send some work:
for (var i = 0; i < 10000; i++)
{
    await testSubject.SendAsync(i);
}

// Process outputs while AutoScaling gradually converges toward the optimal batch size (~100).
for (var i = 0; i < 10000; i++)
{
    var result = await testSubject.ReceiveAsync();
}

Language Composition

ComposableDataflowBlocks is written in C#.

Contributing

Contributions are welcome! If you'd like to suggest improvements, report bugs, or contribute new dataflow blocks, please open an issue or submit a pull request.

  1. Fork the repo.
  2. Create your feature branch (git checkout -b feature/MyFeature).
  3. Commit your changes (git commit -am 'Add new feature').
  4. Push to the branch (git push origin feature/MyFeature).
  5. Open a Merge Request.

License

Distributed under the MIT License. See LICENSE for more information.

Resources

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
10.1.153 94 12/29/2025
10.1.146 181 12/22/2025
10.1.143 255 12/15/2025