SourceFlow.Stores.EntityFramework
2.0.0
dotnet add package SourceFlow.Stores.EntityFramework --version 2.0.0
NuGet\Install-Package SourceFlow.Stores.EntityFramework -Version 2.0.0
<PackageReference Include="SourceFlow.Stores.EntityFramework" Version="2.0.0" />
<PackageVersion Include="SourceFlow.Stores.EntityFramework" Version="2.0.0" />
<PackageReference Include="SourceFlow.Stores.EntityFramework" />
paket add SourceFlow.Stores.EntityFramework --version 2.0.0
#r "nuget: SourceFlow.Stores.EntityFramework, 2.0.0"
#:package SourceFlow.Stores.EntityFramework@2.0.0
#addin nuget:?package=SourceFlow.Stores.EntityFramework&version=2.0.0
#tool nuget:?package=SourceFlow.Stores.EntityFramework&version=2.0.0
SourceFlow.Stores.EntityFramework
Entity Framework Core persistence provider for SourceFlow.Net with support for SQL Server, configurable connection strings per store type, and cloud message idempotency for distributed deployments.
Features
- Complete Store Implementations: ICommandStore, IEntityStore, and IViewModelStore
- Cloud Idempotency: SQL-backed duplicate message detection with
EfIdempotencyService,IdempotencyDbContext, and automatic cleanup viaIdempotencyCleanupService— essential for multi-instance cloud deployments - Flexible Configuration: Separate or shared connection strings per store type
- SQL Server Support: Built-in SQL Server database provider with support for PostgreSQL, MySQL, and SQLite via custom providers
- Resilience Policies: Polly-based retry and circuit breaker patterns for database operations
- Observability: OpenTelemetry instrumentation for EF Core queries and store operations
- Multi-Framework Support: .NET 8.0, .NET 9.0, .NET 10.0
Installation
# Install the core package
dotnet add package SourceFlow.Net
# Install the Entity Framework provider
dotnet add package SourceFlow.Stores.EntityFramework
Quick Start
1. Configure Connection Strings
Add connection strings to your appsettings.json:
{
"ConnectionStrings": {
"CommandStore": "Server=localhost;Database=SourceFlowCommands;Trusted_Connection=True;",
"EntityStore": "Server=localhost;Database=SourceFlowEntities;Trusted_Connection=True;",
"ViewModelStore": "Server=localhost;Database=SourceFlowViews;Trusted_Connection=True;"
}
}
Or use a single shared connection string:
{
"ConnectionStrings": {
"DefaultConnection": "Server=localhost;Database=SourceFlow;Trusted_Connection=True;"
}
}
2. Register Services
services.AddSourceFlowStores(configuration, options =>
{
// Use separate databases for each store
options.UseCommandStore("CommandStore");
options.UseEntityStore("EntityStore");
options.UseViewModelStore("ViewModelStore");
// Or use a single shared database
// options.UseSharedConnectionString("DefaultConnection");
});
3. Apply Migrations
The provider automatically creates the necessary database schema when you run your application. For production scenarios, generate and apply migrations:
dotnet ef migrations add InitialCreate --context CommandStoreContext
dotnet ef database update --context CommandStoreContext
Configuration Options
Separate Databases
Configure different databases for commands, entities, and view models:
services.AddSourceFlowStores(configuration, options =>
{
options.UseCommandStore("CommandStoreConnection");
options.UseEntityStore("EntityStoreConnection");
options.UseViewModelStore("ViewModelStoreConnection");
});
Shared Database
Use a single database for all stores:
services.AddSourceFlowStores(configuration, options =>
{
options.UseSharedConnectionString("DefaultConnection");
});
Custom DbContext Options
Apply additional EF Core configuration:
services.AddSourceFlowStores(configuration, options =>
{
options.UseCommandStore("CommandStore", dbOptions =>
{
dbOptions.EnableSensitiveDataLogging();
dbOptions.EnableDetailedErrors();
});
});
Resilience
The provider includes built-in Polly resilience policies for:
- Transient error retry with exponential backoff
- Circuit breaker for database failures
- Automatic reconnection handling
Idempotency Service
The Entity Framework provider includes EfIdempotencyService, a SQL-based implementation of IIdempotencyService designed for multi-instance deployments where in-memory idempotency tracking is insufficient.
Features
- Thread-Safe Duplicate Detection: Uses database transactions to ensure consistency across multiple application instances
- Automatic Expiration: Records expire based on configurable TTL (Time To Live)
- Background Cleanup: Automatic periodic cleanup of expired records
- Statistics: Track total checks, duplicates detected, and cache size
- Database Agnostic: Support for SQL Server, PostgreSQL, MySQL, SQLite, and other EF Core providers
Configuration
SQL Server (Default)
Register the idempotency service with automatic cleanup:
services.AddSourceFlowIdempotency(
connectionString: configuration.GetConnectionString("IdempotencyStore"),
cleanupIntervalMinutes: 60); // Optional, defaults to 60 minutes
Custom Database Provider
Use PostgreSQL, MySQL, SQLite, or any other EF Core provider:
// PostgreSQL
services.AddSourceFlowIdempotencyWithCustomProvider(
configureContext: options => options.UseNpgsql(connectionString),
cleanupIntervalMinutes: 60);
// MySQL
services.AddSourceFlowIdempotencyWithCustomProvider(
configureContext: options => options.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString)),
cleanupIntervalMinutes: 60);
// SQLite
services.AddSourceFlowIdempotencyWithCustomProvider(
configureContext: options => options.UseSqlite(connectionString),
cleanupIntervalMinutes: 60);
Manual Registration (Advanced)
For more control over the registration:
services.AddDbContext<IdempotencyDbContext>(options =>
options.UseSqlServer(configuration.GetConnectionString("IdempotencyStore")));
services.AddScoped<IIdempotencyService, EfIdempotencyService>();
// Optional: Register background cleanup service
services.AddHostedService<IdempotencyCleanupService>(provider =>
new IdempotencyCleanupService(provider, TimeSpan.FromMinutes(60)));
Database Schema
The service uses a single table with the following structure:
CREATE TABLE IdempotencyRecords (
IdempotencyKey NVARCHAR(500) PRIMARY KEY,
ProcessedAt DATETIME2 NOT NULL,
ExpiresAt DATETIME2 NOT NULL
);
CREATE INDEX IX_IdempotencyRecords_ExpiresAt ON IdempotencyRecords(ExpiresAt);
The schema is automatically created when you run migrations or when the application starts (if auto-migration is enabled).
Usage
The service is automatically used by cloud dispatchers when registered:
// Check if message was already processed
if (await idempotencyService.HasProcessedAsync(messageId))
{
// Skip duplicate message
return;
}
// Process message...
// Mark as processed with 24-hour TTL
await idempotencyService.MarkAsProcessedAsync(messageId, TimeSpan.FromHours(24));
Cleanup
The AddSourceFlowIdempotency and AddSourceFlowIdempotencyWithCustomProvider methods automatically register a background service (IdempotencyCleanupService) that periodically cleans up expired records.
Default Behavior:
- Cleanup runs every 60 minutes (configurable)
- Processes up to 1000 expired records per batch
- Runs as a hosted background service
Custom Cleanup Interval:
services.AddSourceFlowIdempotency(
connectionString: configuration.GetConnectionString("IdempotencyStore"),
cleanupIntervalMinutes: 30); // Run cleanup every 30 minutes
Manual Cleanup (Advanced):
If you need to trigger cleanup manually or implement custom cleanup logic:
public class CustomCleanupJob : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
using var scope = _serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetRequiredService<EfIdempotencyService>();
await service.CleanupExpiredRecordsAsync(stoppingToken);
await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
}
}
}
When to Use
- Multi-Instance Deployments: When running multiple application instances that process the same message queues
- Distributed Systems: When messages can be delivered more than once (at-least-once delivery)
- Cloud Messaging: When using AWS SQS or other cloud message queues
For single-instance deployments, consider using InMemoryIdempotencyService from the core framework for better performance.
End-to-End Cloud Integration
Here's a complete example showing how EF idempotency integrates with AWS cloud messaging:
public void ConfigureServices(IServiceCollection services, IConfiguration configuration)
{
// 1. Register SourceFlow core
services.UseSourceFlow(Assembly.GetExecutingAssembly());
// 2. Register EF persistence stores
services.AddSourceFlowStores(configuration, options =>
{
options.UseCommandStore("CommandStore");
options.UseEntityStore("EntityStore");
options.UseViewModelStore("ViewModelStore");
});
// 3. Register SQL-backed idempotency (replaces in-memory default)
services.AddSourceFlowIdempotency(
connectionString: configuration.GetConnectionString("IdempotencyStore"),
cleanupIntervalMinutes: 60);
// 4. Configure AWS cloud messaging
services.UseSourceFlowAws(
options =>
{
options.Region = RegionEndpoint.USEast1;
options.EnableEncryption = true;
options.KmsKeyId = "alias/sourceflow-key";
},
bus => bus
.Send
.Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))
.Raise
.Event<OrderCreatedEvent>(t => t.Topic("order-events"))
.Listen.To
.CommandQueue("orders.fifo")
.Subscribe.To
.Topic("order-events"));
}
How it works end-to-end:
- Command dispatched to SQS queue (
orders.fifo) - Listener receives message from SQS
- Idempotency check —
EfIdempotencyService.HasProcessedAsync(messageId)queries the SQL database to detect duplicates across all application instances - Command processed by saga, entity persisted via
IEntityStore, events raised - Message marked processed —
EfIdempotencyService.MarkAsProcessedAsync(messageId, ttl)records the message ID with expiration - Background cleanup —
IdempotencyCleanupServiceperiodically removes expired records
This ensures exactly-once processing semantics even with SQS at-least-once delivery and multiple consumer instances.
Documentation
Support
- Issues: GitHub Issues
- Discussions: GitHub Discussions
License
This project is licensed under the MIT License.
Package Version: 2.0.0 | Last Updated: 2026-03-15
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.EntityFrameworkCore (>= 9.0.0)
- Microsoft.EntityFrameworkCore.Relational (>= 9.0.0)
- Microsoft.EntityFrameworkCore.SqlServer (>= 9.0.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- OpenTelemetry (>= 1.14.0)
- OpenTelemetry.Api (>= 1.14.0)
- OpenTelemetry.Exporter.Console (>= 1.14.0)
- OpenTelemetry.Extensions.Hosting (>= 1.14.0)
- OpenTelemetry.Instrumentation.EntityFrameworkCore (>= 1.0.0-beta.12)
- Polly (>= 8.4.2)
- SourceFlow.Net (>= 2.0.0)
-
net8.0
- Microsoft.EntityFrameworkCore (>= 9.0.0)
- Microsoft.EntityFrameworkCore.Relational (>= 9.0.0)
- Microsoft.EntityFrameworkCore.SqlServer (>= 9.0.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- OpenTelemetry (>= 1.14.0)
- OpenTelemetry.Api (>= 1.14.0)
- OpenTelemetry.Exporter.Console (>= 1.14.0)
- OpenTelemetry.Extensions.Hosting (>= 1.14.0)
- OpenTelemetry.Instrumentation.EntityFrameworkCore (>= 1.0.0-beta.12)
- Polly (>= 8.4.2)
- SourceFlow.Net (>= 2.0.0)
-
net9.0
- Microsoft.EntityFrameworkCore (>= 9.0.0)
- Microsoft.EntityFrameworkCore.Relational (>= 9.0.0)
- Microsoft.EntityFrameworkCore.SqlServer (>= 9.0.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- OpenTelemetry (>= 1.14.0)
- OpenTelemetry.Api (>= 1.14.0)
- OpenTelemetry.Exporter.Console (>= 1.14.0)
- OpenTelemetry.Extensions.Hosting (>= 1.14.0)
- OpenTelemetry.Instrumentation.EntityFrameworkCore (>= 1.0.0-beta.12)
- Polly (>= 8.4.2)
- SourceFlow.Net (>= 2.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
v2.0.0 - Aligned with SourceFlow.Net 2.0.0 core framework.
- Updated to depend on SourceFlow.Net 2.0.0 with consolidated cloud abstractions.
- Entity Framework Core 9.0 persistence: CommandStore, EntityStore, and ViewModelStore.
- Cloud idempotency: EF-backed IdempotencyService with duplicate detection, IdempotencyDbContext, and automatic cleanup via IdempotencyCleanupService.
- Configurable connection strings per store type with SQL Server support.
- Polly-based resilience: retry and circuit breaker policies for database operations.
- OpenTelemetry instrumentation for EF Core queries and store operations.
- Multi-target: .NET 8.0, 9.0, and 10.0.