DataFlow.Net
1.1.0
dotnet add package DataFlow.Net --version 1.1.0
NuGet\Install-Package DataFlow.Net -Version 1.1.0
<PackageReference Include="DataFlow.Net" Version="1.1.0" />
<PackageVersion Include="DataFlow.Net" Version="1.1.0" />
<PackageReference Include="DataFlow.Net" />
paket add DataFlow.Net --version 1.1.0
#r "nuget: DataFlow.Net, 1.1.0"
#:package DataFlow.Net@1.1.0
#addin nuget:?package=DataFlow.Net&version=1.1.0
#tool nuget:?package=DataFlow.Net&version=1.1.0
DataFlow.NET
We make Data fit for C#.
From local files to cloud scale — LINQ all the way down.
Let IntelliSense and the compiler do the work.
- df.filter(pl.col("ammount") > 1000) # Typo? Runtime error.
+ .Where(o => o.Amount > 1000) // Typo? Won't compile. ✓
# Install via NuGet
dotnet add package DataFlow.Net --version 1.1.0
Table of Contents
- Sound Familiar?
- Three Simple Rules
- Everything is a Stream
- Quick Start
- Documentation
- Community & Support
1. Sound Familiar?
.NET developers know the story — You write a clean, type-safe data processor in C# — It works perfectly on your dev machine — Then reality hits:
The Data Grows:
- 10 MB:
List<T>works fine. - 10 GB:
OutOfMemoryException. You rewrite usingStreamReader. - 10 TB: You abandon C# for Spark/SQL. You lose type safety and duplicate logic.
- 10 MB:
The Logic Tangles:
- New requirements mean new
if/elsebranches. - You loop over the same data 5 times to handle 5 different cases.
- The code becomes spaghetti, and the data lifecycle becomes a black box.
- New requirements mean new
The Source Fragments:
- Today it's a CSV file. Tomorrow it's a REST API. Next week it's a Kafka Stream.
- For each source, you write different adapter code.
- You end up with a "Code Salad": mixed abstractions, different error handling, and no reuse.
DataFlow.NET was built to stop this cycle:
- ✅ Unified API — Same code for CSV, JSON, Kafka, Spark
- ✅ Constant memory — Stream billions of rows without
OutOfMemoryException(see benchmarks) - ✅ No spaghetti — Declarative
Casespattern replaces nestedif/else - ✅ Pure C# — LINQ all the way down
Define the what. DataFlow.NET handles the how.
2. Three Simple Rules
DataFlow.NET is more than a framework — it defines a pattern to process data.
graph LR
S[**S**ink] --> U[**U**nify]
U --> P[**P**rocess]
P --> R[**R**oute]
R --> A[**A**pply]
style S fill:#f9f,stroke:#333,stroke-width:2px
style A fill:#bbf,stroke:#333,stroke-width:2px
We call this the SUPRA pattern — the name comes from gathering the first letter of each stage: Sink, Unify, Process, Route, Apply.
The SUPRA pattern ensures memory stays constant and items flow one at a time. Read the SUPRA-Pattern Guide →
To achieve the SUPRA pattern, you'll have to follow these rules:
- Sink First — Buffer and normalize at the edge, never in the middle.
- Flow Lazy — Items stream one by one. Constant memory.
- Route Declaratively — No more
if/elsespaghetti.
DataFlow.NET provides all the ready-to-use blocks to natively apply these rules.
3. Everything is a Stream
DataFlow.NET provides tools to abstract the source of data from the processing. Use these to make every data source an IAsyncEnumerable<T> stream — the essence of the "Unified API" — same LINQ operators, same processing logic, regardless of origin.
See Integration Patterns Guide →
| Source Type | Pattern | Output |
|---|---|---|
| EF Core (SQL Server, PostgreSQL, etc.) | .AsAsyncEnumerable() |
IAsyncEnumerable<T> |
| JSON/CSV/YAML Files | Read.Json<T>() / Read.Csv<T>() |
IAsyncEnumerable<T> |
| REST APIs | .Poll() + .SelectMany() |
IAsyncEnumerable<T> |
| Kafka / RabbitMQ / WebSocket | Wrap + .WithBoundedBuffer() |
IAsyncEnumerable<T> |
| Snowflake (Premium) | Read.SnowflakeTable<T>() |
IAsyncEnumerable<T> |
| Apache Spark (Premium) | Read.SparkDataFrame<T>() |
SparkQuery<T> |
Any IAsyncEnumerable<T> source integrates natively.
Examples
Already using Entity Framework Core? DataFlow.NET plugs right in:
// EF Core — Native support
await dbContext.Orders.AsAsyncEnumerable()
.Where(o => o.Amount > 100)
.WriteCsv("orders.csv");
- ✅ EF Core handles database access
- ✅ DataFlow.NET handles processing logic
- ✅ Works with SQL Server, PostgreSQL, MySQL, SQLite
Need to integrate REST APIs or message queues? Use polling and buffering:
// REST API — Poll and flatten
var orders = (() => httpClient.GetFromJsonAsync<Order[]>("/api/orders"))
.Poll(TimeSpan.FromSeconds(5), token)
.SelectMany(batch => batch.ToAsyncEnumerable());
// Kafka/WebSocket — Wrap in async iterator + buffer
var kafkaStream = ConsumeKafka(token).WithBoundedBuffer(1024);
High-Performance Streaming File Readers
DataFlow.NET provides high-performance file readers: no Reflection on the hot path; expression trees are compiled once and cached.
- 4x faster than standard reflection-based creation (benchmark results →)
- Zero allocation overhead — same 48 bytes as native
new()instantiation - Handles CSV, JSON, and YAML files generically.
We carefully crafted an intuitive, fully-featured readers API with advanced error handling — all while streaming row-by-row.
The streaming row-by-row approach — absent in most other frameworks — is the cornerstone of DataFlow.NET's constant memory usage.
Materialization Quick Reference → | Data Reading Guide →
LINQ Extensions
DataFlow.NET implements additional LINQ extensions to make every data loop composable—even side-effect loops.
- Independent implementation — Re-implemented
IAsyncEnumerablemethods without depending onSystem.Linq.Async - Clear terminal vs non-terminal separation — Terminal methods (
Do(),Display()) force execution; non-terminal methods (ForEach(),Select(),Where()) stay lazy
See Extension Methods API Reference →
Cases/SelectCase/ForEachCase
We've extended standard LINQ with custom operators for declarative branching. Using Cases, SelectCase, and ForEachCase, you can replace complex nested if/else blocks with an optimized, single-pass dispatch tree — while remaining fully composable.
Multi-Source Stream Merging
This is the "U" (Unify) step of the SUPRA pattern — "absorb many sources into one stream."
var unifiedStream = new UnifiedStream<Log>()
.Unify(fileLogs, "archive")
.Unify(apiLogs, "live")
.Unify(dbLogs, "backup");
// Result: A single IAsyncEnumerable<Log> you can query
Debug with Spy()
Insert observation points anywhere in your pipeline without changing data flow. Because Spy() is fully composable, you can add or remove traces by simply commenting a line — no code rewriting required.
await data
.Where(...)
.Spy("After filtering") // 👈 See items flow through
.Select(...)
.Spy("After transformation")
.ForEach(...) // 👈 Side-effect iteration, still composable
.Do(); // 👈 Force execution (no output needed)
⚠️ Note: Due to lazy execution, output from multiple
Spy()calls appears interleaved (item-by-item), not grouped by stage. This preserves the streaming nature of the pipeline.
Go Parallel When You Need To
Need to parallelize CPU-intensive or I/O-bound work? DataFlow.NET provides parallel counterparts that work just like their sequential equivalents — still lazy, still composable:
// Parallel sync processing
await data.AsParallel()
.Select(item => ExpensiveCompute(item))
.ForEach(item => WriteToDb(item))
.Do();
// Parallel async processing
await asyncStream.AsParallel()
.WithMaxConcurrency(8)
.Select(async item => await FetchAsync(item))
.Do();
See ParallelAsyncQuery API Reference → | Parallel Processing Guide → | Extension Methods →
Scale to the cloud (Premium)
If you hit the limit of local computing power, DataFlow.NET lets you seamlessly scale to the cloud with LINQ-to-Spark & Snowflake. Your C# lambda expressions are decompiled at runtime and translated into native Spark/SQL execution plans.
- ✅ No data transfer to client
- ✅ Execution happens on the cluster
- ✅ Full type safety
LINQ-to-Spark Guide → | LINQ-to-Snowflake Guide →
4. Quick Start
Prerequisites
- .NET 8.0 SDK or later
Installation
Via NuGet (Recommended):
dotnet add package DataFlow.Net --version 1.1.0
Or clone the repository:
git clone https://github.com/improveTheWorld/DataFlow.NET
cd DataFlow.NET
Run the Usage Examples
dotnet run --project DataFlow.Test.UsageExamples/DataFlow.App.UsageExamples.csproj
Or open the full solution in Visual Studio 2022:
DataFlow.Net.sln
Your First Pipeline
using DataFlow;
// A complete, memory-efficient pipeline in 10 lines
await Read.Csv<Order>("orders.csv")
.Cases(
o => o.Amount > 1000,
o => o.CustomerType == "VIP"
)
.SelectCase(
highValue => ProcessHighValue(highValue),
vip => ProcessVip(vip)
)
.AllCases()
.WriteJson("output.json");
Advanced: One Logic, Multiple Targets
Your business rule is: "Flag high-value transactions from international customers."
// 1. DEVELOPMENT: Read from a local CSV file
await Read.Csv<Order>("orders.csv")
.Cases(o => o.Amount > 10000, o => o.IsInternational) // 👈 Your Logic
.SelectCase(...)
.AllCases()
.WriteCsv("output.csv");
// 2. PRODUCTION: Merge multiple async streams
await new UnifiedStream<Order>()
.Unify(ordersApi, "api")
.Unify(ordersDb, "db")
.Cases(o => o.Amount > 10000, o => o.IsInternational) // 👈 SAME Logic
.SelectCase(...)
.AllCases()
.WriteJson("output.json");
// 3. CLOUD: Query Snowflake Data Warehouse
// Filters and aggregations execute on the server
await Read.SnowflakeTable<Order>(options, "orders")
.Where(o => o.Year == 2024)
.Cases(o => o.Amount > 10000, o => o.IsInternational) // 👈 SAME Logic
.SelectCase(...)
.ToListAsync();
// 4. SCALE: Run on Apache Spark (Petabyte Scale)
// Translates your C# Expression Tree to native Spark orchestration
Read.SparkDataFrame<Order>(spark, ordersDf)
.Where(o => o.Amount > 10000)
.Cases(o => o.Amount > 50000, o => o.IsInternational) // 👈 SAME Logic
.SelectCase(...)
.AllCases()
.Write().Parquet("s3://data/output");
5. Documentation
| Topic | Description |
|---|---|
| 🏰 SUPRA Pattern | The SUPRA Pattern deep dive |
| 🔀 Cases Pattern | The Cases/SelectCase/ForEachCase Engine |
| 📖 Data Reading | Reading CSV, JSON, YAML, Text |
| 🎯 Materialization Guide | Design classes for CSV, JSON, YAML, Snowflake, Spark |
| ✍️ Data Writing | Writing CSV, JSON, YAML, Text |
| 🌊 Stream Merging | UnifiedStream & Multi-Source Streams |
| 🔄 Polling & Buffering | Data acquisition patterns |
| 🔥 Big Data | Running C# on Apache Spark |
| ❄️ Snowflake | LINQ-to-Snowflake Provider |
| 🚀 Performance | The Zero-Allocation Engine |
| 📋 API Reference | Complete API Documentation |
| 🧩 Extension Methods | IEnumerable/IAsyncEnumerable/Parallel API Matrix |
| 🔌 Integration Patterns | HTTP, Kafka, EF Core, WebSocket examples |
| ⚡ Parallel Processing | ParallelQuery & ParallelAsyncQuery |
| ⚡ ParallelAsyncQuery | Parallel async processing API |
| 🧪 Test Coverage | Coverage Reports (~77% Core) |
| 🗺️ Roadmap | Future Enterprise Connectors |
6. Community & Support
- Issues: GitHub Issues
- Discord: Join the Community
- Email: tecnet.paris@gmail.com
DataFlow.NET — Sink the chaos. Let the rest flow pure. 🚀
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- No dependencies.
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
v1.1.0: Critical fix for YAML MaxNodeScalarLength infinite loop (BUG-007), YamlDotNet bundled in package. Full notes: https://github.com/improveTheWorld/DataFlow.NET/blob/main/src/docs/RELEASE-NOTES-v1.1.0.md