Tpl.Dataflow.Builder.Abstractions 0.2.3

dotnet add package Tpl.Dataflow.Builder.Abstractions --version 0.2.3
                    
NuGet\Install-Package Tpl.Dataflow.Builder.Abstractions -Version 0.2.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Tpl.Dataflow.Builder.Abstractions" Version="0.2.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Tpl.Dataflow.Builder.Abstractions" Version="0.2.3" />
                    
Directory.Packages.props
<PackageReference Include="Tpl.Dataflow.Builder.Abstractions" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Tpl.Dataflow.Builder.Abstractions --version 0.2.3
                    
#r "nuget: Tpl.Dataflow.Builder.Abstractions, 0.2.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Tpl.Dataflow.Builder.Abstractions@0.2.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Tpl.Dataflow.Builder.Abstractions&version=0.2.3
                    
Install as a Cake Addin
#tool nuget:?package=Tpl.Dataflow.Builder.Abstractions&version=0.2.3
                    
Install as a Cake Tool

DataflowPipelineBuilder

CI .NET License: MIT

A fluent builder pattern library for creating System.Threading.Tasks.Dataflow pipelines with type-safe chaining, automatic block linking, and built-in completion propagation.

Features

  • Fluent API - Chain dataflow blocks naturally with IntelliSense-friendly syntax
  • Type Safety - Full compile-time type checking between pipeline stages
  • Auto-linking - Blocks are automatically linked with completion propagation
  • Custom Block Base Classes - PropagatorBlock<T,T> and AsyncPropagatorBlock<T,T> for easy custom blocks
  • Dependency Injection - Optional IServiceProvider integration for DI-based block resolution
  • Keyed Services - Support for .NET 8+ keyed service resolution
  • Channel Integration - Use System.Threading.Channels as input source or output sink
  • IAsyncEnumerable Support - Consume pipeline output as async streams
  • IObservable Support - Integrate with Reactive Extensions
  • Named Blocks - Optional naming for debugging and diagnostics
  • Cancellation Support - Built-in cancellation token propagation
  • AOT Compatible - No reflection - fully compatible with Native AOT compilation

Installation

# Main library
dotnet add package Tpl.Dataflow.Builder

# Abstractions only (for consumers/interfaces)
dotnet add package Tpl.Dataflow.Builder.Abstractions

Quick Start

using Tpl.Dataflow.Builder;

// Create a simple transform pipeline
await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>()
    .AddTransformBlock<string>(x => $"Number: {x}")
    .Build();

// Post data
for (int i = 1; i <= 5; i++)
    pipeline.Post(i);

// Signal completion
pipeline.Complete();

// Consume results as IAsyncEnumerable
await foreach (var result in pipeline.ToAsyncEnumerable())
{
    Console.WriteLine(result);
}

Pipeline Examples

Async Transform with Parallelism

await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<string>()
    .AddTransformBlock<string>(
        async url => 
        {
            await Task.Delay(100);
            return $"Processed: {url}";
        },
        options: new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 })
    .Build();

Batch Processing

await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>()
    .AddBatchBlock(batchSize: 3)
    .AddTransformBlock<string>(batch => 
        $"Batch: [{string.Join(", ", batch)}]")
    .Build();

Terminal Pipeline (ActionBlock)

// AddActionBlock returns DataflowPipelineBuilder<TInput>
// which only has Build() - returns IDataflowPipeline<TInput> (base interface)
await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>()
    .AddTransformBlock<string>(x => $"Item #{x}")
    .AddActionBlock(item => Console.WriteLine($"Processing: {item}"))
    .Build();

await pipeline.Completion;

TransformMany (1:N Expansion)

await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<string>()
    .AddTransformManyBlock<char>(s => s.ToCharArray())
    .Build();

Named Blocks for Debugging

await using var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>("InputBuffer")
    .AddTransformBlock<int>(x => x * 2, "Doubler")
    .AddTransformBlock<string>(x => x.ToString(), "Stringifier")
    .Build();

Channel Integration

Channel as Input Source
var inputChannel = Channel.CreateUnbounded<string>();

await using var pipeline = new DataflowPipelineBuilder()
    .FromChannelSource(inputChannel)
    .AddTransformBlock(int.Parse)
    .AddActionBlock(Console.WriteLine)
    .Build();

// Write to channel from producer
await inputChannel.Writer.WriteAsync("42");
inputChannel.Writer.Complete();

await pipeline.Completion;
Channel as Output Sink
var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>()
    .AddTransformBlock(x => x * 2)
    .BuildAsChannel();  // Returns IDataflowChannelPipeline

pipeline.Post(21);
pipeline.Complete();

// Read from output channel
await foreach (var item in pipeline.Output.ReadAllAsync())
{
    Console.WriteLine(item); // 42
}
Full Channel-to-Channel Pipeline
var inputChannel = Channel.CreateUnbounded<int>();

var pipeline = new DataflowPipelineBuilder()
    .FromChannelSource(inputChannel)
    .AddTransformBlock(x => x * 2)
    .AddTransformBlock(x => $"Result: {x}")
    .BuildAsChannel(new BoundedChannelOptions(100)); // Bounded output

// Producer
await inputChannel.Writer.WriteAsync(21);
inputChannel.Writer.Complete();

// Consumer
await foreach (var result in pipeline.Output.ReadAllAsync())
{
    Console.WriteLine(result); // "Result: 42"
}

Custom Propagator Blocks

Create reusable custom blocks by inheriting from PropagatorBlock<TIn, TOut> (sync) or AsyncPropagatorBlock<TIn, TOut> (async):

Synchronous Custom Block
using Tpl.Dataflow.Builder.Abstractions;

public class MultiplierBlock : PropagatorBlock<int, int>
{
    private readonly int _factor;
    
    public MultiplierBlock(int factor) => _factor = factor;
    
    protected override int Transform(int input) => input * _factor;
}

// Usage
var pipeline = new DataflowPipelineBuilder()
    .AddBufferBlock<int>()
    .AddCustomBlock(new MultiplierBlock(10))
    .AddActionBlock(Console.WriteLine)
    .Build();
Asynchronous Custom Block
using Tpl.Dataflow.Builder.Abstractions;

public class HttpFetchBlock : AsyncPropagatorBlock<string, string>
{
    private readonly HttpClient _httpClient;
    
    public HttpFetchBlock(HttpClient httpClient) => _httpClient = httpClient;
    
    protected override async Task<string> TransformAsync(string url)
    {
        return await _httpClient.GetStringAsync(url);
    }
}

// Usage with options
public class ThrottledProcessor : AsyncPropagatorBlock<int, int>
{
    public ThrottledProcessor() 
        : base(new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 })
    { }
    
    protected override async Task<int> TransformAsync(int input)
    {
        await Task.Delay(100);
        return input * 2;
    }
}

Dependency Injection Integration

Basic DI Resolution
var services = new ServiceCollection()
    .AddSingleton<MyCustomBlock>()
    .AddTransient<AnotherBlock>()
    .BuildServiceProvider();

var pipeline = new DataflowPipelineBuilder(serviceProvider: services)
    .AddBufferBlock<int>()
    .AddCustomBlock<MyCustomBlock, int>()        // Resolved from DI
    .AddCustomBlock<AnotherBlock, string>()      // Resolved from DI
    .AddActionBlock(Console.WriteLine)
    .Build();
Keyed Services (.NET 8+)

Use keyed services when you have multiple implementations of the same interface:

// Register keyed services
var services = new ServiceCollection()
    .AddKeyedSingleton<IMultiplierBlock, DoublerBlock>("double")
    .AddKeyedSingleton<IMultiplierBlock, TriplerBlock>("triple")
    .BuildServiceProvider();

// Use specific implementation by key
var doublePipeline = new DataflowPipelineBuilder(serviceProvider: services)
    .AddBufferBlock<int>()
    .AddKeyedCustomBlock<IMultiplierBlock, int>("double")
    .AddActionBlock(x => Console.WriteLine($"Doubled: {x}"))
    .Build();

var triplePipeline = new DataflowPipelineBuilder(serviceProvider: services)
    .AddBufferBlock<int>()
    .AddKeyedCustomBlock<IMultiplierBlock, int>("triple")
    .AddActionBlock(x => Console.WriteLine($"Tripled: {x}"))
    .Build();

doublePipeline.Post(5);  // Output: "Doubled: 10"
triplePipeline.Post(5);  // Output: "Tripled: 15"

Supported Blocks

Block Type Method Description
BufferBlock AddBufferBlock<T>() Stores messages for later consumption
TransformBlock AddTransformBlock<TOut>(Func) Transforms each input to one output
TransformManyBlock AddTransformManyBlock<TOut>(Func) Transforms each input to multiple outputs
BatchBlock AddBatchBlock(batchSize) Groups inputs into arrays
ActionBlock AddActionBlock(Action) Terminal block that consumes inputs
Custom (instance) AddCustomBlock(IPropagatorBlock) Add a custom propagator block instance
Custom (factory) AddCustomBlock(Func<IPropagatorBlock>) Add a custom block via factory
Custom (DI) AddCustomBlock<TBlock, TOut>() Resolve block from IServiceProvider
Custom (Keyed DI) AddKeyedCustomBlock<TBlock, TOut>(key) Resolve keyed service from IServiceProvider
Channel Source FromChannelSource(Channel/ChannelReader) Start pipeline from a channel

Build Methods

Method Returns Description
Build() IDataflowPipeline<TIn, TOut> Standard pipeline with output
Build() (after ActionBlock) IDataflowPipeline<TIn> Terminal pipeline (no output)
BuildAsChannel() IDataflowChannelPipeline<TIn, TOut> Pipeline with Channel output

Output Consumption Methods

// IAsyncEnumerable (preferred)
await foreach (var item in pipeline.ToAsyncEnumerable())
    Console.WriteLine(item);

// IObservable (Rx integration)
var subscription = pipeline.AsObservable()
    .Subscribe(item => Console.WriteLine(item));

// Single receive
var item = await pipeline.ReceiveAsync();

// Try receive (non-blocking)
if (pipeline.TryReceive(out var item))
    Console.WriteLine(item);

Default Options

  • Link Options: PropagateCompletion = true (automatic completion propagation)
  • Cancellation: Default token can be set in builder constructor
  • Block Names: Auto-generated as {BlockType}_{Index} if not specified
  • EnsureOrdered: false by default on execution blocks for better parallel performance

Abstract Base Classes

The Tpl.Dataflow.Builder.Abstractions package provides base classes for creating custom blocks:

Class Description
PropagatorBlock<TIn, TOut> Base for synchronous transforms - override Transform(TIn)
AsyncPropagatorBlock<TIn, TOut> Base for async transforms - override TransformAsync(TIn)

Both classes:

  • Handle all IPropagatorBlock<TIn, TOut> interface plumbing
  • Accept optional ExecutionDataflowBlockOptions in constructor
  • Expose Completion, InputCount, OutputCount properties
  • Support Complete() and Fault() methods

Target Frameworks

  • .NET 8.0
  • .NET 10.0

License

MIT License - see LICENSE for details.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Tpl.Dataflow.Builder.Abstractions:

Package Downloads
Tpl.Dataflow.Builder

A fluent builder for TPL Dataflow pipelines. Simplifies creation and management of dataflow pipelines with type-safe chaining and automatic block linking.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.2.3 4,959 12/5/2025
0.2.3-preview1 209 12/5/2025
0.2.2 808 12/3/2025
0.2.1 691 12/3/2025
0.2.0 155 11/28/2025
0.2.0-preview2 151 11/28/2025
0.2.0-preview1 153 11/28/2025
0.1.0 212 11/27/2025
0.1.0-preview2 209 11/26/2025
0.1.0-preview1 208 11/26/2025