SCB.DataPipe
3.0.3
See the version list below for details.
dotnet add package SCB.DataPipe --version 3.0.3
NuGet\Install-Package SCB.DataPipe -Version 3.0.3
<PackageReference Include="SCB.DataPipe" Version="3.0.3" />
<PackageVersion Include="SCB.DataPipe" Version="3.0.3" />
<PackageReference Include="SCB.DataPipe" />
paket add SCB.DataPipe --version 3.0.3
#r "nuget: SCB.DataPipe, 3.0.3"
#:package SCB.DataPipe@3.0.3
#addin nuget:?package=SCB.DataPipe&version=3.0.3
#tool nuget:?package=SCB.DataPipe&version=3.0.3
DataPipe
DataPipe
Lightweight, flexible, and composable message pipeline framework for .NET
DataPipe lets you orchestrate complex workflows in a simple, maintainable way. Build pipelines with reusable filters, apply cross-cutting concerns via aspects, and handle conditional logic, iterations, and retries � all in a consistent, readable style.
Whether you're integrating with client systems, processing batches, building readable controller endpoints, or creating background services, DataPipe keeps your codebase clean and predictable. Spend less time wrestling with architecture and more time delivering functionality.
- Composable Filters: Chain together any number of filters for a single pipeline.
- Scoped Resource Management: Open SQL connections, transactions, or other resources exactly where needed.
- Cross-Cutting Concerns: Logging, exception handling, and auditing via Aspects.
- Conditional Execution:
Policy,IfTrue,RepeatUntil, andForEachfilters make complex workflows readable. - Async Retries:
OnTimeoutRetryallows fine-grained retry strategies with async delays. - Minimal & Maintainable: Small footprint, easy to onboard new developers, scales from simple tasks to large integrations.
Features
- Composable Filters � chain together any number of filters to build workflows that are clear and modular.
- Aspects for Cross-Cutting Concerns � logging, exception handling, and more. Add your own too!
- Scoped Resource Management: Open SQL connections, transactions, or other resources exactly where needed.
- Conditional Execution:
Policy,IfTrue,RepeatUntil, andForEachfilters make complex workflows readable. - Async-Ready Retries �
OnTimeoutRetrysupports fully asynchronous retry logic with custom delay strategies. - Flow Control � stop pipeline execution safely with
PipelineExecution.Stop(). - Integration Friendly � works well with APIs, console apps, background services, WinForms, and more.
- Minimal Footprint � small, maintainable, and easy for teams to learn.
DataPipe is fully open-source and ready to plug into your .NET projects today.
Installation
Install via NuGet:
dotnet add package DataPipe
Or via the Package Manager Console:
Install-Package DataPipe
Basic Usage
Define a message type:
public class TestMessage : BaseMessage
{
public int Number { get; set; }
}
Create a pipeline:
var pipe = new DataPipe<TestMessage>();
// Add aspects
pipe.Use(new ExceptionAspect<TestMessage>());
pipe.Use(new BasicLoggingAspect<TestMessage>("TestPipeline"));
// Register filters
pipe.Run(new IncrementingNumberFilter());
pipe.Run(new Policy<TestMessage>(msg =>
{
return msg.Number == 0
? new IncrementingNumberFilter()
: new DecrementingNumberFilter();
}));
// Execute pipeline
var message = new TestMessage { Number = 0 };
await pipe.Invoke(message);
Conditional Execution Examples
IfTrue:
pipe.Run(
new IfTrue<TestMessage>(msg => msg.Number > 0,
new IncrementingNumberFilter()
));
Policy:
pipe.Run(new Policy<TestMessage>(msg =>
{
return msg.Number switch
{
0 => new IncrementingNumberFilter(),
1 => new DecrementingNumberFilter(),
_ => null
};
}));
Retry Example with OnTimeoutRetry
pipe.Run(
new OnTimeoutRetry<TestMessage>(maxRetries: 3,
new MockHttpErroringFilter()
));
Flow Control
Stop the pipeline gracefully:
public class CancelFilter : Filter<TestMessage>
{
public Task Execute(TestMessage msg)
{
msg.Execution.Stop("User requested cancellation");
return Task.CompletedTask;
}
}
When to Use
DataPipe is ideal for:
- Complex API integrations
- Simplifying controller logic
- Batch processing pipelines
- Background services or scheduled tasks
- Scenarios requiring retries, conditional logic, or transaction scoping
- Teams looking to reduce boilerplate and improve maintainability
License
MIT License � see LICENSE file for details.
Real-World Usage
DataPipe has been successfully used in production systems for:
- Processing incoming API requests with complex validation and transformation logic.
- Implementing background jobs that require retries and error handling.
- Managing stateful workflows in long-running processes.
- Simplifying data processing pipelines in ETL scenarios.
An at-a-glance example
Below is an example pipeline for processing incoming orders from an external system, handling retries, conditional logic, and logging, all in a readable, composable structure.
using DataPipe.Core;
using DataPipe.Core.Sql;
using System;
using System.Threading.Tasks;
// 1. Define a message type
public class OrderMessage : BaseMessage, ISqlCommand
{
public string OrderId { get; set; }
public bool IsValid { get; set; }
public bool RequiresSpecialProcessing { get; set; }
public SqlCommand SqlCommand { get; set; } // created by OpenSqlConnection filter
}
// 2. Define reusable filters
public class ValidateOrder : Filter<OrderMessage>
{
public async Task Execute(OrderMessage msg)
{
msg.IsValid = !string.IsNullOrEmpty(msg.OrderId);
msg.OnLog?.Invoke($"Order {msg.OrderId} validation: {msg.IsValid}");
await Task.CompletedTask;
}
}
public class ProcessStandardOrder : Filter<OrderMessage>
{
public async Task Execute(OrderMessage msg)
{
msg.OnLog?.Invoke($"Processing standard order {msg.OrderId}");
await Task.CompletedTask;
}
}
public class ProcessSpecialOrder : Filter<OrderMessage>
{
public async Task Execute(OrderMessage msg)
{
msg.OnLog?.Invoke($"Processing special order {msg.OrderId}");
await Task.CompletedTask;
}
}
// 3. Compose the pipeline
public async Task RunOrderPipeline()
{
var msg = new OrderMessage { OrderId = "ORD123", RequiresSpecialProcessing = true };
var pipeline = new DataPipe<OrderMessage>();
// Cross-cutting concerns
pipeline.Use(new BasicLoggingAspect<OrderMessage>());
pipeline.Use(new ExceptionAspect<OrderMessage>());
// basic filter to validate the order
pipeline.Run(new ValidateOrder());
// Conditional processing using Policy
pipeline.Run(new Policy<OrderMessage>(m =>
{
if (!m.IsValid) new MarkOrderAsFailed();
return m.RequiresSpecialProcessing
? new RaiseSpecialOrderHandlingRequiredNotification()
: new OnTimeoutRetry<OrderMessage>(maxRetries: 3,
new StartTransaction(
new OpenSqlConnection(
new ProcessStandardOrder() // Example retryable filter
)));
}));
// Execute the pipeline
await pipeline.Invoke(msg);
}
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- No dependencies.
NuGet packages (1)
Showing the top 1 NuGet packages that depend on SCB.DataPipe:
| Package | Downloads |
|---|---|
|
SCB.DataPipe.Sql
DataPipe.Sql: Ready-made transaction and SQL filters for DataPipe. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated | |
|---|---|---|---|
| 4.5.7 | 111 | 1/30/2026 | |
| 4.5.6 | 103 | 1/14/2026 | |
| 4.5.5 | 100 | 1/13/2026 | |
| 4.5.4 | 102 | 1/12/2026 | |
| 4.5.3 | 101 | 1/12/2026 | |
| 4.5.2 | 104 | 1/11/2026 | |
| 4.5.1 | 104 | 1/10/2026 | |
| 4.5.0 | 101 | 1/9/2026 | |
| 4.4.9 | 105 | 1/8/2026 | |
| 4.4.8 | 99 | 1/7/2026 | |
| 4.4.7 | 100 | 1/7/2026 | |
| 4.4.6 | 107 | 1/7/2026 | |
| 4.4.5 | 108 | 1/7/2026 | |
| 4.4.4 | 103 | 1/7/2026 | |
| 4.4.3 | 99 | 1/6/2026 | |
| 4.4.2 | 104 | 1/6/2026 | |
| 4.4.1 | 101 | 1/6/2026 | |
| 4.4.0 | 104 | 1/6/2026 | |
| 4.3.9 | 102 | 1/6/2026 | |
| 4.3.8 | 101 | 1/6/2026 | |
| 4.3.7 | 107 | 1/5/2026 | |
| 4.3.6 | 99 | 1/4/2026 | |
| 4.3.5 | 112 | 1/4/2026 | |
| 4.3.4 | 100 | 1/4/2026 | |
| 4.3.3 | 97 | 1/4/2026 | |
| 4.3.2 | 105 | 1/4/2026 | |
| 4.3.1 | 105 | 1/4/2026 | |
| 4.3.0 | 99 | 1/4/2026 | |
| 4.2.9 | 109 | 1/4/2026 | |
| 4.2.8 | 105 | 1/3/2026 | |
| 4.2.7 | 103 | 1/3/2026 | |
| 4.2.6 | 109 | 1/3/2026 | |
| 4.2.5 | 110 | 1/2/2026 | |
| 4.2.4 | 114 | 1/1/2026 | |
| 4.2.3 | 111 | 12/30/2025 | |
| 4.2.2 | 107 | 12/30/2025 | |
| 4.2.1 | 116 | 12/27/2025 | |
| 4.2.0 | 112 | 12/27/2025 | |
| 4.1.0 | 114 | 12/27/2025 | |
| 4.0.1 | 106 | 12/27/2025 | |
| 4.0.0 | 200 | 12/24/2025 | |
| 3.0.4 | 190 | 12/23/2025 | |
| 3.0.3 | 186 | 12/23/2025 | |
| 3.0.2 | 191 | 12/22/2025 | |
| 3.0.1 | 274 | 12/22/2025 | |
| 3.0.0 | 262 | 12/22/2025 | |
| 2.1.1 | 255 | 1/18/2024 | |
| 2.1.0 | 309 | 1/11/2024 | |
| 2.0.0 | 547 | 2/14/2023 | |
| 1.0.2 | 834 | 7/1/2021 | |
| 1.0.1 | 686 | 6/26/2021 | |
| 1.0.0 | 972 | 6/24/2021 |