Rivulet.Http
1.3.0
dotnet add package Rivulet.Http --version 1.3.0
NuGet\Install-Package Rivulet.Http -Version 1.3.0
<PackageReference Include="Rivulet.Http" Version="1.3.0" />
<PackageVersion Include="Rivulet.Http" Version="1.3.0" />
<PackageReference Include="Rivulet.Http" />
paket add Rivulet.Http --version 1.3.0
#r "nuget: Rivulet.Http, 1.3.0"
#:package Rivulet.Http@1.3.0
#addin nuget:?package=Rivulet.Http&version=1.3.0
#tool nuget:?package=Rivulet.Http&version=1.3.0
Rivulet.Http
Parallel HTTP operations with automatic retries, resilient downloads, and HttpClientFactory integration.
Built on top of Rivulet.Core, this package provides HTTP-aware parallel operators that automatically handle transient failures, respect rate limits, and support resumable downloads.
Installation
dotnet add package Rivulet.Http
Requires Rivulet.Core (automatically included).
Quick Start
Parallel HTTP GET Requests
Fetch multiple URLs in parallel with automatic retry for transient HTTP errors:
using Rivulet.Http;
var urls = new[]
{
new Uri("https://api.example.com/users/1"),
new Uri("https://api.example.com/users/2"),
new Uri("https://api.example.com/users/3")
};
var responses = await urls.GetParallelAsync(
httpClient,
new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10,
MaxRetries = 3
}
});
foreach (var response in responses)
{
var content = await response.Content.ReadAsStringAsync();
Console.WriteLine(content);
response.Dispose();
}
Get String Content Directly
Automatically read response content as strings:
var urls = new[]
{
new Uri("https://api.example.com/data/1"),
new Uri("https://api.example.com/data/2")
};
var contents = await urls.GetStringParallelAsync(httpClient);
foreach (var content in contents)
{
Console.WriteLine(content);
}
Parallel POST Requests
Send multiple POST requests in parallel:
var requests = users.Select(user => (
uri: new Uri("https://api.example.com/users"),
content: new StringContent(
JsonSerializer.Serialize(user),
Encoding.UTF8,
"application/json")
));
var responses = await requests.PostParallelAsync(
httpClient,
new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 5,
ErrorMode = ErrorMode.CollectAndContinue
}
});
HttpClientFactory Integration
Use named or typed HttpClient instances with IHttpClientFactory:
using Microsoft.Extensions.DependencyInjection;
var services = new ServiceCollection();
services.AddHttpClient("api", client =>
{
client.BaseAddress = new Uri("https://api.example.com");
client.DefaultRequestHeaders.Add("Authorization", "Bearer token");
});
var provider = services.BuildServiceProvider();
var factory = provider.GetRequiredService<IHttpClientFactory>();
var urls = new[]
{
new Uri("/endpoint1", UriKind.Relative),
new Uri("/endpoint2", UriKind.Relative)
};
var results = await urls.GetStringParallelAsync(
factory,
clientName: "api",
new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10
}
});
Resilient Downloads with Resume Support
Download files in parallel with automatic resume on failure:
var downloads = new[]
{
(uri: new Uri("https://example.com/file1.zip"), path: "downloads/file1.zip"),
(uri: new Uri("https://example.com/file2.zip"), path: "downloads/file2.zip"),
(uri: new Uri("https://example.com/file3.zip"), path: "downloads/file3.zip")
};
var results = await downloads.DownloadParallelAsync(
httpClient,
new StreamingDownloadOptions
{
EnableResume = true,
ValidateContentLength = true,
OnProgressAsync = (uri, downloaded, total) =>
{
var percent = total.HasValue ? (downloaded * 100.0 / total.Value) : 0;
Console.WriteLine($"{uri}: {downloaded:N0}/{total:N0} bytes ({percent:F1}%)");
return ValueTask.CompletedTask;
},
HttpOptions = new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 3,
MaxRetries = 5
}
}
});
foreach (var (uri, filePath, bytesDownloaded) in results)
{
Console.WriteLine($"Downloaded {uri} to {filePath}: {bytesDownloaded:N0} bytes");
}
Stream Download to Custom Destination
Download directly to any Stream:
using var memoryStream = new MemoryStream();
var bytesDownloaded = await HttpStreamingExtensions.DownloadToStreamAsync(
new Uri("https://example.com/data.json"),
memoryStream,
httpClient,
new StreamingDownloadOptions
{
BufferSize = 8192,
OnProgressAsync = (uri, downloaded, total) =>
{
Console.WriteLine($"Downloaded: {downloaded:N0} bytes");
return ValueTask.CompletedTask;
}
});
memoryStream.Position = 0;
var data = await JsonSerializer.DeserializeAsync<MyData>(memoryStream);
Automatic Retry Handling
Rivulet.Http automatically retries transient HTTP errors:
- 408 Request Timeout
- 429 Too Many Requests
- 500 Internal Server Error
- 502 Bad Gateway
- 503 Service Unavailable
- 504 Gateway Timeout
Respect Retry-After Headers
Automatically respects Retry-After headers from rate-limited APIs:
var options = new HttpOptions
{
RespectRetryAfterHeader = true, // Default: true
ParallelOptions = new ParallelOptionsRivulet
{
MaxRetries = 5,
BaseDelay = TimeSpan.FromSeconds(1)
}
};
var responses = await urls.GetParallelAsync(httpClient, options);
When a server returns 429 Too Many Requests or 503 Service Unavailable with a Retry-After header, Rivulet.Http will wait the specified duration before retrying instead of using the configured backoff strategy.
Error Handling
HTTP-Specific Error Callbacks
Get notified of HTTP errors with status codes:
var options = new HttpOptions
{
OnHttpErrorAsync = (uri, statusCode, exception) =>
{
Console.WriteLine($"Error fetching {uri}: {statusCode} - {exception.Message}");
return ValueTask.CompletedTask;
},
ParallelOptions = new ParallelOptionsRivulet
{
ErrorMode = ErrorMode.CollectAndContinue,
MaxRetries = 3
}
};
var responses = await urls.GetParallelAsync(httpClient, options);
Custom Retriable Status Codes
Customize which HTTP status codes should trigger retries:
var options = new HttpOptions
{
RetriableStatusCodes = new HashSet<HttpStatusCode>
{
HttpStatusCode.TooManyRequests,
HttpStatusCode.ServiceUnavailable,
HttpStatusCode.RequestTimeout
},
ParallelOptions = new ParallelOptionsRivulet
{
MaxRetries = 3,
BackoffStrategy = BackoffStrategy.ExponentialJitter
}
};
Advanced Features
Rate Limiting
Control request rate to respect API limits:
var options = new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10,
RateLimit = new RateLimitOptions
{
TokensPerSecond = 100,
BurstCapacity = 200
}
}
};
var responses = await urls.GetParallelAsync(httpClient, options);
Circuit Breaker
Prevent cascading failures with circuit breaker pattern:
var options = new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
CircuitBreaker = new CircuitBreakerOptions
{
FailureThreshold = 5,
SuccessThreshold = 2,
OpenTimeout = TimeSpan.FromSeconds(30)
}
}
};
Progress Tracking
Monitor download progress for long-running operations:
var options = new StreamingDownloadOptions
{
ProgressInterval = TimeSpan.FromSeconds(1),
OnProgressAsync = (uri, downloaded, total) =>
{
var percent = total.HasValue ? (downloaded * 100.0 / total.Value) : 0;
Console.WriteLine($"Progress: {percent:F1}%");
return ValueTask.CompletedTask;
},
OnResumeAsync = (uri, offset) =>
{
Console.WriteLine($"Resuming download from byte {offset:N0}");
return ValueTask.CompletedTask;
},
OnCompleteAsync = (uri, path, bytes) =>
{
Console.WriteLine($"Download complete: {bytes:N0} bytes saved to {path}");
return ValueTask.CompletedTask;
}
};
HTTP Methods Supported
All standard HTTP methods with parallel operators:
- GET:
GetParallelAsync,GetStringParallelAsync,GetByteArrayParallelAsync - POST:
PostParallelAsync - PUT:
PutParallelAsync - DELETE:
DeleteParallelAsync
All methods support both HttpClient and IHttpClientFactory.
Configuration Options
HttpOptions
HTTP-specific configuration:
var options = new HttpOptions
{
RequestTimeout = TimeSpan.FromSeconds(30),
RespectRetryAfterHeader = true,
RetriableStatusCodes = new HashSet<HttpStatusCode> { /* ... */ },
BufferSize = 81920,
OnHttpErrorAsync = async (uri, status, ex) => { /* ... */ },
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10,
MaxRetries = 3,
BaseDelay = TimeSpan.FromMilliseconds(100),
BackoffStrategy = BackoffStrategy.ExponentialJitter,
ErrorMode = ErrorMode.CollectAndContinue
}
};
StreamingDownloadOptions
Download-specific configuration:
var options = new StreamingDownloadOptions
{
EnableResume = true,
ValidateContentLength = true,
OverwriteExisting = false,
BufferSize = 81920,
ProgressInterval = TimeSpan.FromSeconds(1),
HttpOptions = new HttpOptions { /* ... */ }
};
Best Practices
- Dispose HttpResponseMessage: Always dispose responses when using
GetParallelAsync,PostParallelAsync, etc. - Use String/ByteArray Variants: Use
GetStringParallelAsyncorGetByteArrayParallelAsyncfor automatic disposal - Set MaxDegreeOfParallelism: Always limit concurrency to avoid overwhelming servers
- Enable Resume for Large Files: Use
EnableResume = truefor downloads that might be interrupted - Respect Rate Limits: Use
RespectRetryAfterHeader = trueand configureRateLimitfor APIs - Use HttpClientFactory: Prefer
IHttpClientFactoryover creating HttpClient instances manually
Performance
Rivulet.Http is designed for high-throughput scenarios:
- Bounded Concurrency: Prevents resource exhaustion
- Backpressure: Channel-based buffering prevents memory issues
- Zero Allocations: Uses
ValueTask<T>in hot paths - Streaming Downloads: Efficient memory usage for large files
- Resume Support: Saves bandwidth by resuming interrupted downloads
Examples
See the samples directory for complete working examples including:
- API scraping with rate limiting
- Bulk data export/import
- Image downloading with progress tracking
- Resilient file synchronization
License
MIT License - see LICENSE file for details
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Microsoft.Extensions.Http (>= 10.0.0)
- Rivulet.Core (>= 1.3.0)
-
net9.0
- Microsoft.Extensions.Http (>= 10.0.0)
- Rivulet.Core (>= 1.3.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.3.0 | 152 | 12/13/2025 |
| 1.3.0-beta | 409 | 12/8/2025 |
| 1.3.0-alpha | 284 | 12/8/2025 |