Mostlylucid.Ephemeral.Atoms.PriorityProcessor 2.3.2

dotnet add package Mostlylucid.Ephemeral.Atoms.PriorityProcessor --version 2.3.2
                    
NuGet\Install-Package Mostlylucid.Ephemeral.Atoms.PriorityProcessor -Version 2.3.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Mostlylucid.Ephemeral.Atoms.PriorityProcessor" Version="2.3.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Mostlylucid.Ephemeral.Atoms.PriorityProcessor" Version="2.3.2" />
                    
Directory.Packages.props
<PackageReference Include="Mostlylucid.Ephemeral.Atoms.PriorityProcessor" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Mostlylucid.Ephemeral.Atoms.PriorityProcessor --version 2.3.2
                    
#r "nuget: Mostlylucid.Ephemeral.Atoms.PriorityProcessor, 2.3.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Mostlylucid.Ephemeral.Atoms.PriorityProcessor@2.3.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Mostlylucid.Ephemeral.Atoms.PriorityProcessor&version=2.3.2
                    
Install as a Cake Addin
#tool nuget:?package=Mostlylucid.Ephemeral.Atoms.PriorityProcessor&version=2.3.2
                    
Install as a Cake Tool

Mostlylucid.Ephemeral.Atoms.PriorityProcessor

NuGet

🚨🚨 WARNING 🚨🚨 - Though in the 2.x range of version THINGS WILL STILL BREAK. This is the lab for developing this concept when stabilized it'll become the first styloflow release 🚨🚨🚨

Priority-based processing with automatic failover, health monitoring, and self-healing recovery.

dotnet add package mostlylucid.ephemeral.atoms.priorityprocessor

Quick Start

using Mostlylucid.Ephemeral.Atoms.PriorityProcessor;

var sink = new SignalSink();

// Priority 1 (primary) processor - handles most work
await using var primary = new PriorityProcessorAtom(
    priority: 1,
    processFunc: async (id, ct) => await ProcessWidget(id, ct),
    signals: sink,
    failureThreshold: 3);

// Priority 2 (backup) processor - takes over on failures
await using var backup = new PriorityProcessorAtom(
    priority: 2,
    processFunc: async (id, ct) => await ProcessWidgetBackup(id, ct),
    signals: sink);

// Enqueue work - automatically routes based on health
await primary.EnqueueAsync("widget-123");

// Primary fails 3 times β†’ automatic failover to backup
// Periodic probing restores primary when healthy

await primary.DrainAsync();
await backup.DrainAsync();

Key Features

πŸ”„ Automatic Failover

Primary processor fails repeatedly? Backup processor instantly takes over based on signal-driven health checks.

πŸ₯ Self-Healing

Periodic probing automatically detects when primary processor recovers and restores routing.

πŸ“Š Full Observability

All decisions captured as signals:

  • processing.started:pri{N}:{id}
  • processing.complete:pri{N}:{id}
  • processing.failed:pri{N}:{id}
  • failover.requested:pri{N}β†’pri{N+1}
  • probe.testing:pri{N}
  • probe.success:pri{N}

⚑ Zero Overhead

Signal-based coordination with lock-free listener arrays - no polling, no timers during normal operation.


All Options

new PriorityProcessorAtom(
    // Required: priority level (1 = highest)
    priority: 1,

    // Required: async processing function
    processFunc: async (itemId, ct) => await Process(itemId, ct),

    // Required: shared signal sink
    signals: sink,

    // Consecutive failures before marking unhealthy
    // Default: 3
    failureThreshold: 3,

    // Max concurrent operations
    // Default: Environment.ProcessorCount
    maxConcurrency: 4,

    // Time window for failure counting
    // Default: 10 seconds
    failureWindow: TimeSpan.FromSeconds(10),

    // Probe interval when unhealthy
    // Default: 5 seconds
    probeInterval: TimeSpan.FromSeconds(5)
)

Pattern: Dynamic Adaptive Workflow

var globalSink = new SignalSink(maxCapacity: 5000);
var processorHealth = new ConcurrentDictionary<int, bool> { [1] = true, [2] = true };

// Primary processor (30% failure rate for demo)
await using var processor1 = new EphemeralWorkCoordinator<string>(
    async (widgetId, ct) =>
    {
        globalSink.Raise($"processing.started:pri1:{widgetId}");
        await Task.Delay(Random.Shared.Next(30, 80), ct);
        var success = Random.Shared.NextDouble() > 0.3;

        if (success)
        {
            processorHealth[1] = true;
            globalSink.Raise($"processing.complete:pri1:{widgetId}");
        }
        else
        {
            globalSink.Raise($"processing.failed:pri1:{widgetId}");

            // OPTIMIZED health check using CountRecentByPrefix
            var recentFailures = globalSink.CountRecentByPrefix(
                "processing.failed:pri1",
                DateTimeOffset.UtcNow.AddSeconds(-10));

            if (recentFailures >= 3 && processorHealth[1])
            {
                processorHealth[1] = false;
                globalSink.Raise("failover.triggered:pri1β†’pri2");
            }
        }
    },
    new EphemeralOptions { MaxConcurrency = 4, Signals = globalSink }
);

// Backup processor (5% failure rate, reliable)
await using var processor2 = new EphemeralWorkCoordinator<string>(
    async (widgetId, ct) =>
    {
        globalSink.Raise($"processing.started:pri2:{widgetId}");
        await Task.Delay(Random.Shared.Next(50, 120), ct);
        var success = Random.Shared.NextDouble() > 0.05;

        if (success)
        {
            globalSink.Raise($"processing.complete:pri2:{widgetId}");
        }
        else
        {
            globalSink.Raise($"processing.failed:pri2:{widgetId}");
        }
    },
    new EphemeralOptions { MaxConcurrency = 4, Signals = globalSink }
);

// Router coordinator - dynamic routing based on health
await using var router = new EphemeralWorkCoordinator<string>(
    async (widgetId, ct) =>
    {
        var targetPriority = processorHealth[1] ? 1 : 2;
        globalSink.Raise($"route.assigned:pri{targetPriority}:{widgetId}");

        if (targetPriority == 1)
            await processor1.EnqueueAsync(widgetId);
        else
            await processor2.EnqueueAsync(widgetId);
    },
    new EphemeralOptions { MaxConcurrency = 16, Signals = globalSink }
);

// Self-healing prober
var proberCts = new CancellationTokenSource();
var probeTask = Task.Run(async () =>
{
    while (!proberCts.Token.IsCancellationRequested)
    {
        await Task.Delay(3000, proberCts.Token);

        if (!processorHealth[1])
        {
            globalSink.Raise("probe.testing:pri1");

            var recovered = await TestProcessor1Health();
            if (recovered)
            {
                globalSink.Raise("probe.success:pri1");
                processorHealth[1] = true;
            }
        }
    }
}, proberCts.Token);

// Process widgets
for (int i = 0; i < 100; i++)
{
    await router.EnqueueAsync($"WIDGET-{i}");
}

// Cleanup
router.Complete();
processor1.Complete();
processor2.Complete();

await router.DrainAsync();
await processor1.DrainAsync();
await processor2.DrainAsync();

proberCts.Cancel();

Performance

Optimized Signal Queries - Uses CountRecentByPrefix() for 70% faster health checks:

  • Before (LINQ + Sense): 94.49Β΅s per query | 10,583 queries/sec
  • After (CountRecentByPrefix): 55.57Β΅s per query | 17,994 queries/sec

Signal Raising - Lock-free performance:

  • 790,000+ signals/sec single-threaded
  • 850,000+ signals/sec with 4 concurrent threads

Failover Latency - <10ms from failure detection to routing change


Use Cases

Multi-Tenant SaaS

Route customer requests to healthy region-specific processors with automatic failover.

Edge Computing

Prioritize on-device processing, fall back to cloud with self-healing when device recovers.

Financial Processing

Primary fast-path validation, backup comprehensive audit with automatic recovery.

IoT Data Pipelines

Route sensor data to primary aggregator, failover to backup on network issues.

Microservices

Service mesh routing with health-based failover and recovery.


Package Description
mostlylucid.ephemeral Core library
mostlylucid.ephemeral.atoms.retry Retry with backoff
mostlylucid.ephemeral.patterns.circuitbreaker Circuit breaker
mostlylucid.ephemeral.complete All in one DLL

License

Unlicense (public domain)

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Mostlylucid.Ephemeral.Atoms.PriorityProcessor:

Package Downloads
mostlylucid.ephemeral.complete

Meta-package that references all Mostlylucid.Ephemeral packages - bounded async execution with signals, atoms, and patterns. Install this single package to get everything.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.3.2 106 1/9/2026
2.3.1 106 1/9/2026
2.3.1-alpha0 104 1/9/2026
2.3.0 961 1/8/2026
2.3.0-alpha1 104 1/8/2026
2.1.0 102 1/8/2026
2.1.0-preview 103 1/8/2026
2.0.1 107 1/8/2026
2.0.0 142 1/8/2026
2.0.0-alpha1 95 1/8/2026
2.0.0-alpha0 98 1/8/2026
1.7.1 426 12/11/2025