Rivulet.Testing 1.3.0

dotnet add package Rivulet.Testing --version 1.3.0
                    
NuGet\Install-Package Rivulet.Testing -Version 1.3.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Rivulet.Testing" Version="1.3.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Rivulet.Testing" Version="1.3.0" />
                    
Directory.Packages.props
<PackageReference Include="Rivulet.Testing" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Rivulet.Testing --version 1.3.0
                    
#r "nuget: Rivulet.Testing, 1.3.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Rivulet.Testing@1.3.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Rivulet.Testing&version=1.3.0
                    
Install as a Cake Addin
#tool nuget:?package=Rivulet.Testing&version=1.3.0
                    
Install as a Cake Tool

Rivulet.Testing

Testing utilities for Rivulet parallel operations including deterministic schedulers, virtual time, fake channels, and chaos injection.

Features

  • VirtualTimeProvider: Control time in tests without actual delays
  • FakeChannel: Testable channel implementation with operation tracking
  • ChaosInjector: Inject failures and delays for resilience testing
  • ConcurrencyAsserter: Assert and verify concurrency behavior

Installation

dotnet add package Rivulet.Testing

Usage

VirtualTimeProvider

Test time-dependent operations without waiting for real time to pass:

using Rivulet.Testing;

[Test]
public async Task TestTimeoutBehavior()
{
    using var timeProvider = new VirtualTimeProvider();

    // Schedule a delay
    var delayTask = timeProvider.CreateDelay(TimeSpan.FromSeconds(10));

    // Fast-forward time
    timeProvider.AdvanceTime(TimeSpan.FromSeconds(5));
    Assert.False(delayTask.IsCompleted); // Only 5 seconds passed

    timeProvider.AdvanceTime(TimeSpan.FromSeconds(5));
    Assert.True(delayTask.IsCompleted); // Now 10 seconds total

    await delayTask; // Await the completed task
}

FakeChannel

Track channel operations and control capacity:

using Rivulet.Testing;
using System.Threading.Channels;

[Test]
public async Task TestChannelOperations()
{
    var channel = new FakeChannel<int>(boundedCapacity: 10);

    // Write items
    await channel.WriteAsync(1);
    await channel.WriteAsync(2);
    await channel.WriteAsync(3);

    Assert.Equal(3, channel.WriteCount);
    Assert.Equal(0, channel.ReadCount);

    // Read items
    var item1 = await channel.ReadAsync();
    var item2 = await channel.ReadAsync();

    Assert.Equal(3, channel.WriteCount);
    Assert.Equal(2, channel.ReadCount);

    channel.Dispose();
}

ChaosInjector

Test resilience by injecting random failures:

using Rivulet.Testing;

[Test]
public async Task TestWithChaos()
{
    var chaos = new ChaosInjector(
        failureRate: 0.3, // 30% failure rate
        artificialDelay: TimeSpan.FromMilliseconds(100)
    );

    var retries = 0;
    var maxRetries = 5;

    while (retries < maxRetries)
    {
        try
        {
            var result = await chaos.ExecuteAsync(async () =>
            {
                // Your operation here
                return await SomeOperationAsync();
            });

            // Success!
            break;
        }
        catch (ChaosException)
        {
            retries++;
            if (retries >= maxRetries)
                throw new Exception("Max retries exceeded");
        }
    }
}

ConcurrencyAsserter

Verify that concurrency limits are respected:

using Rivulet.Testing;
using Rivulet.Core;

[Test]
public async Task TestMaxDegreeOfParallelism()
{
    var asserter = new ConcurrencyAsserter();
    var maxDegree = 5;

    var items = Enumerable.Range(1, 100);

    await items.ForEachParallelAsync(
        async (item, ct) =>
        {
            using var scope = asserter.Enter();

            // Simulate work
            await Task.Delay(10, ct);
        },
        new ParallelOptionsRivulet
        {
            MaxDegreeOfParallelism = maxDegree
        }
    );

    Assert.True(asserter.MaxConcurrency <= maxDegree);
    Assert.Equal(0, asserter.CurrentConcurrency); // All completed
}

Advanced Examples

Combining VirtualTimeProvider with Parallel Operations

[Test]
public async Task TestRetryWithVirtualTime()
{
    using var timeProvider = new VirtualTimeProvider();
    var attempts = 0;

    var retryTask = Task.Run(async () =>
    {
        while (attempts < 3)
        {
            attempts++;
            await timeProvider.CreateDelay(TimeSpan.FromSeconds(1));
        }
    });

    // Wait for all delays to be registered
    await Task.Delay(100);

    // Fast-forward through all retries
    timeProvider.AdvanceTime(TimeSpan.FromSeconds(3));
    await retryTask;

    Assert.Equal(3, attempts);
}

Testing Channel Backpressure

[Test]
public async Task TestBoundedChannelBackpressure()
{
    var channel = new FakeChannel<int>(boundedCapacity: 2);

    // Fill the channel
    await channel.WriteAsync(1);
    await channel.WriteAsync(2);

    // This will block until space is available
    var writeTask = Task.Run(() => channel.WriteAsync(3));

    await Task.Delay(100);
    Assert.False(writeTask.IsCompleted); // Still blocked

    // Read one item to make space
    await channel.ReadAsync();

    await writeTask; // Now completes
    Assert.Equal(3, channel.WriteCount);
}

Chaos Testing with Retry Logic

[Test]
public async Task TestRetryLogicUnderChaos()
{
    var chaos = new ChaosInjector(failureRate: 0.5);
    var successCount = 0;
    var failureCount = 0;

    async Task<int> OperationWithRetry(int value)
    {
        var maxRetries = 10;
        for (int i = 0; i < maxRetries; i++)
        {
            try
            {
                return await chaos.ExecuteAsync(async () =>
                {
                    await Task.Delay(1);
                    return value * 2;
                });
            }
            catch (ChaosException)
            {
                failureCount++;
                if (i == maxRetries - 1)
                    throw;
            }
        }
        return -1;
    }

    var items = Enumerable.Range(1, 10);
    var results = await items.SelectParallelAsync(
        async (item, ct) => await OperationWithRetry(item),
        new ParallelOptionsRivulet { MaxDegreeOfParallelism = 3 }
    );

    Assert.Equal(10, results.Count);
    Assert.True(failureCount > 0); // Some failures occurred
}

Verifying Concurrency Patterns

[Test]
public async Task TestThrottling()
{
    var asserter = new ConcurrencyAsserter();
    var throttle = new SemaphoreSlim(3, 3); // Max 3 concurrent

    var tasks = Enumerable.Range(1, 20).Select(async i =>
    {
        await throttle.WaitAsync();
        try
        {
            using var scope = asserter.Enter();
            await Task.Delay(50);
        }
        finally
        {
            throttle.Release();
        }
    });

    await Task.WhenAll(tasks);

    Assert.True(asserter.MaxConcurrency <= 3);
}

Integration with Rivulet.Core

These testing utilities work seamlessly with Rivulet.Core parallel operations:

using Rivulet.Core;
using Rivulet.Testing;

[Test]
public async Task TestParallelOperationsWithTestingTools()
{
    var channel = new FakeChannel<int>();
    var asserter = new ConcurrencyAsserter();
    var chaos = new ChaosInjector(failureRate: 0.2);

    // Producer
    var producerTask = Task.Run(async () =>
    {
        for (int i = 0; i < 100; i++)
        {
            await channel.WriteAsync(i);
        }
        channel.Complete();
    });

    // Consumer with chaos and concurrency tracking
    var results = new List<int>();
    await channel.Reader.ReadAllAsync()
        .ForEachParallelAsync(
            async (item, ct) =>
            {
                using var scope = asserter.Enter();

                var result = await chaos.ExecuteAsync(async () =>
                {
                    await Task.Delay(1, ct);
                    return item * 2;
                });

                lock (results)
                {
                    results.Add(result);
                }
            },
            new ParallelOptionsRivulet { MaxDegreeOfParallelism = 10 }
        );

    await producerTask;

    Assert.Equal(100, channel.WriteCount);
    Assert.Equal(100, channel.ReadCount);
    Assert.True(asserter.MaxConcurrency <= 10);
}

Best Practices

  1. VirtualTimeProvider: Always dispose of the provider to clean up resources
  2. FakeChannel: Use bounded capacity to test backpressure scenarios
  3. ChaosInjector: Start with low failure rates (0.1-0.3) and increase gradually
  4. ConcurrencyAsserter: Reset between test cases to avoid contamination

License

MIT License - see LICENSE file for details


Made with ❤️ by Jeffeek | NuGet | GitHub

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.3.0 178 12/13/2025
1.3.0-beta 432 12/8/2025
1.3.0-alpha 304 12/8/2025
1.2.0 413 11/19/2025