Mostlylucid.Ephemeral.Patterns.ReactiveFanOut
1.7.1
dotnet add package Mostlylucid.Ephemeral.Patterns.ReactiveFanOut --version 1.7.1
NuGet\Install-Package Mostlylucid.Ephemeral.Patterns.ReactiveFanOut -Version 1.7.1
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Mostlylucid.Ephemeral.Patterns.ReactiveFanOut" Version="1.7.1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Mostlylucid.Ephemeral.Patterns.ReactiveFanOut" Version="1.7.1" />
<PackageReference Include="Mostlylucid.Ephemeral.Patterns.ReactiveFanOut" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Mostlylucid.Ephemeral.Patterns.ReactiveFanOut --version 1.7.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Mostlylucid.Ephemeral.Patterns.ReactiveFanOut, 1.7.1"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Mostlylucid.Ephemeral.Patterns.ReactiveFanOut@1.7.1
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Mostlylucid.Ephemeral.Patterns.ReactiveFanOut&version=1.7.1
#tool nuget:?package=Mostlylucid.Ephemeral.Patterns.ReactiveFanOut&version=1.7.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Mostlylucid.Ephemeral.Patterns.ReactiveFanOut
🚨🚨 WARNING 🚨🚨 - Though in the 1.x range of version THINGS WILL STILL BREAK. This is the lab for developing this concept when stabilized it'll becoe the first styloflow release 🚨🚨🚨
Two-stage reactive pipeline that automatically throttles stage 1 when stage 2 signals backpressure.
dotnet add package mostlylucid.ephemeral.patterns.reactivefanout
Quick Start
using Mostlylucid.Ephemeral.Patterns.ReactiveFanOut;
await using var pipeline = new ReactiveFanOutPipeline<Message>(
stage2Work: async (msg, ct) => await SaveToDbAsync(msg, ct),
preStageWork: async (msg, ct) => await ValidateAsync(msg, ct),
stage1MaxConcurrency: 16,
stage1MinConcurrency: 2,
stage2MaxConcurrency: 4,
backpressureThreshold: 100);
await pipeline.EnqueueAsync(message);
await pipeline.DrainAsync();
All Options
new ReactiveFanOutPipeline<T>(
// Required: stage 2 async work body
stage2Work: async (item, ct) => await ProcessAsync(item, ct),
// Optional: pre-stage work (runs in stage 1 before handoff)
// Default: null (no-op)
preStageWork: async (item, ct) => await ValidateAsync(item, ct),
// Stage 1 max concurrency (scales down under pressure)
// Default: 8
stage1MaxConcurrency: 8,
// Stage 1 min concurrency (floor when throttled)
// Default: 1
stage1MinConcurrency: 1,
// Stage 2 max concurrency (fixed)
// Default: 4
stage2MaxConcurrency: 4,
// Stage 2 pending count that triggers backpressure
// Default: 32
backpressureThreshold: 32,
// Stage 2 pending count that clears backpressure
// Default: 8
reliefThreshold: 8,
// Cooldown between concurrency adjustments (ms)
// Default: 200
adjustCooldownMs: 200,
// Optional shared signal sink
// Default: null (creates internal)
sink: signalSink
)
API Reference
// Enqueue work item
await pipeline.EnqueueAsync(item, ct);
// Check current stage 1 concurrency
int stage1Concurrency = pipeline.Stage1CurrentMaxConcurrency;
// Check stage 2 pending count
int stage2Pending = pipeline.Stage2Pending;
// Drain both stages and dispose
await pipeline.DrainAsync(ct);
await pipeline.DisposeAsync();
How It Works
Stage 1 (Validation/Transform) Stage 2 (Slow I/O)
┌─────────────────────────────┐ ┌─────────────────┐
│ Max: 16, Min: 2 │───>│ Max: 4 │
│ Dynamic based on pressure │ │ Fixed │
└─────────────────────────────┘ └─────────────────┘
│
▼
Pending > 32? ──> Throttle Stage 1
Pending < 8? ──> Restore Stage 1
Signals emitted:
stage2.backpressure- When stage 2 pending exceeds thresholdstage2.failed- When stage 2 work fails
Example: ETL Pipeline
await using var pipeline = new ReactiveFanOutPipeline<Record>(
stage2Work: async (record, ct) =>
{
// Slow database insert
await database.InsertAsync(record, ct);
},
preStageWork: async (record, ct) =>
{
// Fast validation and transform
await ValidateSchema(record, ct);
record.Timestamp = DateTimeOffset.UtcNow;
},
stage1MaxConcurrency: 32,
stage1MinConcurrency: 4,
stage2MaxConcurrency: 8,
backpressureThreshold: 200,
reliefThreshold: 50);
// When DB slows down, Stage 1 throttles automatically
foreach (var record in records)
await pipeline.EnqueueAsync(record);
Example: Monitoring Pipeline State
var sink = new SignalSink();
await using var pipeline = new ReactiveFanOutPipeline<Data>(
stage2Work: ProcessDataAsync,
sink: sink);
// Monitor in background
Task.Run(async () =>
{
while (true)
{
Console.WriteLine($"Stage1 Concurrency: {pipeline.Stage1CurrentMaxConcurrency}");
Console.WriteLine($"Stage2 Pending: {pipeline.Stage2Pending}");
var backpressure = sink.Sense(s => s.Signal == "stage2.backpressure");
if (backpressure.Any())
Console.WriteLine("! Backpressure active");
await Task.Delay(1000);
}
});
Related Packages
| Package | Description |
|---|---|
| mostlylucid.ephemeral | Core library |
| mostlylucid.ephemeral.patterns.backpressure | Simple backpressure |
| mostlylucid.ephemeral.patterns.dynamicconcurrency | Dynamic concurrency |
| mostlylucid.ephemeral.complete | All in one DLL |
License
Unlicense (public domain)
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 is compatible. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net10.0
- mostlylucid.ephemeral (>= 1.7.1)
-
net6.0
- mostlylucid.ephemeral (>= 1.7.1)
-
net7.0
- mostlylucid.ephemeral (>= 1.7.1)
-
net8.0
- mostlylucid.ephemeral (>= 1.7.1)
-
net9.0
- mostlylucid.ephemeral (>= 1.7.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.