SmartPipe.Core 1.0.6

dotnet add package SmartPipe.Core --version 1.0.6
                    
NuGet\Install-Package SmartPipe.Core -Version 1.0.6
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SmartPipe.Core" Version="1.0.6" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="SmartPipe.Core" Version="1.0.6" />
                    
Directory.Packages.props
<PackageReference Include="SmartPipe.Core" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add SmartPipe.Core --version 1.0.6
                    
#r "nuget: SmartPipe.Core, 1.0.6"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package SmartPipe.Core@1.0.6
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=SmartPipe.Core&version=1.0.6
                    
Install as a Cake Addin
#tool nuget:?package=SmartPipe.Core&version=1.0.6
                    
Install as a Cake Tool

SmartPipe.Core

Universal streaming pipeline engine for .NET 10

Built on System.Threading.Channels, SmartPipe.Core provides a production-ready pipeline engine for ETL, real-time stream processing, API aggregation, and AI agent integration β€” all with 0 allocations in hot path.

CI NuGet Core NuGet Extensions License: MIT Coverage

πŸ“– Complete Feature Reference β†’

What is SmartPipe?

SmartPipe is not just another ETL library. It's a universal streaming pipeline engine that handles:

  • ETL/ELT β€” extract from DB/API, transform, load to anywhere
  • Real-time stream processing β€” process events as they arrive
  • API aggregation β€” fan-out requests, aggregate responses
  • Data validation pipelines β€” validate, enrich, route
  • AI agent tools β€” integrate with Semantic Kernel, AutoGen
  • Log/sensor processing β€” process IoT telemetry, application logs
  • Error recovery & dead letter β€” capture failures for later replay
  • Stream merging β€” combine multiple data sources into one pipeline

All in 5 lines of code:

using SmartPipe.Core;
using SmartPipe.Extensions;

var pipeline = PipelineBuilder
    .From(new HttpSelector<MyDto>("https://api.example.com/data"))
    .Transform(x => x.Enrich())       // Middleware for simple ops
    .Transform(new JsonTransform<MyDto, MyEntity>())  // ITransformer for complex
    .WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<MyEntity>(logger));

Getting Started | Installation

# Core engine 
dotnet add package SmartPipe.Core

# Extensions (Http, EF Core, Dapper, JSON, CSV, Mapster, Polly)
dotnet add package SmartPipe.Extensions

Examples by Scenario

Middleware Pattern (5 lines)

var pipeline = PipelineBuilder
    .From(new HttpSelector<int>("https://api.example.com/numbers"))
    .Transform(x => x * 2)          // Middleware!
    .Transform(x => x + 1)          // Middleware!
    .WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<int>(logger));

ETL Pipeline (Database β†’ Transform β†’ API)

var pipeline = PipelineBuilder
    .From(new EfCoreSelector<Order>(dbContext).WithQuery(q => q.Where(o => o.Status == "Pending")))
    .Transform(new MapsterTransform<Order, OrderDto>())
    .Transform(new PollyResilienceTransform<OrderDto>(resiliencePipeline))
    .WithOptions(o => o.MaxDegreeOfParallelism = 8);
await pipeline.To(new HttpSink<OrderDto>(httpClient, "https://api.destination.com/orders"));

Real-time Stream Processing (API β†’ Filter β†’ Log)

var pipeline = PipelineBuilder
    .From(new HttpSelector<SensorData>("https://iot.example.com/telemetry"))
    .Transform(new JsonTransform<SensorData, SensorData>())
    .Transform(new MapsterTransform<SensorData, Alert>())
    .WithOptions(o => { o.MaxDegreeOfParallelism = 2; o.ContinueOnError = true; });
await pipeline.To(new LoggerSink<Alert>(logger));

Single Item Processing

var pipeline = new SmartPipeChannel<string, string>();
pipeline.AddTransformer(new JsonTransform<string, PromptDto>());
var result = await pipeline.ProcessSingleAsync(new ProcessingContext<string>("Long text to summarize..."));

API Aggregation (Fan-out β†’ Aggregate)

var pipeline = PipelineBuilder
    .From(new HttpSelector<User>("https://users.api.com"))
    .Transform(new MapsterTransform<User, EnrichedUser>())
    .Transform(new PollyResilienceTransform<EnrichedUser>(resiliencePipeline));
await pipeline.To(new Sink<EnrichedUser>(user => enrichedUsers.Add(user)));

Error Persistence with DeadLetterSink

var pipeline = PipelineBuilder
    .From(new HttpSelector<Order>("https://api.example.com/orders"))
    .Transform(new OrderValidator())
    .WithOptions(o => o.ContinueOnError = true);
await pipeline.To(new DeadLetterSink<Order>("failed_orders.json"));

First Pipeline (5 lines)

using SmartPipe.Core;
using SmartPipe.Extensions.Selectors;
using SmartPipe.Extensions.Transforms;
using SmartPipe.Extensions.Sinks;

var pipeline = PipelineBuilder
    .From(new HttpSelector<MyDto>("https://api.example.com/data"))
    .Transform(new JsonTransform<MyDto, MyEntity>())
    .WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<MyEntity>(logger));

ASP.NET Core BackgroundService

public class PipelineWorker : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        var pipeline = PipelineBuilder
            .From(new EfCoreSelector<Order>(_dbContext))
            .Transform(new MapsterTransform<Order, OrderDto>())
            .WithOptions(o => o.MaxDegreeOfParallelism = 8);
        await pipeline.To(new HttpSink<OrderDto>(_httpClient, "https://api.dest.com"));
    }
}

SmartPipe Architecture

Overview

SmartPipe is a streaming pipeline engine built on System.Threading.Channels. It consists of 34 integrated components organized in a resilience pipeline order.

Pipeline Flow


ISource<T> (or RunInBackground)
    β–Ό
Bounded Channel (or Rendezvous Channel)
    β–Ό
BackpressureStrategy (P-controller: continuous throttling)
    β–Ό
DeduplicationFilter (Bloom, O(1)) + HyperLogLogEstimator
    β–Ό
AdaptiveParallelism (P-controller with dead zone + anti-windup)
    β–Ό
CircuitBreaker (Lock-free, Closed→Open→HalfOpen + Isolated)
    β”‚
    β–Ό
MiddlewareTransformer (Func<T,T>) + ITransformer (ValueTask)
    β–Ό
RetryQueue (Jitter + Exponential Backoff)
    β–Ό
Bounded Channel
    β–Ό
ISink<T> (Logger, DeadLetter, HealthChecks)
    β–Ό
AsChannelReader() β†’ SignalR/gRPC

Resilience Pipeline Order

  1. TotalRequestTimeout β€” maximum time for entire pipeline
  2. CircuitBreaker β€” stops processing on high failure rate
  3. RetryQueue β€” delays and retries transient errors
  4. AttemptTimeout β€” per-transformer timeout
  5. DeadLetterSink β€” captures exhausted retries for later replay
  6. LivenessCheck β€” detects stalled pipeline
  7. ReadinessCheck β€” detects overloaded pipeline
  8. DefaultRetryPolicy β€” per-pipeline default retry configuration
  9. RetryBudget β€” per-item retry budget control

Component Overview

Component Type Memory Performance
DeduplicationFilter Bloom filter O(1) 20.65 ns
ObjectPool Lock-free O(n) 15.55 ns
CircuitBreaker Lock-free (Interlocked) O(n) 29.30 ns
RetryQueue Lock-free (Channel) O(n) 69.16 ns
ExponentialHistogram Percentiles O(logΒ² n) < 100 ns
JumpHash Sharding O(1) < 10 ns
CuckooFilter Dedup + delete O(1) < 50 ns
ReservoirSampler Sampling O(k) < 10 ns
HyperLogLogEstimator Count-Distinct O(1) < 50 ns
DeadLetterSink Error persistence O(n) β€”
ChannelMerge Stream merging O(n) β€”
AdaptiveMetrics (Update) Double EMA O(1) 20.25 ns
AdaptiveMetrics (Predict) Double EMA O(1) 0.16 ns
IClock Time abstraction O(1) < 5 ns
AtomicHelper Lock-free double ops O(1) < 10 ns

Extension Architecture

Extensions follow the Selection Pattern β€” a single package with categorized components:

  • Selectors β€” data sources (Http, EF Core, Dapper, CSV, JSON, DeadLetter)
  • Transforms β€” data transformers (JSON, CSV, Mapster, Compression, Polly, Filter, Validation, Conditional, Composite)
  • Sinks β€” data destinations (Logger, DeadLetter, Http, Db, CSV, JSON)
  • Health β€” Kubernetes probes (Liveness, Readiness)
  • Streaming β€” ChannelMerge, RunInBackground, AsChannelReader

Instead of 12 separate NuGet packages, SmartPipe uses a single SmartPipe.Extensions package with the Selection Pattern:

SmartPipe.Extensions/
β”œβ”€β”€ Selectors/          ← Data sources
β”‚   β”œβ”€β”€ HttpSelector      ← REST API client
β”‚   β”œβ”€β”€ EfCoreSelector    ← Entity Framework streaming
β”‚   β”œβ”€β”€ DapperSelector    ← High-performance SQL
β”‚   β”œβ”€β”€ CsvFileSource     ← CSV file reader
β”‚   β”œβ”€β”€ JsonFileSource    ← JSON array & NDJSON reader
β”‚   └── DeadLetterSource  ← Replay failed items
β”œβ”€β”€ Transforms/         ← Data transformers
β”‚   β”œβ”€β”€ JsonTransform          ← JSON serialization
β”‚   β”œβ”€β”€ CsvTransform           ← CSV parsing
β”‚   β”œβ”€β”€ MapsterTransform       ← Object mapping
β”‚   β”œβ”€β”€ CompressionTransform   ← Brotli/GZip
β”‚   β”œβ”€β”€ PollyResilienceTransform ← Retry/CB/Hedging
β”‚   β”œβ”€β”€ FilterTransform        ← Predicate filtering
β”‚   β”œβ”€β”€ ValidationTransform    ← DataAnnotations validation
β”‚   β”œβ”€β”€ ConditionalTransform   ← Conditional execution
β”‚   β”œβ”€β”€ CompositeTransform     ← Chain transforms
β”‚   └── FilterValidationExtensions ← ToFilter() conversion
β”œβ”€β”€ Sinks/              ← Data destinations
β”‚   β”œβ”€β”€ LoggerSink       ← Structured logging
β”‚   β”œβ”€β”€ DeadLetterSink   ← Failed items persistence
β”‚   β”œβ”€β”€ HttpSink         ← REST API client
β”‚   β”œβ”€β”€ DbSink           ← Database insert
β”‚   β”œβ”€β”€ CsvFileSink      ← CSV file writer
β”‚   └── JsonFileSink     ← JSON file writer
β”œβ”€β”€ Hosting/            ← ASP.NET Core integration
β”‚   β”œβ”€β”€ SmartPipeHostedService       ← BackgroundService
β”‚   β”œβ”€β”€ SmartPipeServiceCollectionExtensions ← AddSmartPipe DI
β”‚   └── SmartPipeResilienceExtensions ← Polly registration
β”œβ”€β”€ Health/             ← Kubernetes probes
β”‚   β”œβ”€β”€ SmartPipeLivenessCheck
β”‚   └── SmartPipeReadinessCheck
└── Streaming/          ← Stream utilities
    └── ChannelMerge    ← Merge two channels
One package. All integrations. 

Requirements

  • .NET 10.0+
  • SmartPipe.Core: 0 dependencies
  • SmartPipe.Extensions: Polly, EF Core, Dapper, Mapster, CsvHelper
  • 598 tests, 89.2% code coverage

What's New in v1.0.6

  • Thread safety β€” CuckooFilter, DeduplicationFilter, ReservoirSampler now fully thread-safe
  • ObjectPool max capacity β€” prevents unbounded pool growth under sustained load
  • DeduplicationFilter TTL β€” automatic entry expiration for long-running pipelines
  • JsonFileSink periodic flushing β€” NDJSON batch writes, prevents OOM on large datasets
  • RetryQueue polling optimization β€” single CancellationTokenSource per call, reduced allocations
  • DrainAsync + WithTimeoutAsync β€” CancellationToken support
  • PipelineDashboard β€” readonly record struct, PipelineDashboard.Empty
  • TransformWithTimeoutAsync β€” catch-all exception handling prevents consumer crashes
  • SecretScanner β€” disabled by default, explicit opt-in via EnableFeature("SecretScanner")
  • ExponentialHistogram β€” Volatile.Read for percentile reads, P50/P95/P99 caching
  • AdaptiveMetrics β€” Stopwatch.GetTimestamp() instead of TickCount64
  • DbSink β€” async ExecuteAsync, no thread pool blocking
  • DapperSelector β€” try/finally reader disposal
  • ChannelMerge β€” optional BoundedChannelOptions

What's New in v1.0.5

  • 598 tests, 95.8% code coverage
  • DefaultRetryPolicy β€” per-pipeline retry configuration in SmartPipeChannelOptions
  • RetryBudget β€” per-item retry budget in RetryQueue, auto-routes exhausted items to DeadLetterSink
  • DisposeAsync(CancellationToken) β€” graceful cancellation during pipeline disposal
  • AddSmartPipe DI β€” service collection extensions for ASP.NET Core integration
  • IClock integration β€” time abstraction for testability, replaces DateTime.UtcNow
  • AtomicHelper β€” lock-free CompareExchange loop utility
  • SecretScanner evasion detection β€” Base64/URL decoding, MaxRecursionDepth=3, 164 tests
  • DeadLetterSink retry β€” IOException recovery with exponential backoff
  • AdaptiveParallelism adaptive alpha β€” faster response to latency changes
  • CircuitBreaker CleanupWindow β€” thread-safe via TryDequeue+check
  • ObjectPool ABA protection β€” version stamps prevent race conditions
  • CuckooFilter Merge β€” combine multiple filters

What's New in v1.0.4

  • 22 new features (243 tests, 96.4% coverage)
  • P-Controller Parallelism β€” smooth thread scaling, no binary jumps
  • Double EMA + Prediction β€” velocity tracking + one-step latency forecast
  • Hybrid CircuitBreaker β€” EWMA early warning + Sliding window decisions
  • P-Controller Backpressure β€” continuous throttling, no oscillation
  • PipelineState + Cancel() β€” lifecycle management with events
  • Progress reporting β€” OnProgress with ETA calculation
  • Auto DeadLetter routing β€” exhausted retries β†’ DeadLetterSink
  • 12 new Extensions β€” CsvFileSource/Sink, JsonFileSource/Sink, FilterTransform, ValidationTransform, DbSink, HttpSink, ConditionalTransform, DeadLetterSource, CompositeTransform
  • Metrics.Export() β€” JSON + Prometheus format
  • 4 new OWASP patterns in SecretScanner
  • 12% faster ValueTask_Transform (69.12 ns)

What's New in v1.0.3

  • 13 new features (215 tests, 96.3% coverage)
  • Middleware Transformer β€” Func<T,T> as lightweight ITransformer
  • Rendezvous Channel β€” (BoundedCapacity=0)
  • HyperLogLogEstimator β€” Count-Distinct with O(1) memory
  • Dual-threshold Watermark β€” Pause/Resume prevents oscillation
  • Liveness/Readiness Health Checks β€” Kubernetes-native
  • DeadLetterSink β€” failed items persistence
  • Data Lineage β€” provenance tracking in Metadata
  • ChannelMerge β€” merge two streams
  • RunInBackground() β€” streaming pipeline consumption
  • Hybrid Queue β€” FullMode option (Wait/DropOldest)
  • AsChannelReader() β€” SignalR/gRPC integration

What's New in v1.0.2

  • Lock-free RetryQueue
  • Lock-free CircuitBreaker
  • SmartPipeEventSource β€” monitor via dotnet-counters
  • SmartPipeHostedService β€” native ASP.NET Core integration
  • SmartPipeHealthCheck β€” pipeline health for YARP/Kubernetes
  • Adaptive EMA β€” dynamic Ξ± for spike detection
  • Dynamic Watermark β€” throughput-based backpressure
  • 96.3% code coverage (up from 86.5%)
  • 47 new tests, 0 regressions in benchmarks

Documentation

Acknowledgements

SmartPipe is built on ideas and research from:

  • Polly β€” resilience patterns for .NET (github.com/App-vNext/Polly)
  • System.Threading.Channels β€” lock-free producer/consumer infrastructure by Microsoft
  • OpenTelemetry β€” observability framework for cloud-native software
  • Little's Law β€” queue theory applied to adaptive parallelism (ACM Queue, 2025)
  • Bloom & Cuckoo Filters β€” probabilistic data structures for deduplication
  • ReTraced β€” three-level retry model inspiration
  • TheCodeMan β€” production Channel pipeline patterns
  • Microsoft.Extensions.Resilience β€” resilience pipeline integration
  • OWASP β€” security patterns for secret detection
  • BenchmarkDotNet β€” performance measurement framework
  • Control Theory (P-controllers) β€” applied to AdaptiveParallelism and BackpressureStrategy
  • HyperLogLog (Flajolet et al.) β€” cardinality estimation algorithm

License MIT License β€” see LICENSE for details.

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (4)

Showing the top 4 NuGet packages that depend on SmartPipe.Core:

Package Downloads
SmartPipe.Extensions

Extensions for SmartPipe.Core: 28 integrations (Http, EF Core, Dapper, JSON, CSV, Mapster, Compression, Polly, Filter, Validation, Conditional, Composite, DeadLetter). One package instead of 12.

SmartPipe.Memory

Embedded graph memory layer for the SmartPipe ecosystem. Type-safe Fluent API, in-memory traversals, SQLite WAL persistence, time-travel queries, predictive analytics, and OpenTelemetry support.

SmartPipe.Memory.Health

Predictive analytics and health monitoring for SmartPipe.Memory graph stores. HealthVector, BottleneckPredictor, InsightGenerator, CognitiveConsolidation, MemoryDecayPolicy, and background monitoring agents.

SmartPipe.Memory.Extensions

Integration library connecting SmartPipe.Memory with SmartPipe.Core ETL pipelines. Provides UseMemory(), AsGraphSource(), ToGraphSink(), TransformToEdges() and automatic pipeline topology registration.

GitHub repositories

This package is not used by any popular GitHub repositories.