TaskProcessor 2.0.0
dotnet add package TaskProcessor --version 2.0.0
NuGet\Install-Package TaskProcessor -Version 2.0.0
<PackageReference Include="TaskProcessor" Version="2.0.0" />
<PackageVersion Include="TaskProcessor" Version="2.0.0" />
<PackageReference Include="TaskProcessor" />
paket add TaskProcessor --version 2.0.0
#r "nuget: TaskProcessor, 2.0.0"
#:package TaskProcessor@2.0.0
#addin nuget:?package=TaskProcessor&version=2.0.0
#tool nuget:?package=TaskProcessor&version=2.0.0
TaskProcessor
A lightweight .NET 8 library for controlled concurrent task processing with built-in exponential backoff retry, fail-fast error handling, ordered results, progress reporting, and full ASP.NET Core DI support.
🚀 Overview
TaskProcessor helps you process large collections of tasks efficiently while preventing system overload.
It provides:
- Bounded parallel execution via a configurable semaphore
- Exponential backoff retry with full jitter
- Per-task timeout enforcement
- Fail-fast or collect-all-errors modes
- Ordered results guaranteed regardless of completion order
- Progress reporting via
IProgress<int> - Structured logging via
ILogger - ASP.NET Core DI integration via
AddTaskProcessor
🎯 Problem It Solves
In real-world backend systems, processing large datasets (thousands of records, API calls, or jobs) can lead to:
- CPU spikes
- Database connection exhaustion
- API throttling
- Unstable performance
Naively running all tasks in parallel (Task.WhenAll) can overwhelm the system.
TaskProcessor solves this by limiting concurrency and adding production-grade reliability controls.
⚙️ Features
| Feature | Detail |
|---|---|
| ✅ Configurable concurrency | MaxDegreeOfParallelism with validation (≥ 1) |
| ✅ Exponential backoff retry | Base delay doubles each attempt, capped at MaxRetryDelay |
| ✅ Full jitter | Random spread prevents thundering-herd on retries |
| ✅ Exception filtering | ShouldRetry predicate — retry only the exceptions you choose |
| ✅ Per-task timeout | TaskTimeout cancels any single slow task |
| ✅ Fail-fast mode | ContinueOnError = false cancels remaining tasks immediately |
| ✅ Ordered results | Transform overload returns results in the same order as input |
| ✅ Progress reporting | IProgress<int> receives a running count of completed items |
| ✅ Observability callbacks | OnError and OnSuccess hooks per item |
| ✅ Structured logging | Accepts ILogger<TaskProcessorService> |
| ✅ DI-friendly | ITaskProcessorService interface + AddTaskProcessor extension |
📦 Installation
dotnet add package TaskProcessor
🧠 Usage
1. ASP.NET Core — Dependency Injection
// Program.cs
builder.Services.AddTaskProcessor(options =>
{
options.MaxDegreeOfParallelism = 5;
options.RetryCount = 3;
options.RetryDelay = TimeSpan.FromSeconds(1);
options.MaxRetryDelay = TimeSpan.FromSeconds(30);
options.UseJitter = true;
options.TaskTimeout = TimeSpan.FromSeconds(10);
options.ContinueOnError = true;
options.ShouldRetry = ex => ex is HttpRequestException;
options.OnError = (item, ex) => Console.WriteLine($"Failed: {ex.Message}");
options.OnSuccess = item => Console.WriteLine("Item succeeded");
});
Inject ITaskProcessorService wherever you need it:
public class MyService(ITaskProcessorService processor)
{
public async Task RunAsync(IEnumerable<Order> orders)
{
await processor.ProcessAsync(orders, async order =>
{
await SendOrderAsync(order);
});
}
}
2. Manual instantiation — Process tasks (no return value)
var processor = new TaskProcessorService(new TaskProcessorOptions
{
MaxDegreeOfParallelism = 3,
RetryCount = 2,
RetryDelay = TimeSpan.FromMilliseconds(500)
});
await processor.ProcessAsync(data, async item =>
{
await ProcessItemAsync(item);
});
3. Transform data (with ordered return values)
Results are returned in the same order as the input collection, regardless of completion order.
var results = await processor.ProcessAsync<Source, Destination>(
sourceData,
async item =>
{
return await mapper.MapAsync(item);
});
4. Progress reporting
var progress = new Progress<int>(count =>
Console.WriteLine($"Completed {count} of {total}"));
await processor.ProcessAsync(items, ProcessItemAsync, progress);
5. Exception filtering — only retry transient errors
var options = new TaskProcessorOptions
{
RetryCount = 3,
ShouldRetry = ex => ex is HttpRequestException or TimeoutException
};
🔁 How It Works
items ──► [SemaphoreSlim] ──► [RetryPolicy] ──► action(item)
│ │ │
max N tasks exponential OnSuccess
at a time backoff + callback
jitter
│
OnError +
ILogger
- A
SemaphoreSlimlimits concurrency toMaxDegreeOfParallelism - Each item runs inside
RetryPolicy.ExecuteAsyncwith exponential backoff and jitter - A linked
CancellationTokenSourceprovides fail-fast cancellation whenContinueOnError = false - Results are written into an index-aligned array to preserve input order
- All exceptions are collected and rethrown as
AggregateException
📊 Use Cases
- Processing large datasets (data migration, ETL)
- Calling external APIs with rate limits or throttling
- Background job processing
- Bulk data transformations
- File or message queue processing
🏗️ Architecture
TaskProcessor/
├── ITaskProcessorService.cs # Public interface (mockable, DI-friendly)
├── TaskProcessorService.cs # Core implementation
├── TaskProcessorOptions.cs # Configuration with validation
├── RetryPolicy.cs # Unified retry: backoff, jitter, filtering
└── ServiceCollectionExtensions.cs # AddTaskProcessor DI extension
TaskProcessor.Tests/
├── RetryPolicyTests.cs # 7 unit tests for retry logic
└── TaskProcessorServiceTests.cs # 11 unit tests for service behaviour
⚠️ Notes
- Results from
ProcessAsync<T, TResult>are always returned in input order - Best suited for I/O-bound, independent tasks
- For CPU-bound workloads, keep
MaxDegreeOfParallelismat or belowEnvironment.ProcessorCount TaskTimeoutapplies per individual task, not to the entire batch
🧩 Design Philosophy
You define the work — TaskProcessor manages how it runs.
Business logic stays in your lambdas. Concurrency, retry, cancellation, and observability are handled transparently by the library, making it reusable across different domains without coupling.
📄 License
MIT License
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.