FS.StreamFlow.RabbitMQ 9.0.0-alpha.20250720-0108

This is a prerelease version of FS.StreamFlow.RabbitMQ.
dotnet add package FS.StreamFlow.RabbitMQ --version 9.0.0-alpha.20250720-0108
                    
NuGet\Install-Package FS.StreamFlow.RabbitMQ -Version 9.0.0-alpha.20250720-0108
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="FS.StreamFlow.RabbitMQ" Version="9.0.0-alpha.20250720-0108" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="FS.StreamFlow.RabbitMQ" Version="9.0.0-alpha.20250720-0108" />
                    
Directory.Packages.props
<PackageReference Include="FS.StreamFlow.RabbitMQ" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add FS.StreamFlow.RabbitMQ --version 9.0.0-alpha.20250720-0108
                    
#r "nuget: FS.StreamFlow.RabbitMQ, 9.0.0-alpha.20250720-0108"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package FS.StreamFlow.RabbitMQ@9.0.0-alpha.20250720-0108
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=FS.StreamFlow.RabbitMQ&version=9.0.0-alpha.20250720-0108&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=FS.StreamFlow.RabbitMQ&version=9.0.0-alpha.20250720-0108&prerelease
                    
Install as a Cake Tool

FS.StreamFlow

Core NuGet Version RabbitMQ NuGet Version NuGet Downloads GitHub License GitHub Stars

A comprehensive, production-ready messaging framework for .NET 9 with enterprise-grade features, automatic recovery, and event-driven architecture support.

FS.StreamFlow isn't just another messaging client wrapper. It's a complete messaging solution for building scalable, resilient applications with sophisticated event-driven capabilities. Whether you're building microservices, implementing CQRS patterns, or creating real-time applications, FS.StreamFlow provides everything you need in one cohesive framework.

πŸ—οΈ Architecture

FS.StreamFlow follows a layered architecture with clear separation of concerns:

  • FS.StreamFlow.Core: Provider-agnostic abstractions, interfaces, and models
  • FS.StreamFlow.RabbitMQ: Production-ready RabbitMQ implementation
  • Future Providers: Extensible to other messaging providers (Azure Service Bus, Apache Kafka, etc.)

✨ Why FS.StreamFlow?

Imagine you're building a modern distributed application that needs to handle thousands of messages per second, process complex event flows, and remain resilient under pressure. Traditional messaging clients give you basic publish/subscribe functionality, but when you need enterprise-grade features like automatic recovery, event sourcing, saga orchestration, and comprehensive error handling, you're left building everything from scratch.

FS.StreamFlow bridges this gap by providing everything you need in one production-ready framework:

  • πŸ”„ Automatic Recovery: Intelligent connection recovery with exponential backoff
  • πŸ“¨ Enterprise Messaging: Producer/Consumer patterns with confirmations and transactions
  • 🎯 Event-Driven Architecture: Built-in support for domain and integration events
  • πŸ“š Event Sourcing: Complete event store implementation with snapshots
  • πŸ”€ Saga Orchestration: Long-running workflow management
  • πŸ›‘οΈ Comprehensive Error Handling: Dead letter queues, retry policies, and circuit breakers
  • πŸ“Š Production Monitoring: Health checks, metrics, and performance tracking
  • ⚑ High Performance: Async-first API with connection pooling
  • πŸ”Œ Provider Agnostic: Extensible architecture for multiple messaging providers

πŸš€ Quick Start

Installation

Choose your messaging provider:

# For RabbitMQ support
dotnet add package FS.StreamFlow.RabbitMQ

# Core abstractions (automatically included with providers)
dotnet add package FS.StreamFlow.Core

Basic Setup

// Program.cs
using FS.StreamFlow.RabbitMQ.DependencyInjection;

var builder = WebApplication.CreateBuilder(args);

// Add FS.StreamFlow with RabbitMQ
builder.Services.AddRabbitMQStreamFlow(options =>
{
    // Client configuration
    options.ClientConfiguration.ClientName = "My Application";
    options.ClientConfiguration.EnableAutoRecovery = true;
    options.ClientConfiguration.EnableHeartbeat = true;
    options.ClientConfiguration.HeartbeatInterval = TimeSpan.FromSeconds(60);
    
    // Connection settings
    options.ConnectionSettings.Host = "localhost";
    options.ConnectionSettings.Port = 5672;
    options.ConnectionSettings.Username = "guest";
    options.ConnectionSettings.Password = "guest";
    options.ConnectionSettings.VirtualHost = "/";
    options.ConnectionSettings.ConnectionTimeout = TimeSpan.FromSeconds(30);
    
    // Producer settings
    options.ProducerSettings.EnablePublisherConfirms = true;
    options.ProducerSettings.ConfirmationTimeout = TimeSpan.FromSeconds(10);
    options.ProducerSettings.MaxConcurrentPublishes = 100;
    
    // Consumer settings
    options.ConsumerSettings.PrefetchCount = 50;
    options.ConsumerSettings.AutoAcknowledge = false;
    options.ConsumerSettings.MaxConcurrentConsumers = 5;
});

var app = builder.Build();

// Initialize the client
var streamFlow = app.Services.GetRequiredService<IStreamFlowClient>();
await streamFlow.InitializeAsync();

Your First Message

// Inject the StreamFlow client
public class OrderService
{
    private readonly IStreamFlowClient _streamFlow;
    
    public OrderService(IStreamFlowClient streamFlow) => _streamFlow = streamFlow;
    
    public async Task CreateOrderAsync(Order order)
    {
        // Initialize the client first
        await _streamFlow.InitializeAsync();
        
        // Create exchange and queue with fluent API
        await _streamFlow.ExchangeManager.Exchange("orders")
            .AsTopic()
            .WithDurable(true)
            .DeclareAsync();
            
        await _streamFlow.QueueManager.Queue("order-processing")
            .WithDurable(true)
            .WithDeadLetterExchange("dlx")
            .BindToExchange("orders", "order.created")
            .DeclareAsync();
        
        // Publish a message with fluent API
        // Exchange must be declared before publishing!
        await _streamFlow.ExchangeManager.Exchange("orders")
            .AsTopic()
            .WithDurable(true)
            .DeclareAsync();

        // Option 1: With pre-configured message (can use PublishAsync() without parameters)
        await _streamFlow.Producer.Message(order)
            .WithExchange("orders")
            .WithRoutingKey("order.created")
            .WithDeliveryMode(DeliveryMode.Persistent)
            .PublishAsync();

        // Option 2: With generic type (MUST pass message to PublishAsync)
        await _streamFlow.Producer.Message<Order>()
            .WithExchange("orders")
            .WithRoutingKey("order.created")
            .WithDeliveryMode(DeliveryMode.Persistent)
            .PublishAsync(order);
    }
}

// Consume messages with fluent API
public class OrderProcessor
{
    private readonly IStreamFlowClient _streamFlow;
    
    public OrderProcessor(IStreamFlowClient streamFlow) => _streamFlow = streamFlow;
    
    public async Task StartProcessingAsync()
    {
        // Initialize the client first
        await _streamFlow.InitializeAsync();
        
        await _streamFlow.Consumer.Queue<Order>("order-processing")
            .WithConcurrency(5)
            .WithPrefetchCount(100)
            .WithErrorHandler(async (exception, context) =>
            {
                return exception is ConnectFailureException || exception is BrokerUnreachableException;
            })
            .ConsumeAsync(async (order, context) =>
            {
                await ProcessOrderAsync(order);
                return true; // Acknowledge message
            });
    }
}

That's it! You now have a production-ready messaging system with automatic recovery, health monitoring, and comprehensive error handling.

πŸ”§ Troubleshooting

Common Producer Issues

Problem: PublishAsync() throws "Exchange must be configured before publishing"
Solution: Always declare exchanges before publishing:

await _streamFlow.ExchangeManager.Exchange("orders")
    .AsTopic()
    .WithDurable(true)
    .DeclareAsync();

Problem: Message<T>() with .PublishAsync() throws compilation error
Solution: Use the correct overload:

// ❌ Wrong - Message<T>() requires message parameter in PublishAsync
await _streamFlow.Producer.Message<Order>()
    .WithExchange("orders")
    .WithRoutingKey("order.created")
    .PublishAsync(); // COMPILATION ERROR!

// βœ… Correct - Message<T>() MUST pass message to PublishAsync
await _streamFlow.Producer.Message<Order>()
    .WithExchange("orders")
    .WithRoutingKey("order.created")
    .PublishAsync(order); // Correct!

// βœ… Alternative - Message(order) can use PublishAsync() without parameters
await _streamFlow.Producer.Message(order)
    .WithExchange("orders")
    .WithRoutingKey("order.created")
    .PublishAsync(); // Correct!

Common Consumer Issues

Problem: Consumer not receiving messages
Solution: Check queue bindings and routing keys:

await _streamFlow.QueueManager.Queue("order-processing")
    .BindToExchange("orders", "order.created")  // Must match producer routing key
    .DeclareAsync();

Problem: Messages not being acknowledged
Solution: Return true from your message handler:

await _streamFlow.Consumer.Queue<Order>("order-processing")
    .ConsumeAsync(async (order, context) =>
    {
        await ProcessOrderAsync(order);
        return true; // βœ… Acknowledge message
        // return false; // ❌ Reject message
    });

🌟 Key Features

1. Production-Ready Connection Management

Automatic connection recovery with intelligent backoff strategies:

builder.Services.AddRabbitMQStreamFlow(options =>
{
    // Connection settings
    options.ConnectionSettings.Host = "localhost";
    options.ConnectionSettings.Port = 5672;
    options.ConnectionSettings.Username = "guest";
    options.ConnectionSettings.Password = "guest";
    options.ConnectionSettings.VirtualHost = "/";
    options.ConnectionSettings.ConnectionTimeout = TimeSpan.FromSeconds(30);
    
    // Client configuration
    options.ClientConfiguration.EnableAutoRecovery = true;
    options.ClientConfiguration.EnableHeartbeat = true;
    options.ClientConfiguration.HeartbeatInterval = TimeSpan.FromSeconds(60);
});

2. Event-Driven Architecture

Built-in support for domain and integration events:

// Domain Event
public record OrderCreated(Guid OrderId, string CustomerName, decimal Amount) : IDomainEvent
{
    public Guid Id { get; } = Guid.NewGuid();
    public DateTime OccurredOn { get; } = DateTime.UtcNow;
    public int Version { get; } = 1;
    public string EventType => nameof(OrderCreated);
    public string? CorrelationId { get; set; }
    public string? CausationId { get; set; }
    public IDictionary<string, object> Metadata { get; } = new Dictionary<string, object>();
    public string AggregateId => OrderId.ToString();
    public string AggregateType => "Order";
    public long AggregateVersion { get; set; }
    public string? InitiatedBy { get; set; }
}

// Integration Event  
public record OrderShipped(Guid OrderId, string TrackingNumber) : IIntegrationEvent
{
    public Guid Id { get; } = Guid.NewGuid();
    public DateTime OccurredOn { get; } = DateTime.UtcNow;
    public int Version { get; } = 1;
    public string EventType => nameof(OrderShipped);
    public string? CorrelationId { get; set; }
    public string? CausationId { get; set; }
    public IDictionary<string, object> Metadata { get; } = new Dictionary<string, object>();
    public string Source => "order-service";
    public string ExchangeName => "order.shipped";
    public string? Target { get; set; }
    public string SchemaVersion => "1.0";
    public TimeSpan? TimeToLive { get; set; }
}

// Event Handlers
public class OrderCreatedHandler : IAsyncEventHandler<OrderCreated>
{
    public async Task HandleAsync(OrderCreated eventData, EventContext context)
    {
        // Handle domain event
        await _emailService.SendOrderConfirmationAsync(eventData);
    }
}

// Option 1: Direct API (Recommended for production)
// Publishing Domain Events (uses separate exchanges per aggregate)
var orderCreated = new OrderCreated(orderId, customerName, amount)
{
    AggregateType = "Order",
    AggregateId = orderId.ToString(),
    CorrelationId = correlationId,
    Version = "1.0"
};
await _streamFlow.EventBus.PublishDomainEventAsync(orderCreated);

// Publishing Integration Events (uses ExchangeName as exchange name)
var orderShipped = new OrderShipped(orderId, trackingNumber)
{
    ExchangeName = "order-service", // This becomes the exchange name
    CorrelationId = correlationId,
    CausationId = causationId,
    TimeToLive = TimeSpan.FromMinutes(30)
};
await _streamFlow.EventBus.PublishIntegrationEventAsync(orderShipped);

// Option 2: Fluent API (Great for complex configurations)
// Publishing Domain Events with fluent API
await _streamFlow.EventBus.Event<OrderCreated>()
    .WithCorrelationId(correlationId)
    .WithSource("order-service")
    .WithVersion("1.0")
    .WithAggregateId(orderId.ToString())
    .WithAggregateType("Order")
    .PublishAsync(new OrderCreated(orderId, customerName, amount));

// Publishing Integration Events with fluent API
await _streamFlow.EventBus.Event<OrderShipped>()
    .WithCorrelationId(correlationId)
    .WithCausationId(causationId)
    .WithSource("order-service")
    .WithTtl(TimeSpan.FromMinutes(30))
    .WithProperty("priority", "high")
    .PublishAsync(new OrderShipped(orderId, trackingNumber) { ExchangeName = "order-service" });

// Subscribing to Domain Events
await _streamFlow.EventBus.SubscribeToDomainEventAsync<OrderCreated>(
    "Order", // Aggregate type
    new OrderCreatedHandler());

// Subscribing to Integration Events
await _streamFlow.EventBus.SubscribeToIntegrationEventAsync<OrderShipped>(
    "order-service", // Exchange name (matches RoutingKey)
    new OrderShippedHandler());

3. Event Sourcing Support

Complete event store implementation with fluent API and snapshots:

// Store events with fluent API
await _streamFlow.EventStore.Stream($"order-{orderId}")
    .AppendEvent(new OrderCreated(orderId, customerName, amount))
    .AppendEvent(new OrderItemAdded(orderId, itemId, quantity))
    .SaveAsync();

// Read events with fluent API
var events = await _streamFlow.EventStore.Stream($"order-{orderId}")
    .FromVersion(0)
    .WithMaxCount(100)
    .ReadAsync();

// Create snapshots with fluent API
await _streamFlow.EventStore.Stream($"order-{orderId}")
    .WithSnapshot(orderSnapshot, version: 50)
    .SaveAsync();

// Advanced event store operations
await _streamFlow.EventStore.Stream($"order-{orderId}")
    .AppendEventsWithExpectedVersion(events, expectedVersion: 10)
    .WithSnapshot(snapshot, version: 20)
    .SaveAsync();

// Read events backwards
var recentEvents = await _streamFlow.EventStore.Stream($"order-{orderId}")
    .FromVersion(-1)
    .WithMaxCount(10)
    .ReadBackwardAsync();

4. Saga Orchestration

Long-running workflow management:

public class OrderProcessingSaga : ISaga<OrderProcessingSagaState>
{
    public async Task HandleAsync(OrderCreated @event, SagaContext context)
    {
        State.OrderId = @event.OrderId;
        State.Status = "Processing";
        
        // Send command to reserve inventory
        await context.SendAsync(new ReserveInventory(@event.OrderId, @event.Items));
    }
    
    public async Task HandleAsync(InventoryReserved @event, SagaContext context)
    {
        State.Status = "InventoryReserved";
        
        // Send command to process payment
        await context.SendAsync(new ProcessPayment(@event.OrderId, State.Amount));
    }
    
    public async Task HandleAsync(PaymentProcessed @event, SagaContext context)
    {
        State.Status = "Completed";
        
        // Complete the saga
        await context.CompleteAsync();
    }
}

5. Advanced Producer Features

Publisher confirms, transactions, and batch operations:

// Advanced producer with fluent API
await _streamFlow.Producer.Message(order)
    .WithExchange("orders")
    .WithRoutingKey("order.created")
    .WithDeliveryMode(DeliveryMode.Persistent)
    .WithPriority(5)
    .WithExpiration(TimeSpan.FromHours(24))
    .WithHeaders(new Dictionary<string, object>
    {
        ["source"] = "order-service",
        ["version"] = "1.0"
    })
    .WithHeader("priority", "high")
    .WithContentType("application/json")
    .WithContentEncoding("utf-8")
    .WithCorrelationId(correlationId)
    .WithMessageId(Guid.NewGuid().ToString())
    .WithMandatory(true)
    .WithConfirmationTimeout(TimeSpan.FromSeconds(5))
    .PublishAsync();

// Alternative syntax
await _streamFlow.Producer.Message(order)
    .ToExchange("orders")
    .WithRoutingKey("order.created")
    .WithDeliveryMode(DeliveryMode.Persistent)
    .PublishAsync();

// Batch operations
var messages = new[]
{
    new MessageContext("orders", "order.created", order1),
    new MessageContext("orders", "order.updated", order2),
    new MessageContext("orders", "order.shipped", order3)
};

var batchResult = await _streamFlow.Producer.PublishBatchAsync(messages);

6. Advanced Consumer Features

Consumer with comprehensive configuration:

// Advanced consumer with fluent API
await _streamFlow.Consumer.Queue<Order>("order-processing")
    .FromExchange("orders")
    .WithRoutingKey("order.*")
    .WithSettings(settings =>
    {
        settings.PrefetchCount = 100;
        settings.MaxConcurrentConsumers = 5;
        settings.AutoAcknowledge = false;
    })
    .WithConsumerTag("order-processor-1")
    .WithAutoAck(false)
    .WithPrefetchCount(100)
    .WithConcurrency(5)
    .WithErrorHandler(async (exception, context) =>
    {
        if (exception is ConnectFailureException)
            return true; // Requeue
        
        await _deadLetterService.SendAsync(context.Message);
        return false; // Don't requeue
    })
    .WithRetryPolicy(new RetryPolicySettings
    {
        RetryPolicy = RetryPolicyType.ExponentialBackoff,
        MaxRetryAttempts = 3,
        RetryDelay = TimeSpan.FromSeconds(1)
    })
    .WithDeadLetterQueue(new DeadLetterSettings
    {
        DeadLetterExchange = "dlx",
        DeadLetterQueue = "dlq"
    })
    .ConsumeAsync(async (order, context) =>
    {
        await ProcessOrderAsync(order);
        return true;
    });

7. Infrastructure Management

Fluent APIs for queue, exchange, and event stream management:

// Advanced Queue management with fluent API
await _streamFlow.QueueManager.Queue("order-processing")
    .WithDurable(true)
    .WithExclusive(false)
    .WithAutoDelete(false)
    .WithArguments(new Dictionary<string, object>
    {
        ["x-message-ttl"] = 3600000,
        ["x-max-length"] = 10000
    })
    .WithArgument("x-queue-mode", "lazy")
    .WithDeadLetterExchange("dlx")
    .WithDeadLetterRoutingKey("order.failed")
    .WithMessageTtl(TimeSpan.FromHours(24))
    .WithMaxLength(10000)
    .WithMaxLengthBytes(1024 * 1024 * 100) // 100MB
    .WithPriority(5)
    .BindToExchange("orders", "order.created")
    .BindToExchange("orders", "order.updated", new Dictionary<string, object>
    {
        ["x-match"] = "all"
    })
    .DeclareAsync();

// Advanced Exchange management with fluent API
await _streamFlow.ExchangeManager.Exchange("orders")
    .WithType("topic")
    .AsTopic() // Alternative syntax
    .WithDurable(true)
    .WithAutoDelete(false)
    .WithArguments(new Dictionary<string, object>
    {
        ["alternate-exchange"] = "alt-orders"
    })
    .WithArgument("x-delayed-message", true)
    .WithAlternateExchange("alt-orders")
    .WithInternal(false)
    .BindToExchange("master-orders", "order.*")
    .BindToExchange("notifications", "order.created", new Dictionary<string, object>
    {
        ["x-match"] = "any"
    })
    .DeclareAsync();

// Support for all exchange types
await _streamFlow.ExchangeManager.Exchange("direct-orders").AsDirect().DeclareAsync();
await _streamFlow.ExchangeManager.Exchange("fanout-orders").AsFanout().DeclareAsync();
await _streamFlow.ExchangeManager.Exchange("header-orders").AsHeaders().DeclareAsync();

// Event stream management with fluent API
await _streamFlow.EventStore.Stream("order-events")
    .AppendEvents(new[] { event1, event2, event3 })
    .AppendEventsWithExpectedVersion(moreEvents, expectedVersion: 10)
    .WithSnapshot(snapshot, version: 100)
    .SaveAsync();

// Stream operations
await _streamFlow.EventStore.Stream("order-events").CreateAsync();
await _streamFlow.EventStore.Stream("order-events").DeleteAsync();
await _streamFlow.EventStore.Stream("order-events").TruncateAsync(version: 50);

// Snapshot management
var snapshot = await _streamFlow.EventStore.Stream("order-events").GetSnapshotAsync();
await _streamFlow.EventStore.Stream("order-events")
    .WithSnapshot(orderSnapshot, version: 100)
    .SaveAsync();

8. Comprehensive Error Handling

Built-in retry policies, circuit breakers, and dead letter queues:

builder.Services.AddRabbitMQStreamFlow(options =>
{
    // Connection settings
    options.ConnectionSettings.Host = "localhost";
    options.ConnectionSettings.Port = 5672;
    options.ConnectionSettings.Username = "guest";
    options.ConnectionSettings.Password = "guest";
    options.ConnectionSettings.VirtualHost = "/";
    

});

πŸ“‹ Comprehensive Examples

Example 1: E-commerce Order Processing

// Complete order processing workflow with fluent APIs
public class OrderProcessingService
{
    private readonly IStreamFlowClient _streamFlow;
    
    public OrderProcessingService(IStreamFlowClient streamFlow) => _streamFlow = streamFlow;
    
    public async Task ProcessOrderAsync(Order order)
    {
        // 1. Setup infrastructure with fluent APIs
        await _streamFlow.ExchangeManager.Exchange("orders")
            .AsTopic()
            .WithDurable(true)
            .WithAlternateExchange("alt-orders")
            .DeclareAsync();
            
        await _streamFlow.QueueManager.Queue("order-processing")
            .WithDurable(true)
            .WithDeadLetterExchange("dlx")
            .WithMessageTtl(TimeSpan.FromHours(24))
            .WithMaxLength(10000)
            .WithPriority(5)
            .BindToExchange("orders", "order.*")
            .DeclareAsync();
        
        // 2. Publish domain event with new API
        var orderCreated = new OrderCreated(order.Id, order.CustomerName, order.Total)
        {
            AggregateType = "Order",
            AggregateId = order.Id.ToString(),
            CorrelationId = Guid.NewGuid().ToString(),
            Version = "1.0",
            Source = "order-service"
        };
        await _streamFlow.EventBus.PublishDomainEventAsync(orderCreated);
        
        // 3. Store event in event store with fluent API
        await _streamFlow.EventStore.Stream($"order-{order.Id}")
            .AppendEvent(new OrderCreated(order.Id, order.CustomerName, order.Total))
            .AppendEvent(new OrderItemsAdded(order.Id, order.Items))
            .SaveAsync();
        
        // 4. Start saga for order processing
        await _streamFlow.SagaOrchestrator.StartSagaAsync<OrderProcessingSaga>(
            sagaId: order.Id,
            initialEvent: new OrderCreated(order.Id, order.CustomerName, order.Total));
    }
}

// Event handlers with automatic retry and error handling
public class OrderCreatedHandler : IAsyncEventHandler<OrderCreated>
{
    private readonly IStreamFlowClient _streamFlow;
    
    public async Task HandleAsync(OrderCreated @event, EventContext context)
    {
        try
        {
            // Business logic with automatic retry on failure
            await _inventoryService.ReserveItemsAsync(@event.OrderId);
            
            // Publish follow-up events
            // Publish integration event to inventory service
            var inventoryRequested = new InventoryRequested(@event.OrderId, @event.Items)
            {
                RoutingKey = "inventory-service", // Exchange name
                CorrelationId = context.CorrelationId,
                CausationId = context.EventId,
                Source = "order-service"
            };
            await _streamFlow.EventBus.PublishIntegrationEventAsync(inventoryRequested);
        }
        catch (Exception ex)
        {
            // Automatic error handling with dead letter queue
            throw new BusinessException("Failed to process order", ex);
        }
    }
}

Example 2: Microservices Integration

// Service A - Order Service
public class OrderService
{
    private readonly IStreamFlowClient _streamFlow;
    
    public async Task CreateOrderAsync(CreateOrderRequest request)
    {
        var order = new Order(request.CustomerId, request.Items);
        
        // Save to database
        await _orderRepository.SaveAsync(order);
        
        // Publish integration event for other services
        var orderCreated = new OrderCreated(order.Id, order.CustomerId, order.Items)
        {
            ExchangeName = "order-service", // Exchange name
            CorrelationId = Guid.NewGuid().ToString(),
            Source = "order-service",
            Version = "1.0"
        };
        await _streamFlow.EventBus.PublishIntegrationEventAsync(orderCreated);
    }
}

// Service B - Inventory Service
public class InventoryService
{
    private readonly IStreamFlowClient _streamFlow;
    
    public async Task StartAsync()
    {
        // Subscribe to order events
        await _streamFlow.Consumer.Queue<OrderCreated>("inventory-service")
            .WithConcurrency(3)
            .WithPrefetchCount(50)
            .WithErrorHandler(async (exception, context) =>
            {
                // Custom error handling
                return exception is ConnectFailureException;
            })
            .ConsumeAsync(async (orderCreated, context) =>
            {
                // Reserve inventory
                var reserved = await ReserveInventoryAsync(orderCreated.Items);
                
                // Publish inventory reserved event
                var inventoryReserved = new InventoryReserved(orderCreated.OrderId, reserved)
                {
                    RoutingKey = "inventory-service", // Exchange name
                    CorrelationId = context.CorrelationId,
                    CausationId = context.MessageId,
                    Source = "inventory-service"
                };
                await _streamFlow.EventBus.PublishIntegrationEventAsync(inventoryReserved);
                
                return true; // Acknowledge message
            });
    }
}

// Service C - Payment Service
public class PaymentService
{
    private readonly IStreamFlowClient _streamFlow;
    
    public async Task StartAsync()
    {
        await _streamFlow.Consumer.Queue<InventoryReserved>("payment-service")
            .WithConcurrency(2)
            .WithPrefetchCount(20)
            .WithRetryPolicy(new RetryPolicySettings
            {
                RetryPolicy = RetryPolicyType.ExponentialBackoff,
                MaxRetryAttempts = 3,
                RetryDelay = TimeSpan.FromSeconds(1)
            })
            .ConsumeAsync(async (inventoryReserved, context) =>
            {
                // Process payment
                var payment = await ProcessPaymentAsync(inventoryReserved.OrderId);
                
                // Publish payment processed event
                var paymentProcessed = new PaymentProcessed(inventoryReserved.OrderId, payment.TransactionId)
                {
                    RoutingKey = "payment-service", // Exchange name
                    CorrelationId = context.CorrelationId,
                    CausationId = context.MessageId,
                    Source = "payment-service"
                };
                await _streamFlow.EventBus.PublishIntegrationEventAsync(paymentProcessed);
                
                return true;
            });
    }
}

Example 3: Real-time Analytics

// High-throughput analytics processing
public class AnalyticsProcessor
{
    private readonly IStreamFlowClient _streamFlow;
    
    public async Task StartProcessingAsync()
    {
        // Process user events in real-time
        await _streamFlow.Consumer.Queue<UserEvent>("analytics-processing")
            .WithConcurrency(10)
            .WithPrefetchCount(1000)
            .WithAutoAck(true) // High-throughput, less durability
            .WithErrorHandler(async (exception, context) =>
            {
                // Log and continue processing
                _logger.LogError(exception, "Error processing user event");
                return false; // Don't requeue analytics events
            })
            .ConsumeAsync(async (userEvent, context) =>
            {
                // Process analytics
                await ProcessUserEventAsync(userEvent);
                
                // Batch analytics data
                await _analyticsBuffer.AddAsync(userEvent);
                
                return true;
            });
    }
}

// Configuration for high-throughput scenarios
builder.Services.AddRabbitMQStreamFlow(options =>
{
    // Client configuration
    options.ClientConfiguration.ClientName = "High-Throughput Application";
    options.ClientConfiguration.EnableAutoRecovery = true;
    options.ClientConfiguration.EnableHeartbeat = true;
    options.ClientConfiguration.HeartbeatInterval = TimeSpan.FromSeconds(30);
    
    // Connection settings
    options.ConnectionSettings.Host = "localhost";
    options.ConnectionSettings.Port = 5672;
    options.ConnectionSettings.Username = "guest";
    options.ConnectionSettings.Password = "guest";
    options.ConnectionSettings.VirtualHost = "/";
    options.ConnectionSettings.ConnectionTimeout = TimeSpan.FromSeconds(30);
    
    // Producer settings
    options.ProducerSettings.EnablePublisherConfirms = true;
    options.ProducerSettings.MaxConcurrentPublishes = 1000;
    
    // Consumer settings
    options.ConsumerSettings.PrefetchCount = 100;
    options.ConsumerSettings.MaxConcurrentConsumers = 10;
    options.ConsumerSettings.AutoAcknowledge = false;
});

βš™οΈ Configuration Reference

Fluent Configuration API

Complete configuration with the options pattern:

builder.Services.AddRabbitMQStreamFlow(options =>
{
    // Client configuration
    options.ClientConfiguration.ClientName = "My Application";
    options.ClientConfiguration.EnableAutoRecovery = true;
    options.ClientConfiguration.EnableHeartbeat = true;
    options.ClientConfiguration.HeartbeatInterval = TimeSpan.FromSeconds(60);
    
    // Connection settings
    options.ConnectionSettings.Host = "localhost";
    options.ConnectionSettings.Port = 5672;
    options.ConnectionSettings.Username = "guest";
    options.ConnectionSettings.Password = "guest";
    options.ConnectionSettings.VirtualHost = "/";
    options.ConnectionSettings.ConnectionTimeout = TimeSpan.FromSeconds(30);
    
    // Producer settings
    options.ProducerSettings.EnablePublisherConfirms = true;
    options.ProducerSettings.ConfirmationTimeout = TimeSpan.FromSeconds(5);
    options.ProducerSettings.MaxConcurrentPublishes = 100;
    
    // Consumer settings
    options.ConsumerSettings.PrefetchCount = 50;
    options.ConsumerSettings.AutoAcknowledge = false;
    options.ConsumerSettings.MaxConcurrentConsumers = 5;
    

});

Configuration via appsettings.json

{
    "RabbitMQ": {
    "ClientConfiguration": {
      "ClientName": "My Application",
      "EnableAutoRecovery": true,
      "EnableHeartbeat": true,
      "HeartbeatInterval": "00:01:00"
    },
    "ConnectionSettings": {
      "Host": "localhost",
      "Port": 5672,
      "Username": "guest",
      "Password": "guest",
      "VirtualHost": "/",
      "ConnectionTimeout": "00:00:30",
      "RequestTimeout": "00:00:30",
      "UseSsl": false
      },
    "ProducerSettings": {
      "EnablePublisherConfirms": true,
      "ConfirmationTimeout": "00:00:10",
      "MaxConcurrentPublishes": 100,
      "PublishTimeout": "00:00:30",
      "DefaultExchange": "",
      "DefaultRoutingKey": ""
      },
    "ConsumerSettings": {
      "PrefetchCount": 50,
      "AutoAcknowledge": false,
      "MaxConcurrentConsumers": 5,
      "ConsumerTimeout": "00:00:30",
      "EnableDeadLetterQueue": true
    }
  }
}
// Use configuration from appsettings.json
builder.Services.Configure<RabbitMQStreamFlowOptions>(
    builder.Configuration.GetSection("StreamFlow:RabbitMQ"));

πŸ”§ Advanced Features

Fluent Infrastructure Management

Set up your messaging infrastructure with intuitive fluent APIs:

// Complete infrastructure setup
public class InfrastructureSetup
{
    private readonly IStreamFlowClient _streamFlow;
    
    public async Task SetupAsync()
    {
        // Create exchanges with fluent API
        await _streamFlow.ExchangeManager.Exchange("orders")
            .AsTopic()
            .WithDurable(true)
            .WithAlternateExchange("alt-orders")
            .DeclareAsync();
            
        await _streamFlow.ExchangeManager.Exchange("notifications")
            .AsFanout()
            .WithDurable(true)
            .DeclareAsync();
        
        // Create queues with fluent API
        await _streamFlow.QueueManager.Queue("order-processing")
            .WithDurable(true)
            .WithDeadLetterExchange("dlx")
            .WithDeadLetterRoutingKey("order.failed")
            .WithMessageTtl(TimeSpan.FromHours(24))
            .WithMaxLength(10000)
            .WithPriority(5)
            .BindToExchange("orders", "order.created")
            .BindToExchange("orders", "order.updated")
            .DeclareAsync();
            
        await _streamFlow.QueueManager.Queue("email-notifications")
            .WithDurable(true)
            .BindToExchange("notifications", "")
            .DeclareAsync();
            
        // Create event streams with fluent API
        await _streamFlow.EventStore.Stream("order-events")
            .CreateAsync();
            
        await _streamFlow.EventStore.Stream("user-events")
            .CreateAsync();
    }
}

Fluent Producer API

Chain operations with the comprehensive fluent API:

// Advanced message publishing with fluent API
await _streamFlow.Producer.Message(order)
    .WithExchange("orders")
    .WithRoutingKey("order.created")
    .WithDeliveryMode(DeliveryMode.Persistent)
    .WithExpiration(TimeSpan.FromHours(24))
    .WithPriority(5)
    .WithHeaders(new Dictionary<string, object>
    {
        ["source"] = "order-service",
        ["version"] = "1.0",
        ["correlation-id"] = Guid.NewGuid().ToString()
    })
    .WithHeader("priority", "high")
    .WithMandatory(true)
    .WithConfirmationTimeout(TimeSpan.FromSeconds(5))
    .PublishAsync();

// Alternative syntax
await _streamFlow.Producer.Message(order)
    .ToExchange("orders")
    .WithRoutingKey("order.created")
    .WithDeliveryMode(DeliveryMode.Persistent)
    .PublishAsync();

Fluent Consumer API

Configure consumers with comprehensive fluent syntax:

// Advanced consumer configuration with fluent API
await _streamFlow.Consumer.Queue<Order>("order-processing")
    .FromExchange("orders")
    .WithRoutingKey("order.*")
    .WithSettings(settings =>
    {
        settings.PrefetchCount = 100;
        settings.MaxConcurrentConsumers = 5;
        settings.AutoAcknowledge = false;
    })
    .WithConsumerTag("order-processor-1")
    .WithAutoAck(false)
    .WithPrefetchCount(100)
    .WithConcurrency(5)
    .WithErrorHandler(async (exception, context) =>
    {
        if (exception is ConnectFailureException)
            return true; // Requeue
        
        await _deadLetterService.SendAsync(context.Message);
        return false; // Don't requeue
    })
    .WithRetryPolicy(new RetryPolicySettings
    {
        RetryPolicy = RetryPolicyType.ExponentialBackoff,
        MaxRetryAttempts = 3,
        RetryDelay = TimeSpan.FromSeconds(1)
    })
    .WithDeadLetterQueue(new DeadLetterSettings
    {
        DeadLetterExchange = "dlx",
        DeadLetterQueue = "dlq"
    })
    .ConsumeAsync(async (order, context) =>
    {
        await ProcessOrderAsync(order);
        return true;
    });

Fluent Event Bus API

FS.StreamFlow provides both Direct API and Fluent API approaches for event publishing:

  • Direct API: Recommended for production use. More explicit, better performance, easier to debug.
  • Fluent API: Great for complex configurations, prototyping, and when you need chainable operations.

Manage events with comprehensive fluent operations:

// Fluent API for Domain Events
await _streamFlow.EventBus.Event<OrderCreated>()
    .WithCorrelationId(correlationId)
    .WithCausationId(causationId)
    .WithSource("order-service")
    .WithVersion("1.0")
    .WithAggregateId(orderId.ToString())
    .WithAggregateType("Order")
    .WithProperty("priority", "high")
    .PublishAsync(new OrderCreated(orderId, customerName, amount));

// Fluent API for Integration Events
await _streamFlow.EventBus.Event<OrderShipped>()
    .WithCorrelationId(correlationId)
    .WithCausationId(causationId)
    .WithSource("order-service")
    .WithVersion("1.0")
    .WithTtl(TimeSpan.FromMinutes(30))
    .WithProperty("tracking-number", trackingNumber)
    .WithProperty("priority", "urgent")
    .PublishAsync(new OrderShipped(orderId, trackingNumber) { ExchangeName = "order-service" });

// Advanced fluent configuration
await _streamFlow.EventBus.Event<OrderCreated>()
    .WithMetadata(metadata =>
    {
        metadata.CorrelationId = correlationId;
        metadata.Source = "order-service";
        metadata.Version = 1;
        metadata.Aggregate = new AggregateMetadata
        {
            Id = orderId.ToString(),
            Type = "Order"
        };
    })
    .WithProperties(new Dictionary<string, object>
    {
        ["priority"] = "high",
        ["region"] = "us-east-1",
        ["customer-tier"] = "premium"
    })
    .PublishAsync(new OrderCreated(orderId, customerName, amount));

// Subscribe to events (same as direct API)
await _streamFlow.EventBus.SubscribeToDomainEventAsync<OrderCreated>(
    "Order", // Aggregate type
    new OrderCreatedHandler());

await _streamFlow.EventBus.SubscribeToIntegrationEventAsync<OrderShipped>(
    "order-service", // Exchange name
    new OrderShippedHandler());

Fluent Event Store API

Manage event streams with comprehensive fluent operations:

// Store events with fluent API
await _streamFlow.EventStore.Stream($"order-{orderId}")
    .AppendEvent(new OrderCreated(orderId, customerName, amount))
    .AppendEvent(new OrderItemAdded(orderId, itemId, quantity))
    .WithSnapshot(orderSnapshot, version: 50)
    .SaveAsync();

// Read events with fluent API
var events = await _streamFlow.EventStore.Stream($"order-{orderId}")
    .FromVersion(0)
    .WithMaxCount(100)
    .ReadAsync();

// Read events backwards
var recentEvents = await _streamFlow.EventStore.Stream($"order-{orderId}")
    .FromVersion(-1)
    .WithMaxCount(10)
    .ReadBackwardAsync();

// Stream management
await _streamFlow.EventStore.Stream("order-events").CreateAsync();
await _streamFlow.EventStore.Stream("order-events").DeleteAsync();
await _streamFlow.EventStore.Stream("order-events").TruncateAsync(version: 50);

// Snapshot management
var snapshot = await _streamFlow.EventStore.Stream("order-events").GetSnapshotAsync();
await _streamFlow.EventStore.Stream("order-events")
    .WithSnapshot(orderSnapshot, version: 100)
    .SaveSnapshotAsync();

Comprehensive Error Handling

Built-in retry policies, circuit breakers, and dead letter queues:

builder.Services.AddRabbitMQStreamFlow(options =>
{
    // Connection settings
    options.ConnectionSettings.Host = "localhost";
    options.ConnectionSettings.Port = 5672;
    options.ConnectionSettings.Username = "guest";
    options.ConnectionSettings.Password = "guest";
    options.ConnectionSettings.VirtualHost = "/";
    options.ConnectionSettings.ConnectionTimeout = TimeSpan.FromSeconds(30);
    
    // Client configuration
    options.ClientConfiguration.EnableAutoRecovery = true;
    options.ClientConfiguration.EnableHeartbeat = true;
    options.ClientConfiguration.HeartbeatInterval = TimeSpan.FromSeconds(60);
    
    // Producer settings
    options.ProducerSettings.EnablePublisherConfirms = true;
    options.ProducerSettings.ConfirmationTimeout = TimeSpan.FromSeconds(5);
    options.ProducerSettings.MaxConcurrentPublishes = 100;
    
    // Consumer settings
    options.ConsumerSettings.PrefetchCount = 50;
    options.ConsumerSettings.AutoAcknowledge = false;
    options.ConsumerSettings.MaxConcurrentConsumers = 5;
    options.ConsumerSettings.EnableDeadLetterQueue = true;
    options.ConsumerSettings.DeadLetterSettings = new DeadLetterSettings
    {
        ExchangeName = "dlx",
        RoutingKey = "failed",
        Enabled = true,
        MaxRetries = 3,
        MessageTtl = TimeSpan.FromHours(24)
    };
    
    // Retry policy settings
    options.ClientConfiguration.RetryPolicy = new RetryPolicySettings
    {
        MaxRetryAttempts = 3,
        InitialRetryDelay = TimeSpan.FromSeconds(1),
        MaxRetryDelay = TimeSpan.FromSeconds(30),
        RetryDelayMultiplier = 2.0,
        UseExponentialBackoff = true,
        UseJitter = true
    };
    
    // Error handling settings
    options.ConsumerSettings.ErrorHandling = new ErrorHandlingSettings
    {
        Strategy = ErrorHandlingStrategy.Requeue,
        MaxRetries = 3,
        RetryDelay = TimeSpan.FromSeconds(1),
        UseExponentialBackoff = true,
        LogErrors = true,
        ContinueOnError = true
    };
});

Production Monitoring and Health Checks

Comprehensive monitoring and observability:

// Health check integration
builder.Services.AddHealthChecks()
    .AddCheck<RabbitMQHealthCheck>("rabbitmq", tags: new[] { "messaging" });

// Custom health check implementation
public class RabbitMQHealthCheck : IHealthCheck
{
    private readonly IStreamFlowClient _streamFlow;
    
    public RabbitMQHealthCheck(IStreamFlowClient streamFlow) => _streamFlow = streamFlow;
    
    public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
    {
        try
        {
            var healthResult = await _streamFlow.HealthChecker.CheckHealthAsync(cancellationToken);
            
            if (healthResult.IsHealthy)
            {
                return HealthCheckResult.Healthy("RabbitMQ connection is healthy", 
                    new Dictionary<string, object>
                    {
                        ["connection_state"] = _streamFlow.ConnectionManager.State,
                        ["messages_sent"] = _streamFlow.Producer.Statistics.TotalMessagesPublished,
                        ["messages_received"] = _streamFlow.Consumer.Statistics.TotalMessagesConsumed
                    });
            }
            
            return HealthCheckResult.Unhealthy("RabbitMQ connection is unhealthy", 
                healthResult.Exception);
        }
        catch (Exception ex)
        {
            return HealthCheckResult.Unhealthy("RabbitMQ health check failed", ex);
        }
    }
}

// Metrics collection
public class MetricsService
{
    private readonly IStreamFlowClient _streamFlow;
    private readonly ILogger<MetricsService> _logger;
    
    public MetricsService(IStreamFlowClient streamFlow, ILogger<MetricsService> logger)
    {
        _streamFlow = streamFlow;
        _logger = logger;
    }
    
    public async Task CollectMetricsAsync()
    {
        var clientStats = _streamFlow.MetricsCollector.GetClientStatistics();
        var connectionStats = _streamFlow.ConnectionManager.Statistics;
        var producerStats = _streamFlow.Producer.Statistics;
        var consumerStats = _streamFlow.Consumer.Statistics;
        
        _logger.LogInformation("Client Statistics: {ClientStats}", clientStats);
        _logger.LogInformation("Connection Statistics: {ConnectionStats}", connectionStats);
        _logger.LogInformation("Producer Statistics: {ProducerStats}", producerStats);
        _logger.LogInformation("Consumer Statistics: {ConsumerStats}", consumerStats);
        
        // Send metrics to monitoring system
        await SendMetricsToMonitoringSystemAsync(clientStats, connectionStats, producerStats, consumerStats);
    }
}

Saga Orchestration

Long-running workflow management with compensation:

// Saga implementation
public class OrderProcessingSaga : ISaga<OrderProcessingSagaState>
{
    private readonly IStreamFlowClient _streamFlow;
    private readonly ILogger<OrderProcessingSaga> _logger;
    
    public OrderProcessingSaga(IStreamFlowClient streamFlow, ILogger<OrderProcessingSaga> logger)
    {
        _streamFlow = streamFlow;
        _logger = logger;
    }
    
    public async Task HandleAsync(OrderCreated @event, SagaContext context)
    {
        State.OrderId = @event.OrderId;
        State.Status = "Processing";
        
        // Send command to reserve inventory
        await context.SendAsync(new ReserveInventory(@event.OrderId, @event.Items));
    }
    
    public async Task HandleAsync(InventoryReserved @event, SagaContext context)
    {
        State.Status = "InventoryReserved";
        
        // Send command to process payment
        await context.SendAsync(new ProcessPayment(@event.OrderId, State.Amount));
    }
    
    public async Task HandleAsync(PaymentProcessed @event, SagaContext context)
    {
        State.Status = "Completed";
        
        // Complete the saga
        await context.CompleteAsync();
    }
    
    public async Task CompensateAsync(SagaContext context)
    {
        _logger.LogWarning("Compensating order processing saga for order {OrderId}", State.OrderId);
        
        // Implement compensation logic
        if (State.Status == "InventoryReserved")
        {
            await context.SendAsync(new ReleaseInventory(State.OrderId));
        }
        
        if (State.Status == "PaymentProcessed")
        {
            await context.SendAsync(new RefundPayment(State.OrderId));
        }
    }
}

// Saga state
public class OrderProcessingSagaState
{
    public Guid OrderId { get; set; }
    public string Status { get; set; } = string.Empty;
    public decimal Amount { get; set; }
    public List<OrderItem> Items { get; set; } = new();
}

// Start saga
await _streamFlow.SagaOrchestrator.StartSagaAsync<OrderProcessingSaga>(
    sagaId: order.Id,
    initialEvent: new OrderCreated(order.Id, order.CustomerName, order.Total));

Message Serialization and Compression

Support for multiple serialization formats and compression:

// Configure serialization settings
builder.Services.AddRabbitMQStreamFlow(options =>
{
    // Serialization settings
    options.ClientConfiguration.Serialization = new SerializationSettings
    {
        Format = SerializationFormat.Json,
        EnableCompression = true,
        CompressionAlgorithm = CompressionAlgorithm.Gzip,
        CompressionThreshold = 1024,
        IncludeTypeInformation = true
    };
});

// Custom serialization
var serializer = _streamFlow.SerializerFactory.CreateSerializer(SerializationFormat.Json);
var messageBytes = await serializer.SerializeAsync(order, cancellationToken);
var deserializedOrder = await serializer.DeserializeAsync<Order>(messageBytes, cancellationToken);

Connection Management and Recovery

Automatic connection recovery with intelligent backoff:

// Connection event handling
_streamFlow.ConnectionManager.Connected += (sender, e) =>
{
    _logger.LogInformation("Connected to RabbitMQ at {Timestamp}", e.Timestamp);
};

_streamFlow.ConnectionManager.Disconnected += (sender, e) =>
{
    _logger.LogWarning("Disconnected from RabbitMQ: {Reason}", e.Reason);
};

_streamFlow.ConnectionManager.Recovering += (sender, e) =>
{
    _logger.LogInformation("Attempting to reconnect to RabbitMQ...");
};

_streamFlow.ConnectionManager.Recovered += (sender, e) =>
{
    _logger.LogInformation("Successfully reconnected to RabbitMQ");
};

// Manual connection management
await _streamFlow.ConnectionManager.ConnectAsync(cancellationToken);
var isConnected = _streamFlow.ConnectionManager.IsConnected;
var connectionStats = _streamFlow.ConnectionManager.Statistics;

Queue and Exchange Management

Comprehensive infrastructure management:

// Queue management
await _streamFlow.QueueManager.Queue("order-processing")
    .WithDurable(true)
    .WithExclusive(false)
    .WithAutoDelete(false)
    .WithArguments(new Dictionary<string, object>
    {
        ["x-message-ttl"] = 3600000,
        ["x-max-length"] = 10000
    })
    .WithDeadLetterExchange("dlx")
    .WithDeadLetterRoutingKey("order.failed")
    .WithMessageTtl(TimeSpan.FromHours(24))
    .WithMaxLength(10000)
    .WithMaxLengthBytes(1024 * 1024 * 100) // 100MB
    .WithPriority(5)
    .BindToExchange("orders", "order.created")
    .DeclareAsync();

// Exchange management
await _streamFlow.ExchangeManager.Exchange("orders")
    .WithType("topic")
    .AsTopic()
    .WithDurable(true)
    .WithAutoDelete(false)
    .WithArguments(new Dictionary<string, object>
    {
        ["alternate-exchange"] = "alt-orders"
    })
    .WithAlternateExchange("alt-orders")
    .WithInternal(false)
    .BindToExchange("master-orders", "order.*")
    .DeclareAsync();

// Support for all exchange types
await _streamFlow.ExchangeManager.Exchange("direct-orders").AsDirect().DeclareAsync();
await _streamFlow.ExchangeManager.Exchange("fanout-orders").AsFanout().DeclareAsync();
await _streamFlow.ExchangeManager.Exchange("header-orders").AsHeaders().DeclareAsync();

πŸ“Š Performance and Monitoring

Performance Optimization

// High-throughput configuration
builder.Services.AddRabbitMQStreamFlow(options =>
{
    // Client configuration
    options.ClientConfiguration.ClientName = "High-Throughput Application";
    options.ClientConfiguration.EnableAutoRecovery = true;
    options.ClientConfiguration.EnableHeartbeat = true;
    options.ClientConfiguration.HeartbeatInterval = TimeSpan.FromSeconds(30);
    
    // Connection settings
    options.ConnectionSettings.Host = "localhost";
    options.ConnectionSettings.Port = 5672;
    options.ConnectionSettings.Username = "guest";
    options.ConnectionSettings.Password = "guest";
    options.ConnectionSettings.VirtualHost = "/";
    options.ConnectionSettings.ConnectionTimeout = TimeSpan.FromSeconds(30);
    
    // Producer settings
    options.ProducerSettings.EnablePublisherConfirms = true;
    options.ProducerSettings.MaxConcurrentPublishes = 1000;
    
    // Consumer settings
    options.ConsumerSettings.PrefetchCount = 100;
    options.ConsumerSettings.MaxConcurrentConsumers = 10;
    options.ConsumerSettings.AutoAcknowledge = false;
});

Monitoring and Observability

// Comprehensive monitoring
public class MonitoringService
{
    private readonly IStreamFlowClient _streamFlow;
    private readonly ILogger<MonitoringService> _logger;
    
    public MonitoringService(IStreamFlowClient streamFlow, ILogger<MonitoringService> logger)
    {
        _streamFlow = streamFlow;
        _logger = logger;
    }
    
    public async Task MonitorSystemAsync()
    {
        // Client status
        var clientStatus = _streamFlow.Status;
        var clientStats = _streamFlow.MetricsCollector.GetClientStatistics();
        
        // Connection monitoring
        var connectionState = _streamFlow.ConnectionManager.State;
        var connectionStats = _streamFlow.ConnectionManager.Statistics;
        
        // Producer monitoring
        var producerStats = _streamFlow.Producer.Statistics;
        var producerSuccessRate = producerStats.PublishSuccessRate;
        
        // Consumer monitoring
        var consumerStats = _streamFlow.Consumer.Statistics;
        var consumerSuccessRate = consumerStats.ProcessingSuccessRate;
        
        // Log metrics
        _logger.LogInformation("System Status: {Status}, Success Rate: {SuccessRate}%", 
            clientStatus, producerSuccessRate);
        
        // Alert on issues
        if (producerSuccessRate < 95)
        {
            _logger.LogWarning("Producer success rate is low: {SuccessRate}%", producerSuccessRate);
        }
        
        if (consumerSuccessRate < 95)
        {
            _logger.LogWarning("Consumer success rate is low: {SuccessRate}%", consumerSuccessRate);
        }
    }
}

πŸš€ Getting Started Examples

Example 1: Simple Message Publishing

public class SimplePublisher
{
    private readonly IStreamFlowClient _streamFlow;
    
    public SimplePublisher(IStreamFlowClient streamFlow) => _streamFlow = streamFlow;
    
    public async Task PublishMessageAsync(string message)
    {
        // Initialize the client first
        await _streamFlow.InitializeAsync();
        
        // Declare exchange
        await _streamFlow.ExchangeManager.Exchange("messages")
            .AsTopic()
            .WithDurable(true)
            .DeclareAsync();
        
        // Publish message
        await _streamFlow.Producer.Message(message)
            .WithExchange("messages")
            .WithRoutingKey("message.created")
            .WithDeliveryMode(DeliveryMode.Persistent)
            .PublishAsync();
    }
}

Example 2: Simple Message Consumption

public class SimpleConsumer
{
    private readonly IStreamFlowClient _streamFlow;
    
    public SimpleConsumer(IStreamFlowClient streamFlow) => _streamFlow = streamFlow;
    
    public async Task StartConsumingAsync()
    {
        // Initialize the client first
        await _streamFlow.InitializeAsync();
        
        // Declare queue
        await _streamFlow.QueueManager.Queue("message-processing")
            .WithDurable(true)
            .BindToExchange("messages", "message.*")
            .DeclareAsync();
        
        // Consume messages
        await _streamFlow.Consumer.Queue<string>("message-processing")
            .WithConcurrency(3)
            .WithPrefetchCount(10)
            .ConsumeAsync(async (message, context) =>
            {
                await ProcessMessageAsync(message);
                return true; // Acknowledge message
            });
    }
    
    private async Task ProcessMessageAsync(string message)
    {
        // Process the message
        Console.WriteLine($"Processing message: {message}");
        await Task.Delay(100); // Simulate processing
    }
}

Example 3: Event-Driven Architecture

// Domain Event
public record OrderCreated(Guid OrderId, string CustomerName, decimal Amount) : IDomainEvent
{
    public Guid Id { get; } = Guid.NewGuid();
    public DateTime OccurredOn { get; } = DateTime.UtcNow;
    public int Version { get; } = 1;
    public string EventType => nameof(OrderCreated);
    public string? CorrelationId { get; set; }
    public string? CausationId { get; set; }
    public IDictionary<string, object> Metadata { get; } = new Dictionary<string, object>();
    public string AggregateId => OrderId.ToString();
    public string AggregateType => "Order";
    public long AggregateVersion { get; set; }
    public string? InitiatedBy { get; set; }
}

// Event Handler
public class OrderCreatedHandler : IAsyncEventHandler<OrderCreated>
{
    private readonly IStreamFlowClient _streamFlow;
    
    public OrderCreatedHandler(IStreamFlowClient streamFlow) => _streamFlow = streamFlow;
    
    public async Task HandleAsync(OrderCreated @event, EventContext context)
    {
        // Handle domain event
        await SendOrderConfirmationAsync(@event);
        
        // Publish integration event
        var orderConfirmed = new OrderConfirmed(@event.OrderId, @event.CustomerName)
        {
            ExchangeName = "order-service", // Exchange name
            CorrelationId = context.CorrelationId,
            CausationId = context.EventId,
            Source = "order-service"
        };
        await _streamFlow.EventBus.PublishIntegrationEventAsync(orderConfirmed);
    }
    
    private async Task SendOrderConfirmationAsync(OrderCreated @event)
    {
        // Send email confirmation
        Console.WriteLine($"Sending confirmation for order {@event.OrderId}");
        await Task.Delay(100); // Simulate email sending
    }
}

πŸ“š Additional Resources

Documentation

Examples

🀝 Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Setup

# Clone the repository
git clone https://github.com/furkansarikaya/FS.StreamFlow.git
cd FS.StreamFlow

# Restore dependencies
dotnet restore

# Build the solution
dotnet build

# Run tests
dotnet test

# Build packages
dotnet pack

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ™ Acknowledgments

  • Built on top of RabbitMQ.Client
  • Inspired by modern messaging patterns and event-driven architecture
  • Designed for production-ready .NET applications

FS.StreamFlow - Enterprise-grade messaging framework for .NET 9 with comprehensive event-driven capabilities, automatic recovery, and production-ready features.

πŸ“¦ Packages

Package Description NuGet
FS.StreamFlow.Core Core library with all features NuGet
FS.StreamFlow.RabbitMQ RabbitMQ implementation NuGet

🌟 Star History

If you find this library useful, please consider giving it a star on GitHub! It helps others discover the project.

Made with ❀️ by Furkan Sarıkaya

GitHub LinkedIn Medium


Support

If you encounter any issues or have questions:

  1. Search existing GitHub issues
  2. Create a new issue with detailed information
  3. Join our community discussions

Happy coding! πŸš€

Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Alpha Release 9.0.0-alpha.20250720-0108

           🐰 RabbitMQ Implementation:
           β€’ High-performance RabbitMQ client for FS.StreamFlow
           β€’ Production-ready with automatic connection recovery
           β€’ Advanced error handling with retry policies and circuit breaker
           β€’ Publisher confirmations and transactional publishing
           β€’ Dead letter queue management

           🎯 Enterprise Features:
           β€’ Event-driven architecture with domain and integration events
           β€’ Event sourcing with snapshot support
           β€’ Saga orchestration for distributed transactions
           β€’ Comprehensive monitoring and health checks
           β€’ Connection pooling and channel management

           βš‘ Performance Optimized:
           β€’ Batch publishing for high throughput (50k+ msgs/sec)
           β€’ Async-first API design
           β€’ Memory-efficient serialization
           β€’ Connection recovery with exponential backoff

           🔧 Configuration:
           β€’ Fluent configuration API
           β€’ SSL/TLS support
           β€’ Environment-specific settings
           β€’ Integration with Microsoft.Extensions.*

           📚 Complete Documentation:
           Comprehensive guides for producers, consumers, event-driven architecture, performance tuning, and production deployment.

           βš οΈ Alpha Notice:
           This is an alpha release for testing and feedback. Not recommended for production use.