SourceFlow.Stores.EntityFramework 2.0.0

dotnet add package SourceFlow.Stores.EntityFramework --version 2.0.0
                    
NuGet\Install-Package SourceFlow.Stores.EntityFramework -Version 2.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SourceFlow.Stores.EntityFramework" Version="2.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="SourceFlow.Stores.EntityFramework" Version="2.0.0" />
                    
Directory.Packages.props
<PackageReference Include="SourceFlow.Stores.EntityFramework" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add SourceFlow.Stores.EntityFramework --version 2.0.0
                    
#r "nuget: SourceFlow.Stores.EntityFramework, 2.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package SourceFlow.Stores.EntityFramework@2.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=SourceFlow.Stores.EntityFramework&version=2.0.0
                    
Install as a Cake Addin
#tool nuget:?package=SourceFlow.Stores.EntityFramework&version=2.0.0
                    
Install as a Cake Tool

SourceFlow.Stores.EntityFramework

Entity Framework Core persistence provider for SourceFlow.Net with support for SQL Server, configurable connection strings per store type, and cloud message idempotency for distributed deployments.

Features

  • Complete Store Implementations: ICommandStore, IEntityStore, and IViewModelStore
  • Cloud Idempotency: SQL-backed duplicate message detection with EfIdempotencyService, IdempotencyDbContext, and automatic cleanup via IdempotencyCleanupService — essential for multi-instance cloud deployments
  • Flexible Configuration: Separate or shared connection strings per store type
  • SQL Server Support: Built-in SQL Server database provider with support for PostgreSQL, MySQL, and SQLite via custom providers
  • Resilience Policies: Polly-based retry and circuit breaker patterns for database operations
  • Observability: OpenTelemetry instrumentation for EF Core queries and store operations
  • Multi-Framework Support: .NET 8.0, .NET 9.0, .NET 10.0

Installation

# Install the core package
dotnet add package SourceFlow.Net

# Install the Entity Framework provider
dotnet add package SourceFlow.Stores.EntityFramework

Quick Start

1. Configure Connection Strings

Add connection strings to your appsettings.json:

{
  "ConnectionStrings": {
    "CommandStore": "Server=localhost;Database=SourceFlowCommands;Trusted_Connection=True;",
    "EntityStore": "Server=localhost;Database=SourceFlowEntities;Trusted_Connection=True;",
    "ViewModelStore": "Server=localhost;Database=SourceFlowViews;Trusted_Connection=True;"
  }
}

Or use a single shared connection string:

{
  "ConnectionStrings": {
    "DefaultConnection": "Server=localhost;Database=SourceFlow;Trusted_Connection=True;"
  }
}

2. Register Services

services.AddSourceFlowStores(configuration, options =>
{
    // Use separate databases for each store
    options.UseCommandStore("CommandStore");
    options.UseEntityStore("EntityStore");
    options.UseViewModelStore("ViewModelStore");

    // Or use a single shared database
    // options.UseSharedConnectionString("DefaultConnection");
});

3. Apply Migrations

The provider automatically creates the necessary database schema when you run your application. For production scenarios, generate and apply migrations:

dotnet ef migrations add InitialCreate --context CommandStoreContext
dotnet ef database update --context CommandStoreContext

Configuration Options

Separate Databases

Configure different databases for commands, entities, and view models:

services.AddSourceFlowStores(configuration, options =>
{
    options.UseCommandStore("CommandStoreConnection");
    options.UseEntityStore("EntityStoreConnection");
    options.UseViewModelStore("ViewModelStoreConnection");
});

Shared Database

Use a single database for all stores:

services.AddSourceFlowStores(configuration, options =>
{
    options.UseSharedConnectionString("DefaultConnection");
});

Custom DbContext Options

Apply additional EF Core configuration:

services.AddSourceFlowStores(configuration, options =>
{
    options.UseCommandStore("CommandStore", dbOptions =>
    {
        dbOptions.EnableSensitiveDataLogging();
        dbOptions.EnableDetailedErrors();
    });
});

Resilience

The provider includes built-in Polly resilience policies for:

  • Transient error retry with exponential backoff
  • Circuit breaker for database failures
  • Automatic reconnection handling

Idempotency Service

The Entity Framework provider includes EfIdempotencyService, a SQL-based implementation of IIdempotencyService designed for multi-instance deployments where in-memory idempotency tracking is insufficient.

Features

  • Thread-Safe Duplicate Detection: Uses database transactions to ensure consistency across multiple application instances
  • Automatic Expiration: Records expire based on configurable TTL (Time To Live)
  • Background Cleanup: Automatic periodic cleanup of expired records
  • Statistics: Track total checks, duplicates detected, and cache size
  • Database Agnostic: Support for SQL Server, PostgreSQL, MySQL, SQLite, and other EF Core providers

Configuration

SQL Server (Default)

Register the idempotency service with automatic cleanup:

services.AddSourceFlowIdempotency(
    connectionString: configuration.GetConnectionString("IdempotencyStore"),
    cleanupIntervalMinutes: 60); // Optional, defaults to 60 minutes
Custom Database Provider

Use PostgreSQL, MySQL, SQLite, or any other EF Core provider:

// PostgreSQL
services.AddSourceFlowIdempotencyWithCustomProvider(
    configureContext: options => options.UseNpgsql(connectionString),
    cleanupIntervalMinutes: 60);

// MySQL
services.AddSourceFlowIdempotencyWithCustomProvider(
    configureContext: options => options.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString)),
    cleanupIntervalMinutes: 60);

// SQLite
services.AddSourceFlowIdempotencyWithCustomProvider(
    configureContext: options => options.UseSqlite(connectionString),
    cleanupIntervalMinutes: 60);
Manual Registration (Advanced)

For more control over the registration:

services.AddDbContext<IdempotencyDbContext>(options =>
    options.UseSqlServer(configuration.GetConnectionString("IdempotencyStore")));

services.AddScoped<IIdempotencyService, EfIdempotencyService>();

// Optional: Register background cleanup service
services.AddHostedService<IdempotencyCleanupService>(provider =>
    new IdempotencyCleanupService(provider, TimeSpan.FromMinutes(60)));

Database Schema

The service uses a single table with the following structure:

CREATE TABLE IdempotencyRecords (
    IdempotencyKey NVARCHAR(500) PRIMARY KEY,
    ProcessedAt DATETIME2 NOT NULL,
    ExpiresAt DATETIME2 NOT NULL
);

CREATE INDEX IX_IdempotencyRecords_ExpiresAt ON IdempotencyRecords(ExpiresAt);

The schema is automatically created when you run migrations or when the application starts (if auto-migration is enabled).

Usage

The service is automatically used by cloud dispatchers when registered:

// Check if message was already processed
if (await idempotencyService.HasProcessedAsync(messageId))
{
    // Skip duplicate message
    return;
}

// Process message...

// Mark as processed with 24-hour TTL
await idempotencyService.MarkAsProcessedAsync(messageId, TimeSpan.FromHours(24));

Cleanup

The AddSourceFlowIdempotency and AddSourceFlowIdempotencyWithCustomProvider methods automatically register a background service (IdempotencyCleanupService) that periodically cleans up expired records.

Default Behavior:

  • Cleanup runs every 60 minutes (configurable)
  • Processes up to 1000 expired records per batch
  • Runs as a hosted background service

Custom Cleanup Interval:

services.AddSourceFlowIdempotency(
    connectionString: configuration.GetConnectionString("IdempotencyStore"),
    cleanupIntervalMinutes: 30); // Run cleanup every 30 minutes

Manual Cleanup (Advanced):

If you need to trigger cleanup manually or implement custom cleanup logic:

public class CustomCleanupJob : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            using var scope = _serviceProvider.CreateScope();
            var service = scope.ServiceProvider.GetRequiredService<EfIdempotencyService>();
            
            await service.CleanupExpiredRecordsAsync(stoppingToken);
            
            await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
        }
    }
}

When to Use

  • Multi-Instance Deployments: When running multiple application instances that process the same message queues
  • Distributed Systems: When messages can be delivered more than once (at-least-once delivery)
  • Cloud Messaging: When using AWS SQS or other cloud message queues

For single-instance deployments, consider using InMemoryIdempotencyService from the core framework for better performance.

End-to-End Cloud Integration

Here's a complete example showing how EF idempotency integrates with AWS cloud messaging:

public void ConfigureServices(IServiceCollection services, IConfiguration configuration)
{
    // 1. Register SourceFlow core
    services.UseSourceFlow(Assembly.GetExecutingAssembly());

    // 2. Register EF persistence stores
    services.AddSourceFlowStores(configuration, options =>
    {
        options.UseCommandStore("CommandStore");
        options.UseEntityStore("EntityStore");
        options.UseViewModelStore("ViewModelStore");
    });

    // 3. Register SQL-backed idempotency (replaces in-memory default)
    services.AddSourceFlowIdempotency(
        connectionString: configuration.GetConnectionString("IdempotencyStore"),
        cleanupIntervalMinutes: 60);

    // 4. Configure AWS cloud messaging
    services.UseSourceFlowAws(
        options =>
        {
            options.Region = RegionEndpoint.USEast1;
            options.EnableEncryption = true;
            options.KmsKeyId = "alias/sourceflow-key";
        },
        bus => bus
            .Send
                .Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))
            .Raise
                .Event<OrderCreatedEvent>(t => t.Topic("order-events"))
            .Listen.To
                .CommandQueue("orders.fifo")
            .Subscribe.To
                .Topic("order-events"));
}

How it works end-to-end:

  1. Command dispatched to SQS queue (orders.fifo)
  2. Listener receives message from SQS
  3. Idempotency checkEfIdempotencyService.HasProcessedAsync(messageId) queries the SQL database to detect duplicates across all application instances
  4. Command processed by saga, entity persisted via IEntityStore, events raised
  5. Message marked processedEfIdempotencyService.MarkAsProcessedAsync(messageId, ttl) records the message ID with expiration
  6. Background cleanupIdempotencyCleanupService periodically removes expired records

This ensures exactly-once processing semantics even with SQS at-least-once delivery and multiple consumer instances.

Documentation

Support

License

This project is licensed under the MIT License.


Package Version: 2.0.0 | Last Updated: 2026-03-15

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.0.0 45 3/15/2026
1.0.0 2,940 11/29/2025

v2.0.0 - Aligned with SourceFlow.Net 2.0.0 core framework.
     - Updated to depend on SourceFlow.Net 2.0.0 with consolidated cloud abstractions.
     - Entity Framework Core 9.0 persistence: CommandStore, EntityStore, and ViewModelStore.
     - Cloud idempotency: EF-backed IdempotencyService with duplicate detection, IdempotencyDbContext, and automatic cleanup via IdempotencyCleanupService.
     - Configurable connection strings per store type with SQL Server support.
     - Polly-based resilience: retry and circuit breaker policies for database operations.
     - OpenTelemetry instrumentation for EF Core queries and store operations.
     - Multi-target: .NET 8.0, 9.0, and 10.0.