SCB.DataPipe
3.0.4
See the version list below for details.
dotnet add package SCB.DataPipe --version 3.0.4
NuGet\Install-Package SCB.DataPipe -Version 3.0.4
<PackageReference Include="SCB.DataPipe" Version="3.0.4" />
<PackageVersion Include="SCB.DataPipe" Version="3.0.4" />
<PackageReference Include="SCB.DataPipe" />
paket add SCB.DataPipe --version 3.0.4
#r "nuget: SCB.DataPipe, 3.0.4"
#:package SCB.DataPipe@3.0.4
#addin nuget:?package=SCB.DataPipe&version=3.0.4
#tool nuget:?package=SCB.DataPipe&version=3.0.4
DataPipe
Lightweight, flexible, and composable message pipeline framework for .NET
DataPipe lets you orchestrate complex workflows in a simple, maintainable way. Build pipelines with reusable filters, apply cross-cutting concerns via aspects, and handle conditional logic, iterations, and retries all in a consistent, readable style. v3.0 has been updated to include new scoped filters for policy decsisions, async-ready retry logic, and SQL connection management (DataPipe.Sql), and more. All the while being lightweight and easy to learn.
Whether you're integrating with client systems, processing batches, building readable controller endpoints, or creating background services, DataPipe keeps your codebase clean and predictable. Spend less time wrestling with architecture and more time delivering functionality.
Features
- Composable Filters chain together any number of filters to build workflows that are clear and modular.
- Aspects for Cross-Cutting Concerns logging, exception handling, and more. Add your own too!
- Scoped Resource Management: Open SQL connections, transactions, or other resources exactly where needed.
- Conditional Execution:
Policy,IfTrue,RepeatUntil, andForEachfilters make complex workflows readable. - Async-Ready Retries
OnTimeoutRetrysupports fully asynchronous retry logic with custom delay strategies. - inherently concurrent process multiple messages in parallel safely.
- Flow Control stop pipeline execution safely with
PipelineExecution.Stop(). - Integration Friendly works well with APIs, console apps, background services, WinForms, and more.
- Minimal Footprint small, maintainable, and easy for teams to learn.
DataPipe is fully open-source and ready to plug into your .NET projects today.
Installation
Install via NuGet:
dotnet add package SCB.DataPipe
Or via the Package Manager Console:
Install-Package SCB.DataPipe
Basic Usage
Start simple, grow as needed.
- simple lambda filter:
pipe.Run(async m => { await Task.Delay(0); });
- custom filter class to house more complex logic:
pipe.Run(new AddOrderHeader());
- execute conditionally based on message state:
pipe.Run(
new IfTrue<TestMessage>(msg => msg.Number > 0, new IncrementingNumberFilter())
);
- compose filters to group together or manage resources:
pipe.Run(new StartTransaction(
new OpenSqlConnection(...)));
Define a message type:
public class TestMessage : BaseMessage
{
public int Number { get; set; }
}
Create a pipeline:
var pipe = new DataPipe<TestMessage>();
// Add aspects
pipe.Use(new ExceptionAspect<TestMessage>());
pipe.Use(new BasicLoggingAspect<TestMessage>("TestPipeline"));
// Register filters
pipe.Run(new IncrementingNumberFilter());
pipe.Run(new Policy<TestMessage>(msg =>
{
return msg.Number == 0
? new IncrementingNumberFilter()
: new DecrementingNumberFilter();
}));
// Execute pipeline
var message = new TestMessage { Number = 0 };
await pipe.Invoke(message);
Examples
See the DataPipe.Tests project for more examples and test cases.
IfTrue:
pipe.Run(
new IfTrue<TestMessage>(msg => msg.Number > 0,
new IncrementingNumberFilter()
));
Policy:
pipe.Run(new Policy<TestMessage>(msg =>
{
return msg.Number switch
{
0 => new IncrementingNumberFilter(),
1 => new DecrementingNumberFilter(),
_ => null
};
}));
Retry Example with OnTimeoutRetry
pipe.Run(
new OnTimeoutRetry<TestMessage>(maxRetries: 3,
new MockHttpErroringFilter()
));
Flow Control
Stop the pipeline gracefully:
public class CancelFilter : Filter<TestMessage>
{
public Task Execute(TestMessage msg)
{
msg.Execution.Stop("User requested cancellation");
return Task.CompletedTask;
}
}
When to Use
DataPipe is ideal for:
- Complex API integrations
- Simplifying controller logic
- Batch processing pipelines
- Background services or scheduled tasks
- Scenarios requiring retries, conditional logic, or transaction scoping
- Teams looking to reduce boilerplate and improve maintainability
Real-World Usage
DataPipe has been successfully used since its inception more than a decade ago in production systems for:
- Processing incoming API requests with complex validation and transformation logic.
- Implementing background jobs that require retries and error handling.
- Managing stateful workflows in long-running processes.
- Simplifying data processing pipelines in ETL scenarios.
An at-a-glance example
Below is a more realistic example pipeline for processing incoming orders from an external system, handling retries, conditional logic, and logging, all in a readable, composable structure.
using DataPipe.Core;
using DataPipe.Core.Sql;
using System;
using System.Threading.Tasks;
// 1. Base infrastructure message - inherits the built-in BaseMessage
public class ClientContext : BaseMessage, ISqlCommand, IAmCommittable, IAmRetryable
{
public SqlCommand SqlCommand { get; set; }
public bool Commit { get; set; }
public int Attempt { get; set; }
public int MaxRetries { get; set; }
public Action<int> OnRetrying { get; set; } = attempt => Console.WriteLine($"Retrying attempt {attempt}...");
}
// 2. Domain message inherits that infrastructure
public class OrderMessage : ClientContext
{
public string OrderId { get; set; }
public bool IsValid { get; set; }
public bool RequiresSpecialProcessing { get; set; }
}
// 3. Filters
public class ValidateOrder : Filter<OrderMessage>
{
public async Task Execute(OrderMessage msg)
{
msg.IsValid = !string.IsNullOrEmpty(msg.OrderId);
msg.OnLog?.Invoke($"Order {msg.OrderId} validation: {msg.IsValid}");
await Task.CompletedTask;
}
}
public class ProcessStandardOrder : Filter<OrderMessage>
{
public async Task Execute(OrderMessage msg)
{
msg.OnLog?.Invoke($"Processing standard order {msg.OrderId}");
await Task.CompletedTask;
}
}
public class MarkOrderAsFailed : Filter<OrderMessage>
{
public async Task Execute(OrderMessage msg)
{
msg.OnLog?.Invoke($"Order {msg.OrderId} marked as failed.");
await Task.CompletedTask;
}
}
// 4. Pipeline
public async Task RunOrderPipeline()
{
var msg = new OrderMessage
{
OrderId = "ORD123",
RequiresSpecialProcessing = true,
// Optional: override OnRetrying for custom behavior
OnRetrying = attempt =>
{
Console.WriteLine($"Custom retry hook: attempt {attempt} for order ORD123");
}
};
var pipeline = new DataPipe<OrderMessage>();
// Cross-cutting concerns
pipeline.Use(new BasicLoggingAspect<OrderMessage>());
pipeline.Use(new ExceptionAspect<OrderMessage>());
// Retryable filter: OnTimeoutRetry will automatically call msg.OnRetrying on each retry
pipeline.Run(new OnTimeoutRetry<OrderMessage>(
maxRetries: 3,
new DownloadOrderFromClient<OrderMessage>(
new ValidateOrder()
)
));
// Conditional processing
pipeline.Run(new Policy<OrderMessage>(m =>
{
if (!m.IsValid) return new MarkOrderAsFailed();
return m.RequiresSpecialProcessing
? new RaiseSpecialOrderHandlingRequiredNotification()
: new OnTimeoutRetry<OrderMessage>(
maxRetries: 3,
new StartTransaction<OrderMessage>(
new OpenSqlConnection<OrderMessage>(
new ProcessStandardOrder()
)
)
);
}));
// Execute pipeline
await pipeline.Invoke(msg);
}
License
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- No dependencies.
NuGet packages (1)
Showing the top 1 NuGet packages that depend on SCB.DataPipe:
| Package | Downloads |
|---|---|
|
SCB.DataPipe.Sql
DataPipe.Sql: Ready-made transaction and SQL filters for DataPipe. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated | |
|---|---|---|---|
| 4.5.7 | 113 | 1/30/2026 | |
| 4.5.6 | 106 | 1/14/2026 | |
| 4.5.5 | 101 | 1/13/2026 | |
| 4.5.4 | 103 | 1/12/2026 | |
| 4.5.3 | 102 | 1/12/2026 | |
| 4.5.2 | 105 | 1/11/2026 | |
| 4.5.1 | 105 | 1/10/2026 | |
| 4.5.0 | 102 | 1/9/2026 | |
| 4.4.9 | 106 | 1/8/2026 | |
| 4.4.8 | 100 | 1/7/2026 | |
| 4.4.7 | 101 | 1/7/2026 | |
| 4.4.6 | 108 | 1/7/2026 | |
| 4.4.5 | 109 | 1/7/2026 | |
| 4.4.4 | 104 | 1/7/2026 | |
| 4.4.3 | 100 | 1/6/2026 | |
| 4.4.2 | 105 | 1/6/2026 | |
| 4.4.1 | 102 | 1/6/2026 | |
| 4.4.0 | 105 | 1/6/2026 | |
| 4.3.9 | 103 | 1/6/2026 | |
| 4.3.8 | 102 | 1/6/2026 | |
| 4.3.7 | 108 | 1/5/2026 | |
| 4.3.6 | 100 | 1/4/2026 | |
| 4.3.5 | 113 | 1/4/2026 | |
| 4.3.4 | 101 | 1/4/2026 | |
| 4.3.3 | 98 | 1/4/2026 | |
| 4.3.2 | 106 | 1/4/2026 | |
| 4.3.1 | 106 | 1/4/2026 | |
| 4.3.0 | 100 | 1/4/2026 | |
| 4.2.9 | 110 | 1/4/2026 | |
| 4.2.8 | 106 | 1/3/2026 | |
| 4.2.7 | 104 | 1/3/2026 | |
| 4.2.6 | 110 | 1/3/2026 | |
| 4.2.5 | 111 | 1/2/2026 | |
| 4.2.4 | 115 | 1/1/2026 | |
| 4.2.3 | 111 | 12/30/2025 | |
| 4.2.2 | 108 | 12/30/2025 | |
| 4.2.1 | 116 | 12/27/2025 | |
| 4.2.0 | 112 | 12/27/2025 | |
| 4.1.0 | 114 | 12/27/2025 | |
| 4.0.1 | 106 | 12/27/2025 | |
| 4.0.0 | 200 | 12/24/2025 | |
| 3.0.4 | 190 | 12/23/2025 | |
| 3.0.3 | 186 | 12/23/2025 | |
| 3.0.2 | 191 | 12/22/2025 | |
| 3.0.1 | 274 | 12/22/2025 | |
| 3.0.0 | 262 | 12/22/2025 | |
| 2.1.1 | 255 | 1/18/2024 | |
| 2.1.0 | 309 | 1/11/2024 | |
| 2.0.0 | 547 | 2/14/2023 | |
| 1.0.2 | 835 | 7/1/2021 | |
| 1.0.1 | 686 | 6/26/2021 | |
| 1.0.0 | 972 | 6/24/2021 |