SourceFlow.Cloud.AWS
2.0.0
dotnet add package SourceFlow.Cloud.AWS --version 2.0.0
NuGet\Install-Package SourceFlow.Cloud.AWS -Version 2.0.0
<PackageReference Include="SourceFlow.Cloud.AWS" Version="2.0.0" />
<PackageVersion Include="SourceFlow.Cloud.AWS" Version="2.0.0" />
<PackageReference Include="SourceFlow.Cloud.AWS" />
paket add SourceFlow.Cloud.AWS --version 2.0.0
#r "nuget: SourceFlow.Cloud.AWS, 2.0.0"
#:package SourceFlow.Cloud.AWS@2.0.0
#addin nuget:?package=SourceFlow.Cloud.AWS&version=2.0.0
#tool nuget:?package=SourceFlow.Cloud.AWS&version=2.0.0
SourceFlow.Cloud.AWS
AWS cloud integration for distributed command and event processing
Overview
SourceFlow.Cloud.AWS extends the SourceFlow.Net framework with AWS cloud services integration, enabling distributed command and event processing using Amazon SQS, SNS, and KMS. This package provides production-ready dispatchers, listeners, and configuration for building scalable, cloud-native event-sourced applications.
Key Features:
- ๐ Amazon SQS command dispatching with FIFO support
- ๐ข Amazon SNS event publishing with fan-out
- ๐ AWS KMS message encryption for sensitive data
- โ๏ธ Fluent bus configuration API
- ๐ Automatic resource provisioning
- ๐ Built-in observability and health checks
- ๐งช LocalStack integration for local development
Table of Contents
- Installation
- Quick Start
- Configuration
- AWS Services
- Bus Configuration System
- Message Encryption
- Idempotency
- Local Development
- Monitoring
- Best Practices
Installation
NuGet Package
dotnet add package SourceFlow.Cloud.AWS
Prerequisites
- SourceFlow >= 2.0.0
- AWS SDK for .NET
- .NET Standard 2.1, .NET 8.0, .NET 9.0, or .NET 10.0
Quick Start
Basic Setup
using SourceFlow.Cloud.AWS;
using Amazon;
// Configure SourceFlow with AWS integration
services.UseSourceFlow();
services.UseSourceFlowAws(
options =>
{
options.Region = RegionEndpoint.USEast1;
options.MaxConcurrentCalls = 10;
},
bus => bus
.Send
.Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))
.Command<ProcessPaymentCommand>(q => q.Queue("payments.fifo"))
.Raise
.Event<OrderCreatedEvent>(t => t.Topic("order-events"))
.Event<PaymentProcessedEvent>(t => t.Topic("payment-events"))
.Listen.To
.CommandQueue("orders.fifo")
.CommandQueue("payments.fifo")
.Subscribe.To
.Topic("order-events")
.Topic("payment-events"));
What This Does
- Registers AWS dispatchers for commands and events
- Configures routing - which commands go to which queues
- Starts listeners - polls SQS queues for messages
- Creates resources - automatically provisions queues, topics, and subscriptions
- Enables idempotency - prevents duplicate message processing
Configuration
Fluent Configuration (Recommended)
services.UseSourceFlowAws(options =>
{
// Required: AWS Region
options.Region = RegionEndpoint.USEast1;
// Optional: Enable/disable features
options.EnableCommandRouting = true;
options.EnableEventRouting = true;
options.EnableCommandListener = true;
options.EnableEventListener = true;
// Optional: Concurrency
options.MaxConcurrentCalls = 10;
// Optional: Message encryption
options.EnableEncryption = true;
options.KmsKeyId = "alias/sourceflow-key";
});
Configuration from appsettings.json
appsettings.json:
{
"SourceFlow": {
"Aws": {
"Region": "us-east-1",
"MaxConcurrentCalls": 10,
"EnableEncryption": true,
"KmsKeyId": "alias/sourceflow-key"
},
"Bus": {
"Commands": {
"CreateOrderCommand": "orders.fifo",
"UpdateOrderCommand": "orders.fifo",
"ProcessPaymentCommand": "payments.fifo"
},
"Events": {
"OrderCreatedEvent": "order-events",
"OrderUpdatedEvent": "order-events",
"PaymentProcessedEvent": "payment-events"
},
"ListenQueues": [
"orders.fifo",
"payments.fifo"
],
"SubscribeTopics": [
"order-events",
"payment-events"
]
}
}
}
Program.cs:
var configuration = builder.Configuration;
services.UseSourceFlowAws(
options =>
{
var awsConfig = configuration.GetSection("SourceFlow:Aws");
options.Region = RegionEndpoint.GetBySystemName(awsConfig["Region"]);
options.MaxConcurrentCalls = awsConfig.GetValue<int>("MaxConcurrentCalls", 10);
options.EnableEncryption = awsConfig.GetValue<bool>("EnableEncryption", false);
options.KmsKeyId = awsConfig["KmsKeyId"];
},
bus =>
{
var busConfig = configuration.GetSection("SourceFlow:Bus");
// Configure command routing from appsettings
var commandsSection = busConfig.GetSection("Commands");
var sendBuilder = bus.Send;
foreach (var command in commandsSection.GetChildren())
{
var commandType = Type.GetType(command.Key);
var queueName = command.Value;
// Dynamic registration based on configuration
sendBuilder.Command(commandType, q => q.Queue(queueName));
}
// Configure event routing from appsettings
var eventsSection = busConfig.GetSection("Events");
var raiseBuilder = bus.Raise;
foreach (var evt in eventsSection.GetChildren())
{
var eventType = Type.GetType(evt.Key);
var topicName = evt.Value;
// Dynamic registration based on configuration
raiseBuilder.Event(eventType, t => t.Topic(topicName));
}
// Configure listeners from appsettings
var listenQueues = busConfig.GetSection("ListenQueues").Get<string[]>();
var listenBuilder = bus.Listen.To;
foreach (var queue in listenQueues)
{
listenBuilder.CommandQueue(queue);
}
// Configure subscriptions from appsettings
var subscribeTopics = busConfig.GetSection("SubscribeTopics").Get<string[]>();
var subscribeBuilder = bus.Subscribe.To;
foreach (var topic in subscribeTopics)
{
subscribeBuilder.Topic(topic);
}
return bus;
});
Simplified Configuration Helper:
public static class AwsConfigurationExtensions
{
public static IServiceCollection UseSourceFlowAwsFromConfiguration(
this IServiceCollection services,
IConfiguration configuration)
{
return services.UseSourceFlowAws(
options => ConfigureAwsOptions(options, configuration),
bus => ConfigureBusFromSettings(bus, configuration));
}
private static void ConfigureAwsOptions(AwsOptions options, IConfiguration configuration)
{
var awsConfig = configuration.GetSection("SourceFlow:Aws");
options.Region = RegionEndpoint.GetBySystemName(awsConfig["Region"]);
options.MaxConcurrentCalls = awsConfig.GetValue<int>("MaxConcurrentCalls", 10);
options.EnableEncryption = awsConfig.GetValue<bool>("EnableEncryption", false);
options.KmsKeyId = awsConfig["KmsKeyId"];
}
private static BusConfigurationBuilder ConfigureBusFromSettings(
BusConfigurationBuilder bus,
IConfiguration configuration)
{
var busConfig = configuration.GetSection("SourceFlow:Bus");
// Commands
var commands = busConfig.GetSection("Commands").Get<Dictionary<string, string>>();
foreach (var (commandType, queueName) in commands)
{
bus.Send.Command(Type.GetType(commandType), q => q.Queue(queueName));
}
// Events
var events = busConfig.GetSection("Events").Get<Dictionary<string, string>>();
foreach (var (eventType, topicName) in events)
{
bus.Raise.Event(Type.GetType(eventType), t => t.Topic(topicName));
}
// Listen queues
var listenQueues = busConfig.GetSection("ListenQueues").Get<string[]>();
foreach (var queue in listenQueues)
{
bus.Listen.To.CommandQueue(queue);
}
// Subscribe topics
var subscribeTopics = busConfig.GetSection("SubscribeTopics").Get<string[]>();
foreach (var topic in subscribeTopics)
{
bus.Subscribe.To.Topic(topic);
}
return bus;
}
}
// Usage
services.UseSourceFlowAwsFromConfiguration(configuration);
Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
Region |
RegionEndpoint |
Required | AWS region for services |
EnableCommandRouting |
bool |
true |
Enable command dispatching to SQS |
EnableEventRouting |
bool |
true |
Enable event publishing to SNS |
EnableCommandListener |
bool |
true |
Enable SQS command listener |
EnableEventListener |
bool |
true |
Enable SNS event listener |
MaxConcurrentCalls |
int |
10 |
Concurrent message processing |
EnableEncryption |
bool |
false |
Enable KMS encryption |
KmsKeyId |
string |
null |
KMS key ID or alias |
AWS Services
Amazon SQS (Simple Queue Service)
Purpose: Command dispatching and queuing
Standard Queues
.Send.Command<SendEmailCommand>(q => q.Queue("notifications"))
Characteristics:
- High throughput (unlimited TPS)
- At-least-once delivery
- Best-effort ordering
- Use for independent operations
FIFO Queues
.Send.Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))
Characteristics:
- Exactly-once processing
- Strict ordering per entity
- Content-based deduplication
- Use for ordered operations
FIFO Configuration:
- Queue name must end with
.fifo MessageGroupIdset to entity IDMessageDeduplicationIdgenerated from content- Maximum 300 TPS per message group
Amazon SNS (Simple Notification Service)
Purpose: Event publishing and fan-out
.Raise.Event<OrderCreatedEvent>(t => t.Topic("order-events"))
Characteristics:
- Publish-subscribe pattern
- Fan-out to multiple subscribers
- Topic-to-queue subscriptions
- Message filtering (future)
How It Works:
Event Published
โ
SNS Topic (order-events)
โ
Fan-out to Subscribers
โ
SQS Queue (orders.fifo)
โ
Command Listener
AWS KMS (Key Management Service)
Purpose: Message encryption for sensitive data
services.UseSourceFlowAws(
options =>
{
options.EnableEncryption = true;
options.KmsKeyId = "alias/sourceflow-key";
},
bus => ...);
Encryption Flow:
- Generate data key from KMS
- Encrypt message with data key
- Encrypt data key with KMS master key
- Store encrypted message + encrypted data key
Bus Configuration System
Fluent API
The bus configuration system provides a type-safe, intuitive way to configure message routing.
Send Commands
.Send
.Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))
.Command<UpdateOrderCommand>(q => q.Queue("orders.fifo"))
.Command<CancelOrderCommand>(q => q.Queue("orders.fifo"))
Raise Events
.Raise
.Event<OrderCreatedEvent>(t => t.Topic("order-events"))
.Event<OrderUpdatedEvent>(t => t.Topic("order-events"))
.Event<OrderCancelledEvent>(t => t.Topic("order-events"))
Listen to Command Queues
.Listen.To
.CommandQueue("orders.fifo")
.CommandQueue("inventory.fifo")
.CommandQueue("payments.fifo")
Subscribe to Event Topics
.Subscribe.To
.Topic("order-events")
.Topic("payment-events")
.Topic("inventory-events")
Short Name Resolution
Configuration: Provide short names only
.Send.Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))
Resolved at Startup:
- Short name:
"orders.fifo" - Resolved URL:
https://sqs.us-east-1.amazonaws.com/123456789012/orders.fifo
Benefits:
- No hardcoded account IDs
- Portable across environments
- Easier to read and maintain
Resource Provisioning
The AwsBusBootstrapper automatically creates missing AWS resources at startup:
SQS Queues:
// Standard queue
CreateQueueRequest {
QueueName = "notifications",
Attributes = {
{ "MessageRetentionPeriod", "1209600" }, // 14 days
{ "VisibilityTimeout", "30" }
}
}
// FIFO queue (detected by .fifo suffix)
CreateQueueRequest {
QueueName = "orders.fifo",
Attributes = {
{ "FifoQueue", "true" },
{ "ContentBasedDeduplication", "true" },
{ "MessageRetentionPeriod", "1209600" },
{ "VisibilityTimeout", "30" }
}
}
SNS Topics:
CreateTopicRequest {
Name = "order-events",
Attributes = {
{ "DisplayName", "Order Events Topic" }
}
}
SNS Subscriptions:
// Subscribe queue to topic
SubscribeRequest {
TopicArn = "arn:aws:sns:us-east-1:123456789012:order-events",
Protocol = "sqs",
Endpoint = "arn:aws:sqs:us-east-1:123456789012:orders.fifo",
Attributes = {
{ "RawMessageDelivery", "true" }
}
}
Idempotency: All operations are idempotent - safe to run multiple times.
Message Encryption
KMS Configuration
Enable message encryption for sensitive data using AWS KMS:
services.UseSourceFlowAws(
options =>
{
options.EnableEncryption = true;
options.KmsKeyId = "alias/sourceflow-key"; // or key ID
},
bus => ...);
Encryption Flow
Plaintext Message
โ
Generate Data Key (KMS)
โ
Encrypt Message (Data Key)
โ
Encrypt Data Key (KMS Master Key)
โ
Store: Encrypted Message + Encrypted Data Key
Decryption Flow
Retrieve: Encrypted Message + Encrypted Data Key
โ
Decrypt Data Key (KMS Master Key)
โ
Decrypt Message (Data Key)
โ
Plaintext Message
KMS Key Setup
Create KMS Key:
aws kms create-key \
--description "SourceFlow message encryption key" \
--key-usage ENCRYPT_DECRYPT
aws kms create-alias \
--alias-name alias/sourceflow-key \
--target-key-id <key-id>
Key Policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:root"
},
"Action": "kms:*",
"Resource": "*"
},
{
"Sid": "Allow SourceFlow Application",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:role/SourceFlowApplicationRole"
},
"Action": [
"kms:Decrypt",
"kms:Encrypt",
"kms:GenerateDataKey",
"kms:DescribeKey"
],
"Resource": "*"
}
]
}
IAM Permissions
Minimum Required for Bootstrapper and Runtime:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SQSQueueManagement",
"Effect": "Allow",
"Action": [
"sqs:CreateQueue",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue"
],
"Resource": "arn:aws:sqs:*:*:*"
},
{
"Sid": "SQSMessageOperations",
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:DeleteMessage",
"sqs:ChangeMessageVisibility"
],
"Resource": "arn:aws:sqs:*:*:*"
},
{
"Sid": "SNSTopicManagement",
"Effect": "Allow",
"Action": [
"sns:CreateTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:TagResource"
],
"Resource": "arn:aws:sns:*:*:*"
},
{
"Sid": "SNSPublishAndSubscribe",
"Effect": "Allow",
"Action": [
"sns:Subscribe",
"sns:Unsubscribe",
"sns:Publish"
],
"Resource": "arn:aws:sns:*:*:*"
},
{
"Sid": "STSGetCallerIdentity",
"Effect": "Allow",
"Action": [
"sts:GetCallerIdentity"
],
"Resource": "*"
},
{
"Sid": "KMSEncryption",
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:Encrypt",
"kms:GenerateDataKey",
"kms:DescribeKey"
],
"Resource": "arn:aws:kms:*:*:key/*"
}
]
}
Production Best Practice - Restrict Resources:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SQSQueueManagement",
"Effect": "Allow",
"Action": [
"sqs:CreateQueue",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:DeleteMessage",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:us-east-1:123456789012:orders.fifo",
"arn:aws:sqs:us-east-1:123456789012:payments.fifo",
"arn:aws:sqs:us-east-1:123456789012:notifications"
]
},
{
"Sid": "SNSTopicManagement",
"Effect": "Allow",
"Action": [
"sns:CreateTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:TagResource",
"sns:Subscribe",
"sns:Unsubscribe",
"sns:Publish"
],
"Resource": [
"arn:aws:sns:us-east-1:123456789012:order-events",
"arn:aws:sns:us-east-1:123456789012:payment-events"
]
},
{
"Sid": "STSGetCallerIdentity",
"Effect": "Allow",
"Action": [
"sts:GetCallerIdentity"
],
"Resource": "*"
},
{
"Sid": "KMSEncryption",
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:Encrypt",
"kms:GenerateDataKey",
"kms:DescribeKey"
],
"Resource": "arn:aws:kms:us-east-1:123456789012:key/your-key-id"
}
]
}
Idempotency
Default (In-Memory)
Automatically registered for single-instance deployments:
services.UseSourceFlowAws(
options => { options.Region = RegionEndpoint.USEast1; },
bus => ...);
// InMemoryIdempotencyService registered automatically
Multi-Instance (SQL-Based)
For production deployments with multiple instances:
// Install package
// dotnet add package SourceFlow.Stores.EntityFramework
// Register SQL-based idempotency
services.AddSourceFlowIdempotency(
connectionString: "Server=...;Database=...;",
cleanupIntervalMinutes: 60);
// Configure AWS
services.UseSourceFlowAws(
options => { options.Region = RegionEndpoint.USEast1; },
bus => ...);
See: Cloud Message Idempotency Guide for detailed configuration.
Local Development
LocalStack Integration
LocalStack provides local AWS service emulation for development and testing.
Setup with Script (Recommended)
# PowerShell (Windows)
./tests/SourceFlow.Cloud.AWS.Tests/run-integration-tests.ps1
# Bash (Linux/macOS/WSL)
./tests/SourceFlow.Cloud.AWS.Tests/run-integration-tests.sh
The scripts automatically start a LocalStack Docker container, wait for services, set environment variables, and run the integration tests. Use --keep / -KeepRunning to leave the container running after tests.
Manual Setup
# Start LocalStack via Docker
docker run -d --name sourceflow-localstack \
-p 4566:4566 \
-e SERVICES=sqs,sns,kms \
-e EAGER_SERVICE_LOADING=1 \
localstack/localstack:latest
Configuration
services.UseSourceFlowAws(
options =>
{
options.Region = RegionEndpoint.USEast1;
// LocalStack endpoints
options.ServiceURL = "http://localhost:4566";
},
bus => bus
.Send.Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))
.Listen.To.CommandQueue("orders.fifo"));
Environment Variables
# LocalStack endpoints
export AWS_ENDPOINT_URL=http://localhost:4566
# LocalStack uses hardcoded test credentials in test fixtures
# BasicAWSCredentials("test", "test") provides better endpoint compatibility
export AWS_DEFAULT_REGION=us-east-1
Note: LocalStack does not validate AWS credentials. The test infrastructure uses BasicAWSCredentials with dummy "test"/"test" values for better compatibility with AWS SDK endpoint resolution. This approach avoids endpoint override issues that can occur with AnonymousAWSCredentials.
Testing
[Trait("Category", "Integration")]
[Trait("Category", "RequiresLocalStack")]
public class AwsIntegrationTests : LocalStackRequiredTestBase
{
[Fact]
public async Task Should_Process_Command_Through_SQS()
{
// Test implementation
}
}
Run Tests:
# Unit tests only
dotnet test --filter "Category=Unit"
# Integration tests with LocalStack (using script)
./tests/SourceFlow.Cloud.AWS.Tests/run-integration-tests.ps1 # PowerShell
./tests/SourceFlow.Cloud.AWS.Tests/run-integration-tests.sh # Bash
# Integration tests manually (LocalStack must be running)
dotnet test --filter "Category=Integration&Category=RequiresLocalStack"
Monitoring
Health Checks
services.AddHealthChecks()
.AddCheck<AwsHealthCheck>("aws");
Checks:
- SQS connectivity
- SNS connectivity
- KMS access (if encryption enabled)
- Queue/topic existence
Metrics
Command Dispatching:
sourceflow.aws.command.dispatched- Commands sent to SQSsourceflow.aws.command.dispatch_duration- Dispatch latencysourceflow.aws.command.dispatch_error- Dispatch failures
Event Publishing:
sourceflow.aws.event.published- Events published to SNSsourceflow.aws.event.publish_duration- Publish latencysourceflow.aws.event.publish_error- Publish failures
Message Processing:
sourceflow.aws.message.received- Messages received from SQSsourceflow.aws.message.processed- Messages successfully processedsourceflow.aws.message.processing_duration- Processing latencysourceflow.aws.message.processing_error- Processing failures
Distributed Tracing
Activity Source: SourceFlow.Cloud.AWS
Spans:
AwsSqsCommandDispatcher.DispatchAwsSnsEventDispatcher.DispatchAwsSqsCommandListener.ProcessMessage
Trace Context: Propagated via message attributes
Best Practices
Queue Design
Use FIFO queues for ordered operations
.Send.Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))Use standard queues for independent operations
.Send.Command<SendEmailCommand>(q => q.Queue("notifications"))Group related commands to the same queue
.Send .Command<CreateOrderCommand>(q => q.Queue("orders.fifo")) .Command<UpdateOrderCommand>(q => q.Queue("orders.fifo")) .Command<CancelOrderCommand>(q => q.Queue("orders.fifo"))
IAM Permissions
Development Environment (Broad Permissions):
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SQSFullAccess",
"Effect": "Allow",
"Action": [
"sqs:CreateQueue",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:DeleteMessage",
"sqs:ChangeMessageVisibility"
],
"Resource": "arn:aws:sqs:*:*:*"
},
{
"Sid": "SNSFullAccess",
"Effect": "Allow",
"Action": [
"sns:CreateTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:TagResource",
"sns:Subscribe",
"sns:Unsubscribe",
"sns:Publish"
],
"Resource": "arn:aws:sns:*:*:*"
},
{
"Sid": "STSGetCallerIdentity",
"Effect": "Allow",
"Action": [
"sts:GetCallerIdentity"
],
"Resource": "*"
}
]
}
Production Environment (Restricted Resources):
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SQSSpecificQueues",
"Effect": "Allow",
"Action": [
"sqs:CreateQueue",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:DeleteMessage",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:us-east-1:123456789012:orders.fifo",
"arn:aws:sqs:us-east-1:123456789012:payments.fifo",
"arn:aws:sqs:us-east-1:123456789012:inventory.fifo",
"arn:aws:sqs:us-east-1:123456789012:notifications"
]
},
{
"Sid": "SNSSpecificTopics",
"Effect": "Allow",
"Action": [
"sns:CreateTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:TagResource",
"sns:Subscribe",
"sns:Unsubscribe",
"sns:Publish"
],
"Resource": [
"arn:aws:sns:us-east-1:123456789012:order-events",
"arn:aws:sns:us-east-1:123456789012:payment-events",
"arn:aws:sns:us-east-1:123456789012:inventory-events"
]
},
{
"Sid": "STSGetCallerIdentity",
"Effect": "Allow",
"Action": [
"sts:GetCallerIdentity"
],
"Resource": "*"
},
{
"Sid": "KMSSpecificKey",
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:Encrypt",
"kms:GenerateDataKey",
"kms:DescribeKey"
],
"Resource": "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012"
}
]
}
Explanation of Permissions:
| Permission | Purpose | Required For |
|---|---|---|
sqs:CreateQueue |
Create queues during bootstrapping | Bootstrapper |
sqs:GetQueueUrl |
Resolve queue names to URLs | Bootstrapper, Dispatchers |
sqs:GetQueueAttributes |
Verify queue configuration | Bootstrapper |
sqs:SetQueueAttributes |
Configure queue settings | Bootstrapper |
sqs:TagQueue |
Add tags to queues | Bootstrapper (optional) |
sqs:ReceiveMessage |
Poll messages from queues | Listeners |
sqs:SendMessage |
Send commands to queues | Dispatchers |
sqs:DeleteMessage |
Remove processed messages | Listeners |
sqs:ChangeMessageVisibility |
Extend processing time | Listeners |
sns:CreateTopic |
Create topics during bootstrapping | Bootstrapper |
sns:GetTopicAttributes |
Verify topic configuration | Bootstrapper |
sns:SetTopicAttributes |
Configure topic settings | Bootstrapper |
sns:TagResource |
Add tags to topics | Bootstrapper (optional) |
sns:Subscribe |
Subscribe queues to topics | Bootstrapper |
sns:Unsubscribe |
Remove subscriptions | Bootstrapper (cleanup) |
sns:Publish |
Publish events to topics | Dispatchers |
sts:GetCallerIdentity |
Get AWS account ID | Bootstrapper |
kms:Decrypt |
Decrypt messages | Listeners (if encryption enabled) |
kms:Encrypt |
Encrypt messages | Dispatchers (if encryption enabled) |
kms:GenerateDataKey |
Generate encryption keys | Dispatchers (if encryption enabled) |
kms:DescribeKey |
Verify key configuration | Bootstrapper (if encryption enabled) |
Production Deployment
Use SQL-based idempotency
services.AddSourceFlowIdempotency(connectionString);Enable encryption for sensitive data
options.EnableEncryption = true; options.KmsKeyId = "alias/sourceflow-key";Configure appropriate concurrency
options.MaxConcurrentCalls = 10; // Adjust based on loadUse infrastructure as code
- CloudFormation or Terraform for production
- Let bootstrapper create resources in development
Monitor metrics and health checks
services.AddHealthChecks().AddCheck<AwsHealthCheck>("aws");
Error Handling
Configure dead letter queues
- Automatic for all queues
- Review failed messages regularly
Implement retry policies
- SQS visibility timeout for retries
- Exponential backoff built-in
Monitor processing errors
- Track
sourceflow.aws.message.processing_error - Alert on high error rates
- Track
Architecture
Command Flow
Command Published
โ
CommandBus (assigns sequence number)
โ
AwsSqsCommandDispatcher (checks routing)
โ
SQS Queue (message persisted)
โ
AwsSqsCommandListener (polls queue)
โ
CommandBus.Publish (local processing)
โ
Saga Handles Command
Event Flow
Event Published
โ
EventQueue (enqueues event)
โ
AwsSnsEventDispatcher (checks routing)
โ
SNS Topic (message published)
โ
SQS Queue (subscribed to topic)
โ
AwsSqsCommandListener (polls queue)
โ
EventQueue.Enqueue (local processing)
โ
Aggregates/Views Handle Event
Related Documentation
- SourceFlow Core
- AWS Cloud Architecture
- Cloud Message Idempotency Guide
- Cloud Integration Testing
- Entity Framework Stores
Support
- Documentation: GitHub Wiki
- Issues: GitHub Issues
- Discussions: GitHub Discussions
License
MIT License - see LICENSE file for details.
Package Version: 2.0.0
Last Updated: 2026-03-15
Status: Production Ready
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.1 is compatible. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.1
- AWSSDK.Extensions.NETCore.Setup (>= 3.7.301)
- AWSSDK.KeyManagementService (>= 3.7.400)
- AWSSDK.SimpleNotificationService (>= 3.7.400.58)
- AWSSDK.SQS (>= 3.7.400.58)
- Microsoft.Extensions.Caching.Memory (>= 9.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 9.0.0)
- Microsoft.Extensions.Hosting (>= 9.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.0)
- SourceFlow.Net (>= 2.0.0)
-
net10.0
- AWSSDK.Extensions.NETCore.Setup (>= 3.7.301)
- AWSSDK.KeyManagementService (>= 3.7.400)
- AWSSDK.SimpleNotificationService (>= 3.7.400.58)
- AWSSDK.SQS (>= 3.7.400.58)
- Microsoft.Extensions.Caching.Memory (>= 9.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 9.0.0)
- Microsoft.Extensions.Hosting (>= 9.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.0)
- SourceFlow.Net (>= 2.0.0)
-
net8.0
- AWSSDK.Extensions.NETCore.Setup (>= 3.7.301)
- AWSSDK.KeyManagementService (>= 3.7.400)
- AWSSDK.SimpleNotificationService (>= 3.7.400.58)
- AWSSDK.SQS (>= 3.7.400.58)
- Microsoft.Extensions.Caching.Memory (>= 9.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 9.0.0)
- Microsoft.Extensions.Hosting (>= 9.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.0)
- SourceFlow.Net (>= 2.0.0)
-
net9.0
- AWSSDK.Extensions.NETCore.Setup (>= 3.7.301)
- AWSSDK.KeyManagementService (>= 3.7.400)
- AWSSDK.SimpleNotificationService (>= 3.7.400.58)
- AWSSDK.SQS (>= 3.7.400.58)
- Microsoft.Extensions.Caching.Memory (>= 9.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 9.0.0)
- Microsoft.Extensions.Hosting (>= 9.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.0)
- SourceFlow.Net (>= 2.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2.0.0 | 80 | 3/15/2026 |
v2.0.0 - Major release with production-ready AWS integration.
- SQS command dispatching: Standard and FIFO queues with batched send/receive operations.
- SNS event publishing: topic creation, subscription management, and filter policies.
- Bus bootstrapper: IHostedService that auto-provisions queues, topics, and subscriptions at startup.
- Security: KMS envelope encryption for messages at rest, sensitive data masking in logs.
- Resilience: circuit breaker, configurable retry policies, and throttling protection.
- Dead letter queues: automatic DLQ setup and failed message reprocessing.
- Health checks: IHealthCheck implementations for SQS, SNS, and KMS endpoints.
- Observability: OpenTelemetry distributed tracing across command and event flows.
- Breaking change: depends on SourceFlow.Net 2.0.0 (Cloud.Core consolidated into core).