Rivulet.Core
1.3.0
dotnet add package Rivulet.Core --version 1.3.0
NuGet\Install-Package Rivulet.Core -Version 1.3.0
<PackageReference Include="Rivulet.Core" Version="1.3.0" />
<PackageVersion Include="Rivulet.Core" Version="1.3.0" />
<PackageReference Include="Rivulet.Core" />
paket add Rivulet.Core --version 1.3.0
#r "nuget: Rivulet.Core, 1.3.0"
#:package Rivulet.Core@1.3.0
#addin nuget:?package=Rivulet.Core&version=1.3.0
#tool nuget:?package=Rivulet.Core&version=1.3.0
Rivulet.Core
Safe, async-first parallel operators with bounded concurrency, retries, and backpressure for I/O-heavy workloads.
Transform collections in parallel while maintaining control over concurrency, errors, and resource usage.
Installation
dotnet add package Rivulet.Core
Quick Start
Parallel Transformation
using Rivulet.Core;
var urls = new[] { "https://api.example.com/1", "https://api.example.com/2", /* ... */ };
var results = await urls.SelectParallelAsync(
async (url, ct) =>
{
using var response = await httpClient.GetAsync(url, ct);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync(ct);
},
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
MaxRetries = 3,
IsTransient = ex => ex is HttpRequestException or TaskCanceledException,
ErrorMode = ErrorMode.CollectAndContinue
});
Streaming Results
Process results as they complete instead of waiting for all:
await foreach (var result in source.SelectParallelStreamAsync(
async (item, ct) => await ProcessAsync(item, ct),
new ParallelOptionsRivulet { MaxDegreeOfParallelism = 16 }))
{
// Handle result immediately as it completes
Console.WriteLine(result);
}
Parallel Side Effects
Execute actions in parallel without collecting results:
await items.ForEachParallelAsync(
async (item, ct) => await SaveToDbAsync(item, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10,
ErrorMode = ErrorMode.FailFast
});
Batch Processing
Process items in batches for efficient bulk operations:
// Materialize results - process 100 items at a time
var results = await items.BatchParallelAsync(
batchSize: 100,
async (batch, ct) =>
{
// Process entire batch together (e.g., bulk database insert)
await BulkInsertAsync(batch, ct);
return batch.Count();
},
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 5
});
// Streaming results - process batches as they're ready
await foreach (var batchResult in items.BatchParallelStreamAsync(
batchSize: 50,
async (batch, ct) =>
{
await ProcessBatchAsync(batch, ct);
return batch.Count();
},
new ParallelOptionsRivulet { MaxDegreeOfParallelism = 10 },
batchTimeout: TimeSpan.FromSeconds(5))) // Flush partial batch after timeout
{
Console.WriteLine($"Processed batch: {batchResult} items");
}
Ordered Output
Maintain input order for sequence-sensitive operations:
// ETL pipeline where order matters for downstream processing
var orderedResults = await records.SelectParallelAsync(
async (record, ct) => await TransformAsync(record, ct),
new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
OrderedOutput = true // Results match input order despite parallel processing
});
// Streaming with ordered output
await foreach (var result in source.SelectParallelStreamAsync(
async (x, ct) => await ProcessAsync(x, ct),
new ParallelOptionsRivulet { OrderedOutput = true }))
{
// Results arrive in input sequence order
}
Key Features
- ✅ Bounded Concurrency - Control max parallel operations with backpressure
- ✅ Adaptive Concurrency - Auto-scale workers based on latency and success rate (AIMD algorithm)
- ✅ Retry Policies - Automatic retries with exponential backoff for transient errors
- ✅ Circuit Breaker - Prevent cascading failures with automatic service protection
- ✅ Rate Limiting - Token bucket algorithm for controlling operation rates
- ✅ Error Handling Modes - FailFast, CollectAndContinue, or BestEffort
- ✅ Streaming Support - Process results incrementally via
IAsyncEnumerable<T> - ✅ Ordered Output - Maintain input sequence order when needed
- ✅ Runtime Metrics - Built-in monitoring via EventCounters and custom callbacks
- ✅ Cancellation - Full
CancellationTokensupport throughout - ✅ Lifecycle Hooks - OnStart, OnComplete, OnError, OnThrottle callbacks
- ✅ Per-Item Timeouts - Enforce timeouts for individual operations
- ✅ Works with both
IEnumerable<T>andIAsyncEnumerable<T>
Configuration Options
new ParallelOptionsRivulet
{
// Concurrency control
MaxDegreeOfParallelism = 32, // Max concurrent operations (default: CPU cores)
ChannelCapacity = 1024, // Backpressure buffer size (streaming only)
OrderedOutput = false, // Return results in input order (default: false)
// Adaptive concurrency (auto-scale workers based on performance)
AdaptiveConcurrency = new AdaptiveConcurrencyOptions
{
MinConcurrency = 1,
MaxConcurrency = 32,
TargetLatency = TimeSpan.FromMilliseconds(100),
MinSuccessRate = 0.95
},
// Error handling
ErrorMode = ErrorMode.CollectAndContinue, // How to handle failures
OnErrorAsync = async (index, ex) => { /* ... */ return true; },
// Retry policy
MaxRetries = 3, // Number of retry attempts
IsTransient = ex => ex is HttpRequestException, // Which errors to retry
BaseDelay = TimeSpan.FromMilliseconds(100), // Exponential backoff base
BackoffStrategy = BackoffStrategy.ExponentialJitter,
// Circuit breaker (fail-fast when service is unhealthy)
CircuitBreaker = new CircuitBreakerOptions
{
FailureThreshold = 5,
SuccessThreshold = 2,
OpenTimeout = TimeSpan.FromSeconds(30)
},
// Rate limiting (token bucket algorithm)
RateLimit = new RateLimitOptions
{
TokensPerSecond = 100,
BurstCapacity = 200
},
// Timeouts
PerItemTimeout = TimeSpan.FromSeconds(30), // Timeout per item
// Lifecycle hooks
OnStartItemAsync = async (index) => { /* ... */ },
OnCompleteItemAsync = async (index) => { /* ... */ },
OnThrottleAsync = async (count) => { /* ... */ },
OnDrainAsync = async (count) => { /* ... */ }
}
Error Modes
- FailFast - Stop immediately on first error and throw
- CollectAndContinue - Continue processing, collect all errors, throw
AggregateExceptionat end - BestEffort - Continue processing, return successful results only, suppress errors
Framework Support
- .NET 8.0
- .NET 9.0
Documentation & Source
- GitHub Repository: https://github.com/Jeffeek/Rivulet
- Report Issues: https://github.com/Jeffeek/Rivulet/issues
- License: MIT
Performance Tips
- Choose appropriate parallelism - Start with
MaxDegreeOfParallelism = 32for I/O-bound work - Use streaming for large datasets -
SelectParallelStreamAsyncreduces memory usage - Set per-item timeouts - Prevent hung operations from blocking the pipeline
- Configure backpressure - Adjust
ChannelCapacitybased on memory constraints - Handle transient errors - Use
IsTransientandMaxRetriesfor flaky APIs
License
MIT License - see LICENSE file for details
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- System.Linq.Async (>= 7.0.0)
- System.Threading.Channels (>= 10.0.0)
-
net9.0
- System.Linq.Async (>= 7.0.0)
- System.Threading.Channels (>= 10.0.0)
NuGet packages (8)
Showing the top 5 NuGet packages that depend on Rivulet.Core:
| Package | Downloads |
|---|---|
|
Rivulet.Diagnostics
Enterprise observability for Rivulet.Core with EventListener wrappers, metric aggregators, and health check integration. |
|
|
Rivulet.Testing
Testing utilities for Rivulet parallel operations including deterministic schedulers, virtual time, fake channels, and chaos injection. |
|
|
Rivulet.Diagnostics.OpenTelemetry
OpenTelemetry integration for Rivulet.Core with distributed tracing, metrics, and logging support. |
|
|
Rivulet.Hosting
Integration with Microsoft.Extensions.Hosting for dependency injection, configuration, and hosted services using Rivulet parallel operations. |
|
|
Rivulet.Sql
Safe parallel SQL operations with connection pooling awareness and bulk operations for Rivulet.Core |
GitHub repositories
This package is not used by any popular GitHub repositories.