FluentParallel 0.2.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package FluentParallel --version 0.2.0
                    
NuGet\Install-Package FluentParallel -Version 0.2.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="FluentParallel" Version="0.2.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="FluentParallel" Version="0.2.0" />
                    
Directory.Packages.props
<PackageReference Include="FluentParallel" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add FluentParallel --version 0.2.0
                    
#r "nuget: FluentParallel, 0.2.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package FluentParallel@0.2.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=FluentParallel&version=0.2.0
                    
Install as a Cake Addin
#tool nuget:?package=FluentParallel&version=0.2.0
                    
Install as a Cake Tool

FluentParallel

A fluent, composable API for running parallel and streaming data pipelines in .NET.
It allows you to easily batch, transform, filter, and stream results as they complete using a simple fluent interface.


✨ Features

  • Fluent API to build parallel pipelines step by step.
  • Batching of items into configurable chunk sizes.
  • Synchronous, asynchronous, and streaming transformations.
  • Filtering at any stage in the pipeline.
  • Error handling with OnError.
  • Works with both IEnumerable<T> and IAsyncEnumerable<T> sources.
  • Real-time streaming with ExecuteStreamAsync + StreamAsync.

🚀 Installation

dotnet add package FluentParallel

🚀 Before vs After - See the Difference

❌ "Raw" C# (manual batching + concurrency + channel)

// BEFORE: raw C# with batching, parallelism and channel
async Task RunRawExampleAsync()
{
    var customerIds = Enumerable.Range(1, 1000);
    int batchSize = 10;

    var batches = customerIds
        .Select((id, index) => new { id, index })
        .GroupBy(x => x.index / batchSize)
        .Select(g => g.Select(x => x.id).ToArray())
        .ToList();

    var semaphore = new SemaphoreSlim(4);
    var channel = Channel.CreateUnbounded<string>();
    var cts = new CancellationTokenSource();

    // Producer
    var producer = Task.Run(async () =>
    {
        foreach (var batch in batches)
        {
            await semaphore.WaitAsync(cts.Token);
            _ = Task.Run(async () =>
            {
                try
                {
                    // Simulate async work
                    await Task.Delay(500);
                    var sum = batch.Sum();
                    await channel.Writer.WriteAsync($"Batch sum: {sum}", cts.Token);
                }
                finally
                {
                    semaphore.Release();
                }
            }, cts.Token);
        }

        // Wait for all batches to finish then complete channel
        for (int i = 0; i < 4; i++)
            await semaphore.WaitAsync(cts.Token);

        channel.Writer.Complete();
    }, cts.Token);

    // Consumer
    await foreach (var msg in channel.Reader.ReadAllAsync(cts.Token))
    {
        Console.WriteLine(msg);
    }

    await producer;
}

✅ After with FluentParallel

// AFTER: FluentParallel handles batching, parallelism and streaming for you!
async Task RunFluentParallelExampleAsync()
{
    var customerIds = Enumerable.Range(1, 1000);

    await foreach (var sum in ParallelFlow
        .From(customerIds)
        .Batch(10)                     // auto-batching
        .WithParallel(4)               // 4 batches in parallel
        .ExecuteStreamAsync(async batch =>
        {
            // Simulate async work
            await Task.Delay(500);
            return batch.Sum();
        })
        .StreamAsync())                // stream results as they complete
    {
        Console.WriteLine($"Batch sum: {sum}");
    }
}

✨ The difference

  • Before: ~45 lines of boilerplate with SemaphoreSlim, Channel, manual batching and tasks.
  • After: ~10 lines, all in one fluent pipeline.
  • You get parallelism, batching, streaming and error handling built-in.

🧩 When to Use

Use FluentParallel when you need to:

  • Process large datasets without waiting for all items to complete.
  • Combine async tasks and parallelism seamlessly.
  • Stream intermediate results as they are ready.
  • Handle errors gracefully without stopping the pipeline.

How to use it?

1️⃣ Synchronous Flow

using FluentParallel;

var numbers = Enumerable.Range(1, 30);

var result = await ParallelFlow
    .From(numbers)
    .Batch(5)          // batches of 5
    .WithParallel(4)   // 4 batches processed in parallel
    .Execute(batches =>
    {
        Console.WriteLine(
            $"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", batches)}]");
        return batches;
    })
    .CollectAsync();

Console.WriteLine($"Collected {result.Count} batches");

2️⃣ Asynchronous Flow

var resultAsync = await ParallelFlow
    .From(numbers)
    .Batch(5)
    .WithParallel(4)
    .ExecuteAsync(async batch =>
    {
        // Simulate async work
        await Task.Delay(2000);
        Console.WriteLine(
            $"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", batch)}] | sum: {batch.Sum()}");
        return batch.Sum();
    })
    .Filter(sum => sum < 100) // filter items < 100
    .CollectAsync();

Console.WriteLine($"Collected sums: [{string.Join(",", resultAsync)}]");

3️⃣ Streaming Flow (process results as soon as they are ready!)

await foreach (var result in ParallelFlow
    .From(numbers)
    .Batch(5)
    .WithParallel(4)
    .ExecuteStreamAsync(async batch =>
    {
        await Task.Delay(2000);
        Console.WriteLine(
            $"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", batch)}] | sum: {batch.Sum()}");
        return batch.Sum();
    })
    .Filter(sum => sum < 100)
    .StreamAsync()) // yields as results come in
{
    Console.WriteLine($"Got: {result}");
}

4️⃣ Error Handling Flow

This demonstrates that:

  • Exceptions inside steps are caught by your OnError handler.
  • Processing continues for other items even if one fails.
await foreach (var result in ParallelFlow
    .From(numbers)
    .Batch(5)
    .WithParallel(4)
    .OnError(ex => Console.WriteLine($"Error: {ex.Message}"))
    .ExecuteStreamAsync(async n =>
    {
        await Task.Delay(500);
        Console.WriteLine($"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", n)}]");

        if (n.Contains(5))
            throw new InvalidOperationException($"Boom! Thread {Environment.CurrentManagedThreadId} error: contains 5!");
        return n.Sum();
    })
    .StreamAsync())
{
    Console.WriteLine($"Sum: {result}");
}

📄 License

MIT

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.3 129 2/1/2026
0.2.0 133 10/4/2025
0.1.8 202 9/30/2025
0.1.7 192 9/29/2025
0.1.6 194 9/29/2025
0.1.5 198 9/29/2025