Rivulet.Diagnostics.OpenTelemetry
1.3.0
dotnet add package Rivulet.Diagnostics.OpenTelemetry --version 1.3.0
NuGet\Install-Package Rivulet.Diagnostics.OpenTelemetry -Version 1.3.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Rivulet.Diagnostics.OpenTelemetry" Version="1.3.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Rivulet.Diagnostics.OpenTelemetry" Version="1.3.0" />
<PackageReference Include="Rivulet.Diagnostics.OpenTelemetry" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Rivulet.Diagnostics.OpenTelemetry --version 1.3.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Rivulet.Diagnostics.OpenTelemetry, 1.3.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Rivulet.Diagnostics.OpenTelemetry@1.3.0
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Rivulet.Diagnostics.OpenTelemetry&version=1.3.0
#tool nuget:?package=Rivulet.Diagnostics.OpenTelemetry&version=1.3.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Rivulet.Diagnostics.OpenTelemetry
OpenTelemetry integration for Rivulet.Core providing distributed tracing, metrics export, and comprehensive observability.
Installation
dotnet add package Rivulet.Diagnostics.OpenTelemetry
Features
- Distributed Tracing: Automatic activity creation with parent-child relationships
- Metrics Export: Bridge EventCounters to OpenTelemetry Meters
- Retry Tracking: Record retry attempts as activity events
- Circuit Breaker Events: Track circuit state changes in traces
- Adaptive Concurrency: Monitor concurrency adjustments
- Error Correlation: Link errors with retry attempts and transient classification
Quick Start
1. Configure OpenTelemetry
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using OpenTelemetry.Metrics;
using Rivulet.Diagnostics.OpenTelemetry;
// At application startup
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("MyService"))
.AddSource(RivuletActivitySource.SourceName)
.AddJaegerExporter(options =>
{
options.AgentHost = "localhost";
options.AgentPort = 6831;
})
.Build();
using var meterProvider = Sdk.CreateMeterProviderBuilder()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("MyService"))
.AddMeter(RivuletMetricsExporter.MeterName)
.AddPrometheusExporter()
.Build();
// Create metrics exporter
// IMPORTANT: Must remain alive for the duration of the application
// Bridges EventCounters from Rivulet.Core to OpenTelemetry Meters
// Disposing it stops the metrics export
using var metricsExporter = new RivuletMetricsExporter();
2. Use with Rivulet Operations
using Rivulet.Core;
using Rivulet.Diagnostics.OpenTelemetry;
var urls = new[] { "https://api.example.com/1", "https://api.example.com/2", /* ... */ };
var options = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
MaxRetries = 3,
IsTransient = ex => ex is HttpRequestException,
ErrorMode = ErrorMode.CollectAndContinue
}.WithOpenTelemetryTracing("FetchUrls");
var results = await urls.SelectParallelAsync(
async (url, ct) =>
{
using var response = await httpClient.GetAsync(url, ct);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync(ct);
},
options);
Distributed Tracing
Activity Hierarchy
Each parallel operation creates activities with this structure:
Rivulet.FetchUrls [Root Activity]
├── Rivulet.FetchUrls.Item [Item 0]
│ ├── Tags: rivulet.item_index=0
│ └── Status: Ok
├── Rivulet.FetchUrls.Item [Item 1 - with retry]
│ ├── Tags: rivulet.item_index=1
│ ├── Events:
│ │ └── retry (attempt=1, exception.type=HttpRequestException)
│ └── Status: Ok
└── Rivulet.FetchUrls.Item [Item 2 - failed]
├── Tags: rivulet.item_index=2, rivulet.error.transient=false
├── Exception: InvalidOperationException
└── Status: Error
Activity Tags
| Tag | Description |
|---|---|
rivulet.item_index |
Index of the item being processed |
rivulet.total_items |
Total number of items (on root activity) |
rivulet.retries |
Number of retry attempts |
rivulet.error.transient |
Whether error is transient |
rivulet.items_processed |
Items successfully processed |
rivulet.concurrency.current |
Current concurrency level |
rivulet.circuit_breaker.state |
Circuit breaker state |
Activity Events
| Event | Description | Tags |
|---|---|---|
retry |
Retry attempt occurred | rivulet.retry_attempt, exception.type, exception.message |
circuit_breaker_state_change |
Circuit breaker changed state | rivulet.circuit_breaker.state |
adaptive_concurrency_change |
Concurrency level adjusted | rivulet.concurrency.old, rivulet.concurrency.new |
Metrics Export
The RivuletMetricsExporter bridges Rivulet's EventCounters to OpenTelemetry Meters:
Available Metrics
| Metric | Type | Unit | Description |
|---|---|---|---|
rivulet.items.started |
Gauge | {items} | Total items started |
rivulet.items.completed |
Gauge | {items} | Total items completed |
rivulet.retries.total |
Gauge | {retries} | Total retry attempts |
rivulet.failures.total |
Gauge | {failures} | Total failures after retries |
rivulet.throttle.events |
Gauge | {events} | Backpressure throttle events |
rivulet.drain.events |
Gauge | {events} | Channel drain events |
rivulet.error.rate |
Gauge | {ratio} | Error rate (failures/started) |
Example with Prometheus
using OpenTelemetry;
using OpenTelemetry.Metrics;
using OpenTelemetry.Exporter.Prometheus;
var meterProvider = Sdk.CreateMeterProviderBuilder()
.AddMeter(RivuletMetricsExporter.MeterName)
.AddPrometheusHttpListener(options =>
{
options.UriPrefixes = new[] { "http://localhost:9090/" };
})
.Build();
// Metrics available at http://localhost:9090/metrics
// Create exporter and keep it alive for the application lifetime
// It automatically bridges Rivulet EventCounters to OpenTelemetry
using var exporter = new RivuletMetricsExporter();
// Use Rivulet normally - metrics automatically exported
var results = await items.SelectParallelAsync(processAsync, options);
Advanced Usage
Retry Tracking
Track individual retry attempts in trace spans:
var options = new ParallelOptionsRivulet
{
MaxRetries = 5,
BaseDelay = TimeSpan.FromMilliseconds(100),
BackoffStrategy = BackoffStrategy.ExponentialJitter,
IsTransient = ex => ex is HttpRequestException or TimeoutException
}.WithOpenTelemetryTracingAndRetries("ProcessWithRetries", trackRetries: true);
// Each retry creates an activity event with exception details
var results = await urls.SelectParallelAsync(processAsync, options);
Circuit Breaker Monitoring
Monitor circuit breaker state changes in traces:
var options = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 32,
CircuitBreaker = new CircuitBreakerOptions
{
FailureThreshold = 5,
OpenTimeout = TimeSpan.FromSeconds(30),
OnStateChange = async (oldState, newState) =>
{
// State changes are automatically recorded in current activity
logger.LogWarning($"Circuit breaker: {oldState} → {newState}");
}
}
}.WithOpenTelemetryTracing("ResilientOperation");
var results = await items.SelectParallelAsync(processAsync, options);
Adaptive Concurrency Tracking
Track concurrency adjustments:
var options = new ParallelOptionsRivulet
{
AdaptiveConcurrency = new AdaptiveConcurrencyOptions
{
MinConcurrency = 1,
MaxConcurrency = 64,
TargetLatency = TimeSpan.FromMilliseconds(100),
OnConcurrencyChange = async (oldValue, newValue) =>
{
// Changes automatically recorded in current activity
logger.LogInformation($"Concurrency adjusted: {oldValue} → {newValue}");
}
}
}.WithOpenTelemetryTracing("AdaptiveOperation");
var results = await items.SelectParallelAsync(processAsync, options);
Integration with Observability Platforms
Jaeger
using OpenTelemetry.Trace;
var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(RivuletActivitySource.SourceName)
.AddJaegerExporter(options =>
{
options.AgentHost = "jaeger-host";
options.AgentPort = 6831;
})
.Build();
Azure Monitor (Application Insights)
using Azure.Monitor.OpenTelemetry.Exporter;
var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(RivuletActivitySource.SourceName)
.AddAzureMonitorTraceExporter(options =>
{
options.ConnectionString = "InstrumentationKey=...";
})
.Build();
DataDog
using OpenTelemetry.Exporter;
var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(RivuletActivitySource.SourceName)
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri("https://trace.agent.datadoghq.com:4318");
})
.Build();
Zipkin
using OpenTelemetry.Trace;
var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(RivuletActivitySource.SourceName)
.AddZipkinExporter(options =>
{
options.Endpoint = new Uri("http://zipkin-host:9411/api/v2/spans");
})
.Build();
Best Practices
- Configure Once: Set up OpenTelemetry at application startup
- Use Operation Names: Provide meaningful operation names for tracing
- Sample Appropriately: Use sampling for high-throughput scenarios
- Monitor Error Rate: Alert on
rivulet.error.ratemetric - Track Retries: Enable retry tracking for transient error analysis
- Correlate Traces: Use W3C TraceContext for cross-service correlation
- Keep Exporter Alive: RivuletMetricsExporter must remain alive for metrics export
- In web apps: Register as singleton service
- In console apps: Keep reference until application exits
- Disposing stops the EventCounter listener and metrics collection
Performance
- Minimal Overhead: Activities only created when tracing is enabled
- Async-Safe: All operations use
AsyncLocal<T>for proper async context flow - Zero Allocations: When tracing is disabled, no activities are created
- Sampling Friendly: Respects OpenTelemetry sampling decisions
Framework Support
- .NET 8.0
- .NET 9.0
Dependencies
- Rivulet.Core (≥ 1.2.0)
- OpenTelemetry.Api (≥ 1.13.1)
- System.Diagnostics.DiagnosticSource (≥ 9.0.0)
License
MIT License - see LICENSE file for details
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net8.0
- OpenTelemetry.Api (>= 1.14.0)
- Rivulet.Core (>= 1.3.0)
- System.Diagnostics.DiagnosticSource (>= 10.0.0)
-
net9.0
- OpenTelemetry.Api (>= 1.14.0)
- Rivulet.Core (>= 1.3.0)
- System.Diagnostics.DiagnosticSource (>= 10.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.3.0 | 169 | 12/13/2025 |
| 1.3.0-beta | 420 | 12/8/2025 |
| 1.3.0-alpha | 287 | 12/8/2025 |
| 1.2.0 | 404 | 11/19/2025 |