DataFlow.Data 1.0.0

Suggested Alternatives

DataFlow.Net

Additional Details

This package has missing dependencies. Use DataFlow.Net instead for the complete framework.

The owner has unlisted this package. This could mean that the package is deprecated, has security vulnerabilities or shouldn't be used anymore.
dotnet add package DataFlow.Data --version 1.0.0
                    
NuGet\Install-Package DataFlow.Data -Version 1.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DataFlow.Data" Version="1.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="DataFlow.Data" Version="1.0.0" />
                    
Directory.Packages.props
<PackageReference Include="DataFlow.Data" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add DataFlow.Data --version 1.0.0
                    
#r "nuget: DataFlow.Data, 1.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package DataFlow.Data@1.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=DataFlow.Data&version=1.0.0
                    
Install as a Cake Addin
#tool nuget:?package=DataFlow.Data&version=1.0.0
                    
Install as a Cake Tool

DataFlow.NET

We make Data fit for C#.

From local files to cloud scale — LINQ all the way down.
Let IntelliSense and the compiler do the work.

- df.filter(pl.col("ammount") > 1000)   # Typo? Runtime error.
+ .Where(o => o.Amount > 1000)          // Typo? Won't compile. ✓

License Coverage


Table of Contents

  1. Sound Familiar?
  2. Three Simple Rules
  3. Everything is a Stream
  4. Quick Start
  5. Documentation
  6. Community & Support

1. Sound Familiar?

.NET developers know the story — You write a clean, type-safe data processor in C# — It works perfectly on your dev machine — Then reality hits:

  1. The Data Grows:

    • 10 MB: List<T> works fine.
    • 10 GB: OutOfMemoryException. You rewrite using StreamReader.
    • 10 TB: You abandon C# for Spark/SQL. You lose type safety and duplicate logic.
  2. The Logic Tangles:

    • New requirements mean new if/else branches.
    • You loop over the same data 5 times to handle 5 different cases.
    • The code becomes spaghetti, and the data lifecycle becomes a black box.
  3. The Source Fragments:

    • Today it's a CSV file. Tomorrow it's a REST API. Next week it's a Kafka Stream.
    • For each source, you write different adapter code.
    • You end up with a "Code Salad": mixed abstractions, different error handling, and no reuse.

DataFlow.NET was built to stop this cycle:

  • Unified API — Same code for CSV, JSON, Kafka, Spark
  • Constant memory — Stream billions of rows without OutOfMemoryException (see benchmarks)
  • No spaghetti — Declarative Cases pattern replaces nested if/else
  • Pure C# — LINQ all the way down

Define the what. DataFlow.NET handles the how.


2. Three Simple Rules

DataFlow.NET is more than a framework — it defines a pattern to process data.

graph LR
    S[**S**ink] --> U[**U**nify]
    U --> P[**P**rocess]
    P --> R[**R**oute]
    R --> A[**A**pply]
    
    style S fill:#f9f,stroke:#333,stroke-width:2px
    style A fill:#bbf,stroke:#333,stroke-width:2px

We call this the SUPRA pattern — the name comes from gathering the first letter of each stage: Sink, Unify, Process, Route, Apply.

The SUPRA pattern ensures memory stays constant and items flow one at a time. Read the SUPRA-Pattern Guide →

To achieve the SUPRA pattern, you'll have to follow these rules:

  1. Sink First — Buffer and normalize at the edge, never in the middle.
  2. Flow Lazy — Items stream one by one. Constant memory.
  3. Route Declaratively — No more if/else spaghetti.

DataFlow.NET provides all the ready-to-use blocks to natively apply these rules.


3. Everything is a Stream

DataFlow.NET provides tools to abstract the source of data from the processing. Use these to make every data source an IAsyncEnumerable<T> stream — the essence of the "Unified API" — same LINQ operators, same processing logic, regardless of origin.

See Integration Patterns Guide →

Source Type Pattern Output
EF Core (SQL Server, PostgreSQL, etc.) .AsAsyncEnumerable() IAsyncEnumerable<T>
JSON/CSV/YAML Files Read.Json<T>() / Read.Csv<T>() IAsyncEnumerable<T>
REST APIs .Poll() + .SelectMany() IAsyncEnumerable<T>
Kafka / RabbitMQ / WebSocket Wrap + .WithBoundedBuffer() IAsyncEnumerable<T>
Snowflake (Premium) Read.SnowflakeTable<T>() IAsyncEnumerable<T>
Apache Spark (Premium) Read.SparkDataFrame<T>() SparkQuery<T>

Any IAsyncEnumerable<T> source integrates natively.

Examples

Already using Entity Framework Core? DataFlow.NET plugs right in:

// EF Core — Native support
await dbContext.Orders.AsAsyncEnumerable()
    .Where(o => o.Amount > 100)
    .WriteCsv("orders.csv");
  • ✅ EF Core handles database access
  • ✅ DataFlow.NET handles processing logic
  • ✅ Works with SQL Server, PostgreSQL, MySQL, SQLite

Need to integrate REST APIs or message queues? Use polling and buffering:

// REST API — Poll and flatten
var orders = (() => httpClient.GetFromJsonAsync<Order[]>("/api/orders"))
    .Poll(TimeSpan.FromSeconds(5), token)
    .SelectMany(batch => batch.ToAsyncEnumerable());

// Kafka/WebSocket — Wrap in async iterator + buffer
var kafkaStream = ConsumeKafka(token).WithBoundedBuffer(1024);

High-Performance Streaming File Readers

DataFlow.NET provides high-performance file readers: no Reflection on the hot path; expression trees are compiled once and cached.

  • 4x faster than standard reflection-based creation (benchmark results →)
  • Zero allocation overhead — same 48 bytes as native new() instantiation
  • Handles CSV, JSON, and YAML files generically.

We carefully crafted an intuitive, fully-featured readers API with advanced error handling — all while streaming row-by-row.

The streaming row-by-row approach — absent in most other frameworks — is the cornerstone of DataFlow.NET's constant memory usage.

LINQ Extensions

DataFlow.NET implements additional LINQ extensions to make every data loop composable—even side-effect loops.

  • Independent implementation — Re-implemented IAsyncEnumerable methods without depending on System.Linq.Async
  • Clear terminal vs non-terminal separation — Terminal methods (Do(), Display()) force execution; non-terminal methods (ForEach(), Select(), Where()) stay lazy

See Extension Methods API Reference →

Cases/SelectCase/ForEachCase

We've extended standard LINQ with custom operators for declarative branching. Using Cases, SelectCase, and ForEachCase, you can replace complex nested if/else blocks with an optimized, single-pass dispatch tree — while remaining fully composable.

See Cases Pattern Guide →

Multi-Source Stream Merging

This is the "U" (Unify) step of the SUPRA pattern — "absorb many sources into one stream."

var unifiedStream = new UnifiedStream<Log>()
    .Unify(fileLogs, "archive")
    .Unify(apiLogs, "live")
    .Unify(dbLogs, "backup");
// Result: A single IAsyncEnumerable<Log> you can query

See Stream Merging Guide →

Debug with Spy()

Insert observation points anywhere in your pipeline without changing data flow. Because Spy() is fully composable, you can add or remove traces by simply commenting a line — no code rewriting required.

await data
    .Where(...)
    .Spy("After filtering")       // 👈 See items flow through
    .Select(...)
    .Spy("After transformation")
    .ForEach(...)                 // 👈 Side-effect iteration, still composable
    .Do();                        // 👈 Force execution (no output needed)

⚠️ Note: Due to lazy execution, output from multiple Spy() calls appears interleaved (item-by-item), not grouped by stage. This preserves the streaming nature of the pipeline.

Go Parallel When You Need To

Need to parallelize CPU-intensive or I/O-bound work? DataFlow.NET provides parallel counterparts that work just like their sequential equivalents — still lazy, still composable:

// Parallel sync processing
await data.AsParallel()
    .Select(item => ExpensiveCompute(item))
    .ForEach(item => WriteToDb(item))
    .Do();

// Parallel async processing
await asyncStream.AsParallel()
    .WithMaxConcurrency(8)
    .Select(async item => await FetchAsync(item))
    .Do();

See ParallelAsyncQuery API Reference → | Parallel Processing Guide → | Extension Methods →

Scale to the cloud (Premium)

If you hit the limit of local computing power, DataFlow.NET lets you seamlessly scale to the cloud with LINQ-to-Spark & Snowflake. Your C# lambda expressions are decompiled at runtime and translated into native Spark/SQL execution plans.

  • ✅ No data transfer to client
  • ✅ Execution happens on the cluster
  • ✅ Full type safety

LINQ-to-Spark Guide → | LINQ-to-Snowflake Guide →


4. Quick Start

Prerequisites

Installation

git clone https://github.com/improveTheWorld/DataFlow.NET
cd DataFlow.NET

Run the Usage Examples

dotnet run --project DataFlow.Test.UsageExamples/DataFlow.App.UsageExamples.csproj

Or open the full solution in Visual Studio 2022:

DataFlow.Net.sln

Your First Pipeline

using DataFlow;

// A complete, memory-efficient pipeline in 10 lines
await Read.Csv<Order>("orders.csv")
    .Cases(
        o => o.Amount > 1000, 
        o => o.CustomerType == "VIP"
    )
    .SelectCase(
        highValue => ProcessHighValue(highValue),
        vip => ProcessVip(vip)
    )
    .AllCases()
    .WriteJson("output.json");

Advanced: One Logic, Multiple Targets

Your business rule is: "Flag high-value transactions from international customers."

// 1. DEVELOPMENT: Read from a local CSV file
await Read.Csv<Order>("orders.csv")
    .Cases(o => o.Amount > 10000, o => o.IsInternational) // 👈 Your Logic
    .SelectCase(...) 
    .AllCases()
    .WriteCsv("output.csv");

// 2. PRODUCTION: Merge multiple async streams
await new UnifiedStream<Order>()
    .Unify(ordersApi, "api")
    .Unify(ordersDb, "db")
    .Cases(o => o.Amount > 10000, o => o.IsInternational) // 👈 SAME Logic
    .SelectCase(...)
    .AllCases()
    .WriteJson("output.json");

// 3. CLOUD: Query Snowflake Data Warehouse
// Filters and aggregations execute on the server
await Read.SnowflakeTable<Order>(options, "orders")
    .Where(o => o.Year == 2024)
    .Cases(o => o.Amount > 10000, o => o.IsInternational) // 👈 SAME Logic
    .SelectCase(...)
    .ToListAsync();

// 4. SCALE: Run on Apache Spark (Petabyte Scale)
// Translates your C# Expression Tree to native Spark orchestration
Read.SparkDataFrame<Order>(spark, ordersDf)
    .Where(o => o.Amount > 10000)
    .Cases(o => o.Amount > 50000, o => o.IsInternational) // 👈 SAME Logic
    .SelectCase(...)
    .AllCases()
    .Write().Parquet("s3://data/output");

5. Documentation

Topic Description
🏰 SUPRA Pattern The SUPRA Pattern deep dive
🔀 Cases Pattern The Cases/SelectCase/ForEachCase Engine
📖 Data Reading Reading CSV, JSON, YAML, Text
✍️ Data Writing Writing CSV, JSON, YAML, Text
🌊 Stream Merging UnifiedStream & Multi-Source Streams
🔄 Polling & Buffering Data acquisition patterns
🔥 Big Data Running C# on Apache Spark
❄️ Snowflake LINQ-to-Snowflake Provider
🚀 Performance The Zero-Allocation Engine
📋 API Reference Complete API Documentation
🧩 Extension Methods IEnumerable/IAsyncEnumerable/Parallel API Matrix
🔌 Integration Patterns HTTP, Kafka, EF Core, WebSocket examples
Parallel Processing ParallelQuery & ParallelAsyncQuery
ParallelAsyncQuery Parallel async processing API
🧪 Test Coverage Coverage Reports (~77% Core)
🗺️ Roadmap Future Enterprise Connectors

6. Community & Support

DataFlow.NETSink the chaos. Let the rest flow pure. 🚀

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated