Tpl.Dataflow.Builder 0.2.3

dotnet add package Tpl.Dataflow.Builder --version 0.2.3
                    
NuGet\Install-Package Tpl.Dataflow.Builder -Version 0.2.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Tpl.Dataflow.Builder" Version="0.2.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Tpl.Dataflow.Builder" Version="0.2.3" />
                    
Directory.Packages.props
<PackageReference Include="Tpl.Dataflow.Builder" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Tpl.Dataflow.Builder --version 0.2.3
                    
#r "nuget: Tpl.Dataflow.Builder, 0.2.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Tpl.Dataflow.Builder@0.2.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Tpl.Dataflow.Builder&version=0.2.3
                    
Install as a Cake Addin
#tool nuget:?package=Tpl.Dataflow.Builder&version=0.2.3
                    
Install as a Cake Tool

DataflowPipelineBuilder

CI .NET License: MIT

A fluent builder pattern library for creating System.Threading.Tasks.Dataflow pipelines with type-safe chaining, automatic block linking, and built-in completion propagation.

Features

  • Fluent API - Chain dataflow blocks naturally with IntelliSense-friendly syntax
  • Type Safety - Full compile-time type checking between pipeline stages
  • Auto-linking - Blocks are automatically linked with completion propagation
  • Custom Block Base Classes - PropagatorBlock<T,T> and AsyncPropagatorBlock<T,T> for easy custom blocks
  • Dependency Injection - Optional IServiceProvider integration for DI-based block resolution
  • Keyed Services - Support for .NET 8+ keyed service resolution
  • Channel Integration - Use System.Threading.Channels as input source or output sink
  • IAsyncEnumerable Support - Consume pipeline output as async streams
  • IObservable Support - Integrate with Reactive Extensions
  • Named Blocks - Optional naming for debugging and diagnostics
  • Cancellation Support - Built-in cancellation token propagation
  • AOT Compatible - No reflection - fully compatible with Native AOT compilation

Installation

# Main library
dotnet add package Tpl.Dataflow.Builder

# Abstractions only (for consumers/interfaces)
dotnet add package Tpl.Dataflow.Builder.Abstractions

Quick Start

using Tpl.Dataflow.Builder;

// Create a simple transform pipeline
await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>()
    .AddTransformBlock<string>(x => $"Number: {x}")
    .Build();

// Post data
for (int i = 1; i <= 5; i++)
    pipeline.Post(i);

// Signal completion
pipeline.Complete();

// Consume results as IAsyncEnumerable
await foreach (var result in pipeline.ToAsyncEnumerable())
{
    Console.WriteLine(result);
}

Pipeline Examples

Async Transform with Parallelism

await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<string>()
    .AddTransformBlock<string>(
        async url => 
        {
            await Task.Delay(100);
            return $"Processed: {url}";
        },
        options: new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 })
    .Build();

Batch Processing

await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>()
    .AddBatchBlock(batchSize: 3)
    .AddTransformBlock<string>(batch => 
        $"Batch: [{string.Join(", ", batch)}]")
    .Build();

Terminal Pipeline (ActionBlock)

// AddActionBlock returns DataflowPipelineBuilder<TInput>
// which only has Build() - returns IDataflowPipeline<TInput> (base interface)
await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>()
    .AddTransformBlock<string>(x => $"Item #{x}")
    .AddActionBlock(item => Console.WriteLine($"Processing: {item}"))
    .Build();

await pipeline.Completion;

TransformMany (1:N Expansion)

await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<string>()
    .AddTransformManyBlock<char>(s => s.ToCharArray())
    .Build();

Named Blocks for Debugging

await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>("InputBuffer")
    .AddTransformBlock<int>(x => x * 2, "Doubler")
    .AddTransformBlock<string>(x => x.ToString(), "Stringifier")
    .Build();

Channel Integration

Channel as Input Source
var inputChannel = Channel.CreateUnbounded<string>();

await using var pipeline = new DataflowPipelineBuilder()
    .FromChannelSource(inputChannel)
    .AddTransformBlock(int.Parse)
    .AddActionBlock(Console.WriteLine)
    .Build();

// Write to channel from producer
await inputChannel.Writer.WriteAsync("42");
inputChannel.Writer.Complete();

await pipeline.Completion;
Channel as Output Sink
var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>()
    .AddTransformBlock(x => x * 2)
    .BuildAsChannel();  // Returns IDataflowChannelPipeline

pipeline.Post(21);
pipeline.Complete();

// Read from output channel
await foreach (var item in pipeline.Output.ReadAllAsync())
{
    Console.WriteLine(item); // 42
}
Full Channel-to-Channel Pipeline
var inputChannel = Channel.CreateUnbounded<int>();

var pipeline = new DataflowPipelineBuilder()
    .FromChannelSource(inputChannel)
    .AddTransformBlock(x => x * 2)
    .AddTransformBlock(x => $"Result: {x}")
    .BuildAsChannel(new BoundedChannelOptions(100)); // Bounded output

// Producer
await inputChannel.Writer.WriteAsync(21);
inputChannel.Writer.Complete();

// Consumer
await foreach (var result in pipeline.Output.ReadAllAsync())
{
    Console.WriteLine(result); // "Result: 42"
}

Custom Propagator Blocks

Create reusable custom blocks by inheriting from PropagatorBlock<TIn, TOut> (sync) or AsyncPropagatorBlock<TIn, TOut> (async):

Synchronous Custom Block
using Tpl.Dataflow.Builder.Abstractions;

public class MultiplierBlock : PropagatorBlock<int, int>
{
    private readonly int _factor;
    
    public MultiplierBlock(int factor) => _factor = factor;
    
    protected override int Transform(int input) => input * _factor;
}

// Usage
var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>()
    .AddCustomBlock(new MultiplierBlock(10))
    .AddActionBlock(Console.WriteLine)
    .Build();
Asynchronous Custom Block
using Tpl.Dataflow.Builder.Abstractions;

public class HttpFetchBlock : AsyncPropagatorBlock<string, string>
{
    private readonly HttpClient _httpClient;
    
    public HttpFetchBlock(HttpClient httpClient) => _httpClient = httpClient;
    
    protected override async Task<string> TransformAsync(string url)
    {
        return await _httpClient.GetStringAsync(url);
    }
}

// Usage with options
public class ThrottledProcessor : AsyncPropagatorBlock<int, int>
{
    public ThrottledProcessor() 
        : base(new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 })
    { }
    
    protected override async Task<int> TransformAsync(int input)
    {
        await Task.Delay(100);
        return input * 2;
    }
}

Dependency Injection Integration

Basic DI Resolution
var services = new ServiceCollection()
    .AddSingleton<MyCustomBlock>()
    .AddTransient<AnotherBlock>()
    .BuildServiceProvider();

var pipeline = new DataflowPipelineBuilder(serviceProvider: services)
    .AddBufferBlock<int>()
    .AddCustomBlock<MyCustomBlock, int>()        // Resolved from DI
    .AddCustomBlock<AnotherBlock, string>()      // Resolved from DI
    .AddActionBlock(Console.WriteLine)
    .Build();
Keyed Services (.NET 8+)

Use keyed services when you have multiple implementations of the same interface:

// Register keyed services
var services = new ServiceCollection()
    .AddKeyedSingleton<IMultiplierBlock, DoublerBlock>("double")
    .AddKeyedSingleton<IMultiplierBlock, TriplerBlock>("triple")
    .BuildServiceProvider();

// Use specific implementation by key
var doublePipeline = new DataflowPipelineBuilder(serviceProvider: services)
    .AddBufferBlock<int>()
    .AddKeyedCustomBlock<IMultiplierBlock, int>("double")
    .AddActionBlock(x => Console.WriteLine($"Doubled: {x}"))
    .Build();

var triplePipeline = new DataflowPipelineBuilder(serviceProvider: services)
    .AddBufferBlock<int>()
    .AddKeyedCustomBlock<IMultiplierBlock, int>("triple")
    .AddActionBlock(x => Console.WriteLine($"Tripled: {x}"))
    .Build();

doublePipeline.Post(5);  // Output: "Doubled: 10"
triplePipeline.Post(5);  // Output: "Tripled: 15"

Supported Blocks

Block Type Method Description
BufferBlock AddBufferBlock<T>() Stores messages for later consumption
TransformBlock AddTransformBlock<TOut>(Func) Transforms each input to one output
TransformManyBlock AddTransformManyBlock<TOut>(Func) Transforms each input to multiple outputs
BatchBlock AddBatchBlock(batchSize) Groups inputs into arrays
ActionBlock AddActionBlock(Action) Terminal block that consumes inputs
Custom (instance) AddCustomBlock(IPropagatorBlock) Add a custom propagator block instance
Custom (factory) AddCustomBlock(Func<IPropagatorBlock>) Add a custom block via factory
Custom (DI) AddCustomBlock<TBlock, TOut>() Resolve block from IServiceProvider
Custom (Keyed DI) AddKeyedCustomBlock<TBlock, TOut>(key) Resolve keyed service from IServiceProvider
Channel Source FromChannelSource(Channel/ChannelReader) Start pipeline from a channel

Build Methods

Method Returns Description
Build() IDataflowPipeline<TIn, TOut> Standard pipeline with output
Build() (after ActionBlock) IDataflowPipeline<TIn> Terminal pipeline (no output)
BuildAsChannel() IDataflowChannelPipeline<TIn, TOut> Pipeline with Channel output

Output Consumption Methods

// IAsyncEnumerable (preferred)
await foreach (var item in pipeline.ToAsyncEnumerable())
    Console.WriteLine(item);

// IObservable (Rx integration)
var subscription = pipeline.AsObservable()
    .Subscribe(item => Console.WriteLine(item));

// Single receive
var item = await pipeline.ReceiveAsync();

// Try receive (non-blocking)
if (pipeline.TryReceive(out var item))
    Console.WriteLine(item);

Default Options

  • Link Options: PropagateCompletion = true (automatic completion propagation)
  • Cancellation: Default token can be set in builder constructor
  • Block Names: Auto-generated as {BlockType}_{Index} if not specified
  • EnsureOrdered: false by default on execution blocks for better parallel performance

Abstract Base Classes

The Tpl.Dataflow.Builder.Abstractions package provides base classes for creating custom blocks:

Class Description
PropagatorBlock<TIn, TOut> Base for synchronous transforms - override Transform(TIn)
AsyncPropagatorBlock<TIn, TOut> Base for async transforms - override TransformAsync(TIn)

Both classes:

  • Handle all IPropagatorBlock<TIn, TOut> interface plumbing
  • Accept optional ExecutionDataflowBlockOptions in constructor
  • Expose Completion, InputCount, OutputCount properties
  • Support Complete() and Fault() methods

Target Frameworks

  • .NET 8.0
  • .NET 10.0

License

MIT License - see LICENSE for details.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.2.3 4,753 12/5/2025
0.2.3-preview1 198 12/5/2025
0.2.2 793 12/3/2025
0.2.1 683 12/3/2025
0.2.0 145 11/28/2025
0.2.0-preview2 136 11/28/2025
0.2.0-preview1 137 11/28/2025
0.1.0 193 11/27/2025
0.1.0-preview2 191 11/26/2025
0.1.0-preview1 191 11/26/2025