SmartPipe.Core 1.0.2

Suggested Alternatives

SmartPipe.Core 1.0.5

Additional Details

Critical bugs fixed in v1.0.5. Thread-safety, evasion detection, P-controller fixes.

There is a newer version of this package available.
See the version list below for details.
dotnet add package SmartPipe.Core --version 1.0.2
                    
NuGet\Install-Package SmartPipe.Core -Version 1.0.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SmartPipe.Core" Version="1.0.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="SmartPipe.Core" Version="1.0.2" />
                    
Directory.Packages.props
<PackageReference Include="SmartPipe.Core" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add SmartPipe.Core --version 1.0.2
                    
#r "nuget: SmartPipe.Core, 1.0.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package SmartPipe.Core@1.0.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=SmartPipe.Core&version=1.0.2
                    
Install as a Cake Addin
#tool nuget:?package=SmartPipe.Core&version=1.0.2
                    
Install as a Cake Tool

SmartPipe.Core

Universal streaming pipeline engine for .NET 10 with zero dependencies.

Built on System.Threading.Channels, SmartPipe.Core provides a production-ready pipeline engine for ETL, real-time stream processing, API aggregation, and AI agent integration — all with 0 allocations in hot path.

CI NuGet Core NuGet Extensions License: MIT Coverage

What is SmartPipe?

SmartPipe is not just another ETL library. It's a universal streaming pipeline engine that handles:

  • ETL/ELT — extract from DB/API, transform, load to anywhere
  • Real-time stream processing — process events as they arrive
  • API aggregation — fan-out requests, aggregate responses
  • Data validation pipelines — validate, enrich, route
  • AI agent tools — integrate with Semantic Kernel, AutoGen
  • Log/sensor processing — process IoT telemetry, application logs

All in 5 lines of code:

using SmartPipe.Core;
using SmartPipe.Extensions;

var pipeline = PipelineBuilder
    .From(new HttpSelector<MyDto>("https://api.example.com/data"))
    .Transform(new JsonTransform<MyDto, MyEntity>())
    .Transform(new PollyResilienceTransform<MyEntity>(resiliencePipeline))
    .WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<MyEntity>(logger));

Examples by Scenario

ETL Pipeline (Database → Transform → API)

var pipeline = PipelineBuilder
    .From(new EfCoreSelector<Order>(dbContext).WithQuery(q => q.Where(o => o.Status == "Pending")))
    .Transform(new MapsterTransform<Order, OrderDto>())
    .Transform(new PollyResilienceTransform<OrderDto>(resiliencePipeline))
    .WithOptions(o => o.MaxDegreeOfParallelism = 8);
await pipeline.To(new HttpSink<OrderDto>(httpClient, "https://api.destination.com/orders"));

Real-time Stream Processing (API → Filter → Log)

var pipeline = PipelineBuilder
    .From(new HttpSelector<SensorData>("https://iot.example.com/telemetry"))
    .Transform(new JsonTransform<SensorData, SensorData>())
    .Transform(new MapsterTransform<SensorData, Alert>())
    .WithOptions(o => { o.MaxDegreeOfParallelism = 2; o.ContinueOnError = true; });
await pipeline.To(new LoggerSink<Alert>(logger));

AI Agent Tool (Semantic Kernel Integration)

var tool = new PipelineTool<string, string>("summarize", "Summarize text using AI");
tool.AddTransformer(new JsonTransform<string, PromptDto>());
tool.AddTransformer(new HttpTransform<PromptDto, string>(openAiClient));

var result = await tool.ExecuteAsync("Long text to summarize...");

API Aggregation (Fan-out → Aggregate)

var pipeline = PipelineBuilder
    .From(new HttpSelector<User>("https://users.api.com"))
    .Transform(new MapsterTransform<User, EnrichedUser>())
    .Transform(new PollyResilienceTransform<EnrichedUser>(resiliencePipeline));
await pipeline.To(new Sink<EnrichedUser>(user => enrichedUsers.Add(user)));

Getting Started | Installation

# Core engine (zero dependencies)
dotnet add package SmartPipe.Core

# Extensions (Http, EF Core, Dapper, JSON, CSV, Mapster, Polly)
dotnet add package SmartPipe.Extensions

First Pipeline (5 lines)

using SmartPipe.Core;
using SmartPipe.Extensions.Selectors;
using SmartPipe.Extensions.Transforms;
using SmartPipe.Extensions.Sinks;

var pipeline = PipelineBuilder
    .From(new HttpSelector<MyDto>("https://api.example.com/data"))
    .Transform(new JsonTransform<MyDto, MyEntity>())
    .WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<MyEntity>(logger));

ASP.NET Core BackgroundService

public class PipelineWorker : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        var pipeline = PipelineBuilder
            .From(new EfCoreSelector<Order>(_dbContext))
            .Transform(new MapsterTransform<Order, OrderDto>())
            .WithOptions(o => o.MaxDegreeOfParallelism = 8);
        await pipeline.To(new HttpSink<OrderDto>(_httpClient, "https://api.dest.com"));
    }
}

SmartPipe Architecture

Overview

SmartPipe is a streaming pipeline engine built on System.Threading.Channels. It consists of 21 integrated components organized in a resilience pipeline order.

## Pipeline Flow
ISource<T>
    │
    ▼
Bounded Channel
    │
    ▼
BackpressureStrategy (Watermark 80%/95%)
    │
    ▼
DeduplicationFilter (Bloom, O(1))
    │
    ▼
AdaptiveParallelism (Little's Law)
    │
    ▼
CircuitBreaker (Closed→Open→HalfOpen)
    │
    ▼
ITransformer (ValueTask, parallel) + AttemptTimeout
    │
    ▼
RetryQueue (Jitter + Exponential Backoff)
    │
    ▼
Bounded Channel
    │
    ▼
ISink<T>

Resilience Pipeline Order

  1. TotalRequestTimeout — maximum time for entire pipeline
  2. CircuitBreaker — stops processing on high failure rate
  3. RetryQueue — delays and retries transient errors
  4. AttemptTimeout — per-transformer timeout

Component Overview

Component Type Memory Performance
DeduplicationFilter Bloom filter O(1) 20.04 ns
ObjectPool Lock-free O(n) 15.63 ns
CircuitBreaker Lock-free (Interlocked) O(n) 27.76 ns
RetryQueue Lock-free (Channel) O(n) 69.16 ns
ExponentialHistogram Percentiles O(log² n) < 100 ns
JumpHash Sharding O(1) < 10 ns
CuckooFilter Dedup + delete O(1) < 50 ns
ReservoirSampler Sampling O(k) < 10 ns

Extension Architecture

Extensions follow the Selection Pattern — a single package with categorized components:

  • Selectors — data sources (Http, EF Core, Dapper)
  • Transforms — data transformers (JSON, CSV, Mapster, Compression, Polly)
  • Sinks — data destinations (Logger)

Instead of 12 separate NuGet packages, SmartPipe uses a single SmartPipe.Extensions package with the Selection Pattern:

SmartPipe.Extensions/
├── Selectors/          ← Data sources
│   ├── HttpSelector
│   ├── EfCoreSelector
│   └── DapperSelector
├── Transforms/         ← Data transformers
│   ├── JsonTransform
│   ├── CsvTransform
│   ├── MapsterTransform
│   ├── CompressionTransform
│   └── PollyResilienceTransform
└── Sinks/              ← Data destinations
    └── LoggerSink
One package. All integrations. Zero boilerplate.

Requirements

  • .NET 10.0+
  • SmartPipe.Core: 0 dependencies
  • SmartPipe.Extensions: Polly, EF Core, Dapper, Mapster, CsvHelper
  • 186 tests, 96.3% code coverage

What's New in v1.0.2

  • Lock-free RetryQueue
  • Lock-free CircuitBreaker
  • SmartPipeEventSource — monitor via dotnet-counters
  • SmartPipeHostedService — native ASP.NET Core integration
  • SmartPipeHealthCheck — pipeline health for YARP/Kubernetes
  • Adaptive EMA — dynamic α for spike detection
  • Dynamic Watermark — throughput-based backpressure
  • 96.3% code coverage (up from 86.5%)
  • 47 new tests, 0 regressions in benchmarks

Acknowledgements

SmartPipe is built on ideas and research from:

  • Polly — resilience patterns for .NET (github.com/App-vNext/Polly)
  • System.Threading.Channels — lock-free producer/consumer infrastructure by Microsoft
  • OpenTelemetry — observability framework for cloud-native software
  • Little's Law — queue theory applied to adaptive parallelism (ACM Queue, 2025)
  • Bloom & Cuckoo Filters — probabilistic data structures for deduplication
  • ReTraced — three-level retry model inspiration
  • TheCodeMan — production Channel pipeline patterns
  • Microsoft.Extensions.Resilience — resilience pipeline integration
  • OWASP — security patterns for secret detection
  • BenchmarkDotNet — performance measurement framework

License MIT License — see LICENSE for details.

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
  • net10.0

    • No dependencies.

NuGet packages (4)

Showing the top 4 NuGet packages that depend on SmartPipe.Core:

Package Downloads
SmartPipe.Extensions

Extensions for SmartPipe.Core: 28 integrations (Http, EF Core, Dapper, JSON, CSV, Mapster, Compression, Polly, Filter, Validation, Conditional, Composite, DeadLetter). One package instead of 12.

SmartPipe.Memory.Health

Predictive analytics and health monitoring for SmartPipe.Memory graph stores. HealthVector, BottleneckPredictor, InsightGenerator, CognitiveConsolidation, MemoryDecayPolicy, and background monitoring agents.

SmartPipe.Memory

Embedded graph memory layer for the SmartPipe ecosystem. Type-safe Fluent API, in-memory traversals, SQLite WAL persistence, time-travel queries, predictive analytics, and OpenTelemetry support.

SmartPipe.Memory.Extensions

Integration library connecting SmartPipe.Memory with SmartPipe.Core ETL pipelines. Provides UseMemory(), AsGraphSource(), ToGraphSink(), TransformToEdges() and automatic pipeline topology registration.

GitHub repositories

This package is not used by any popular GitHub repositories.