SmartPipe.Core
1.0.6
dotnet add package SmartPipe.Core --version 1.0.6
NuGet\Install-Package SmartPipe.Core -Version 1.0.6
<PackageReference Include="SmartPipe.Core" Version="1.0.6" />
<PackageVersion Include="SmartPipe.Core" Version="1.0.6" />
<PackageReference Include="SmartPipe.Core" />
paket add SmartPipe.Core --version 1.0.6
#r "nuget: SmartPipe.Core, 1.0.6"
#:package SmartPipe.Core@1.0.6
#addin nuget:?package=SmartPipe.Core&version=1.0.6
#tool nuget:?package=SmartPipe.Core&version=1.0.6
SmartPipe.Core
Universal streaming pipeline engine for .NET 10
Built on System.Threading.Channels, SmartPipe.Core provides a production-ready pipeline engine for ETL, real-time stream processing, API aggregation, and AI agent integration β all with 0 allocations in hot path.
π Complete Feature Reference β
What is SmartPipe?
SmartPipe is not just another ETL library. It's a universal streaming pipeline engine that handles:
- ETL/ELT β extract from DB/API, transform, load to anywhere
- Real-time stream processing β process events as they arrive
- API aggregation β fan-out requests, aggregate responses
- Data validation pipelines β validate, enrich, route
- AI agent tools β integrate with Semantic Kernel, AutoGen
- Log/sensor processing β process IoT telemetry, application logs
- Error recovery & dead letter β capture failures for later replay
- Stream merging β combine multiple data sources into one pipeline
All in 5 lines of code:
using SmartPipe.Core;
using SmartPipe.Extensions;
var pipeline = PipelineBuilder
.From(new HttpSelector<MyDto>("https://api.example.com/data"))
.Transform(x => x.Enrich()) // Middleware for simple ops
.Transform(new JsonTransform<MyDto, MyEntity>()) // ITransformer for complex
.WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<MyEntity>(logger));
Getting Started | Installation
# Core engine
dotnet add package SmartPipe.Core
# Extensions (Http, EF Core, Dapper, JSON, CSV, Mapster, Polly)
dotnet add package SmartPipe.Extensions
Examples by Scenario
Middleware Pattern (5 lines)
var pipeline = PipelineBuilder
.From(new HttpSelector<int>("https://api.example.com/numbers"))
.Transform(x => x * 2) // Middleware!
.Transform(x => x + 1) // Middleware!
.WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<int>(logger));
ETL Pipeline (Database β Transform β API)
var pipeline = PipelineBuilder
.From(new EfCoreSelector<Order>(dbContext).WithQuery(q => q.Where(o => o.Status == "Pending")))
.Transform(new MapsterTransform<Order, OrderDto>())
.Transform(new PollyResilienceTransform<OrderDto>(resiliencePipeline))
.WithOptions(o => o.MaxDegreeOfParallelism = 8);
await pipeline.To(new HttpSink<OrderDto>(httpClient, "https://api.destination.com/orders"));
Real-time Stream Processing (API β Filter β Log)
var pipeline = PipelineBuilder
.From(new HttpSelector<SensorData>("https://iot.example.com/telemetry"))
.Transform(new JsonTransform<SensorData, SensorData>())
.Transform(new MapsterTransform<SensorData, Alert>())
.WithOptions(o => { o.MaxDegreeOfParallelism = 2; o.ContinueOnError = true; });
await pipeline.To(new LoggerSink<Alert>(logger));
Single Item Processing
var pipeline = new SmartPipeChannel<string, string>();
pipeline.AddTransformer(new JsonTransform<string, PromptDto>());
var result = await pipeline.ProcessSingleAsync(new ProcessingContext<string>("Long text to summarize..."));
API Aggregation (Fan-out β Aggregate)
var pipeline = PipelineBuilder
.From(new HttpSelector<User>("https://users.api.com"))
.Transform(new MapsterTransform<User, EnrichedUser>())
.Transform(new PollyResilienceTransform<EnrichedUser>(resiliencePipeline));
await pipeline.To(new Sink<EnrichedUser>(user => enrichedUsers.Add(user)));
Error Persistence with DeadLetterSink
var pipeline = PipelineBuilder
.From(new HttpSelector<Order>("https://api.example.com/orders"))
.Transform(new OrderValidator())
.WithOptions(o => o.ContinueOnError = true);
await pipeline.To(new DeadLetterSink<Order>("failed_orders.json"));
First Pipeline (5 lines)
using SmartPipe.Core;
using SmartPipe.Extensions.Selectors;
using SmartPipe.Extensions.Transforms;
using SmartPipe.Extensions.Sinks;
var pipeline = PipelineBuilder
.From(new HttpSelector<MyDto>("https://api.example.com/data"))
.Transform(new JsonTransform<MyDto, MyEntity>())
.WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<MyEntity>(logger));
ASP.NET Core BackgroundService
public class PipelineWorker : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
var pipeline = PipelineBuilder
.From(new EfCoreSelector<Order>(_dbContext))
.Transform(new MapsterTransform<Order, OrderDto>())
.WithOptions(o => o.MaxDegreeOfParallelism = 8);
await pipeline.To(new HttpSink<OrderDto>(_httpClient, "https://api.dest.com"));
}
}
SmartPipe Architecture
Overview
SmartPipe is a streaming pipeline engine built on System.Threading.Channels.
It consists of 34 integrated components organized in a resilience pipeline order.
Pipeline Flow
ISource<T> (or RunInBackground)
βΌ
Bounded Channel (or Rendezvous Channel)
βΌ
BackpressureStrategy (P-controller: continuous throttling)
βΌ
DeduplicationFilter (Bloom, O(1)) + HyperLogLogEstimator
βΌ
AdaptiveParallelism (P-controller with dead zone + anti-windup)
βΌ
CircuitBreaker (Lock-free, ClosedβOpenβHalfOpen + Isolated)
β
βΌ
MiddlewareTransformer (Func<T,T>) + ITransformer (ValueTask)
βΌ
RetryQueue (Jitter + Exponential Backoff)
βΌ
Bounded Channel
βΌ
ISink<T> (Logger, DeadLetter, HealthChecks)
βΌ
AsChannelReader() β SignalR/gRPC
Resilience Pipeline Order
- TotalRequestTimeout β maximum time for entire pipeline
- CircuitBreaker β stops processing on high failure rate
- RetryQueue β delays and retries transient errors
- AttemptTimeout β per-transformer timeout
- DeadLetterSink β captures exhausted retries for later replay
- LivenessCheck β detects stalled pipeline
- ReadinessCheck β detects overloaded pipeline
- DefaultRetryPolicy β per-pipeline default retry configuration
- RetryBudget β per-item retry budget control
Component Overview
| Component | Type | Memory | Performance |
|---|---|---|---|
| DeduplicationFilter | Bloom filter | O(1) | 20.65 ns |
| ObjectPool | Lock-free | O(n) | 15.55 ns |
| CircuitBreaker | Lock-free (Interlocked) | O(n) | 29.30 ns |
| RetryQueue | Lock-free (Channel) | O(n) | 69.16 ns |
| ExponentialHistogram | Percentiles | O(logΒ² n) | < 100 ns |
| JumpHash | Sharding | O(1) | < 10 ns |
| CuckooFilter | Dedup + delete | O(1) | < 50 ns |
| ReservoirSampler | Sampling | O(k) | < 10 ns |
| HyperLogLogEstimator | Count-Distinct | O(1) | < 50 ns |
| DeadLetterSink | Error persistence | O(n) | β |
| ChannelMerge | Stream merging | O(n) | β |
| AdaptiveMetrics (Update) | Double EMA | O(1) | 20.25 ns |
| AdaptiveMetrics (Predict) | Double EMA | O(1) | 0.16 ns |
| IClock | Time abstraction | O(1) | < 5 ns |
| AtomicHelper | Lock-free double ops | O(1) | < 10 ns |
Extension Architecture
Extensions follow the Selection Pattern β a single package with categorized components:
- Selectors β data sources (Http, EF Core, Dapper, CSV, JSON, DeadLetter)
- Transforms β data transformers (JSON, CSV, Mapster, Compression, Polly, Filter, Validation, Conditional, Composite)
- Sinks β data destinations (Logger, DeadLetter, Http, Db, CSV, JSON)
- Health β Kubernetes probes (Liveness, Readiness)
- Streaming β ChannelMerge, RunInBackground, AsChannelReader
Instead of 12 separate NuGet packages, SmartPipe uses a single SmartPipe.Extensions package with the Selection Pattern:
SmartPipe.Extensions/
βββ Selectors/ β Data sources
β βββ HttpSelector β REST API client
β βββ EfCoreSelector β Entity Framework streaming
β βββ DapperSelector β High-performance SQL
β βββ CsvFileSource β CSV file reader
β βββ JsonFileSource β JSON array & NDJSON reader
β βββ DeadLetterSource β Replay failed items
βββ Transforms/ β Data transformers
β βββ JsonTransform β JSON serialization
β βββ CsvTransform β CSV parsing
β βββ MapsterTransform β Object mapping
β βββ CompressionTransform β Brotli/GZip
β βββ PollyResilienceTransform β Retry/CB/Hedging
β βββ FilterTransform β Predicate filtering
β βββ ValidationTransform β DataAnnotations validation
β βββ ConditionalTransform β Conditional execution
β βββ CompositeTransform β Chain transforms
β βββ FilterValidationExtensions β ToFilter() conversion
βββ Sinks/ β Data destinations
β βββ LoggerSink β Structured logging
β βββ DeadLetterSink β Failed items persistence
β βββ HttpSink β REST API client
β βββ DbSink β Database insert
β βββ CsvFileSink β CSV file writer
β βββ JsonFileSink β JSON file writer
βββ Hosting/ β ASP.NET Core integration
β βββ SmartPipeHostedService β BackgroundService
β βββ SmartPipeServiceCollectionExtensions β AddSmartPipe DI
β βββ SmartPipeResilienceExtensions β Polly registration
βββ Health/ β Kubernetes probes
β βββ SmartPipeLivenessCheck
β βββ SmartPipeReadinessCheck
βββ Streaming/ β Stream utilities
βββ ChannelMerge β Merge two channels
One package. All integrations.
Requirements
- .NET 10.0+
- SmartPipe.Core: 0 dependencies
- SmartPipe.Extensions: Polly, EF Core, Dapper, Mapster, CsvHelper
- 598 tests, 89.2% code coverage
What's New in v1.0.6
- Thread safety β CuckooFilter, DeduplicationFilter, ReservoirSampler now fully thread-safe
- ObjectPool max capacity β prevents unbounded pool growth under sustained load
- DeduplicationFilter TTL β automatic entry expiration for long-running pipelines
- JsonFileSink periodic flushing β NDJSON batch writes, prevents OOM on large datasets
- RetryQueue polling optimization β single CancellationTokenSource per call, reduced allocations
- DrainAsync + WithTimeoutAsync β CancellationToken support
- PipelineDashboard β readonly record struct,
PipelineDashboard.Empty - TransformWithTimeoutAsync β catch-all exception handling prevents consumer crashes
- SecretScanner β disabled by default, explicit opt-in via
EnableFeature("SecretScanner") - ExponentialHistogram β Volatile.Read for percentile reads, P50/P95/P99 caching
- AdaptiveMetrics β Stopwatch.GetTimestamp() instead of TickCount64
- DbSink β async ExecuteAsync, no thread pool blocking
- DapperSelector β try/finally reader disposal
- ChannelMerge β optional BoundedChannelOptions
What's New in v1.0.5
- 598 tests, 95.8% code coverage
- DefaultRetryPolicy β per-pipeline retry configuration in SmartPipeChannelOptions
- RetryBudget β per-item retry budget in RetryQueue, auto-routes exhausted items to DeadLetterSink
- DisposeAsync(CancellationToken) β graceful cancellation during pipeline disposal
- AddSmartPipe DI β service collection extensions for ASP.NET Core integration
- IClock integration β time abstraction for testability, replaces DateTime.UtcNow
- AtomicHelper β lock-free CompareExchange loop utility
- SecretScanner evasion detection β Base64/URL decoding, MaxRecursionDepth=3, 164 tests
- DeadLetterSink retry β IOException recovery with exponential backoff
- AdaptiveParallelism adaptive alpha β faster response to latency changes
- CircuitBreaker CleanupWindow β thread-safe via TryDequeue+check
- ObjectPool ABA protection β version stamps prevent race conditions
- CuckooFilter Merge β combine multiple filters
What's New in v1.0.4
- 22 new features (243 tests, 96.4% coverage)
- P-Controller Parallelism β smooth thread scaling, no binary jumps
- Double EMA + Prediction β velocity tracking + one-step latency forecast
- Hybrid CircuitBreaker β EWMA early warning + Sliding window decisions
- P-Controller Backpressure β continuous throttling, no oscillation
- PipelineState + Cancel() β lifecycle management with events
- Progress reporting β
OnProgresswith ETA calculation - Auto DeadLetter routing β exhausted retries β DeadLetterSink
- 12 new Extensions β CsvFileSource/Sink, JsonFileSource/Sink, FilterTransform, ValidationTransform, DbSink, HttpSink, ConditionalTransform, DeadLetterSource, CompositeTransform
- Metrics.Export() β JSON + Prometheus format
- 4 new OWASP patterns in SecretScanner
- 12% faster ValueTask_Transform (69.12 ns)
What's New in v1.0.3
- 13 new features (215 tests, 96.3% coverage)
- Middleware Transformer β
Func<T,T>as lightweight ITransformer - Rendezvous Channel β (BoundedCapacity=0)
- HyperLogLogEstimator β Count-Distinct with O(1) memory
- Dual-threshold Watermark β Pause/Resume prevents oscillation
- Liveness/Readiness Health Checks β Kubernetes-native
- DeadLetterSink β failed items persistence
- Data Lineage β provenance tracking in Metadata
- ChannelMerge β merge two streams
- RunInBackground() β streaming pipeline consumption
- Hybrid Queue β FullMode option (Wait/DropOldest)
- AsChannelReader() β SignalR/gRPC integration
What's New in v1.0.2
- Lock-free RetryQueue
- Lock-free CircuitBreaker
- SmartPipeEventSource β monitor via
dotnet-counters - SmartPipeHostedService β native ASP.NET Core integration
- SmartPipeHealthCheck β pipeline health for YARP/Kubernetes
- Adaptive EMA β dynamic Ξ± for spike detection
- Dynamic Watermark β throughput-based backpressure
- 96.3% code coverage (up from 86.5%)
- 47 new tests, 0 regressions in benchmarks
Documentation
- Complete Feature Reference β all 24 components in detail
- Architecture Overview β pipeline flow and design
- API Reference β interfaces and configuration
- Contributing Guide
- Security Policy
- Changelog
Acknowledgements
SmartPipe is built on ideas and research from:
- Polly β resilience patterns for .NET (github.com/App-vNext/Polly)
- System.Threading.Channels β lock-free producer/consumer infrastructure by Microsoft
- OpenTelemetry β observability framework for cloud-native software
- Little's Law β queue theory applied to adaptive parallelism (ACM Queue, 2025)
- Bloom & Cuckoo Filters β probabilistic data structures for deduplication
- ReTraced β three-level retry model inspiration
- TheCodeMan β production Channel pipeline patterns
- Microsoft.Extensions.Resilience β resilience pipeline integration
- OWASP β security patterns for secret detection
- BenchmarkDotNet β performance measurement framework
- Control Theory (P-controllers) β applied to AdaptiveParallelism and BackpressureStrategy
- HyperLogLog (Flajolet et al.) β cardinality estimation algorithm
License MIT License β see LICENSE for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.0)
NuGet packages (4)
Showing the top 4 NuGet packages that depend on SmartPipe.Core:
| Package | Downloads |
|---|---|
|
SmartPipe.Extensions
Extensions for SmartPipe.Core: 28 integrations (Http, EF Core, Dapper, JSON, CSV, Mapster, Compression, Polly, Filter, Validation, Conditional, Composite, DeadLetter). One package instead of 12. |
|
|
SmartPipe.Memory.Health
Predictive analytics and health monitoring for SmartPipe.Memory graph stores. HealthVector, BottleneckPredictor, InsightGenerator, CognitiveConsolidation, MemoryDecayPolicy, and background monitoring agents. |
|
|
SmartPipe.Memory
Embedded graph memory layer for the SmartPipe ecosystem. Type-safe Fluent API, in-memory traversals, SQLite WAL persistence, time-travel queries, predictive analytics, and OpenTelemetry support. |
|
|
SmartPipe.Memory.Extensions
Integration library connecting SmartPipe.Memory with SmartPipe.Core ETL pipelines. Provides UseMemory(), AsGraphSource(), ToGraphSink(), TransformToEdges() and automatic pipeline topology registration. |
GitHub repositories
This package is not used by any popular GitHub repositories.