SCB.DataPipe 3.0.3

There is a newer version of this package available.
See the version list below for details.
dotnet add package SCB.DataPipe --version 3.0.3
                    
NuGet\Install-Package SCB.DataPipe -Version 3.0.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SCB.DataPipe" Version="3.0.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="SCB.DataPipe" Version="3.0.3" />
                    
Directory.Packages.props
<PackageReference Include="SCB.DataPipe" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add SCB.DataPipe --version 3.0.3
                    
#r "nuget: SCB.DataPipe, 3.0.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package SCB.DataPipe@3.0.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=SCB.DataPipe&version=3.0.3
                    
Install as a Cake Addin
#tool nuget:?package=SCB.DataPipe&version=3.0.3
                    
Install as a Cake Tool

DataPipe

NuGet

DataPipe

Lightweight, flexible, and composable message pipeline framework for .NET

DataPipe lets you orchestrate complex workflows in a simple, maintainable way. Build pipelines with reusable filters, apply cross-cutting concerns via aspects, and handle conditional logic, iterations, and retries � all in a consistent, readable style.

Whether you're integrating with client systems, processing batches, building readable controller endpoints, or creating background services, DataPipe keeps your codebase clean and predictable. Spend less time wrestling with architecture and more time delivering functionality.

  • Composable Filters: Chain together any number of filters for a single pipeline.
  • Scoped Resource Management: Open SQL connections, transactions, or other resources exactly where needed.
  • Cross-Cutting Concerns: Logging, exception handling, and auditing via Aspects.
  • Conditional Execution: Policy, IfTrue, RepeatUntil, and ForEach filters make complex workflows readable.
  • Async Retries: OnTimeoutRetry allows fine-grained retry strategies with async delays.
  • Minimal & Maintainable: Small footprint, easy to onboard new developers, scales from simple tasks to large integrations.

Features

  • Composable Filters � chain together any number of filters to build workflows that are clear and modular.
  • Aspects for Cross-Cutting Concerns � logging, exception handling, and more. Add your own too!
  • Scoped Resource Management: Open SQL connections, transactions, or other resources exactly where needed.
  • Conditional Execution: Policy, IfTrue, RepeatUntil, and ForEach filters make complex workflows readable.
  • Async-Ready RetriesOnTimeoutRetry supports fully asynchronous retry logic with custom delay strategies.
  • Flow Control � stop pipeline execution safely with PipelineExecution.Stop().
  • Integration Friendly � works well with APIs, console apps, background services, WinForms, and more.
  • Minimal Footprint � small, maintainable, and easy for teams to learn.

DataPipe is fully open-source and ready to plug into your .NET projects today.


Installation

Install via NuGet:

dotnet add package DataPipe

Or via the Package Manager Console:

Install-Package DataPipe

Basic Usage

Define a message type:

public class TestMessage : BaseMessage
{
    public int Number { get; set; }
}

Create a pipeline:

var pipe = new DataPipe<TestMessage>();

// Add aspects
pipe.Use(new ExceptionAspect<TestMessage>());
pipe.Use(new BasicLoggingAspect<TestMessage>("TestPipeline"));

// Register filters
pipe.Run(new IncrementingNumberFilter());
pipe.Run(new Policy<TestMessage>(msg =>
{
    return msg.Number == 0 
	? new IncrementingNumberFilter()
	: new DecrementingNumberFilter();
}));

// Execute pipeline
var message = new TestMessage { Number = 0 };

await pipe.Invoke(message);

Conditional Execution Examples

IfTrue:

pipe.Run(
	new IfTrue<TestMessage>(msg => msg.Number > 0,
		new IncrementingNumberFilter()
	));

Policy:

pipe.Run(new Policy<TestMessage>(msg =>
{
    return msg.Number switch
    {
        0 => new IncrementingNumberFilter(),
        1 => new DecrementingNumberFilter(),
        _ => null
    };
}));

Retry Example with OnTimeoutRetry

pipe.Run(
	new OnTimeoutRetry<TestMessage>(maxRetries: 3,
		new MockHttpErroringFilter()
	));

Flow Control

Stop the pipeline gracefully:

public class CancelFilter : Filter<TestMessage>
{
    public Task Execute(TestMessage msg)
    {
        msg.Execution.Stop("User requested cancellation");
        return Task.CompletedTask;
    }
}

When to Use

DataPipe is ideal for:

  • Complex API integrations
  • Simplifying controller logic
  • Batch processing pipelines
  • Background services or scheduled tasks
  • Scenarios requiring retries, conditional logic, or transaction scoping
  • Teams looking to reduce boilerplate and improve maintainability

License

MIT License � see LICENSE file for details.

Real-World Usage

DataPipe has been successfully used in production systems for:

  • Processing incoming API requests with complex validation and transformation logic.
  • Implementing background jobs that require retries and error handling.
  • Managing stateful workflows in long-running processes.
  • Simplifying data processing pipelines in ETL scenarios.

An at-a-glance example

Below is an example pipeline for processing incoming orders from an external system, handling retries, conditional logic, and logging, all in a readable, composable structure.


using DataPipe.Core;
using DataPipe.Core.Sql;
using System;
using System.Threading.Tasks;

// 1. Define a message type
public class OrderMessage : BaseMessage, ISqlCommand
{
    public string OrderId { get; set; }
    public bool IsValid { get; set; }
    public bool RequiresSpecialProcessing { get; set; }
	public SqlCommand SqlCommand { get; set; } // created by OpenSqlConnection filter	
}

// 2. Define reusable filters
public class ValidateOrder : Filter<OrderMessage>
{
    public async Task Execute(OrderMessage msg)
    {
        msg.IsValid = !string.IsNullOrEmpty(msg.OrderId);
        msg.OnLog?.Invoke($"Order {msg.OrderId} validation: {msg.IsValid}");
        await Task.CompletedTask;
    }
}

public class ProcessStandardOrder : Filter<OrderMessage>
{
    public async Task Execute(OrderMessage msg)
    {
        msg.OnLog?.Invoke($"Processing standard order {msg.OrderId}");
        await Task.CompletedTask;
    }
}

public class ProcessSpecialOrder : Filter<OrderMessage>
{
    public async Task Execute(OrderMessage msg)
    {
        msg.OnLog?.Invoke($"Processing special order {msg.OrderId}");
        await Task.CompletedTask;
    }
}

// 3. Compose the pipeline
public async Task RunOrderPipeline()
{
    var msg = new OrderMessage { OrderId = "ORD123", RequiresSpecialProcessing = true };

    var pipeline = new DataPipe<OrderMessage>();

    // Cross-cutting concerns
    pipeline.Use(new BasicLoggingAspect<OrderMessage>());
    pipeline.Use(new ExceptionAspect<OrderMessage>());

	// basic filter to validate the order
    pipeline.Run(new ValidateOrder());

    // Conditional processing using Policy
    pipeline.Run(new Policy<OrderMessage>(m =>
    {
        if (!m.IsValid) new MarkOrderAsFailed();

        return m.RequiresSpecialProcessing 
            ? new RaiseSpecialOrderHandlingRequiredNotification() 
            : new OnTimeoutRetry<OrderMessage>(maxRetries: 3,
				new StartTransaction(
					new OpenSqlConnection(
						new ProcessStandardOrder() // Example retryable filter
					)));
    }));

    // Execute the pipeline
    await pipeline.Invoke(msg);
}

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
  • net8.0

    • No dependencies.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on SCB.DataPipe:

Package Downloads
SCB.DataPipe.Sql

DataPipe.Sql: Ready-made transaction and SQL filters for DataPipe.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
4.5.7 111 1/30/2026
4.5.6 103 1/14/2026
4.5.5 100 1/13/2026
4.5.4 102 1/12/2026
4.5.3 101 1/12/2026
4.5.2 104 1/11/2026
4.5.1 104 1/10/2026
4.5.0 101 1/9/2026
4.4.9 105 1/8/2026
4.4.8 99 1/7/2026
4.4.7 100 1/7/2026
4.4.6 107 1/7/2026
4.4.5 108 1/7/2026
4.4.4 103 1/7/2026
4.4.3 99 1/6/2026
4.4.2 104 1/6/2026
4.4.1 101 1/6/2026
4.4.0 104 1/6/2026
4.3.9 102 1/6/2026
4.3.8 101 1/6/2026
4.3.7 107 1/5/2026
4.3.6 99 1/4/2026
4.3.5 112 1/4/2026
4.3.4 100 1/4/2026
4.3.3 97 1/4/2026
4.3.2 105 1/4/2026
4.3.1 105 1/4/2026
4.3.0 99 1/4/2026
4.2.9 109 1/4/2026
4.2.8 105 1/3/2026
4.2.7 103 1/3/2026
4.2.6 109 1/3/2026
4.2.5 110 1/2/2026
4.2.4 114 1/1/2026
4.2.3 111 12/30/2025
4.2.2 107 12/30/2025
4.2.1 116 12/27/2025
4.2.0 112 12/27/2025
4.1.0 114 12/27/2025
4.0.1 106 12/27/2025
4.0.0 200 12/24/2025
3.0.4 190 12/23/2025
3.0.3 186 12/23/2025
3.0.2 191 12/22/2025
3.0.1 274 12/22/2025 3.0.1 is deprecated because it is no longer maintained.
3.0.0 262 12/22/2025 3.0.0 is deprecated because it is no longer maintained.
2.1.1 255 1/18/2024
2.1.0 309 1/11/2024 2.1.0 is deprecated because it is no longer maintained.
2.0.0 547 2/14/2023 2.0.0 is deprecated because it is no longer maintained.
1.0.2 834 7/1/2021 1.0.2 is deprecated because it is no longer maintained.
1.0.1 686 6/26/2021 1.0.1 is deprecated because it is no longer maintained.
1.0.0 972 6/24/2021 1.0.0 is deprecated because it has critical bugs.