FluentParallel 0.2.0
There is a newer version of this package available.
See the version list below for details.
See the version list below for details.
dotnet add package FluentParallel --version 0.2.0
NuGet\Install-Package FluentParallel -Version 0.2.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="FluentParallel" Version="0.2.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="FluentParallel" Version="0.2.0" />
<PackageReference Include="FluentParallel" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add FluentParallel --version 0.2.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: FluentParallel, 0.2.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package FluentParallel@0.2.0
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=FluentParallel&version=0.2.0
#tool nuget:?package=FluentParallel&version=0.2.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
FluentParallel
A fluent, composable API for running parallel and streaming data pipelines in .NET.
It allows you to easily batch, transform, filter, and stream results as they complete using a simple fluent interface.
✨ Features
- Fluent API to build parallel pipelines step by step.
- Batching of items into configurable chunk sizes.
- Synchronous, asynchronous, and streaming transformations.
- Filtering at any stage in the pipeline.
- Error handling with
OnError. - Works with both
IEnumerable<T>andIAsyncEnumerable<T>sources. - Real-time streaming with
ExecuteStreamAsync+StreamAsync.
🚀 Installation
dotnet add package FluentParallel
🚀 Before vs After - See the Difference
❌ "Raw" C# (manual batching + concurrency + channel)
// BEFORE: raw C# with batching, parallelism and channel
async Task RunRawExampleAsync()
{
var customerIds = Enumerable.Range(1, 1000);
int batchSize = 10;
var batches = customerIds
.Select((id, index) => new { id, index })
.GroupBy(x => x.index / batchSize)
.Select(g => g.Select(x => x.id).ToArray())
.ToList();
var semaphore = new SemaphoreSlim(4);
var channel = Channel.CreateUnbounded<string>();
var cts = new CancellationTokenSource();
// Producer
var producer = Task.Run(async () =>
{
foreach (var batch in batches)
{
await semaphore.WaitAsync(cts.Token);
_ = Task.Run(async () =>
{
try
{
// Simulate async work
await Task.Delay(500);
var sum = batch.Sum();
await channel.Writer.WriteAsync($"Batch sum: {sum}", cts.Token);
}
finally
{
semaphore.Release();
}
}, cts.Token);
}
// Wait for all batches to finish then complete channel
for (int i = 0; i < 4; i++)
await semaphore.WaitAsync(cts.Token);
channel.Writer.Complete();
}, cts.Token);
// Consumer
await foreach (var msg in channel.Reader.ReadAllAsync(cts.Token))
{
Console.WriteLine(msg);
}
await producer;
}
✅ After with FluentParallel
// AFTER: FluentParallel handles batching, parallelism and streaming for you!
async Task RunFluentParallelExampleAsync()
{
var customerIds = Enumerable.Range(1, 1000);
await foreach (var sum in ParallelFlow
.From(customerIds)
.Batch(10) // auto-batching
.WithParallel(4) // 4 batches in parallel
.ExecuteStreamAsync(async batch =>
{
// Simulate async work
await Task.Delay(500);
return batch.Sum();
})
.StreamAsync()) // stream results as they complete
{
Console.WriteLine($"Batch sum: {sum}");
}
}
✨ The difference
- Before: ~45 lines of boilerplate with SemaphoreSlim, Channel, manual batching and tasks.
- After: ~10 lines, all in one fluent pipeline.
- You get parallelism, batching, streaming and error handling built-in.
🧩 When to Use
Use FluentParallel when you need to:
- Process large datasets without waiting for all items to complete.
- Combine async tasks and parallelism seamlessly.
- Stream intermediate results as they are ready.
- Handle errors gracefully without stopping the pipeline.
How to use it?
1️⃣ Synchronous Flow
using FluentParallel;
var numbers = Enumerable.Range(1, 30);
var result = await ParallelFlow
.From(numbers)
.Batch(5) // batches of 5
.WithParallel(4) // 4 batches processed in parallel
.Execute(batches =>
{
Console.WriteLine(
$"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", batches)}]");
return batches;
})
.CollectAsync();
Console.WriteLine($"Collected {result.Count} batches");
2️⃣ Asynchronous Flow
var resultAsync = await ParallelFlow
.From(numbers)
.Batch(5)
.WithParallel(4)
.ExecuteAsync(async batch =>
{
// Simulate async work
await Task.Delay(2000);
Console.WriteLine(
$"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", batch)}] | sum: {batch.Sum()}");
return batch.Sum();
})
.Filter(sum => sum < 100) // filter items < 100
.CollectAsync();
Console.WriteLine($"Collected sums: [{string.Join(",", resultAsync)}]");
3️⃣ Streaming Flow (process results as soon as they are ready!)
await foreach (var result in ParallelFlow
.From(numbers)
.Batch(5)
.WithParallel(4)
.ExecuteStreamAsync(async batch =>
{
await Task.Delay(2000);
Console.WriteLine(
$"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", batch)}] | sum: {batch.Sum()}");
return batch.Sum();
})
.Filter(sum => sum < 100)
.StreamAsync()) // yields as results come in
{
Console.WriteLine($"Got: {result}");
}
4️⃣ Error Handling Flow
This demonstrates that:
- Exceptions inside steps are caught by your
OnErrorhandler. - Processing continues for other items even if one fails.
await foreach (var result in ParallelFlow
.From(numbers)
.Batch(5)
.WithParallel(4)
.OnError(ex => Console.WriteLine($"Error: {ex.Message}"))
.ExecuteStreamAsync(async n =>
{
await Task.Delay(500);
Console.WriteLine($"Thread {Environment.CurrentManagedThreadId}: [{string.Join(",", n)}]");
if (n.Contains(5))
throw new InvalidOperationException($"Boom! Thread {Environment.CurrentManagedThreadId} error: contains 5!");
return n.Sum();
})
.StreamAsync())
{
Console.WriteLine($"Sum: {result}");
}
📄 License
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net8.0
- System.Linq.Async (>= 6.0.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.