Mostlylucid.Ephemeral.Atoms.PriorityProcessor
2.3.2
dotnet add package Mostlylucid.Ephemeral.Atoms.PriorityProcessor --version 2.3.2
NuGet\Install-Package Mostlylucid.Ephemeral.Atoms.PriorityProcessor -Version 2.3.2
<PackageReference Include="Mostlylucid.Ephemeral.Atoms.PriorityProcessor" Version="2.3.2" />
<PackageVersion Include="Mostlylucid.Ephemeral.Atoms.PriorityProcessor" Version="2.3.2" />
<PackageReference Include="Mostlylucid.Ephemeral.Atoms.PriorityProcessor" />
paket add Mostlylucid.Ephemeral.Atoms.PriorityProcessor --version 2.3.2
#r "nuget: Mostlylucid.Ephemeral.Atoms.PriorityProcessor, 2.3.2"
#:package Mostlylucid.Ephemeral.Atoms.PriorityProcessor@2.3.2
#addin nuget:?package=Mostlylucid.Ephemeral.Atoms.PriorityProcessor&version=2.3.2
#tool nuget:?package=Mostlylucid.Ephemeral.Atoms.PriorityProcessor&version=2.3.2
Mostlylucid.Ephemeral.Atoms.PriorityProcessor
π¨π¨ WARNING π¨π¨ - Though in the 2.x range of version THINGS WILL STILL BREAK. This is the lab for developing this concept when stabilized it'll become the first styloflow release π¨π¨π¨
Priority-based processing with automatic failover, health monitoring, and self-healing recovery.
dotnet add package mostlylucid.ephemeral.atoms.priorityprocessor
Quick Start
using Mostlylucid.Ephemeral.Atoms.PriorityProcessor;
var sink = new SignalSink();
// Priority 1 (primary) processor - handles most work
await using var primary = new PriorityProcessorAtom(
priority: 1,
processFunc: async (id, ct) => await ProcessWidget(id, ct),
signals: sink,
failureThreshold: 3);
// Priority 2 (backup) processor - takes over on failures
await using var backup = new PriorityProcessorAtom(
priority: 2,
processFunc: async (id, ct) => await ProcessWidgetBackup(id, ct),
signals: sink);
// Enqueue work - automatically routes based on health
await primary.EnqueueAsync("widget-123");
// Primary fails 3 times β automatic failover to backup
// Periodic probing restores primary when healthy
await primary.DrainAsync();
await backup.DrainAsync();
Key Features
π Automatic Failover
Primary processor fails repeatedly? Backup processor instantly takes over based on signal-driven health checks.
π₯ Self-Healing
Periodic probing automatically detects when primary processor recovers and restores routing.
π Full Observability
All decisions captured as signals:
processing.started:pri{N}:{id}processing.complete:pri{N}:{id}processing.failed:pri{N}:{id}failover.requested:pri{N}βpri{N+1}probe.testing:pri{N}probe.success:pri{N}
β‘ Zero Overhead
Signal-based coordination with lock-free listener arrays - no polling, no timers during normal operation.
All Options
new PriorityProcessorAtom(
// Required: priority level (1 = highest)
priority: 1,
// Required: async processing function
processFunc: async (itemId, ct) => await Process(itemId, ct),
// Required: shared signal sink
signals: sink,
// Consecutive failures before marking unhealthy
// Default: 3
failureThreshold: 3,
// Max concurrent operations
// Default: Environment.ProcessorCount
maxConcurrency: 4,
// Time window for failure counting
// Default: 10 seconds
failureWindow: TimeSpan.FromSeconds(10),
// Probe interval when unhealthy
// Default: 5 seconds
probeInterval: TimeSpan.FromSeconds(5)
)
Pattern: Dynamic Adaptive Workflow
var globalSink = new SignalSink(maxCapacity: 5000);
var processorHealth = new ConcurrentDictionary<int, bool> { [1] = true, [2] = true };
// Primary processor (30% failure rate for demo)
await using var processor1 = new EphemeralWorkCoordinator<string>(
async (widgetId, ct) =>
{
globalSink.Raise($"processing.started:pri1:{widgetId}");
await Task.Delay(Random.Shared.Next(30, 80), ct);
var success = Random.Shared.NextDouble() > 0.3;
if (success)
{
processorHealth[1] = true;
globalSink.Raise($"processing.complete:pri1:{widgetId}");
}
else
{
globalSink.Raise($"processing.failed:pri1:{widgetId}");
// OPTIMIZED health check using CountRecentByPrefix
var recentFailures = globalSink.CountRecentByPrefix(
"processing.failed:pri1",
DateTimeOffset.UtcNow.AddSeconds(-10));
if (recentFailures >= 3 && processorHealth[1])
{
processorHealth[1] = false;
globalSink.Raise("failover.triggered:pri1βpri2");
}
}
},
new EphemeralOptions { MaxConcurrency = 4, Signals = globalSink }
);
// Backup processor (5% failure rate, reliable)
await using var processor2 = new EphemeralWorkCoordinator<string>(
async (widgetId, ct) =>
{
globalSink.Raise($"processing.started:pri2:{widgetId}");
await Task.Delay(Random.Shared.Next(50, 120), ct);
var success = Random.Shared.NextDouble() > 0.05;
if (success)
{
globalSink.Raise($"processing.complete:pri2:{widgetId}");
}
else
{
globalSink.Raise($"processing.failed:pri2:{widgetId}");
}
},
new EphemeralOptions { MaxConcurrency = 4, Signals = globalSink }
);
// Router coordinator - dynamic routing based on health
await using var router = new EphemeralWorkCoordinator<string>(
async (widgetId, ct) =>
{
var targetPriority = processorHealth[1] ? 1 : 2;
globalSink.Raise($"route.assigned:pri{targetPriority}:{widgetId}");
if (targetPriority == 1)
await processor1.EnqueueAsync(widgetId);
else
await processor2.EnqueueAsync(widgetId);
},
new EphemeralOptions { MaxConcurrency = 16, Signals = globalSink }
);
// Self-healing prober
var proberCts = new CancellationTokenSource();
var probeTask = Task.Run(async () =>
{
while (!proberCts.Token.IsCancellationRequested)
{
await Task.Delay(3000, proberCts.Token);
if (!processorHealth[1])
{
globalSink.Raise("probe.testing:pri1");
var recovered = await TestProcessor1Health();
if (recovered)
{
globalSink.Raise("probe.success:pri1");
processorHealth[1] = true;
}
}
}
}, proberCts.Token);
// Process widgets
for (int i = 0; i < 100; i++)
{
await router.EnqueueAsync($"WIDGET-{i}");
}
// Cleanup
router.Complete();
processor1.Complete();
processor2.Complete();
await router.DrainAsync();
await processor1.DrainAsync();
await processor2.DrainAsync();
proberCts.Cancel();
Performance
Optimized Signal Queries - Uses CountRecentByPrefix() for 70% faster health checks:
- Before (LINQ + Sense): 94.49Β΅s per query | 10,583 queries/sec
- After (CountRecentByPrefix): 55.57Β΅s per query | 17,994 queries/sec
Signal Raising - Lock-free performance:
- 790,000+ signals/sec single-threaded
- 850,000+ signals/sec with 4 concurrent threads
Failover Latency - <10ms from failure detection to routing change
Use Cases
Multi-Tenant SaaS
Route customer requests to healthy region-specific processors with automatic failover.
Edge Computing
Prioritize on-device processing, fall back to cloud with self-healing when device recovers.
Financial Processing
Primary fast-path validation, backup comprehensive audit with automatic recovery.
IoT Data Pipelines
Route sensor data to primary aggregator, failover to backup on network issues.
Microservices
Service mesh routing with health-based failover and recovery.
Related Packages
| Package | Description |
|---|---|
| mostlylucid.ephemeral | Core library |
| mostlylucid.ephemeral.atoms.retry | Retry with backoff |
| mostlylucid.ephemeral.patterns.circuitbreaker | Circuit breaker |
| mostlylucid.ephemeral.complete | All in one DLL |
License
Unlicense (public domain)
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- mostlylucid.ephemeral (>= 2.3.2)
-
net8.0
- mostlylucid.ephemeral (>= 2.3.2)
-
net9.0
- mostlylucid.ephemeral (>= 2.3.2)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Mostlylucid.Ephemeral.Atoms.PriorityProcessor:
| Package | Downloads |
|---|---|
|
mostlylucid.ephemeral.complete
Meta-package that references all Mostlylucid.Ephemeral packages - bounded async execution with signals, atoms, and patterns. Install this single package to get everything. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2.3.2 | 106 | 1/9/2026 |
| 2.3.1 | 106 | 1/9/2026 |
| 2.3.1-alpha0 | 104 | 1/9/2026 |
| 2.3.0 | 961 | 1/8/2026 |
| 2.3.0-alpha1 | 104 | 1/8/2026 |
| 2.1.0 | 102 | 1/8/2026 |
| 2.1.0-preview | 103 | 1/8/2026 |
| 2.0.1 | 107 | 1/8/2026 |
| 2.0.0 | 142 | 1/8/2026 |
| 2.0.0-alpha1 | 95 | 1/8/2026 |
| 2.0.0-alpha0 | 98 | 1/8/2026 |
| 1.7.1 | 426 | 12/11/2025 |