Philiprehberger.BatchProcessor 0.4.0

dotnet add package Philiprehberger.BatchProcessor --version 0.4.0
                    
NuGet\Install-Package Philiprehberger.BatchProcessor -Version 0.4.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Philiprehberger.BatchProcessor" Version="0.4.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Philiprehberger.BatchProcessor" Version="0.4.0" />
                    
Directory.Packages.props
<PackageReference Include="Philiprehberger.BatchProcessor" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Philiprehberger.BatchProcessor --version 0.4.0
                    
#r "nuget: Philiprehberger.BatchProcessor, 0.4.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Philiprehberger.BatchProcessor@0.4.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Philiprehberger.BatchProcessor&version=0.4.0
                    
Install as a Cake Addin
#tool nuget:?package=Philiprehberger.BatchProcessor&version=0.4.0
                    
Install as a Cake Tool

Philiprehberger.BatchProcessor

CI NuGet Last updated

Process large collections in configurable batches with progress reporting, error handling, async execution, streaming IAsyncEnumerable support, checkpoint/resume, and adaptive batch sizing.

Installation

dotnet add package Philiprehberger.BatchProcessor

Usage

using Philiprehberger.BatchProcessor;

var items = Enumerable.Range(1, 1000).ToList();

var result = await BatchProcessor.Process(items, batchSize: 100, async batch =>
{
    await ProcessItemsAsync(batch);
});

Console.WriteLine($"Processed {result.SuccessCount} items in {result.TotalDuration.TotalSeconds}s");

Progress Reporting

using Philiprehberger.BatchProcessor;

var items = Enumerable.Range(1, 500).ToList();

var result = await BatchProcessor.Process(items, batchSize: 50, async batch =>
{
    await SendToApiAsync(batch);
}, new BatchOptions
{
    OnProgress = progress =>
    {
        Console.WriteLine($"Progress: {progress.Percent:F1}% ({progress.ProcessedCount}/{progress.TotalCount})");
    }
});

Parallel Execution with Error Handling

using Philiprehberger.BatchProcessor;

var items = Enumerable.Range(1, 1000).ToList();

var result = await BatchProcessor.Process(items, batchSize: 50, async batch =>
{
    await SendToApiAsync(batch);
}, new BatchOptions
{
    MaxDegreeOfParallelism = 4,
    OnBatchError = BatchErrorHandling.Skip,
    RetryCount = 2
});

Console.WriteLine($"Success: {result.SuccessCount}, Failures: {result.FailureCount}");
Console.WriteLine($"Errors: {result.Errors.Count}");

Per-Batch Timeout

using Philiprehberger.BatchProcessor;

var items = Enumerable.Range(1, 1000).ToList();

var result = await BatchProcessor.Process(items, batchSize: 100, async batch =>
{
    await SendToSlowServiceAsync(batch);
}, new BatchOptions
{
    BatchTimeout = TimeSpan.FromSeconds(30),
    OnBatchError = BatchErrorHandling.Skip
});

Per-Item Error Tracking

using Philiprehberger.BatchProcessor;

var items = Enumerable.Range(1, 100).ToList();

var result = await BatchProcessor.ProcessAsync(items, batchSize: 10, async batch =>
{
    await ProcessBatchAsync(batch);
});

Console.WriteLine($"Succeeded: {result.SucceededCount}, Failed: {result.FailedCount}");

foreach (var failure in result.Failures)
{
    Console.WriteLine($"Item {failure.Item} failed: {failure.Exception?.Message}");
}

Batch Completed Callback

using Philiprehberger.BatchProcessor;

var items = Enumerable.Range(1, 500).ToList();

var result = await BatchProcessor.Process(items, batchSize: 50, async batch =>
{
    await ProcessBatchAsync(batch);
}, new BatchOptions
{
    OnBatchCompleted = e =>
    {
        Console.WriteLine($"Batch {e.BatchIndex}: {e.SuccessCount} ok, {e.FailureCount} failed in {e.Elapsed.TotalMilliseconds}ms");
    }
});

Cancellation Token

using Philiprehberger.BatchProcessor;

using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
var items = Enumerable.Range(1, 10000).ToList();

var result = await BatchProcessor.Process(items, batchSize: 100, async batch =>
{
    await ProcessBatchAsync(batch);
}, cancellationToken: cts.Token);

Streaming with IAsyncEnumerable

Process items from an async stream without materializing the full collection:

using Philiprehberger.BatchProcessor;

async IAsyncEnumerable<Order> GetOrdersAsync()
{
    await foreach (var order in database.StreamOrdersAsync())
    {
        yield return order;
    }
}

var result = await BatchProcessor.ProcessStreamAsync(GetOrdersAsync(), batchSize: 50, async batch =>
{
    await SendToWarehouseAsync(batch);
}, new BatchOptions
{
    MaxDegreeOfParallelism = 3,
    OnBatchError = BatchErrorHandling.Skip
});

Console.WriteLine($"Streamed {result.SuccessCount} orders, {result.FailureCount} failed");

Checkpoint and Resume

Persist progress after each batch and resume from where processing left off:

using Philiprehberger.BatchProcessor;

var lastCheckpoint = LoadCheckpointFromDisk(); // e.g., returns 5

var result = await BatchProcessor.Process(items, batchSize: 100, async batch =>
{
    await ProcessBatchAsync(batch);
}, new BatchOptions
{
    ResumeFromBatch = lastCheckpoint + 1,
    CheckpointCallback = batchIndex =>
    {
        SaveCheckpointToDisk(batchIndex);
    }
});

Streaming with Per-Item Results

When you need per-item failure tracking on a streamed source, use ProcessStreamWithItemsAsync:

using Philiprehberger.BatchProcessor;

async IAsyncEnumerable<Order> GetOrdersAsync()
{
    await foreach (var order in database.StreamOrdersAsync())
    {
        yield return order;
    }
}

var result = await BatchProcessor.ProcessStreamWithItemsAsync(GetOrdersAsync(), batchSize: 50, async batch =>
{
    await SendToWarehouseAsync(batch);
}, new BatchOptions
{
    OnBatchError = BatchErrorHandling.Skip
});

foreach (var failure in result.Failures)
{
    Console.WriteLine($"Order {failure.Item} failed: {failure.Exception?.Message}");
}

Adaptive Batch Sizing

Automatically adjust batch sizes based on measured throughput:

using Philiprehberger.BatchProcessor;

var items = Enumerable.Range(1, 10000).ToList();

var result = await BatchProcessor.Process(items, batchSize: 50, async batch =>
{
    await SendToApiAsync(batch);
}, new BatchOptions
{
    AdaptiveBatching = new AdaptiveBatchOptions
    {
        MinBatchSize = 10,
        MaxBatchSize = 500,
        TargetThroughput = 200 // items per second
    },
    OnBatchCompleted = e =>
    {
        Console.WriteLine($"Batch {e.BatchIndex}: {e.ItemCount} items in {e.Elapsed.TotalMilliseconds}ms");
    }
});

API

BatchProcessor

Method Description
Process<T>(items, batchSize, processor, options?, cancellationToken?) Process items in batches asynchronously. Returns a BatchResult.
ProcessAsync<T>(items, batchSize, processor, options?, cancellationToken?) Process items in batches with per-item error tracking. Returns a BatchResult<T>.
ProcessStreamAsync<T>(source, batchSize, processor, options?, cancellationToken?) Process items from an IAsyncEnumerable<T> source in batches. Returns a BatchResult.
ProcessStreamWithItemsAsync<T>(source, batchSize, processor, options?, cancellationToken?) Process items from an IAsyncEnumerable<T> source with per-item error tracking. Returns a BatchResult<T>.

BatchOptions

Property Type Default Description
MaxDegreeOfParallelism int 1 Maximum number of batches to process concurrently.
OnProgress Action<BatchProgress>? null Callback invoked after each batch completes.
OnBatchError BatchErrorHandling Abort Error handling strategy: Abort or Skip.
RetryCount int 0 Number of times to retry a failed batch.
BatchTimeout TimeSpan? null Timeout per batch. Throws TimeoutException if exceeded.
OnBatchCompleted Action<BatchCompletedEventArgs>? null Callback with batch index, item count, elapsed time, and success/failure counts.
CheckpointCallback Action<int>? null Callback invoked after each batch with the zero-based batch index for checkpoint/resume.
ResumeFromBatch int 0 Zero-based batch index to resume from. Earlier batches are skipped.
AdaptiveBatching AdaptiveBatchOptions? null Adaptive batch sizing configuration.

AdaptiveBatchOptions

Property Type Default Description
MinBatchSize int 1 Minimum batch size for adaptive sizing.
MaxBatchSize int 1000 Maximum batch size for adaptive sizing.
TargetThroughput double 100.0 Target throughput in items per second.

BatchProgress

Property Type Description
ProcessedCount int Number of items processed so far.
TotalCount int Total number of items.
CurrentBatch int Current batch number (1-based).
TotalBatches int Total number of batches.
Percent double Completion percentage (0-100).

BatchResult

Property Type Description
SuccessCount int Number of successfully processed items.
FailureCount int Number of items in failed batches.
TotalDuration TimeSpan Total processing duration.
Errors IReadOnlyList<BatchError> List of batch errors.

BatchResult<T>

Property Type Description
Items IReadOnlyList<BatchItemResult<T>> Per-item results for every processed item.
SucceededCount int Number of items that succeeded.
FailedCount int Number of items that failed.
Failures IReadOnlyList<BatchItemResult<T>> Per-item results for failed items only.
TotalDuration TimeSpan Total processing duration.
Errors IReadOnlyList<BatchError> List of batch-level errors.

BatchItemResult<T>

Property Type Description
Item T The item that was processed.
Success bool Whether the item was processed successfully.
Exception Exception? The exception that occurred, or null if successful.

BatchCompletedEventArgs

Property Type Description
BatchIndex int Zero-based index of the completed batch.
ItemCount int Number of items in the batch.
Elapsed TimeSpan Time spent processing the batch.
SuccessCount int Number of items that succeeded in this batch.
FailureCount int Number of items that failed in this batch.

BatchError

Property Type Description
BatchIndex int Zero-based index of the failed batch.
Exception Exception The exception that occurred.

Development

dotnet build src/Philiprehberger.BatchProcessor.csproj --configuration Release

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
  • net8.0

    • No dependencies.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.4.0 103 4/27/2026
0.3.0 109 4/1/2026
0.2.0 161 3/28/2026
0.1.3 102 3/27/2026
0.1.2 105 3/25/2026
0.1.1 102 3/25/2026
0.1.0 106 3/23/2026