Shuttle.Core.Threading
21.0.1-beta
Prefix Reserved
dotnet add package Shuttle.Core.Threading --version 21.0.1-beta
NuGet\Install-Package Shuttle.Core.Threading -Version 21.0.1-beta
<PackageReference Include="Shuttle.Core.Threading" Version="21.0.1-beta" />
<PackageVersion Include="Shuttle.Core.Threading" Version="21.0.1-beta" />
<PackageReference Include="Shuttle.Core.Threading" />
paket add Shuttle.Core.Threading --version 21.0.1-beta
#r "nuget: Shuttle.Core.Threading, 21.0.1-beta"
#:package Shuttle.Core.Threading@21.0.1-beta
#addin nuget:?package=Shuttle.Core.Threading&version=21.0.1-beta&prerelease
#tool nuget:?package=Shuttle.Core.Threading&version=21.0.1-beta&prerelease
Shuttle.Core.Threading
Provides various classes and interfaces to facilitate thread-based processing using task-based asynchronous patterns.
Installation
dotnet add package Shuttle.Core.Threading
Overview
This library enables you to create thread pools that continuously execute processor implementations. Each processor runs in a loop, performing work and utilizing configurable idle strategies when no work is available. The library uses dependency injection and supports multiple thread pools with different service keys.
Core Components
IProcessor
Implement this interface to define the work that will be executed by processor threads:
public interface IProcessor
{
ValueTask<bool> ExecuteAsync(CancellationToken cancellationToken = default);
}
The return value indicates whether work was performed (true) or not (false), which is used by the idle strategy to determine thread behavior.
IProcessorContext
Available via dependency injection within your processor implementation, providing context about the current execution:
public interface IProcessorContext
{
string ServiceKey { get; }
int ManagedThreadId { get; }
}
ProcessorThreadPool
Manages a pool of processor threads that execute your IProcessor implementation:
public class ProcessorThreadPool(
string serviceKey,
int threadCount,
IServiceScopeFactory serviceScopeFactory,
ThreadingOptions threadingOptions,
IProcessorIdleStrategy processorIdleStrategy
) : IProcessorThreadPool
Parameters:
serviceKey: Identifier used for keyed service resolution and configurationthreadCount: Number of processor threads in the pool (must be > 0)serviceScopeFactory: Factory for creating service scopes for each processor executionthreadingOptions: Configuration options including events and timeoutsprocessorIdleStrategy: Strategy for handling idle periods when no work is performed
Configuration
Service Registration
Register threading services in your dependency injection container:
services.AddThreading(builder =>
{
builder.ConfigureThreading(options =>
{
options.JoinTimeout = TimeSpan.FromSeconds(30);
});
builder.ConfigureProcessorIdle("my-processor", options =>
{
options.Durations = new List<TimeSpan>
{
TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(500),
TimeSpan.FromSeconds(1)
};
});
});
// Register your processor implementation with a service key
services.AddKeyedScoped<IProcessor, MyProcessor>("my-processor");
ThreadingOptions
| Property | Type | Default | Description |
|---|---|---|---|
JoinTimeout |
TimeSpan |
00:00:15 |
Duration to wait for processor threads to stop gracefully |
ProcessorThreadCreated |
AsyncEvent |
- | Raised when a processor thread is created |
ProcessorThreadActive |
AsyncEvent |
- | Raised when a processor thread becomes active |
ProcessorThreadStarting |
AsyncEvent |
- | Raised when a processor thread is starting |
ProcessorThreadStopping |
AsyncEvent |
- | Raised when a processor thread is stopping |
ProcessorThreadStopped |
AsyncEvent |
- | Raised when a processor thread has stopped |
ProcessorThreadOperationCanceled |
AsyncEvent |
- | Raised when a processor operation is canceled |
ProcessorExecuting |
AsyncEvent |
- | Raised before processor execution |
ProcessorExecuted |
AsyncEvent |
- | Raised after processor execution |
ProcessorException |
AsyncEvent |
- | Raised when a processor throws an exception |
ProcessorIdleOptions
Configures the idle strategy behavior when processors return false (no work performed):
public class ProcessorIdleOptions
{
public List<TimeSpan> Durations { get; set; } = [];
}
The Durations list defines progressive wait times. When no work is performed, the thread waits for increasing durations from this list before retrying.
Usage Example
1. Implement IProcessor
public class MyProcessor : IProcessor
{
private readonly IProcessorContext _context;
private readonly ILogger<MyProcessor> _logger;
private readonly IMyWorkQueue _workQueue;
public MyProcessor(
IProcessorContext context,
ILogger<MyProcessor> logger,
IMyWorkQueue workQueue)
{
_context = context;
_logger = logger;
_workQueue = workQueue;
}
public async ValueTask<bool> ExecuteAsync(CancellationToken cancellationToken)
{
_logger.LogInformation(
"Processing on thread {ThreadId} with service key {ServiceKey}",
_context.ManagedThreadId,
_context.ServiceKey);
var workItem = await _workQueue.DequeueAsync(cancellationToken);
if (workItem == null)
{
return false; // No work available
}
await ProcessWorkItemAsync(workItem, cancellationToken);
return true; // Work was performed
}
private async Task ProcessWorkItemAsync(WorkItem item, CancellationToken cancellationToken)
{
// Process the work item
await Task.Delay(100, cancellationToken);
}
}
2. Register Services
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddThreading(threadingBuilder =>
{
threadingBuilder.ConfigureThreading(options =>
{
options.JoinTimeout = TimeSpan.FromSeconds(30);
// Subscribe to events
options.ProcessorException.Subscribe(async args =>
{
Console.WriteLine($"Processor exception: {args.Exception.Message}");
});
});
threadingBuilder.ConfigureProcessorIdle("my-processor", options =>
{
options.Durations = new List<TimeSpan>
{
TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(500),
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(5)
};
});
});
builder.Services.AddKeyedScoped<IProcessor, MyProcessor>("my-processor");
builder.Services.AddSingleton<IMyWorkQueue, MyWorkQueue>();
var host = builder.Build();
await host.RunAsync();
3. Create and Manage Thread Pool
public class MyHostedService : IHostedService
{
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ThreadingOptions _threadingOptions;
private readonly IProcessorIdleStrategy _processorIdleStrategy;
private ProcessorThreadPool? _threadPool;
public MyHostedService(
IServiceScopeFactory serviceScopeFactory,
IOptions<ThreadingOptions> threadingOptions,
IProcessorIdleStrategy processorIdleStrategy)
{
_serviceScopeFactory = serviceScopeFactory;
_threadingOptions = threadingOptions.Value;
_processorIdleStrategy = processorIdleStrategy;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_threadPool = new ProcessorThreadPool(
"my-processor",
threadCount: 5,
_serviceScopeFactory,
_threadingOptions,
_processorIdleStrategy);
await _threadPool.StartAsync(cancellationToken);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_threadPool != null)
{
await _threadPool.StopAsync(cancellationToken);
await _threadPool.DisposeAsync();
}
}
}
Idle Strategies
IProcessorIdleStrategy
Controls thread behavior when processors return false (no work performed):
public interface IProcessorIdleStrategy
{
Task SignalAsync(string serviceKey, bool workPerformed, CancellationToken cancellationToken = default);
}
Built-in Implementations:
- DefaultProcessorIdleStrategy: Uses progressive wait durations from
ProcessorIdleOptions. When no work is performed, waits for increasing durations before retrying. - NullProcessorIdleStrategy: No idle behavior; threads immediately retry without waiting.
Event Handling
Subscribe to events to monitor and react to processor lifecycle and execution:
services.AddThreading(builder =>
{
builder.ConfigureThreading(options =>
{
options.ProcessorExecuting.Subscribe(async args =>
{
Console.WriteLine($"Executing processor on thread {args.ManagedThreadId}");
});
options.ProcessorExecuted.Subscribe(async args =>
{
Console.WriteLine($"Executed processor. Work performed: {args.WorkPerformed}");
});
options.ProcessorException.Subscribe(async args =>
{
Console.WriteLine($"Exception on thread {args.ManagedThreadId}: {args.Exception.Message}");
});
});
});
Advanced Scenarios
Multiple Thread Pools
You can create multiple thread pools with different service keys and configurations:
// Register multiple processors
services.AddKeyedScoped<IProcessor, HighPriorityProcessor>("high-priority");
services.AddKeyedScoped<IProcessor, LowPriorityProcessor>("low-priority");
// Configure different idle strategies
builder.ConfigureProcessorIdle("high-priority", options =>
{
options.Durations = new List<TimeSpan> { TimeSpan.FromMilliseconds(50) };
});
builder.ConfigureProcessorIdle("low-priority", options =>
{
options.Durations = new List<TimeSpan> { TimeSpan.FromSeconds(5) };
});
// Create separate thread pools
var highPriorityPool = new ProcessorThreadPool("high-priority", 10, ...);
var lowPriorityPool = new ProcessorThreadPool("low-priority", 2, ...);
Thread Safety
ProcessorThreadPooluses internal locking (SemaphoreSlim) for thread-safe start/stop operations- Each processor execution runs in its own service scope
ProcessorContextis scoped per execution and accessed viaProcessorContextAccessor
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Extensions.Options (>= 10.0.1)
- Moq (>= 4.20.72)
- Shuttle.Core.Contract (>= 21.0.1-beta)
- Shuttle.Core.Reflection (>= 21.0.1-beta)
- Shuttle.Extensions.Options (>= 21.0.1-beta)
NuGet packages (8)
Showing the top 5 NuGet packages that depend on Shuttle.Core.Threading:
| Package | Downloads |
|---|---|
|
Shuttle.Esb
Contains the core Shuttle.Esb assembly that should always be referenced when building Shuttle.Esb solutions. |
|
|
Shuttle.Recall
Event sourcing mechanism. |
|
|
Shuttle.Core.ServiceHost
Turns your console application into a Windows service. |
|
|
Shuttle.Core.Data.CallContext
IDatabaseConnectionCache implementation for use in async/await scenarios. |
|
|
Shuttle.Core.Data.ThreadDatabaseContextScope
Provides a mechanism to create a new database context scope per processor thread. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 21.0.1-beta | 117 | 2/7/2026 |
| 21.0.0-alpha | 114 | 1/18/2026 |
| 20.0.0 | 4,575 | 2/2/2025 |
| 13.1.0 | 2,655 | 8/5/2024 |
| 13.0.0 | 4,784 | 4/30/2024 |
| 12.0.1 | 17,319 | 12/1/2022 |
| 12.0.0 | 29,638 | 9/4/2022 |
| 11.1.2 | 1,254 | 4/9/2022 |
| 11.1.1 | 42,921 | 1/30/2021 |
| 11.0.2 | 14,451 | 1/17/2021 |
| 11.0.1 | 2,932 | 11/27/2020 |
| 11.0.0 | 43,124 | 6/21/2019 |
| 10.1.0 | 1,422 | 4/27/2019 |
| 10.0.2 | 81,367 | 7/4/2018 |
| 10.0.1 | 2,033 | 7/2/2018 |
| 10.0.0 | 24,608 | 1/3/2018 |