Rivulet.Sql 1.3.0-alpha

This is a prerelease version of Rivulet.Sql.
There is a newer version of this package available.
See the version list below for details.
dotnet add package Rivulet.Sql --version 1.3.0-alpha
                    
NuGet\Install-Package Rivulet.Sql -Version 1.3.0-alpha
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Rivulet.Sql" Version="1.3.0-alpha" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Rivulet.Sql" Version="1.3.0-alpha" />
                    
Directory.Packages.props
<PackageReference Include="Rivulet.Sql" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Rivulet.Sql --version 1.3.0-alpha
                    
#r "nuget: Rivulet.Sql, 1.3.0-alpha"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Rivulet.Sql@1.3.0-alpha
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Rivulet.Sql&version=1.3.0-alpha&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=Rivulet.Sql&version=1.3.0-alpha&prerelease
                    
Install as a Cake Tool

Rivulet.Sql

Safe parallel SQL operations with connection pooling awareness and bulk operations.

Built on top of Rivulet.Core, this package provides SQL-aware parallel operators that automatically handle transient database failures, respect connection pooling limits, and support efficient bulk operations.

Installation

dotnet add package Rivulet.Sql

Requires Rivulet.Core (automatically included).

Quick Start

Parallel SQL Queries

Execute multiple queries in parallel with automatic retry for transient SQL errors:

using Rivulet.Sql;
using System.Data.SqlClient;

var userIds = new[] { 1, 2, 3, 4, 5 };
var queries = userIds.Select(id => $"SELECT * FROM Users WHERE Id = {id}");

var results = await queries.ExecuteQueriesParallelAsync(
    () => new SqlConnection(connectionString),
    reader =>
    {
        var users = new List<User>();
        while (reader.Read())
        {
            users.Add(new User
            {
                Id = reader.GetInt32(0),
                Name = reader.GetString(1),
                Email = reader.GetString(2)
            });
        }
        return users;
    },
    new SqlOptions
    {
        CommandTimeout = 30,
        ParallelOptions = new ParallelOptionsRivulet
        {
            MaxDegreeOfParallelism = 10,
            MaxRetries = 3
        }
    });

foreach (var userList in results)
{
    foreach (var user in userList)
    {
        Console.WriteLine($"{user.Id}: {user.Name}");
    }
}

Parameterized Queries

Use parameters to prevent SQL injection:

var userIds = new[] { 1, 2, 3 };
var queriesWithParams = userIds.Select(id => (
    query: "SELECT * FROM Users WHERE Id = @id",
    configureParams: (Action<IDbCommand>)((cmd) =>
    {
        var param = cmd.CreateParameter();
        param.ParameterName = "@id";
        param.Value = id;
        cmd.Parameters.Add(param);
    })
));

var results = await queriesWithParams.ExecuteQueriesParallelAsync(
    () => new SqlConnection(connectionString),
    reader =>
    {
        var user = new User();
        if (reader.Read())
        {
            user.Id = reader.GetInt32(0);
            user.Name = reader.GetString(1);
        }
        return user;
    });

Parallel SQL Commands

Execute multiple INSERT, UPDATE, or DELETE commands in parallel:

var updates = new[]
{
    "UPDATE Users SET LastLogin = GETDATE() WHERE Id = 1",
    "UPDATE Users SET LastLogin = GETDATE() WHERE Id = 2",
    "UPDATE Users SET LastLogin = GETDATE() WHERE Id = 3"
};

var affectedRows = await updates.ExecuteCommandsParallelAsync(
    () => new SqlConnection(connectionString),
    new SqlOptions
    {
        ParallelOptions = new ParallelOptionsRivulet
        {
            MaxDegreeOfParallelism = 5,
            ErrorMode = ErrorMode.CollectAndContinue
        }
    });

Console.WriteLine($"Total rows affected: {affectedRows.Sum()}");

Parallel Scalar Queries

Execute scalar queries (COUNT, MAX, MIN, etc.) in parallel:

var tableNames = new[] { "Users", "Products", "Orders" };
var queries = tableNames.Select(table => $"SELECT COUNT(*) FROM {table}");

var counts = await queries.ExecuteScalarParallelAsync<int>(
    () => new SqlConnection(connectionString));

for (int i = 0; i < tableNames.Length; i++)
{
    Console.WriteLine($"{tableNames[i]}: {counts[i]} rows");
}

Bulk Operations

Bulk Insert

Efficiently insert thousands of records using batched operations:

var users = Enumerable.Range(1, 10000)
    .Select(i => new User { Name = $"User{i}", Email = $"user{i}@example.com" })
    .ToList();

var totalInserted = await users.BulkInsertAsync(
    () => new SqlConnection(connectionString),
    async (batch, cmd, ct) =>
    {
        var sb = new StringBuilder();
        int paramIndex = 0;

        foreach (var user in batch)
        {
            if (sb.Length > 0) sb.Append("; ");
            sb.Append($"INSERT INTO Users (Name, Email) VALUES (@name{paramIndex}, @email{paramIndex})");

            var nameParam = cmd.CreateParameter();
            nameParam.ParameterName = $"@name{paramIndex}";
            nameParam.Value = user.Name;
            cmd.Parameters.Add(nameParam);

            var emailParam = cmd.CreateParameter();
            emailParam.ParameterName = $"@email{paramIndex}";
            emailParam.Value = user.Email;
            cmd.Parameters.Add(emailParam);

            paramIndex++;
        }

        cmd.CommandText = sb.ToString();
        await Task.CompletedTask;
    },
    new BulkOperationOptions
    {
        BatchSize = 1000,
        UseTransaction = true,
        SqlOptions = new SqlOptions
        {
            ParallelOptions = new ParallelOptionsRivulet
            {
                MaxDegreeOfParallelism = 4
            }
        }
    });

Console.WriteLine($"Inserted {totalInserted} users");

Bulk Update

Update multiple records efficiently:

var users = await GetUsersToUpdate();

var totalUpdated = await users.BulkUpdateAsync(
    () => new SqlConnection(connectionString),
    async (batch, cmd, ct) =>
    {
        var sb = new StringBuilder();
        int paramIndex = 0;

        foreach (var user in batch)
        {
            if (sb.Length > 0) sb.Append("; ");
            sb.Append($"UPDATE Users SET Name = @name{paramIndex}, Email = @email{paramIndex} WHERE Id = @id{paramIndex}");

            var idParam = cmd.CreateParameter();
            idParam.ParameterName = $"@id{paramIndex}";
            idParam.Value = user.Id;
            cmd.Parameters.Add(idParam);

            var nameParam = cmd.CreateParameter();
            nameParam.ParameterName = $"@name{paramIndex}";
            nameParam.Value = user.Name;
            cmd.Parameters.Add(nameParam);

            var emailParam = cmd.CreateParameter();
            emailParam.ParameterName = $"@email{paramIndex}";
            emailParam.Value = user.Email;
            cmd.Parameters.Add(emailParam);

            paramIndex++;
        }

        cmd.CommandText = sb.ToString();
        await Task.CompletedTask;
    },
    new BulkOperationOptions
    {
        BatchSize = 500,
        UseTransaction = true
    });

Bulk Delete

Delete multiple records in batches:

var userIdsToDelete = await GetInactiveUserIds();

var totalDeleted = await userIdsToDelete.BulkDeleteAsync(
    () => new SqlConnection(connectionString),
    async (batch, cmd, ct) =>
    {
        cmd.CommandText = $"DELETE FROM Users WHERE Id IN ({string.Join(",", batch)})";
        await Task.CompletedTask;
    },
    new BulkOperationOptions
    {
        BatchSize = 1000,
        UseTransaction = true
    });

Console.WriteLine($"Deleted {totalDeleted} inactive users");

Automatic Retry Handling

Rivulet.Sql automatically retries transient SQL errors:

SQL Server Transient Errors

  • -2, -1: Connection timeout/broken
  • 53: Connection does not exist
  • 64: Error on server
  • 40197, 40501, 40613: Azure SQL transient errors

PostgreSQL (Npgsql) Transient Errors

  • 08000-08006: Connection exceptions
  • 53300: Too many connections
  • 57P03: Cannot connect now

MySQL Transient Errors

  • 1040: Too many connections
  • 1205: Lock wait timeout
  • 1213: Deadlock found
  • 2006, 2013: Server gone away/lost connection
var options = new SqlOptions
{
    ParallelOptions = new ParallelOptionsRivulet
    {
        MaxRetries = 5,
        BaseDelay = TimeSpan.FromMilliseconds(100),
        BackoffStrategy = BackoffStrategy.ExponentialJitter
    },
    OnSqlErrorAsync = (item, exception, retryAttempt) =>
    {
        Console.WriteLine($"SQL error on retry {retryAttempt}: {exception.Message}");
        return ValueTask.CompletedTask;
    }
};

var results = await queries.ExecuteQueriesParallelAsync(
    () => new SqlConnection(connectionString),
    reader => MapToUser(reader),
    options);

Connection Pool Management

Rivulet.Sql is designed to work with ADO.NET connection pooling:

// Connection string with pooling configuration
var connectionString = "Server=localhost;Database=MyDb;User Id=sa;Password=***;" +
                      "Max Pool Size=100;Min Pool Size=10;";

var options = new SqlOptions
{
    AutoManageConnection = true,  // Automatically opens and closes connections
    ParallelOptions = new ParallelOptionsRivulet
    {
        MaxDegreeOfParallelism = 20  // Don't exceed connection pool size
    }
};

// The factory function creates new connections that participate in pooling
var results = await queries.ExecuteQueriesParallelAsync(
    () => new SqlConnection(connectionString),
    reader => MapToUser(reader),
    options);

Best Practice: Set MaxDegreeOfParallelism to be less than your connection pool's Max Pool Size to avoid pool exhaustion.

Batch Operation Callbacks

Monitor bulk operation progress:

var totalProcessed = 0;
var options = new BulkOperationOptions
{
    BatchSize = 1000,
    OnBatchStartAsync = (batch, batchNum) =>
    {
        Console.WriteLine($"Starting batch {batchNum} with {batch.Count} items");
        return ValueTask.CompletedTask;
    },
    OnBatchCompleteAsync = (batch, batchNum, affectedRows) =>
    {
        totalProcessed += affectedRows;
        Console.WriteLine($"Batch {batchNum} complete: {affectedRows} rows affected");
        Console.WriteLine($"Total processed so far: {totalProcessed}");
        return ValueTask.CompletedTask;
    },
    OnBatchErrorAsync = (batch, batchNum, exception) =>
    {
        Console.WriteLine($"Batch {batchNum} failed: {exception.Message}");
        return ValueTask.CompletedTask;
    }
};

await items.BulkInsertAsync(
    () => new SqlConnection(connectionString),
    BuildInsertCommand,
    options);

Advanced Features

Transaction Isolation Levels

Control transaction isolation for bulk operations:

var options = new BulkOperationOptions
{
    UseTransaction = true,
    SqlOptions = new SqlOptions
    {
        IsolationLevel = IsolationLevel.Serializable  // Highest isolation
    }
};

await users.BulkInsertAsync(
    () => new SqlConnection(connectionString),
    BuildInsertCommand,
    options);

Custom Command Timeout

Set per-operation timeouts:

var options = new SqlOptions
{
    CommandTimeout = 120,  // 2 minutes for long-running queries
    ParallelOptions = new ParallelOptionsRivulet
    {
        PerItemTimeout = TimeSpan.FromSeconds(130)  // Overall timeout per item
    }
};

Provider-Agnostic Code

Works with any ADO.NET provider (SQL Server, PostgreSQL, MySQL, SQLite, etc.):

// SQL Server
var results1 = await queries.ExecuteQueriesParallelAsync(
    () => new SqlConnection(sqlServerConnectionString),
    MapToUser);

// PostgreSQL
var results2 = await queries.ExecuteQueriesParallelAsync(
    () => new NpgsqlConnection(postgresConnectionString),
    MapToUser);

// MySQL
var results3 = await queries.ExecuteQueriesParallelAsync(
    () => new MySqlConnection(mysqlConnectionString),
    MapToUser);

Configuration Options

SqlOptions

SQL-specific configuration:

var options = new SqlOptions
{
    CommandTimeout = 30,              // Command timeout in seconds
    AutoManageConnection = true,       // Auto open/close connections
    IsolationLevel = IsolationLevel.ReadCommitted,  // Transaction isolation
    OnSqlErrorAsync = async (item, ex, retry) => { /* custom logging */ },
    ParallelOptions = new ParallelOptionsRivulet
    {
        MaxDegreeOfParallelism = 10,
        MaxRetries = 3,
        BaseDelay = TimeSpan.FromMilliseconds(100),
        BackoffStrategy = BackoffStrategy.ExponentialJitter,
        ErrorMode = ErrorMode.CollectAndContinue
    }
};

BulkOperationOptions

Bulk operation configuration:

var options = new BulkOperationOptions
{
    BatchSize = 1000,                  // Items per batch
    UseTransaction = true,              // Wrap each batch in transaction
    SqlOptions = new SqlOptions { /* ... */ },
    OnBatchStartAsync = async (batch, num) => { /* ... */ },
    OnBatchCompleteAsync = async (batch, num, affected) => { /* ... */ },
    OnBatchErrorAsync = async (batch, num, ex) => { /* ... */ }
};

Best Practices

  1. Use Parameterized Queries: Always use parameters to prevent SQL injection
  2. Set Appropriate Parallelism: Match MaxDegreeOfParallelism to your connection pool size
  3. Enable AutoManageConnection: Let Rivulet handle connection lifecycle unless you have specific needs
  4. Use Transactions for Bulk Operations: Enable UseTransaction = true for data consistency
  5. Monitor Progress: Use callbacks for long-running bulk operations
  6. Tune Batch Size: Experiment with batch sizes (100-2000) for optimal performance
  7. Handle Provider Differences: Be aware of provider-specific SQL syntax and error codes

Performance

Rivulet.Sql is designed for high-throughput database operations:

  • Connection Pooling Aware: Respects connection pool limits to avoid exhaustion
  • Batched Operations: Reduces round-trips for bulk operations
  • Bounded Concurrency: Prevents overwhelming the database
  • Automatic Retries: Handles transient failures without manual intervention
  • Zero Allocations: Uses ValueTask<T> in hot paths

Examples

See the samples directory for complete working examples including:

  • Parallel report generation from multiple queries
  • Bulk data migration between databases
  • ETL pipelines with SQL sources
  • Database maintenance operations

Multi-Database Support

Works seamlessly with:

  • SQL Server (System.Data.SqlClient, Microsoft.Data.SqlClient)
  • PostgreSQL (Npgsql)
  • MySQL (MySql.Data, MySqlConnector)
  • SQLite (System.Data.SQLite, Microsoft.Data.Sqlite)
  • Oracle (Oracle.ManagedDataAccess)
  • Any ADO.NET provider implementing IDbConnection

License

MIT License - see LICENSE file for details


Made with ❤️ by Jeffeek | NuGet | GitHub

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on Rivulet.Sql:

Package Downloads
Rivulet.Sql.SqlServer

SQL Server-specific optimizations for Rivulet.Sql including SqlBulkCopy integration for 10-100x faster bulk inserts

Rivulet.Sql.PostgreSql

PostgreSQL-specific optimizations for Rivulet.Sql including COPY command integration for 10-100x faster bulk inserts

Rivulet.Sql.MySql

MySQL-specific optimizations for Rivulet.Sql including LOAD DATA INFILE integration for 10-100x faster bulk inserts

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.3.0 197 12/13/2025
1.3.0-beta 453 12/8/2025
1.3.0-alpha 328 12/8/2025