SmartPipe.Core
1.0.4
Critical bugs fixed in v1.0.5. Thread-safety, evasion detection, P-controller fixes.
See the version list below for details.
dotnet add package SmartPipe.Core --version 1.0.4
NuGet\Install-Package SmartPipe.Core -Version 1.0.4
<PackageReference Include="SmartPipe.Core" Version="1.0.4" />
<PackageVersion Include="SmartPipe.Core" Version="1.0.4" />
<PackageReference Include="SmartPipe.Core" />
paket add SmartPipe.Core --version 1.0.4
#r "nuget: SmartPipe.Core, 1.0.4"
#:package SmartPipe.Core@1.0.4
#addin nuget:?package=SmartPipe.Core&version=1.0.4
#tool nuget:?package=SmartPipe.Core&version=1.0.4
SmartPipe.Core
Universal streaming pipeline engine for .NET 10 with zero dependencies.
Built on System.Threading.Channels, SmartPipe.Core provides a production-ready pipeline engine for ETL, real-time stream processing, API aggregation, and AI agent integration β all with 0 allocations in hot path.
π Complete Feature Reference β
What is SmartPipe?
SmartPipe is not just another ETL library. It's a universal streaming pipeline engine that handles:
- ETL/ELT β extract from DB/API, transform, load to anywhere
- Real-time stream processing β process events as they arrive
- API aggregation β fan-out requests, aggregate responses
- Data validation pipelines β validate, enrich, route
- AI agent tools β integrate with Semantic Kernel, AutoGen
- Log/sensor processing β process IoT telemetry, application logs
- Error recovery & dead letter β capture failures for later replay
- Stream merging β combine multiple data sources into one pipeline
All in 5 lines of code:
using SmartPipe.Core;
using SmartPipe.Extensions;
var pipeline = PipelineBuilder
.From(new HttpSelector<MyDto>("https://api.example.com/data"))
.Transform(x => x.Enrich()) // Middleware for simple ops
.Transform(new JsonTransform<MyDto, MyEntity>()) // ITransformer for complex
.WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<MyEntity>(logger));
Getting Started | Installation
# Core engine (zero dependencies)
dotnet add package SmartPipe.Core
# Extensions (Http, EF Core, Dapper, JSON, CSV, Mapster, Polly)
dotnet add package SmartPipe.Extensions
Examples by Scenario
Middleware Pattern (5 lines)
var pipeline = PipelineBuilder
.From(new HttpSelector<int>("https://api.example.com/numbers"))
.Transform(x => x * 2) // Middleware!
.Transform(x => x + 1) // Middleware!
.WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<int>(logger));
ETL Pipeline (Database β Transform β API)
var pipeline = PipelineBuilder
.From(new EfCoreSelector<Order>(dbContext).WithQuery(q => q.Where(o => o.Status == "Pending")))
.Transform(new MapsterTransform<Order, OrderDto>())
.Transform(new PollyResilienceTransform<OrderDto>(resiliencePipeline))
.WithOptions(o => o.MaxDegreeOfParallelism = 8);
await pipeline.To(new HttpSink<OrderDto>(httpClient, "https://api.destination.com/orders"));
Real-time Stream Processing (API β Filter β Log)
var pipeline = PipelineBuilder
.From(new HttpSelector<SensorData>("https://iot.example.com/telemetry"))
.Transform(new JsonTransform<SensorData, SensorData>())
.Transform(new MapsterTransform<SensorData, Alert>())
.WithOptions(o => { o.MaxDegreeOfParallelism = 2; o.ContinueOnError = true; });
await pipeline.To(new LoggerSink<Alert>(logger));
AI Agent Tool (Semantic Kernel Integration)
var tool = new PipelineTool<string, string>("summarize", "Summarize text using AI");
tool.AddTransformer(new JsonTransform<string, PromptDto>());
tool.AddTransformer(new HttpTransform<PromptDto, string>(openAiClient));
var result = await tool.ExecuteAsync("Long text to summarize...");
API Aggregation (Fan-out β Aggregate)
var pipeline = PipelineBuilder
.From(new HttpSelector<User>("https://users.api.com"))
.Transform(new MapsterTransform<User, EnrichedUser>())
.Transform(new PollyResilienceTransform<EnrichedUser>(resiliencePipeline));
await pipeline.To(new Sink<EnrichedUser>(user => enrichedUsers.Add(user)));
Error Persistence with DeadLetterSink
var pipeline = PipelineBuilder
.From(new HttpSelector<Order>("https://api.example.com/orders"))
.Transform(new OrderValidator())
.WithOptions(o => o.ContinueOnError = true);
await pipeline.To(new DeadLetterSink<Order>("failed_orders.json"));
First Pipeline (5 lines)
using SmartPipe.Core;
using SmartPipe.Extensions.Selectors;
using SmartPipe.Extensions.Transforms;
using SmartPipe.Extensions.Sinks;
var pipeline = PipelineBuilder
.From(new HttpSelector<MyDto>("https://api.example.com/data"))
.Transform(new JsonTransform<MyDto, MyEntity>())
.WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<MyEntity>(logger));
ASP.NET Core BackgroundService
public class PipelineWorker : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
var pipeline = PipelineBuilder
.From(new EfCoreSelector<Order>(_dbContext))
.Transform(new MapsterTransform<Order, OrderDto>())
.WithOptions(o => o.MaxDegreeOfParallelism = 8);
await pipeline.To(new HttpSink<OrderDto>(_httpClient, "https://api.dest.com"));
}
}
SmartPipe Architecture
Overview
SmartPipe is a streaming pipeline engine built on System.Threading.Channels.
It consists of 24 integrated components organized in a resilience pipeline order.
Pipeline Flow
ISource<T> (or RunInBackground) β βΌ Bounded Channel (or Rendezvous Channel) β βΌ BackpressureStrategy (Dual-threshold: Pause/Resume) β βΌ DeduplicationFilter (Bloom, O(1)) + HyperLogLogEstimator β βΌ AdaptiveParallelism (Little's Law) β βΌ CircuitBreaker (Lock-free, ClosedβOpenβHalfOpen) β βΌ MiddlewareTransformer (Func<T,T>) + ITransformer (ValueTask) β βΌ RetryQueue (Jitter + Exponential Backoff) β βΌ Bounded Channel β βΌ ISink<T> (Logger, DeadLetter, HealthChecks) β βΌ AsChannelReader() β SignalR/gRPC
Resilience Pipeline Order
- TotalRequestTimeout β maximum time for entire pipeline
- CircuitBreaker β stops processing on high failure rate
- RetryQueue β delays and retries transient errors
- AttemptTimeout β per-transformer timeout
- DeadLetterSink β captures exhausted retries for later replay
- LivenessCheck β detects stalled pipeline
- ReadinessCheck β detects overloaded pipeline
Component Overview
| Component | Type | Memory | Performance |
|---|---|---|---|
| DeduplicationFilter | Bloom filter | O(1) | 20.65 ns |
| ObjectPool | Lock-free | O(n) | 15.55 ns |
| CircuitBreaker | Lock-free (Interlocked) | O(n) | 28.10 ns |
| RetryQueue | Lock-free (Channel) | O(n) | 69.16 ns |
| ExponentialHistogram | Percentiles | O(logΒ² n) | < 100 ns |
| JumpHash | Sharding | O(1) | < 10 ns |
| CuckooFilter | Dedup + delete | O(1) | < 50 ns |
| ReservoirSampler | Sampling | O(k) | < 10 ns |
| HyperLogLogEstimator | Count-Distinct | O(1) | < 50 ns |
| DeadLetterSink | Error persistence | O(n) | β |
| ChannelMerge | Stream merging | O(n) | β |
| AdaptiveMetrics (Update) | Double EMA | O(1) | 20.25 ns |
| AdaptiveMetrics (Predict) | Double EMA | O(1) | 0.16 ns |
Extension Architecture
Extensions follow the Selection Pattern β a single package with categorized components:
- Selectors β data sources (Http, EF Core, Dapper, CSV, JSON, DeadLetter)
- Transforms β data transformers (JSON, CSV, Mapster, Compression, Polly, Filter, Validation, Conditional, Composite)
- Sinks β data destinations (Logger, DeadLetter, Http, Db, CSV, JSON)
- Health β Kubernetes probes (Liveness, Readiness)
- Streaming β ChannelMerge, RunInBackground, AsChannelReader
Instead of 12 separate NuGet packages, SmartPipe uses a single SmartPipe.Extensions package with the Selection Pattern:
SmartPipe.Extensions/
βββ Selectors/ β Data sources
β βββ HttpSelector β REST API client
β βββ EfCoreSelector β Entity Framework streaming
β βββ DapperSelector β High-performance SQL
β βββ CsvFileSource β CSV file reader
β βββ JsonFileSource β JSON array & NDJSON reader
β βββ DeadLetterSource β Replay failed items
βββ Transforms/ β Data transformers
β βββ JsonTransform β JSON serialization
β βββ CsvTransform β CSV parsing
β βββ MapsterTransform β Object mapping
β βββ CompressionTransform β Brotli/GZip
β βββ PollyResilienceTransform β Retry/CB/Hedging
β βββ FilterTransform β Predicate filtering
β βββ ValidationTransform β DataAnnotations validation
β βββ ConditionalTransform β Conditional execution
β βββ CompositeTransform β Chain transforms
βββ Sinks/ β Data destinations
βββ LoggerSink β Structured logging
βββ DeadLetterSink β Failed items persistence
βββ HttpSink β REST API client
βββ DbSink β Database insert
βββ CsvFileSink β CSV file writer
βββ JsonFileSink β JSON file writer
One package. All integrations. Zero boilerplate.
Requirements
- .NET 10.0+
- SmartPipe.Core: 0 dependencies
- SmartPipe.Extensions: Polly, EF Core, Dapper, Mapster, CsvHelper
- 243 tests, 96.4% code coverage
What's New in v1.0.4
- 22 new features (243 tests, 96.4% coverage)
- P-Controller Parallelism β smooth thread scaling, no binary jumps
- Double EMA + Prediction β velocity tracking + one-step latency forecast
- Hybrid CircuitBreaker β EWMA early warning + Sliding window decisions
- P-Controller Backpressure β continuous throttling, no oscillation
- PipelineState + Cancel() β lifecycle management with events
- Progress reporting β
OnProgresswith ETA calculation - Auto DeadLetter routing β exhausted retries β DeadLetterSink
- 12 new Extensions β CsvFileSource/Sink, JsonFileSource/Sink, FilterTransform, ValidationTransform, DbSink, HttpSink, ConditionalTransform, DeadLetterSource, CompositeTransform
- Metrics.Export() β JSON + Prometheus format
- 4 new OWASP patterns in SecretScanner
- 12% faster ValueTask_Transform (69.12 ns)
What's New in v1.0.3
- 13 new features (215 tests, 96.3% coverage)
- Middleware Transformer β
Func<T,T>as lightweight ITransformer - Rendezvous Channel β (BoundedCapacity=0)
- HyperLogLogEstimator β Count-Distinct with O(1) memory
- Dual-threshold Watermark β Pause/Resume prevents oscillation
- Liveness/Readiness Health Checks β Kubernetes-native
- DeadLetterSink β failed items persistence
- Data Lineage β provenance tracking in Metadata
- ChannelMerge β merge two streams
- RunInBackground() β streaming pipeline consumption
- Hybrid Queue β FullMode option (Wait/DropOldest)
- AsChannelReader() β SignalR/gRPC integration
What's New in v1.0.2
- Lock-free RetryQueue
- Lock-free CircuitBreaker
- SmartPipeEventSource β monitor via
dotnet-counters - SmartPipeHostedService β native ASP.NET Core integration
- SmartPipeHealthCheck β pipeline health for YARP/Kubernetes
- Adaptive EMA β dynamic Ξ± for spike detection
- Dynamic Watermark β throughput-based backpressure
- 96.3% code coverage (up from 86.5%)
- 47 new tests, 0 regressions in benchmarks
Documentation
- Complete Feature Reference β all 24 components in detail
- Architecture Overview β pipeline flow and design
- API Reference β interfaces and configuration
- Contributing Guide
- Security Policy
- Changelog
Acknowledgements
SmartPipe is built on ideas and research from:
- Polly β resilience patterns for .NET (github.com/App-vNext/Polly)
- System.Threading.Channels β lock-free producer/consumer infrastructure by Microsoft
- OpenTelemetry β observability framework for cloud-native software
- Little's Law β queue theory applied to adaptive parallelism (ACM Queue, 2025)
- Bloom & Cuckoo Filters β probabilistic data structures for deduplication
- ReTraced β three-level retry model inspiration
- TheCodeMan β production Channel pipeline patterns
- Microsoft.Extensions.Resilience β resilience pipeline integration
- OWASP β security patterns for secret detection
- BenchmarkDotNet β performance measurement framework
License MIT License β see LICENSE for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- No dependencies.
NuGet packages (4)
Showing the top 4 NuGet packages that depend on SmartPipe.Core:
| Package | Downloads |
|---|---|
|
SmartPipe.Extensions
Extensions for SmartPipe.Core: 28 integrations (Http, EF Core, Dapper, JSON, CSV, Mapster, Compression, Polly, Filter, Validation, Conditional, Composite, DeadLetter). One package instead of 12. |
|
|
SmartPipe.Memory
Embedded graph memory layer for the SmartPipe ecosystem. Type-safe Fluent API, in-memory traversals, SQLite WAL persistence, time-travel queries, predictive analytics, and OpenTelemetry support. |
|
|
SmartPipe.Memory.Health
Predictive analytics and health monitoring for SmartPipe.Memory graph stores. HealthVector, BottleneckPredictor, InsightGenerator, CognitiveConsolidation, MemoryDecayPolicy, and background monitoring agents. |
|
|
SmartPipe.Memory.Extensions
Integration library connecting SmartPipe.Memory with SmartPipe.Core ETL pipelines. Provides UseMemory(), AsGraphSource(), ToGraphSink(), TransformToEdges() and automatic pipeline topology registration. |
GitHub repositories
This package is not used by any popular GitHub repositories.