Chd.Coordination 2.1.0

dotnet add package Chd.Coordination --version 2.1.0
                    
NuGet\Install-Package Chd.Coordination -Version 2.1.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Chd.Coordination" Version="2.1.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Chd.Coordination" Version="2.1.0" />
                    
Directory.Packages.props
<PackageReference Include="Chd.Coordination" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Chd.Coordination --version 2.1.0
                    
#r "nuget: Chd.Coordination, 2.1.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Chd.Coordination@2.1.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Chd.Coordination&version=2.1.0
                    
Install as a Cake Addin
#tool nuget:?package=Chd.Coordination&version=2.1.0
                    
Install as a Cake Tool

📘 Chd.Coordination – Distributed Coordination Library for .NET

NuGet Package

Chd.Coordination is a .NET Standard 2.0 compatible, Redis-based, DI-first distributed coordination library.
It provides essential primitives for distributed locks, idempotency, and minimal saga coordination, designed for scalable microservices and distributed systems.


📑 Table of Contents

  1. Features
  2. Installation
  3. Quick Start
  4. Dependency Injection
  5. Distributed Lock Examples
  6. Idempotency Examples
  7. Saga Examples
  8. CoordinationContext Usage
  9. Key-Value Stores
  10. Advanced Usage
  11. Unit Testing
  12. Design Principles
  13. Security and Reliability
  14. NuGet Package Metadata
  15. Changelog

🌟 Features

V2.0.4 – Available

  • Distributed Lock (Redis-based with safe acquire/release)
  • Idempotency helper (retry-safe execution)
  • Minimal Saga coordination (step-by-step orchestration)
  • Multiple Backend Support:
    • RedisKeyValueStore - Production-ready Redis implementation
    • InMemoryKeyValueStore - Fast, lightweight, perfect for testing
    • CompositeKeyValueStore - Flexible multi-backend composition
  • Complete IKeyValueStore Interface with all essential operations
  • Cancellation Token Support throughout all async operations
  • DI-first, testable, production-ready
  • .NET Standard 2.0 → works everywhere
  • Extendable with custom key-value stores

V2.x – Available Now

  • OpenTelemetry / Prometheus metrics (NEW in v2.1.0!)
  • Advanced retry policies with exponential backoff & jitter (NEW in v2.1.0!)
  • Circuit breaker pattern for cascading failure prevention (NEW in v2.1.0!)
  • 🚧 Strong consensus with fencing tokens
  • 🚧 Linearizable locking
  • 🚧 ZooKeeper / etcd backend implementations

Not in Scope

  • ❌ Exactly-once execution (requires 2PC)
  • ❌ Global distributed transactions

⚡ Installation

dotnet add package Chd.Coordination

For OpenTelemetry/Prometheus Metrics (Optional):

# For ASP.NET Core with Prometheus exporter
dotnet add package OpenTelemetry.Exporter.Prometheus.AspNetCore
dotnet add package OpenTelemetry.Extensions.Hosting

📊 Metrics & Observability

Enable OpenTelemetry Metrics

using OpenTelemetry.Metrics;

var builder = WebApplication.CreateBuilder(args);

// 1. Register coordination with metrics
builder.Services.AddCoordination(opt =>
{
    opt.RedisConnectionString = "localhost:6379";
});

// 2. Enable OpenTelemetry metrics
builder.Services.AddCoordinationMetrics(); // <-- NEW!

// 3. Configure OpenTelemetry exporter
builder.Services.AddOpenTelemetry()
    .WithMetrics(metrics => metrics
        .AddMeter("Chd.Coordination")  // <-- Add our meter
        .AddPrometheusExporter());

var app = builder.Build();

// 4. Map Prometheus scrape endpoint
app.MapPrometheusScrapingEndpoint();  // /metrics endpoint

app.Run();

Available Metrics

Lock Metrics:

  • chd_coordination_lock_acquired_total - Total locks acquired
  • chd_coordination_lock_failed_total - Failed lock attempts
  • chd_coordination_lock_released_total - Locks released
  • chd_coordination_lock_timeout_total - Lock acquisition timeouts
  • chd_coordination_lock_acquisition_duration_seconds - Lock acquisition time
  • chd_coordination_lock_contention_attempts - Retry attempts before success

Idempotency Metrics:

  • chd_coordination_idempotency_hit_total - Cache hits (duplicate requests)
  • chd_coordination_idempotency_miss_total - Cache misses (new requests)
  • chd_coordination_idempotency_executed_total - Operations executed
  • chd_coordination_idempotency_failed_total - Failed operations
  • chd_coordination_idempotency_execution_duration_seconds - Execution time

Saga Metrics:

  • chd_coordination_saga_started_total - Sagas started
  • chd_coordination_saga_completed_total - Successfully completed sagas
  • chd_coordination_saga_failed_total - Failed sagas
  • chd_coordination_saga_compensated_total - Compensated steps
  • chd_coordination_saga_duration_seconds - Total saga duration
  • chd_coordination_saga_step_duration_seconds - Individual step duration

Grafana Dashboard Query Examples

# Lock acquisition rate (per second)
rate(chd_coordination_lock_acquired_total[5m])

# Average lock acquisition time
rate(chd_coordination_lock_acquisition_duration_seconds_sum[5m]) / 
rate(chd_coordination_lock_acquisition_duration_seconds_count[5m])

# Lock failure percentage
(rate(chd_coordination_lock_failed_total[5m]) / 
 (rate(chd_coordination_lock_acquired_total[5m]) + rate(chd_coordination_lock_failed_total[5m]))) * 100

# Idempotency hit ratio
rate(chd_coordination_idempotency_hit_total[5m]) / 
(rate(chd_coordination_idempotency_hit_total[5m]) + rate(chd_coordination_idempotency_miss_total[5m]))

# Top contended resources
topk(10, sum by (resource) (chd_coordination_lock_contention_attempts))

⚡ Advanced Retry Policies

Exponential Backoff with Jitter

Prevents thundering herd problem by adding randomization to retry delays.

using Chd.Coordination.Policies;

// Use default exponential backoff (50 retries, 100ms-10s, with jitter)
var policy = ExponentialBackoffRetryPolicy.Default;

var result = await policy.ExecuteAsync(async ct =>
{
    return await CallUnreliableApiAsync(ct);
});

Custom Exponential Backoff

var policy = new ExponentialBackoffRetryPolicy(
    maxRetryCount: 20,
    initialDelay: TimeSpan.FromMilliseconds(200),
    maxDelay: TimeSpan.FromSeconds(5),
    useJitter: true,
    backoffMultiplier: 2.0);

await policy.ExecuteAsync(async ct =>
{
    await ProcessPaymentAsync(ct);
});

Fluent Builder API

var policy = RetryPolicyBuilder.CreateDefault()
    .WithMaxRetryCount(30)
    .WithInitialDelay(TimeSpan.FromMilliseconds(100))
    .WithMaxDelay(TimeSpan.FromSeconds(15))
    .WithJitter(enabled: true)
    .WithBackoffMultiplier(2.5)
    .Build();

await policy.ExecuteAsync(async ct =>
{
    return await FetchDataAsync(ct);
});

Circuit Breaker Pattern

Prevents cascading failures by opening circuit after consecutive failures.

var policy = RetryPolicyBuilder.CreateDefault()
    .WithMaxRetryCount(10)
    .WithCircuitBreaker(
        failureThreshold: 5,      // Open after 5 consecutive failures
        resetTimeout: TimeSpan.FromMinutes(1))  // Try again after 1 minute
    .BuildWithCircuitBreaker();

try
{
    await policy.ExecuteAsync(async ct =>
    {
        await CallExternalServiceAsync(ct);
    });
}
catch (InvalidOperationException ex) when (ex.Message.Contains("Circuit breaker"))
{
    // Circuit is open - service is down
    Console.WriteLine("Service unavailable, fast-failing");
}

Preset Policies

// Fast: 10 retries, 50ms-1s (low latency scenarios)
var fast = ExponentialBackoffRetryPolicy.Fast;

// Default: 50 retries, 100ms-10s (balanced)
var balanced = ExponentialBackoffRetryPolicy.Default;

// Conservative: 20 retries, 500ms-30s (high contention)
var conservative = ExponentialBackoffRetryPolicy.Conservative;

Monitoring Retry Behavior

var policy = ExponentialBackoffRetryPolicy.Default;

for (int attempt = 0; attempt < 5; attempt++)
{
    var delay = policy.GetDelay(attempt);
    Console.WriteLine($"Attempt {attempt}: {delay.TotalMilliseconds:F0}ms");
}

// Output (approximate with jitter):
// Attempt 0: 95ms
// Attempt 1: 220ms
// Attempt 2: 380ms
// Attempt 3: 850ms
// Attempt 4: 1640ms

🚀 Quick Start

using Chd.Coordination;
using Microsoft.Extensions.DependencyInjection;

// 1. Register services
services.AddCoordination(opt =>
{
    opt.RedisConnectionString = "localhost:6379";
});

// 2. Inject and use
public class OrderService
{
    private readonly ICoordinator _coordinator;

    public OrderService(ICoordinator coordinator)
    {
        _coordinator = coordinator;
    }

    public async Task ProcessOrderAsync(string orderId)
    {
        // Distributed lock
        await _coordinator.Lock.RunAsync(
            $"order:{orderId}",
            TimeSpan.FromSeconds(30),
            async ct =>
            {
                // Only one instance processes this order at a time
                await DoWorkAsync(orderId, ct);
            });
    }
}

🔌 Dependency Injection

Basic Setup with Redis

using Chd.Coordination;
using Microsoft.Extensions.DependencyInjection;

// In your Startup.cs or Program.cs
services.AddCoordination(opt =>
{
    opt.RedisConnectionString = "localhost:6379";
    opt.DatabaseNumber = 0; // Optional: Redis database number
});

ASP.NET Core Minimal API

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddCoordination(opt =>
{
    opt.RedisConnectionString = builder.Configuration
        .GetConnectionString("Redis");
});

var app = builder.Build();

app.MapPost("/orders/{orderId}/process", async (
    string orderId,
    ICoordinator coordinator) =>
{
    await coordinator.Lock.RunAsync(
        $"order:{orderId}",
        TimeSpan.FromSeconds(30),
        async ct =>
        {
            // Process order
            return Results.Ok($"Order {orderId} processed");
        });
});

Setup with InMemory Store (Testing/Development)

services.AddSingleton<IKeyValueStore, InMemoryKeyValueStore>();
services.AddCoordination();

🔑 Distributed Lock Examples

Basic Usage

await _coordinator.Lock.RunAsync(
    key: "order:123",
    ttl: TimeSpan.FromSeconds(30),
    async ct =>
    {
        Console.WriteLine("Processing order 123...");
        await UpdateInventoryAsync("order:123", ct);
    });

Prevent Duplicate Order Processing

public class OrderProcessor
{
    private readonly ICoordinator _coordinator;
    private readonly IOrderRepository _orderRepo;
    private readonly ILogger<OrderProcessor> _logger;

    public async Task<bool> ProcessOrderAsync(
        string orderId,
        CancellationToken ct = default)
    {
        var lockKey = $"order:process:{orderId}";
        
        try
        {
            await _coordinator.Lock.RunAsync(
                lockKey,
                TimeSpan.FromMinutes(5),
                async lockCt =>
                {
                    var order = await _orderRepo.GetAsync(orderId, lockCt);
                    
                    if (order.Status != OrderStatus.Pending)
                    {
                        throw new InvalidOperationException(
                            $"Order {orderId} already processed");
                    }

                    // Process payment
                    await ChargePaymentAsync(order, lockCt);
                    
                    // Update inventory
                    await DecrementStockAsync(order.Items, lockCt);
                    
                    // Update order status
                    order.Status = OrderStatus.Completed;
                    await _orderRepo.UpdateAsync(order, lockCt);
                    
                    _logger.LogInformation(
                        "Order {OrderId} processed successfully", orderId);
                },
                ct);
                
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex,
                "Failed to process order {OrderId}", orderId);
            return false;
        }
    }
}

Lock with Timeout and Cancellation

using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(10));

try
{
    await _coordinator.Lock.RunAsync(
        "resource:critical",
        TimeSpan.FromSeconds(30),
        async ct =>
        {
            await LongRunningOperationAsync(ct);
        },
        cancellationToken: cts.Token);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Operation was cancelled");
}

Scheduled Job with Lock

public class ScheduledJobService : BackgroundService
{
    private readonly ICoordinator _coordinator;
    private readonly ILogger<ScheduledJobService> _logger;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                // Ensure only one instance runs this job
                await _coordinator.Lock.RunAsync(
                    "scheduled:daily-report",
                    TimeSpan.FromMinutes(30),
                    async ct =>
                    {
                        _logger.LogInformation("Generating daily report...");
                        await GenerateDailyReportAsync(ct);
                    },
                    stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to generate daily report");
            }

            await Task.Delay(TimeSpan.FromHours(24), stoppingToken);
        }
    }
}

🔁 Idempotency Examples

Basic Idempotency

await _coordinator.Idempotency.RunAsync(
    key: "payment:order:123",
    ttl: TimeSpan.FromMinutes(10),
    async () =>
    {
        await ProcessPaymentAsync();
    });

Idempotent Payment Processing

public class PaymentService
{
    private readonly ICoordinator _coordinator;
    private readonly IPaymentGateway _paymentGateway;
    private readonly ILogger<PaymentService> _logger;

    public async Task<PaymentResult> ProcessPaymentAsync(
        string orderId,
        decimal amount,
        string customerId)
    {
        var idempotencyKey = $"payment:{orderId}:{customerId}";
        var result = new PaymentResult();
        
        await _coordinator.Idempotency.RunAsync(
            key: idempotencyKey,
            ttl: TimeSpan.FromHours(24),
            async () =>
            {
                _logger.LogInformation(
                    "Processing payment for order {OrderId}", orderId);
                
                var response = await _paymentGateway.ChargeAsync(
                    customerId,
                    amount,
                    $"Order {orderId}");
                
                result.TransactionId = response.TransactionId;
                result.Success = response.IsSuccess;
                result.Message = response.Message;
                
                _logger.LogInformation(
                    "Payment processed: {TransactionId}",
                    response.TransactionId);
            });
        
        return result;
    }
}

Webhook Handler with Idempotency

[ApiController]
[Route("api/webhooks")]
public class WebhookController : ControllerBase
{
    private readonly ICoordinator _coordinator;
    private readonly ILogger<WebhookController> _logger;

    [HttpPost("payment")]
    public async Task<IActionResult> HandlePaymentWebhook(
        [FromBody] PaymentWebhookDto webhook,
        [FromHeader(Name = "X-Webhook-Id")] string webhookId)
    {
        try
        {
            await _coordinator.Idempotency.RunAsync(
                key: $"webhook:payment:{webhookId}",
                ttl: TimeSpan.FromDays(7),
                async () =>
                {
                    _logger.LogInformation(
                        "Processing webhook {WebhookId}", webhookId);
                    await ProcessPaymentWebhookAsync(webhook);
                });
            
            return Ok(new { processed = true });
        }
        catch (Exception ex)
        {
            _logger.LogError(ex,
                "Failed to process webhook {WebhookId}", webhookId);
            return StatusCode(500);
        }
    }
}

Message Queue Handler

public class MessageConsumer
{
    private readonly ICoordinator _coordinator;
    private readonly ILogger<MessageConsumer> _logger;

    public async Task HandleMessageAsync(Message message)
    {
        await _coordinator.Idempotency.RunAsync(
            key: $"message:{message.Id}",
            ttl: TimeSpan.FromHours(1),
            async () =>
            {
                _logger.LogInformation(
                    "Processing message {MessageId}", message.Id);
                await ProcessMessageAsync(message);
            });
    }

    private async Task ProcessMessageAsync(Message message)
    {
        // Your message processing logic
        await Task.Delay(100);
    }
}

API Idempotency Pattern

[HttpPost("orders")]
public async Task<IActionResult> CreateOrder(
    [FromBody] CreateOrderDto dto,
    [FromHeader(Name = "Idempotency-Key")] string idempotencyKey)
{
    if (string.IsNullOrEmpty(idempotencyKey))
    {
        return BadRequest("Idempotency-Key header is required");
    }

    var orderId = string.Empty;
    
    await _coordinator.Idempotency.RunAsync(
        key: $"order:create:{idempotencyKey}",
        ttl: TimeSpan.FromHours(24),
        async () =>
        {
            orderId = await CreateOrderInternalAsync(dto);
        });

    return CreatedAtAction(
        nameof(GetOrder),
        new { id = orderId },
        new { orderId });
}

📜 Saga Examples

Basic Saga Pattern

await _coordinator.Saga.RunAsync("order:123", async saga =>
{
    await saga.Step("reserve-stock", ReserveStockAsync);
    await saga.Step("charge-payment", ChargePaymentAsync);
    await saga.Step("notify-user", NotifyUserAsync);
});

E-Commerce Order Saga

public class OrderSagaCoordinator
{
    private readonly ICoordinator _coordinator;
    private readonly IInventoryService _inventory;
    private readonly IPaymentService _payment;
    private readonly IShippingService _shipping;
    private readonly INotificationService _notification;
    private readonly ILogger<OrderSagaCoordinator> _logger;

    public async Task<OrderResult> CreateOrderAsync(CreateOrderRequest request)
    {
        var orderId = Guid.NewGuid().ToString();
        var sagaId = $"order:saga:{orderId}";
        
        try
        {
            await _coordinator.Saga.RunAsync(sagaId, async saga =>
            {
                // Step 1: Reserve inventory
                await saga.Step("reserve-inventory", async () =>
                {
                    _logger.LogInformation(
                        "Reserving inventory for order {OrderId}", orderId);
                    await _inventory.ReserveAsync(orderId, request.Items);
                });

                // Step 2: Process payment
                await saga.Step("process-payment", async () =>
                {
                    _logger.LogInformation(
                        "Processing payment for order {OrderId}", orderId);
                    await _payment.ChargeAsync(
                        orderId,
                        request.CustomerId,
                        request.TotalAmount);
                });

                // Step 3: Create shipment
                await saga.Step("create-shipment", async () =>
                {
                    _logger.LogInformation(
                        "Creating shipment for order {OrderId}", orderId);
                    await _shipping.CreateShipmentAsync(
                        orderId,
                        request.ShippingAddress);
                });

                // Step 4: Send notifications
                await saga.Step("send-notifications", async () =>
                {
                    _logger.LogInformation(
                        "Sending notifications for order {OrderId}", orderId);
                    await _notification.SendOrderConfirmationAsync(
                        request.CustomerId,
                        orderId);
                });
            });

            return OrderResult.Success(orderId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex,
                "Order saga failed for {OrderId}", orderId);
            await CompensateFailedOrderAsync(orderId);
            return OrderResult.Failed(ex.Message);
        }
    }

    private async Task CompensateFailedOrderAsync(string orderId)
    {
        // Rollback operations
        await _inventory.ReleaseReservationAsync(orderId);
        await _payment.RefundAsync(orderId);
    }
}

Long-Running Data Import Saga

public class DataImportSaga
{
    private readonly ICoordinator _coordinator;

    public async Task ProcessLargeDataImportAsync(
        string importId,
        List<DataBatch> batches)
    {
        var sagaId = $"import:{importId}";
        
        await _coordinator.Saga.RunAsync(sagaId, async saga =>
        {
            await saga.Step("validate-data", async () =>
            {
                await ValidateDataAsync(batches);
            });

            await saga.Step("transform-data", async () =>
            {
                await TransformDataAsync(batches);
            });

            // Process in batches - each is idempotent
            for (int i = 0; i < batches.Count; i++)
            {
                var stepName = $"process-batch-{i}";
                var batchIndex = i; // Capture for closure
                
                await saga.Step(stepName, async () =>
                {
                    await ProcessBatchAsync(batches[batchIndex]);
                    Console.WriteLine(
                        $"Batch {batchIndex + 1}/{batches.Count} completed");
                });
            }

            await saga.Step("finalize-import", async () =>
            {
                await FinalizeImportAsync(importId);
            });
        });
    }
}

Multi-Service Orchestration Saga

public class TravelBookingSaga
{
    private readonly ICoordinator _coordinator;

    public async Task BookTravelAsync(TravelBookingRequest request)
    {
        var bookingId = Guid.NewGuid().ToString();
        
        await _coordinator.Saga.RunAsync($"travel:{bookingId}", async saga =>
        {
            await saga.Step("book-flight", async () =>
            {
                await BookFlightAsync(request.FlightDetails);
            });

            await saga.Step("book-hotel", async () =>
            {
                await BookHotelAsync(request.HotelDetails);
            });

            await saga.Step("book-car-rental", async () =>
            {
                await BookCarRentalAsync(request.CarRentalDetails);
            });

            await saga.Step("process-payment", async () =>
            {
                await ProcessTravelPaymentAsync(bookingId, request.Total);
            });

            await saga.Step("send-confirmation", async () =>
            {
                await SendBookingConfirmationAsync(bookingId, request.Email);
            });
        });
    }
}

🧱 CoordinationContext Usage

Basic Context Creation

// Create a new coordination context
var context = CoordinationContext.Create();
Console.WriteLine($"Correlation ID: {context.CorrelationId}");

// Create context with lock key
var lockedContext = new CoordinationContext(
    context.CorrelationId,
    lockKey: "order:123");

// Create context with saga ID
var sagaContext = new CoordinationContext(
    context.CorrelationId,
    sagaId: "order:123");

Request Tracking with Context

[ApiController]
[Route("api/orders")]
public class OrderController : ControllerBase
{
    private readonly ICoordinator _coordinator;
    private readonly ILogger<OrderController> _logger;

    [HttpPost]
    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderDto dto)
    {
        // Create context with correlation ID from request
        var correlationId = HttpContext.TraceIdentifier;
        var context = new CoordinationContext(correlationId);

        _logger.LogInformation(
            "Creating order with correlation ID: {CorrelationId}",
            context.CorrelationId);

        try
        {
            var orderId = await ProcessOrderWithContextAsync(dto, context);
            return Ok(new
            {
                orderId,
                correlationId = context.CorrelationId
            });
        }
        catch (Exception ex)
        {
            _logger.LogError(ex,
                "Failed to create order. CorrelationId: {CorrelationId}",
                context.CorrelationId);
            return StatusCode(500);
        }
    }

    private async Task<string> ProcessOrderWithContextAsync(
        CreateOrderDto dto,
        CoordinationContext context)
    {
        var orderId = Guid.NewGuid().ToString();
        
        var lockContext = new CoordinationContext(
            context.CorrelationId,
            lockKey: $"order:create:{dto.CustomerId}");

        await _coordinator.Lock.RunAsync(
            lockContext.LockKey,
            TimeSpan.FromSeconds(30),
            async ct =>
            {
                _logger.LogInformation(
                    "Processing order {OrderId} - CorrelationId: {CorrelationId}",
                    orderId,
                    context.CorrelationId);

                await SaveOrderAsync(orderId, dto, ct);
            });

        return orderId;
    }
}

⚙️ Key-Value Stores

1. RedisKeyValueStore (Production)

services.AddCoordination(opt =>
{
    opt.RedisConnectionString = "localhost:6379";
    opt.DatabaseNumber = 0;
});

Features:

  • Production-ready Redis implementation
  • Lua script-based atomic operations
  • Safe lock acquire/release with SET NX PX
  • All operations with TTL support

2. InMemoryKeyValueStore (Testing/Development)

services.AddSingleton<IKeyValueStore, InMemoryKeyValueStore>();
services.AddCoordination();

Features:

  • Fast, lightweight, no external dependencies
  • Perfect for unit testing and development
  • Thread-safe with ConcurrentDictionary
  • Automatic TTL expiration

3. CompositeKeyValueStore (Advanced)

var primary = new RedisKeyValueStore(redisDb);
var composite = new CompositeKeyValueStore(primary);
services.AddSingleton<IKeyValueStore>(composite);

Features:

  • Delegates to underlying store
  • Enables multi-backend scenarios
  • Consistent interface across all operations

4. Custom Stores (Extensible)

public class MyCustomStore : IKeyValueStore
{
    private readonly IMyDatabase _database;

    public MyCustomStore(IMyDatabase database)
    {
        _database = database;
    }

    public async Task<bool> TryAcquireAsync(
        string key,
        string value,
        TimeSpan ttl,
        CancellationToken ct = default)
    {
        return await _database.SetIfNotExistsAsync(key, value, ttl, ct);
    }

    public async Task<bool> ReleaseAsync(
        string key,
        string value,
        CancellationToken ct = default)
    {
        return await CompareAndDeleteAsync(key, value, ct);
    }

    public async Task<bool> ExistsAsync(
        string key,
        CancellationToken ct = default)
    {
        return await _database.KeyExistsAsync(key, ct);
    }

    public async Task SetAsync(
        string key,
        string value,
        TimeSpan ttl,
        CancellationToken ct = default)
    {
        await _database.SetAsync(key, value, ttl, ct);
    }

    public async Task<bool> TrySetAsync(
        string key,
        string value,
        TimeSpan ttl,
        CancellationToken ct = default)
    {
        return await _database.SetIfNotExistsAsync(key, value, ttl, ct);
    }

    public async Task<bool> CompareAndDeleteAsync(
        string key,
        string expectedValue,
        CancellationToken ct = default)
    {
        return await _database.DeleteIfMatchAsync(key, expectedValue, ct);
    }

    public async Task<string?> GetAsync(
        string key,
        CancellationToken ct = default)
    {
        return await _database.GetAsync(key, ct);
    }
}

// Register in DI
services.AddSingleton<IKeyValueStore, MyCustomStore>();
services.AddCoordination();

⚡ Advanced Usage

Combining Multiple Coordination Primitives

public class ComplexOrderProcessor
{
    private readonly ICoordinator _coordinator;

    public async Task ProcessComplexOrderAsync(string orderId, string customerId)
    {
        // Use lock to prevent concurrent processing
        await _coordinator.Lock.RunAsync(
            $"order:lock:{orderId}",
            TimeSpan.FromMinutes(5),
            async lockCt =>
            {
                // Use idempotency for payment
                await _coordinator.Idempotency.RunAsync(
                    $"payment:{orderId}",
                    TimeSpan.FromHours(24),
                    async () =>
                    {
                        await ProcessPaymentAsync(orderId);
                    });

                // Use saga for multi-step process
                await _coordinator.Saga.RunAsync(
                    $"order:saga:{orderId}",
                    async saga =>
                    {
                        await saga.Step("allocate-inventory",
                            () => AllocateInventoryAsync(orderId));
                        
                        await saga.Step("generate-invoice",
                            () => GenerateInvoiceAsync(orderId));
                        
                        await saga.Step("schedule-shipping",
                            () => ScheduleShippingAsync(orderId));
                    });
            });
    }
}

Error Handling and Retry Logic

public class ResilientProcessor
{
    private readonly ICoordinator _coordinator;
    private readonly ILogger<ResilientProcessor> _logger;

    public async Task<bool> ProcessWithRetryAsync(
        string resourceId,
        int maxRetries = 3)
    {
        int retryCount = 0;

        while (retryCount < maxRetries)
        {
            try
            {
                await _coordinator.Lock.RunAsync(
                    $"resource:{resourceId}",
                    TimeSpan.FromSeconds(30),
                    async ct =>
                    {
                        await ProcessResourceAsync(resourceId, ct);
                    });
                
                return true;
            }
            catch (Exception ex)
            {
                retryCount++;
                _logger.LogWarning(ex,
                    "Attempt {RetryCount}/{MaxRetries} failed",
                    retryCount, maxRetries);
                
                if (retryCount >= maxRetries)
                {
                    _logger.LogError(
                        "All retry attempts exhausted for {ResourceId}",
                        resourceId);
                    throw;
                }
                
                // Exponential backoff
                var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
                await Task.Delay(delay);
            }
        }

        return false;
    }
}

Parallel Processing with Coordination

public class BatchProcessor
{
    private readonly ICoordinator _coordinator;

    public async Task ProcessBatchInParallelAsync(List<string> items)
    {
        var tasks = items.Select(item => ProcessItemAsync(item));
        await Task.WhenAll(tasks);
    }

    private async Task ProcessItemAsync(string item)
    {
        // Each item gets its own lock
        await _coordinator.Lock.RunAsync(
            $"item:{item}",
            TimeSpan.FromMinutes(1),
            async ct =>
            {
                await ProcessSingleItemAsync(item, ct);
            });
    }
}

🧪 Unit Testing

Testing with InMemoryKeyValueStore

public class OrderServiceTests
{
    private readonly ServiceProvider _serviceProvider;
    private readonly ICoordinator _coordinator;
    private readonly OrderService _orderService;

    public OrderServiceTests()
    {
        var services = new ServiceCollection();
        
        // Use in-memory store for testing
        services.AddSingleton<IKeyValueStore, InMemoryKeyValueStore>();
        services.AddCoordination();
        services.AddLogging();
        
        // Add your services
        services.AddTransient<OrderService>();
        
        _serviceProvider = services.BuildServiceProvider();
        _coordinator = _serviceProvider.GetRequiredService<ICoordinator>();
        _orderService = _serviceProvider.GetRequiredService<OrderService>();
    }

    [Fact]
    public async Task ProcessOrder_WithLock_PreventsConcurrentExecution()
    {
        var orderId = "test-order-123";
        var executionCount = 0;

        // Try to execute same order twice concurrently
        var task1 = _coordinator.Lock.RunAsync(
            $"order:{orderId}",
            TimeSpan.FromSeconds(2),
            async ct =>
            {
                Interlocked.Increment(ref executionCount);
                await Task.Delay(1000, ct);
            });

        var task2 = _coordinator.Lock.RunAsync(
            $"order:{orderId}",
            TimeSpan.FromSeconds(2),
            async ct =>
            {
                Interlocked.Increment(ref executionCount);
                await Task.Delay(1000, ct);
            });

        await Task.WhenAll(task1, task2);

        // Only one should have executed
        Assert.Equal(1, executionCount);
    }

    [Fact]
    public async Task Idempotency_PreventsDuplicateExecution()
    {
        var paymentId = "payment-123";
        var executionCount = 0;

        // Execute twice with same key
        await _coordinator.Idempotency.RunAsync(
            $"payment:{paymentId}",
            TimeSpan.FromMinutes(1),
            async () =>
            {
                Interlocked.Increment(ref executionCount);
                await Task.CompletedTask;
            });

        await _coordinator.Idempotency.RunAsync(
            $"payment:{paymentId}",
            TimeSpan.FromMinutes(1),
            async () =>
            {
                Interlocked.Increment(ref executionCount);
                await Task.CompletedTask;
            });

        // Should only execute once
        Assert.Equal(1, executionCount);
    }

    [Fact]
    public async Task Saga_ExecutesAllSteps_InOrder()
    {
        var steps = new List<string>();
        var sagaId = "saga-123";

        await _coordinator.Saga.RunAsync(sagaId, async saga =>
        {
            await saga.Step("step1", async () =>
            {
                steps.Add("step1");
                await Task.CompletedTask;
            });

            await saga.Step("step2", async () =>
            {
                steps.Add("step2");
                await Task.CompletedTask;
            });

            await saga.Step("step3", async () =>
            {
                steps.Add("step3");
                await Task.CompletedTask;
            });
        });

        Assert.Equal(new[] { "step1", "step2", "step3" }, steps);
    }

    public void Dispose()
    {
        _serviceProvider?.Dispose();
    }
}

🧠 Design Principles

  • Composable: Primitives can work independently
  • DI-first: Integrates cleanly with IServiceCollection
  • Testable: Unit & integration tests ready
  • Framework-agnostic: .NET Standard 2.0 → works anywhere
  • Fail-safe: TTL-based auto-release prevents deadlocks
  • Observable: Rich logging and tracing support

🔐 Security and Reliability

  • Lock ownership: Safe release with owner validation
  • Idempotency: Retry-safe execution
  • Saga state persistence: Crash recovery via Redis
  • TTL protection: Auto-expiration prevents resource leaks
  • Atomic operations: Lua scripts ensure consistency
  • Cancellation support: Graceful shutdown handling

📦 NuGet Package Metadata

  • Package ID: Chd.Coordination
  • Version: 2.0.4
  • Authors: Mehmet Yoldaş
  • License: MIT
  • Target Framework: .NET Standard 2.0
  • Description: Distributed lock, idempotency, and saga coordination for .NET Standard 2.0 projects

🔮 Changelog

v2.0.4 - Latest Release

✅ Completed Features
  • Enhanced IKeyValueStore Interface:

    • TryAcquireAsync() - Acquire distributed locks with TTL
    • ReleaseAsync() - Safely release locks with owner validation
    • ExistsAsync() - Check key existence with TTL awareness
    • SetAsync() - Set key-value pairs with TTL
    • TrySetAsync() - Conditional set (SET NX)
    • GetAsync() - Retrieve values
    • CompareAndDeleteAsync() - Atomic compare-and-delete operations
  • Production-Ready InMemoryKeyValueStore:

    • Fully implemented with ConcurrentDictionary
    • TTL support with automatic expiration
    • Thread-safe operations
    • Perfect for testing and lightweight scenarios
  • Complete RedisKeyValueStore Implementation:

    • All interface methods implemented
    • Lua script-based atomic operations
    • Safe lock acquire/release with SET NX PX
    • CancellationToken support throughout
  • CompositeKeyValueStore:

    • Delegate pattern for flexible store composition
    • Consistent cancellation token support
    • Enables multi-backend scenarios
🚧 In Progress
  • Fencing tokens / linearizable lock: Infrastructure ready. Production hardening in progress.
  • Retry policies: Basic implementation available. Advanced backoff strategies coming soon.
  • OpenTelemetry / metrics: Interface defined. Full telemetry integration planned.
  • ZooKeeper / etcd backends: Stubs created. Full implementations planned for future releases.
🎯 Key Improvements
  • Unified Interface: All key-value stores implement complete IKeyValueStore interface
  • Better Testing Support: InMemoryKeyValueStore enables easy unit testing
  • Cancellation Support: All async operations support CancellationToken
  • Type Safety: Nullable reference types enabled
  • Production Ready: All core components fully implemented and tested

🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

📄 License

This project is licensed under the MIT License.


📋 Changelog

Version 2.0.6 (2025-01-XX)

📝 Documentation & Code Quality Update

✨ Improvements
  • Complete XML Documentation: Added comprehensive XML documentation for all public interfaces, classes, and methods
    • 80+ types documented with detailed descriptions
    • All public APIs now have IntelliSense support
    • Parameter descriptions and return value documentation
    • Usage examples in XML comments
📚 Documented Components
  • Interfaces: ICoordinationMetrics, ICoordinator, IIdempotency, IKeyValueStore, IDistributedLock, IFencingLockCoordinator, ISagaCoordinator, and more
  • Classes: LockHandle, CoordinationContext, FencingLockCoordinator, RetryPolicy, and all store implementations
  • Stores: RedisKeyValueStore, InMemoryKeyValueStore, EtcdKeyValueStore, ZooKeeperKeyValueStore, CompositeKeyValueStore
🔧 Developer Experience
  • Enhanced IntelliSense tooltips in Visual Studio and VS Code
  • Better API discoverability
  • Clearer parameter requirements and return values
  • Improved code navigation
⚠️ Breaking Changes

None - This is a documentation-only update!

🔄 Migration Guide

Simply update your package reference:

dotnet add package Chd.Coordination --version 2.0.6

No code changes required. Fully backward compatible with 2.0.5.


Version 2.0.5 (Previous)

  • Redis-based distributed coordination
  • Fencing token support
  • Idempotency patterns
  • Saga orchestration
  • Multiple store implementations


✍️ Authors

Made with ❤️ by Mehmet Yoldaş

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.1.0 141 3/3/2026
2.0.6 112 3/2/2026
2.0.5 113 2/27/2026
2.0.4 118 2/27/2026
2.0.1 117 2/21/2026
2.0.0 111 2/19/2026

v2.1.0 Release - Observability & Resilience! 📊⚡

🎉 NEW FEATURES:
• OpenTelemetry/Prometheus metrics integration (16 metrics)
• ExponentialBackoffRetryPolicy with jitter (prevents thundering herd)
• CircuitBreakerPolicy for cascading failure prevention
• RetryPolicyBuilder with fluent API
• CompositeRetryPolicy combining retry + circuit breaker
• AddCoordinationMetrics() extension method

📊 METRICS:
• Lock: acquired, failed, released, timeout, duration, contention
• Idempotency: hits, misses, executed, failed, duration
• Saga: started, completed, failed, compensated, step duration

⚡ RETRY POLICIES:
• Exponential backoff: delay * 2^attempt with cap
• Jitter: ±25% randomization prevents thundering herd
• Circuit breaker: 3 states (Closed/Open/HalfOpen)
• Presets: Fast (10r, 50ms-1s), Default (50r, 100ms-10s), Conservative (20r, 500ms-30s)
• Fluent builder API for custom policies

✅ IMPROVEMENTS:
• Production-ready observability
• Resilient retry mechanisms
• Thread-safe circuit breaker
• Full documentation + Grafana queries + 11 unit tests

📦 USAGE:
services.AddCoordinationMetrics();
var policy = RetryPolicyBuilder.CreateDefault()
.WithCircuitBreaker(5, TimeSpan.FromMinutes(1))
.BuildWithCircuitBreaker();

See full docs: https://github.com/mehmet-yoldas/library-core