Moberg.Warp.Worker 2.0.0

dotnet add package Moberg.Warp.Worker --version 2.0.0
                    
NuGet\Install-Package Moberg.Warp.Worker -Version 2.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Moberg.Warp.Worker" Version="2.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Moberg.Warp.Worker" Version="2.0.0" />
                    
Directory.Packages.props
<PackageReference Include="Moberg.Warp.Worker" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Moberg.Warp.Worker --version 2.0.0
                    
#r "nuget: Moberg.Warp.Worker, 2.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Moberg.Warp.Worker@2.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Moberg.Warp.Worker&version=2.0.0
                    
Install as a Cake Addin
#tool nuget:?package=Moberg.Warp.Worker&version=2.0.0
                    
Install as a Cake Tool

Warp

A distributed job processing, message queue, and in-memory mediator library for .NET 10. Five patterns, one unified pipeline, a real-time dashboard.

NuGet NuGet NuGet NuGet Docs

Features

  • Background Jobs — Schedule and orchestrate jobs with retries, continuations, and batch processing.
  • Message Queue — Publish messages with multiple handlers. Each handler runs as an independent, retryable job.
  • In-Memory RequestsIRequest<TResponse> with IMediator.Send() for immediate, typed request/response. No database persistence.
  • In-Memory StreamsIStreamRequest<TResponse> with IMediator.CreateStream() for lazy, item-by-item streaming via IAsyncEnumerable<TResponse>. No database persistence.
  • SagasSaga base class + ISagaHandler<TSaga, TMessage> for long-lived, correlated state across multiple message arrivals. Opt-in via opt.AddSagas(). Distributed mutex serialization, optimistic concurrency defense-in-depth. See website/docs/features/sagas.md.
  • Background ServicesWarpBackgroundService base class for dashboard-visible long-running services. First-class part of Warp (always available); register your services with opt.AddBackgroundService<T>(). Automatic restart-on-fault with exponential backoff, cluster-singleton coordination via database lease, captured log output in the dashboard, and orphan-definition cleanup for renamed services. See website/docs/features/background-services.md.
  • Unified PipelineIPipelineBehavior<T, TResponse> wraps all four patterns. IStreamPipelineBehavior<T, TResponse> adds enumeration-level wrapping for streams.
  • Named Queues — Assign jobs to queues. Workers subscribe to specific queues. Alphabetical order = priority.
  • Execution Logs — ILogger output automatically captured and flushed to the database every ~1 second during handler execution, viewable in dashboard in real time. Each log entry tracks which worker produced it.
  • Unified Activity Log — Single audit trail per job: lifecycle events (Created, Processing, Completed, Failed, Cancelled) + handler logs.
  • Naming Convention Support — Entity configurations respect EF Core naming conventions (e.g., UseSnakeCaseNamingConvention()). All Warp tables default to the warp schema (configurable via WarpConfiguration.Schema).
  • Multi-Database — PostgreSQL and SQL Server with row-level locking for concurrent worker safety.
  • Server Monitoring — Worker registration, heartbeat tracking, orphaned job recovery. Worker detail page shows job activity.
  • Job Retention — Configurable JobExpirationTimeout (default 1 day). Optional MaxExpirableJobCount threshold. Failed jobs persist forever.
  • Time-Series Stats — Hourly succeeded/failed counts for historical graphs.
  • Recurring Jobs — Cron-based scheduled job execution. Immutable execution history via RecurringJobLog.
  • Graceful Cancellation — CancellationMode enum signals handlers to stop. Job stays in Processing with "Cancelling..." badge until handler exits. Handlers that complete despite cancellation are marked Completed.
  • Pause / Resume — Pause and resume job processing at the server or worker group level. Paused workers stop picking up new jobs; in-progress jobs continue to completion.
  • Job Metadata — Attach key-value metadata to jobs via JobParameters.Metadata. Metadata inherited by child jobs, accessible in handlers via IJobContext. Publish pipeline behaviors (IPublishPipelineBehavior<T>) for cross-cutting metadata.
  • Real-time Handler Logs — ILogger output flushed to the database every ~1 second during handler execution, visible in dashboard while the job is still processing.
  • Failed Job Type Filter — Group failed jobs by type, filter, and bulk delete/requeue all of a specific type.
  • Dashboard Auth — Pluggable IWarpAuthorizationFilter with optional redirect URL. Ships with LocalRequestsOnlyAuthorizationFilter.
  • Configurable Handler LoggingEnableHandlerLogging option (default true) to suppress handler ILogger output from the JobLog table when not needed.
  • Dashboard — React-based web UI with realtime graph, historical graph, dark mode, clickable metric cards, bulk actions, batch progress bars, worker detail page.
  • TimeProvider — All production code uses injectable TimeProvider for testability.
  • DB Push (optional) — Opt-in opt.UseDatabasePush() replaces polling wake-up with PostgreSQL LISTEN/NOTIFY or SQL Server Service Broker, cutting dispatcher pickup latency from ~500ms to <50ms without tight polling.
  • Realtime Dashboard Push (optional) — Opt-in opt.AddDashboardPush() adds a SignalR hub at /warp/api/hub so the dashboard refreshes on backend events instead of polling. Frontend probes once at boot and falls back to polling if the addon is not registered. Multi-server fanout reuses the same DB push backbone.

Integration Guide

1. Install packages

Warp ships as a small set of NuGet packages — pick the provider package that matches your database:

Package Purpose
Moberg.Warp.Core Core types (always required)
Moberg.Warp.Worker Worker + background tasks (required for processing)
Moberg.Warp.Provider.PostgreSql PostgreSQL provider (row-lock SQL, LISTEN/NOTIFY, locks)
Moberg.Warp.Provider.SqlServer SQL Server provider (row-lock SQL, Service Broker, locks)
Moberg.Warp.UI Dashboard UI (optional)
Moberg.Warp.Http Expose IRequest<T> / IStreamRequest<T> over HTTP (optional)

You only add the provider package for your database; Warp.Core no longer has a hard dependency on either EF provider.

Pin one coherent version across every Warp package. The packages have inter-dependencies (e.g. Warp.Http requires a matching Warp.Core), so floating different floors per package can resolve to an incompatible mix. Set the same explicit version on all of them:

<PackageReference Include="Moberg.Warp.Core" Version="2.0.0" />
<PackageReference Include="Moberg.Warp.Worker" Version="2.0.0" />
<PackageReference Include="Moberg.Warp.Provider.PostgreSql" Version="2.0.0" />
<PackageReference Include="Moberg.Warp.UI" Version="2.0.0" />
<PackageReference Include="Moberg.Warp.Http" Version="2.0.0" />

Package IDs are Moberg.Warp.*; namespaces are Warp.*. Install Moberg.Warp.Core, but write using Warp.Core;. The public surface is also split across a few namespaces — here's where the common types live:

You want… Types / methods using
Register Warp / server AddWarp, AddWarpServer, AddBackgroundService, IPublisher, IBatchPublisher, IRecurringJobPublisher Warp.Core
Define & handle work IJob, IMessage, IRequest<T>, IStreamRequest<T>, IJobHandler<>, IMessageHandler<>, IRequestHandler<,>, IStreamRequestHandler<,>, IPipelineBehavior<,>, IJobContext, Unit, IMediator Warp.Core.Handlers
Addon builder methods AddRetry / AddConcurrency / AddTimeout / AddRateLimit / AddSagas Warp.Core.Retry / .Concurrency / .Timeout / .RateLimit / .Sagas
Dashboard UseWarpUI Warp.UI.UIMiddleware
HTTP exposure WarpHttpGet/Post/…, AddWarpHttp, MapWarpHttp Warp.Http
Provider opt-in UsePostgreSql / UseSqlServer Warp.Provider.PostgreSql / Warp.Provider.SqlServer

2. Register Services

Register your DbContext as usual — Warp hooks into it automatically when you call AddWarp or AddWarpServer. Opt into a provider from the lambda:

var builder = WebApplication.CreateBuilder(args);

// Register your DbContext (Warp adds interceptors and entity configuration automatically)
builder.Services.AddDbContext<AppDbContext>(options =>
    options.UseNpgsql(builder.Configuration.GetConnectionString("Default")));

// Register a Warp server — runs the job worker by default (includes AddWarp internally).
// opt.UsePostgreSql() comes from Moberg.Warp.Provider.PostgreSql and registers the row-lock SQL,
// distributed lock provider, exception classifier, and the push notification factory.
builder.Services.AddWarpServer<AppDbContext>(opt =>
{
    opt.UsePostgreSql();

    opt.WorkerCount = 5;
    opt.PollingInterval = TimeSpan.FromSeconds(1);
    opt.Queues = ["critical", "default", "low"];
    opt.JobExpirationTimeout = TimeSpan.FromDays(7);

    // Core addons live on the same builder
    opt.AddRetry(r => r.MaxRetries = 3);
    opt.AddConcurrency();   // [Mutex] / [Semaphore]
    opt.AddRateLimit();     // [RateLimit] (Fixed/Sliding windows)
});

// Handlers (IJobHandler<T>, IMessageHandler<T>, IRequestHandler<T,R>,
// IStreamRequestHandler<T,R>) and IPipelineBehavior<,> implementations are discovered and
// registered automatically by the Warp source generator at compile time — there is no
// AddHandlers(...) / AddPipelineBehaviors(...) call to make.

var app = builder.Build();

// Dashboard UI (serves at /warp)
app.UseWarpUI();

app.Run();

If you only need to publish jobs or serve the dashboard (no executing server), use AddWarp instead:

builder.Services.AddWarp<AppDbContext>(opt =>
{
    opt.UsePostgreSql();  // or opt.UseSqlServer()
});

For a service-only server — runs WarpBackgroundService instances but processes no jobs — call opt.DisableWorker():

builder.Services.AddWarpServer<AppDbContext>(opt =>
{
    opt.UsePostgreSql();
    opt.DisableWorker();                 // no job worker; background services + server infra only
    opt.AddBackgroundService<EmailPump>();
});

To bind configuration from appsettings.json, use BindConfiguration inside the lambda — provider opt-in must still be an explicit call since it's a DI registration, not a config field:

builder.Services.AddWarpServer<AppDbContext>(opt =>
{
    opt.BindConfiguration(builder.Configuration.GetSection("Warp"));
    opt.UsePostgreSql();
});

For fine-grained control, use worker groups to assign different queues and polling intervals:

builder.Services.AddWarpServer<AppDbContext>(opt =>
{
    opt.UseSqlServer();

    opt.WorkerCount = 5;
    opt.Queues = ["critical"];
    opt.PollingInterval = TimeSpan.FromMilliseconds(100);

    opt.AddWorkerGroup(group =>
    {
        group.WorkerCount = 2;
        group.Queues = ["reports", "default"];
        group.PollingInterval = TimeSpan.FromSeconds(5);
    });
});

3. Define Messages (Pub/Sub)

public class OrderCreated : IMessage
{
    public int OrderId { get; set; }
    public string CustomerEmail { get; set; }
}

public class SendConfirmationEmail : IMessageHandler<OrderCreated>
{
    private readonly ILogger<SendConfirmationEmail> _logger;

    public SendConfirmationEmail(ILogger<SendConfirmationEmail> logger) => _logger = logger;

    public async Task HandleAsync(OrderCreated message, CancellationToken ct)
    {
        _logger.LogInformation("Sending email for order {OrderId}", message.OrderId);
        // All ILogger calls are captured and viewable in the dashboard
    }
}

public class UpdateInventory : IMessageHandler<OrderCreated>
{
    public async Task HandleAsync(OrderCreated message, CancellationToken ct)
    {
        // Runs independently — if email fails, this still succeeds
    }
}

Publish with outbox pattern:

[HttpPost]
public async Task<IActionResult> CreateOrder(CreateOrderRequest request)
{
    var order = new Order { /* ... */ };
    _context.Orders.Add(order);

    // Same transaction — if SaveChanges fails, no message is sent
    await _publisher.Publish(new OrderCreated { OrderId = order.Id });

    await _context.SaveChangesAsync();
    return Ok(order.Id);
}

4. Define Jobs (Orchestration)

public class GenerateReport : IJob
{
    public int Month { get; set; }
}

public class GenerateReportHandler : IJobHandler<GenerateReport>
{
    private readonly ILogger<GenerateReportHandler> _logger;

    public async Task HandleAsync(GenerateReport message, CancellationToken ct)
    {
        _logger.LogInformation("Generating report for month {Month}", message.Month);
        // Handler logs appear in the job's Activity Log on the dashboard
    }
}

Publishing options:

await publisher.Enqueue(new GenerateReport { Month = 3 });
await publisher.Schedule(new GenerateReport { Month = 3 }, tomorrow);
await publisher.Enqueue(new GenerateReport { Month = 3 }, queue: "reports");
await publisher.Enqueue(new FollowUp(), parentJobId: prepareId); // continuation

5. Define Requests (In-Memory)

public class GetUser : IRequest<UserDto>
{
    public int UserId { get; set; }
}

public class GetUserHandler : IRequestHandler<GetUser, UserDto>
{
    public async Task<UserDto> HandleAsync(GetUser request, CancellationToken ct)
    {
        return await _db.Users.FindAsync(request.UserId, ct);
    }
}

Send via IMediator:

var user = await mediator.Send(new GetUser { UserId = 1 });

Requests execute immediately in-process — no database, no worker, no retries. Errors bubble up to the caller.

6. Define Streams (In-Memory)

public class GetUsers : IStreamRequest<UserDto>
{
    public string Role { get; set; }
}

public class GetUsersHandler : IStreamRequestHandler<GetUsers, UserDto>
{
    public async IAsyncEnumerable<UserDto> HandleAsync(GetUsers request, [EnumeratorCancellation] CancellationToken ct)
    {
        await foreach (var user in _db.Users.AsAsyncEnumerable().WithCancellation(ct))
        {
            yield return new UserDto { Id = user.Id, Name = user.Name };
        }
    }
}

Stream via IMediator:

await foreach (var user in mediator.CreateStream(new GetUsers { Role = "Admin" }))
{
    Console.WriteLine(user.Name);
}

Streams execute lazily in-process — items are yielded one at a time. No database persistence, no worker, no retries.

7. Pipeline Behaviors

The unified pipeline wraps all four patterns — jobs, messages, requests, and streams:

public class LoggingBehavior<T, TResponse> : IPipelineBehavior<T, TResponse>
    where T : IRequest<TResponse>
{
    private readonly ILogger<LoggingBehavior<T, TResponse>> _logger;

    public async Task<TResponse> HandleAsync(T request, RequestHandlerDelegate<TResponse> next, CancellationToken ct)
    {
        var sw = Stopwatch.StartNew();
        _logger.LogInformation("Handling {Type}", typeof(T).Name);
        var result = await next();
        _logger.LogInformation("Handled {Type} in {Ms}ms", typeof(T).Name, sw.ElapsedMilliseconds);
        return result;
    }
}

For jobs and messages, TResponse is Unit. For requests, it's your custom response type. For streams, it's IAsyncEnumerable<T>.

Behaviors are registered automatically by the source generator — like handlers, there is no AddPipelineBehaviors(...) call. Open-generic behaviors (e.g. LoggingBehavior<T, TResponse>) are registered as open generics and closed by the DI container per request. The pipeline runs outer → inner in DI registration order, and the addon builder methods (AddRetry, AddTimeout, …) insert in call order — which is why, for example, AddRetry() must be called before AddTimeout().

8. Named Queues

options.Queues = ["a-critical", "b-default", "c-low"];
// Alphabetical order = priority. "a-critical" processed first.

await publisher.Enqueue(new UrgentTask(), queue: "a-critical");
await publisher.Publish(new LowPriorityEvent(), queue: "c-low");

9. Recurring Jobs

await recurringPublisher.AddOrUpdateRecurringJob(
    new CleanupSessions(), name: "session-cleanup", cron: "0 * * * *");

AddOrUpdateRecurringJob registers the definition. The RecurringJobScheduler task creates jobs when the cron time arrives. Execution history is tracked in RecurringJobLog and survives job cleanup.

10. Dashboard Authorization

app.UseWarpUI(options =>
{
    options.Authorization = new MyAuthFilter();
    options.UnauthorizedRedirectUrl = "/login"; // optional, redirects browser requests
});

public class MyAuthFilter : IWarpAuthorizationFilter
{
    public bool Authorize(HttpContext httpContext)
    {
        return httpContext.User.Identity?.IsAuthenticated == true
            && httpContext.User.IsInRole("Admin");
    }
}

Built-in filter for localhost-only access:

options.Authorization = new LocalRequestsOnlyAuthorizationFilter();

10. Configuration

builder.Services.AddWarpServer<AppDbContext>(options =>
{
    // Worker
    options.WorkerCount = 10;
    options.Queues = ["default"];
    options.PollingInterval = TimeSpan.FromSeconds(1);
    options.UseDispatcher = false; // true = batch-fetch mode

    // Cancellation
    options.CancellationCheckInterval = TimeSpan.FromSeconds(5);
    options.InvisibilityTimeout = TimeSpan.FromMinutes(5);

    // Retention
    options.JobExpirationTimeout = TimeSpan.FromDays(1);
    options.MaxExpirableJobCount = 20_000; // null to disable
    options.ExpirationBatchSize = 1000;

    // Background tasks
    options.OrchestrationInterval = TimeSpan.FromSeconds(10);
    options.MessageRoutingInterval = TimeSpan.FromSeconds(1);
    options.HealthCheckInterval = TimeSpan.FromSeconds(10);
    options.CounterAggregationInterval = TimeSpan.FromSeconds(5);

    // Scheduled-job activation. Future-dated jobs (`Schedule(job, at)`) sit in `State.Scheduled`
    // until this task flips them to `Enqueued`. The interval is the worst-case lag between
    // `ScheduleTime` and the job becoming pickup-eligible — e.g., default 5s means a job
    // scheduled for 12:00:00 runs somewhere in [12:00:00, 12:00:05]. Lower this for tighter
    // scheduled-job latency; the task is time-driven (no event trigger exists for "time has
    // passed"), so push notifications don't help here.
    options.ScheduledActivationInterval = TimeSpan.FromSeconds(5);
});

11. DB Push (optional)

Replaces polling wake-up with push notifications — PostgreSQL LISTEN/NOTIFY or SQL Server Service Broker. The dispatcher, MessageRouter, and Orchestrator wake instantly on relevant events instead of waiting for their next poll. Opt-in; default behavior (polling) is unchanged if you don't call opt.UseDatabasePush().

builder.Services.AddWarpServer<AppDbContext>(opt =>
{
    opt.UsePostgreSql();         // or opt.UseSqlServer()

    // Push benefits the dispatcher's batch-fetch path; individual workers still poll.
    opt.UseDispatcher = true;
    opt.PollingInterval = TimeSpan.FromSeconds(5); // loose polling is fine when push is on

    opt.UseDatabasePush();
});

The provider-specific transport is wired by whichever UsePostgreSql() / UseSqlServer() you called. Transports are resilient to connection drops — the listener reconnects with exponential backoff and fires a drain signal on every reconnect so jobs enqueued during the gap are picked up.

Scheduled jobs: push accelerates immediate enqueues. Jobs published via Schedule(job, at) sit in State.Scheduled until ScheduledJobActivation flips them to Enqueued — only then does the JobEnqueued notification fire. Dispatcher pickup after activation is <50ms via push, but the activation itself is time-driven and bounded by ScheduledActivationInterval (default 5s, see §10). If you need sub-second precision on scheduled jobs, lower that interval — polling is the only mechanism, since there's no event for "wall-clock time has advanced."

SQL Server setup requirements: Service Broker must be enabled on the target database. Warp creates the message type / contract / queue / service idempotently on first publish, but it cannot run ALTER DATABASE ... SET ENABLE_BROKER for you (that requires exclusive DB access). If broker isn't enabled, the transport logs an actionable error and degrades silently to polling:

ALTER DATABASE [YourDb] SET ENABLE_BROKER WITH ROLLBACK IMMEDIATE;

Observability: transport failures are logged at Warning and incremented on warp.notifications.publish_failures (OpenTelemetry counter). Successful publishes increment warp.notifications.published. Alert on the failure counter if push health matters to your SLOs.

Upgrading from <0.9: the Scheduled state was introduced alongside DB push. Future-dated jobs published on the old codebase land in Enqueued with ScheduleTime > now and are correctly gated by a defensive predicate in worker queries — but they won't appear in the dashboard's Scheduled list until their time arrives. To migrate them eagerly, run once after upgrade:

UPDATE warp.job
SET    current_state = 7  -- State.Scheduled
WHERE  current_state = 1  -- State.Enqueued
  AND  schedule_time > now();

12. Realtime Dashboard Push (optional)

Replaces dashboard polling with server-pushed SignalR events. With this addon registered, every connected dashboard client receives a JobFinalized event whenever any job reaches a terminal state, and a MessageEnqueued event whenever a Kind=Message job is published.

Each broadcast carries the current DashboardStatistics DTO as the payload — the server fetches it once per coalesce window and fans it out to all connected clients, eliminating per-client GET /api/status round-trips. Stats fetch is best-effort: if the service can't be resolved (or GetWarpStatus() throws) the event still fires without a payload, and the SPA's safety-net interval will refetch via REST. Per-view data (filtered job lists, job detail, logs) is not pushed in the payload — clients refetch those on event because they're per-viewer scoped (different filters, pages, job ids per client).

builder.Services.AddWarp<AppDbContext>(opt =>
{
    opt.UseDatabasePush();      // recommended — see "Multi-server" below
    opt.AddDashboardPush();     // registers hub + broadcaster
});

Optional configuration (defaults shown):

opt.AddDashboardPush(cfg =>
{
    // Collapse signal bursts (e.g., 50-job batch completion) into one broadcast.
    // Set to TimeSpan.Zero to disable coalescing.
    cfg.CoalesceWindow = TimeSpan.FromMilliseconds(100);
});

Addon discovery: the dashboard reads GET /warp/api/addons once at boot. If AddDashboardPush() is not registered the response sets push: false and the SPA keeps using its existing polling fallback (at the safety-net interval of 30 s — coarser than the previous 2–5 s, which is the intended cost of not opting in).

Auth: hub negotiate hits WarpUIMiddleware's existing path-based auth (the hub URL contains /api/, so unauthenticated requests get a 401). No parallel auth code path — the same IWarpAuthorizationFilter (or built-in cookie login) gates the hub.

Multi-server fanout: when opt.UseDatabasePush() is also configured, an event on server A fans out to dashboard clients connected to every server. The DB-push transport already routes JobFinalized and MessageEnqueued notifications to all NotificationListenerTask instances; the dashboard broadcaster on each server picks them up via the same ServerTaskSignals<TContext> surface that wakes the orchestrator. Without UseDatabasePush(), push is single-server only: events from server B reach a client connected to server A only via the safety-net 30 s refetch.

Observability: warp.dashboard.events.broadcast (counter, post-coalesce) and warp.dashboard.connections.active (up/down counter). No new alerts recommended for push — the polling fallback is the correctness floor.

Frontend dependency: the dashboard adds @microsoft/signalr as a client dependency. No new server-side NuGet — SignalR ships with Microsoft.AspNetCore.App.

HTTP Exposure (Warp.Http)

Moberg.Warp.Http is an optional package that exposes Warp IRequest<TResponse> and IStreamRequest<TResponse> handlers as ASP.NET Minimal API endpoints — annotate the handler class, run MapWarpHttp(), you have an HTTP endpoint. Source-generated dispatch (no per-request reflection); independent of Warp.UI. Full docs at features/http.

IJob and IMessage cannot be HTTP-exposed — write a thin IRequest<Guid> wrapper that calls IPublisher.Enqueue if you need "submit a job via HTTP".

using Microsoft.AspNetCore.Mvc;          // [FromRoute], [FromQuery], [FromHeader], [FromBody]
using Warp.Core.Handlers;
using Warp.Http;

// 1. Define request as a public contract; tag the HANDLER with the HTTP method + route.
public sealed record GetOrder([FromRoute] Guid Id) : IRequest<OrderDto>;

[WarpHttpGet("/orders/{id}")]
public sealed class GetOrderHandler : IRequestHandler<GetOrder, OrderDto>
{
    public Task<OrderDto> HandleAsync(GetOrder request, CancellationToken ct) { ... }
}

// 2. Streaming — IStreamRequest<T> becomes a text/event-stream endpoint.
public sealed record OrderFeed([FromQuery] int Count) : IStreamRequest<OrderEvent>;

[WarpHttpGet("/orders/feed")]
public sealed class OrderFeedHandler : IStreamRequestHandler<OrderFeed, OrderEvent> { ... }

// 3. "Submit a job via HTTP" — IRequest<Guid> wrapper around IPublisher.Enqueue.
public sealed record EnqueueReport(Guid TenantId) : IRequest<Guid>;

[WarpHttpPost("/reports/generate")]
public sealed class EnqueueReportHandler(IPublisher publisher) : IRequestHandler<EnqueueReport, Guid>
{
    public async Task<Guid> HandleAsync(EnqueueReport req, CancellationToken ct)
    {
        var jobId = await publisher.Enqueue(new GenerateReportJob(req.TenantId));
        await publisher.SaveChangesAsync(ct);
        return jobId;
    }
}

// 4. Auth via standard ASP.NET attributes on the handler — surfaced as endpoint metadata.
public sealed record CancelOrder(Guid Id) : IRequest<Unit>;

[Authorize(Policy = "OrdersWrite")]
[WarpHttpPost("/orders/cancel")]
public sealed class CancelOrderHandler : IRequestHandler<CancelOrder, Unit> { ... }

// 5. Custom status / Location via IHttpResponseShape on the response type.
public sealed record CreatedOrder(Guid Id) : IHttpResponseShape
{
    public void Apply(HttpContext ctx)
    {
        ctx.Response.StatusCode = StatusCodes.Status201Created;
        ctx.Response.Headers.Location = $"/orders/{Id}";
    }
}

// 6. Registration.
builder.Services.AddWarpHttp();
app.MapWarpHttp();                                              // null-group handlers
app.MapGroup("/api/public").RequireAuthorization("publicPolicy").MapWarpHttp("public");

Binding — handled by ASP.NET Minimal API. Use the standard Microsoft.AspNetCore.Mvc attributes: [FromRoute], [FromQuery], [FromHeader], [FromBody]. ASP.NET handles IParsable<T>, TryParse, query arrays, content negotiation, etc. The whole-body POST DTO case (no per-property attributes) just works — ASP.NET binds TRequest from the JSON body directly.

⚠️ GET/DELETE query params: a non-nullable scalar is REQUIRED, even with a C# default. For non-body verbs, Warp binds the request via ASP.NET's [AsParameters], which decomposes TRequest into per-property query/route arguments. A C# property initializer or parameter default is ignored by the binder, so a non-nullable scalar becomes a required query parameter — a request that omits it returns 400, not your default:

// ✗ Bare GET /api/todos → 400: "Take" and "AsOf" are required query params.
public sealed class ListTodos : IRequest<IReadOnlyList<TodoDto>>
{
    public int Take { get; set; } = 20;        // default ignored by the binder
    public DateTime AsOf { get; set; } = DateTime.UtcNow;
}

// ✓ Make optional params nullable; apply the default inside the handler.
public sealed class ListTodos : IRequest<IReadOnlyList<TodoDto>>
{
    public int? Take { get; set; }
    public DateTime? AsOf { get; set; }
}
// handler: var take = request.Take ?? 20; var asOf = request.AsOf ?? clock.UtcNow;

The WHTTP005 build warning flags exactly this case (non-nullable value-typed query param with a C# default) so you catch it at compile time instead of at the first request.

Mixed route + body (PATCH/PUT with an id in the path and fields in the body). [AsParameters] cannot bind a [FromBody] member, so use a class with a [FromRoute] scalar plus a single [FromBody] nested DTO. Positional record parameters confuse the binder here — prefer a class with settable properties:

public sealed class UpdateTodo
{
    [FromRoute] public Guid Id { get; set; }
    [FromBody]  public UpdateTodoBody Body { get; set; } = default!;
}

public sealed record UpdateTodoBody(string Title, bool Done);

[WarpHttpPatch("/api/todos/{id}")]
public sealed class UpdateTodoHandler : IRequestHandler<UpdateTodo, TodoDto> { ... }

Response semantics

Handler kind Status Body
IRequest<TResponse> 200 JSON of TResponse
IRequest<Unit> 204 empty
IStreamRequest<T> 200 text/event-stream (one data: per item)

Named groups[WarpHttpPost("/x", Group = "public")] on the handler, app.MapWarpHttp("public") to register strictly-matching descriptors. Calling MapWarpHttp(group) twice on the same builder with the same group throws.

Multi-attributeAllowMultiple = true on WarpHttpAttribute; multiple attributes require Name = "..." on each (WHTTP002). Useful for versioning aliases like [WarpHttpPost("/v1/orders", Name="V1"), WarpHttpPost("/v2/orders", Name="V2")].

How It Works

Message Flow

Publish(OrderCreated) → Message (Enqueued)
  ↓ MessageRouter routes
  → Job 1 (SendEmail)       → Completed → stats:succeeded +1
  → Job 2 (UpdateInventory) → Completed → stats:succeeded +1
  → Message Completed → ExpireAt set

Job Flow

Enqueue(GenerateReport) → Job (Enqueued, queue="reports")
  ↓ Worker picks up (queue match + schedule)
  → Pipeline → Handler → ILogger captured → Completed
  → stats:succeeded +1, stats:succeeded:{hour} +1
  → ExpireAt set → eventually cleaned up

Request Flow

mediator.Send(GetUser { Id = 1 })
  → Resolve IRequestHandler<GetUser, UserDto>
  → Pipeline behaviors (logging, validation, etc.)
  → Handler.HandleAsync → UserDto returned
  → No database involved

Stream Flow

mediator.CreateStream(GetUsers { Role = "Admin" })
  → IPipelineBehavior chain (request-level: auth, logging)
  → IStreamPipelineBehavior chain (enumeration-level)
  → IStreamRequestHandler.HandleAsync → IAsyncEnumerable<UserDto>
  → Items yielded lazily on enumeration
  → No database involved

Cancellation Flow

DeleteJob(processingJobId)
  → CancellationMode = Graceful (state stays Processing)
  → RunJobMonitor detects CancellationMode
  → Handler's CancellationToken cancelled
  → If handler stops: state → Deleted
  → If handler completes anyway: state → Completed

Concurrency Safety

  • Job pickup: FOR UPDATE SKIP LOCKED — one worker per job
  • State changes: Transaction + row lock — stats and state atomic
  • Bulk operations: Per-job transactions — failures skip, don't propagate
  • Message routing: Row lock prevents duplicate fan-out

Job Retention

  • Completed/Deleted: auto-expire (configurable via JobExpirationTimeout)
  • Failed: never expire (manual intervention required)
  • Count-based: optional MaxExpirableJobCount deletes oldest by ExpireAt
  • Statistics survive deletion (persistent counters)
  • Hourly stats cleaned up after 7 days
  • Recurring job logs: last 100 per recurring job retained

Dashboard

  • Realtime graph — jobs/second, polling every 2s, rolling 5 minutes
  • Historical graph — succeeded/failed per hour, 24h or 7d view
  • Metric cards — clickable, navigate to corresponding pages
  • Job list — by state, bulk actions (requeue/delete), per-page selector
  • Failed jobs — type filter bar, bulk delete/requeue by type
  • Job detail — colored state cards with duration, handler output, "Cancelling..." badge
  • Messages — list with job count, detail with spawned jobs
  • Batches — stacked green/red progress bar (completed/failed)
  • Recurring jobs — cron schedules, trigger/delete, execution history
  • Servers — health, CPU, memory, clickable workers
  • Worker detail — job activity log, server link, status indicator
  • Dark mode — system preference + toggle
  • Auth — pluggable filter with optional login redirect

Project Structure

src/
├── core/
│   ├── Warp.Core/                      # Entities, handlers, publisher, services, logging
│   ├── Warp.Worker/                    # Worker service, background tasks, dispatcher
│   ├── Warp.UI/                        # Dashboard API endpoints + embedded SPA
│   ├── Warp.SourceGenerator/           # Mediator & worker-dispatch source generator
│   └── providers/
│       ├── Warp.Provider.PostgreSql/   # PG provider (LISTEN/NOTIFY, row-lock SQL, locks)
│       └── Warp.Provider.SqlServer/    # SS provider (Service Broker, row-lock SQL, locks)
├── tests/
│   ├── Warp.Tests/                     # 1,024 tests (xUnit v3 + Shouldly + Testcontainers + Respawn)
│   ├── Warp.Tests.Mutation/            # Stryker mutation-testing config
│   └── Warp.Tests.SourceGenerator/     # Test source generator (emits PG/SS concrete subclasses)
├── demo/
│   ├── Warp.Test.Shared/               # Shared demo handlers
│   ├── Warp.TestApp/                   # Demo web application with login page
│   └── Warp.TestWorker/                # Demo worker service
├── benchmarks/                          # BenchmarkDotNet throughput suite
└── ui/                                  # Vite + React + TypeScript + Tailwind + shadcn/ui

Development

dotnet build Warp.slnx
dotnet test Warp.slnx -- --filter-trait "Category=NoDb"        # No container (~3s)
dotnet test Warp.slnx -- --filter-trait "Category=PostgreSql"  # PG-backed (~1m 10s)
dotnet test Warp.slnx -- --filter-trait "Category=SqlServer"   # SS-backed (~1m 20s)
dotnet test Warp.slnx                                           # Full suite (~1m 30s)
cd src/ui && npm run dev                                         # Dashboard on :5173

Requires Docker for tests (Testcontainers + Respawn).

License

MIT — see LICENSE.

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (2)

Showing the top 2 NuGet packages that depend on Moberg.Warp.Worker:

Package Downloads
Moberg.Warp.Provider.PostgreSql

PostgreSQL provider for Moberg.Warp — notification transport (LISTEN/NOTIFY), row-lock SQL, distributed lock, and exception classification.

Moberg.Warp.Provider.SqlServer

SQL Server provider for Moberg.Warp — notification transport (Service Broker), row-lock SQL, distributed lock, and exception classification.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.0.0 0 6/10/2026
1.0.1 134 6/3/2026
1.0.0 126 6/3/2026
0.17.2 110 6/2/2026
0.17.1 117 5/26/2026
0.17.0 113 5/25/2026
0.16.0 116 5/25/2026
0.15.3 114 5/22/2026
0.15.2 112 5/18/2026
0.15.1 115 5/18/2026
0.15.0 109 5/18/2026
0.14.1 111 5/16/2026
0.14.0 127 5/14/2026
0.13.0 118 5/11/2026
0.12.0 114 5/7/2026
0.11.0 108 5/6/2026
0.10.0 111 4/27/2026

See https://moberghr.github.io/warp/docs/releases for full release notes.