FluentParallel 1.0.3
dotnet add package FluentParallel --version 1.0.3
NuGet\Install-Package FluentParallel -Version 1.0.3
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="FluentParallel" Version="1.0.3" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="FluentParallel" Version="1.0.3" />
<PackageReference Include="FluentParallel" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add FluentParallel --version 1.0.3
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: FluentParallel, 1.0.3"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package FluentParallel@1.0.3
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=FluentParallel&version=1.0.3
#tool nuget:?package=FluentParallel&version=1.0.3
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
FluentParallel
A fluent, composable API for running adaptive parallel and streaming data pipelines in .NET. FluentParallel lets you batch, transform, filter, and stream results as they complete, automatically scaling concurrency up or down based on CPU usage.
✨ Features
- Fluent API to build parallel pipelines step by step.
- Adaptive Concurrency based on CPU usage.
- Batching of items into configurable chunk sizes.
- Synchronous, asynchronous, and streaming transformations.
- Filtering at any stage in the pipeline.
- Error handling with
OnError. - Works with both
IEnumerable<T>andIAsyncEnumerable<T>sources. - Real-time streaming with
ExecuteStreamAsync+StreamAsync.
🚀 Installation
dotnet add package FluentParallel
🚀 Before vs After - See the Difference
❌ "Raw" C# (manual batching + concurrency + channel)
// BEFORE: raw C# with batching, parallelism and channel
async Task RunRawExampleAsync()
{
var customerIds = Enumerable.Range(1, 1000);
int batchSize = 10;
var batches = customerIds
.Select((id, index) => new { id, index })
.GroupBy(x => x.index / batchSize)
.Select(g => g.Select(x => x.id).ToArray())
.ToList();
var semaphore = new SemaphoreSlim(4);
var channel = Channel.CreateUnbounded<string>();
var cts = new CancellationTokenSource();
// Producer
var producer = Task.Run(async () =>
{
foreach (var batch in batches)
{
await semaphore.WaitAsync(cts.Token);
_ = Task.Run(async () =>
{
try
{
// Simulate async work
await Task.Delay(500);
var sum = batch.Sum();
await channel.Writer.WriteAsync($"Batch sum: {sum}", cts.Token);
}
finally
{
semaphore.Release();
}
}, cts.Token);
}
// Wait for all batches to finish then complete channel
for (int i = 0; i < 4; i++)
await semaphore.WaitAsync(cts.Token);
channel.Writer.Complete();
}, cts.Token);
// Consumer
await foreach (var msg in channel.Reader.ReadAllAsync(cts.Token))
{
Console.WriteLine(msg);
}
await producer;
}
✅ After with FluentParallel
// AFTER: FluentParallel handles batching, parallelism and streaming for you!
async Task RunFluentParallelExampleAsync()
{
var customerIds = Enumerable.Range(1, 1000);
await foreach (var sum in ParallelFlow
.From(customerIds)
.Batch(10) // auto-batching
.WithParallel(4) // 4 batches in parallel
.ExecuteStreamAsync(async batch =>
{
// Simulate async work
await Task.Delay(500);
return batch.Sum();
})
.StreamAsync()) // stream results as they complete
{
Console.WriteLine($"Batch sum: {sum}");
}
}
✨ The difference
- Before: ~45 lines of boilerplate with SemaphoreSlim, Channel, manual batching and tasks.
- After: ~10 lines, all in one fluent pipeline.
- You get parallelism, batching, streaming and error handling built-in.
🧩 When to Use
Use FluentParallel when you need to:
- Process large datasets without waiting for all items to complete.
- Combine async tasks and parallelism seamlessly.
- Stream intermediate results as they are ready.
- Handle errors gracefully without stopping the pipeline.
How to use it?
➡️ Synchronous Flow
using FluentParallel;
var numbers = Enumerable.Range(1, 30);
var result = await ParallelFlow
.From(numbers)
.Batch(5) // batches of 5
.WithParallel(4) // 4 batches processed in parallel
.Execute(batches =>
{
Console.WriteLine(
$"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", batches)}]");
return batches;
})
.CollectAsync();
Console.WriteLine($"Collected {result.Count} batches");
➡️ Asynchronous Flow
var resultAsync = await ParallelFlow
.From(numbers)
.Batch(5)
.WithParallel(4)
.ExecuteAsync(async batch =>
{
// Simulate async work
await Task.Delay(2000);
Console.WriteLine(
$"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", batch)}] | sum: {batch.Sum()}");
return batch.Sum();
})
.Filter(sum => sum < 100) // filter items < 100
.CollectAsync();
Console.WriteLine($"Collected sums: [{string.Join(",", resultAsync)}]");
➡️ Streaming Flow (process results as soon as they are ready!)
await foreach (var result in ParallelFlow
.From(numbers)
.Batch(5)
.WithParallel(4)
.ExecuteStreamAsync(async batch =>
{
await Task.Delay(2000);
Console.WriteLine(
$"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", batch)}] | sum: {batch.Sum()}");
return batch.Sum();
})
.Filter(sum => sum < 100)
.StreamAsync()) // yields as results come in
{
Console.WriteLine($"Got: {result}");
}
➡️ Fixed VS Adaptive Parallelism
Fixed Parallelism
.WithParallel(4)
- Always 4 concurrent tasks
Adaptive Parallelism
.WithAdaptiveParallelism(
minDegreeOfParallelism: 2,
maxDegreeOfParallelism: 8,
cpuThreshold: 60,
checkInterval: TimeSpan.FromSeconds(1))
- Scales up when CPU is below 60%
- Scales down when CPU is above 60%
- Never goes below 2 or above 8 concurrent tasks
- Check the CPU usage every second
➡️ Error Handling Flow
This demonstrates that:
- Exceptions inside steps are caught by your
OnErrorhandler. - Processing continues for other items even if one fails.
await foreach (var result in ParallelFlow
.From(numbers)
.Batch(5)
.WithParallel(4)
.OnError(ex => Console.WriteLine($"Error: {ex.Message}"))
.ExecuteStreamAsync(async n =>
{
await Task.Delay(500);
Console.WriteLine($"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", n)}]");
if (n.Contains(5))
throw new InvalidOperationException($"Boom! Thread {Environment.CurrentManagedThreadId} error: contains 5!");
return n.Sum();
})
.StreamAsync())
{
Console.WriteLine($"Sum: {result}");
}
License
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net10.0
- System.Linq.Async (>= 7.0.0)
-
net8.0
- System.Linq.Async (>= 7.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.