LowCodeHub.Jobs 0.0.3

dotnet add package LowCodeHub.Jobs --version 0.0.3
                    
NuGet\Install-Package LowCodeHub.Jobs -Version 0.0.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="LowCodeHub.Jobs" Version="0.0.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="LowCodeHub.Jobs" Version="0.0.3" />
                    
Directory.Packages.props
<PackageReference Include="LowCodeHub.Jobs" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add LowCodeHub.Jobs --version 0.0.3
                    
#r "nuget: LowCodeHub.Jobs, 0.0.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package LowCodeHub.Jobs@0.0.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=LowCodeHub.Jobs&version=0.0.3
                    
Install as a Cake Addin
#tool nuget:?package=LowCodeHub.Jobs&version=0.0.3
                    
Install as a Cake Tool

LowCodeHub.Jobs

Durable, typed, streaming background jobs for ASP.NET Core on .NET 10.

A small Hangfire-class job system designed around three ideas:

  1. Typed everything. Jobs are records implementing IJobRequest<TResult> (or IJobRequest<TResult, TProgress>); handlers are classes implementing IJobHandler<…>. No reflection-heavy dispatch, no string-name routing in user code.
  2. Streaming-first observability. Every handler gets a rich IJobContext that emits progress, structured logs, token chunks (LLM-style), and named metrics — all multiplexed over a single SSE channel and replayable on reconnect via Last-Event-ID.
  3. Batteries included. Delayed scheduling, cron recurring jobs, continuations, idempotency keys, retries with backoff, cooperative cancellation, TTL cleanup, health checks, OpenTelemetry metrics, an embedded admin dashboard (SPA + REST API + auth), and multi-pod leader election with automatic Redis → database fallback.

Persistence: SQL Server or PostgreSQL. Live streaming: Redis Streams via LowCodeHub.SSE.


Install

dotnet add package LowCodeHub.Jobs

Targets net10.0. Brings in LowCodeHub.SSE transitively.


Define a Job

A request is a record. It must expose a static Metadata property — that's how the dashboard, recurring scheduler, and admin UI know about it.

using LowCodeHub.Jobs.Abstractions;
using LowCodeHub.Jobs.Contracts;

public sealed record ExportUsers(Guid TenantId)
    : IJobRequest<ExportResult, ExportProgress>
{
    public static JobMetadata Metadata { get; } = new()
    {
        Name        = "exports.users.v1",   // stable wire name
        DisplayName = "Export users",
        Description = "Streams a tenant's users to a CSV blob.",
        Icon        = "📤",
        Kind        = JobKind.User,         // user-owned (admin dashboard is read-only)
        MaxAttempts = 5,
    };
}

public sealed record ExportProgress(int Done, int Total);
public sealed record ExportResult(string BlobName, int Rows);

public sealed class ExportUsersHandler
    : IJobHandler<ExportUsers, ExportResult, ExportProgress>
{
    public async Task<ExportResult> HandleAsync(
        ExportUsers request,
        IJobContext<ExportProgress> ctx,
        CancellationToken ct)
    {
        await ctx.LogInfoAsync($"Starting export for tenant {request.TenantId}", ct: ct);

        for (var page = 1; page <= 10; page++)
        {
            ct.ThrowIfCancellationRequested();

            // typed progress
            await ctx.ReportAsync(new ExportProgress(page, 10), ct);

            // or the non-typed reporter API
            await ctx.ReportProgressAsync(done: page, total: 10, label: $"Page {page}/10", ct);
            await ctx.SetMetricAsync("rows_written", page * 100, ct);
        }

        return new ExportResult($"exports/{request.TenantId}.csv", Rows: 1000);
    }
}

JobKind: User vs System

Kind Meaning Dashboard behavior
User Enqueued by end-user actions Read-only for admins (history, delete only)
System Platform-owned (maintenance, cron) Admins can invoke and cancel from the UI

For System jobs, set Cron on JobMetadata and the library auto-registers a recurring definition at startup — no extra ScheduleRecurringAsync call needed:

public sealed record NightlyCleanup() : IJobRequest<Unit>
{
    public static JobMetadata Metadata { get; } = new()
    {
        Name = "system.nightly-cleanup",
        Kind = JobKind.System,
        Cron = "0 2 * * *",
        TimeZoneId = "Europe/Amsterdam",
    };
}

Configure Services

using LowCodeHub.Jobs.Extensions;
using LowCodeHub.SSE.Extensions;

builder.Services.AddJobs(b => b
    .Configure(o =>
    {
        o.Worker.MaxParallelism    = 16;
        o.Worker.DefaultMaxRetries = 3;
        o.Worker.ResultTtl         = TimeSpan.FromHours(24);
    })
    .WithSqlServer(o =>
    {
        o.ConnectionString = builder.Configuration.GetConnectionString("Jobs")!;
        o.Schema           = "jobs";
    })
    .AddHandler<ExportUsers, ExportResult, ExportProgress, ExportUsersHandler>()
    .AddHandler<NightlyCleanup, Unit, NightlyCleanupHandler>());

builder.Services.AddSse(o =>
{
    o.ConnectionString = builder.Configuration.GetConnectionString("Redis")!;
});

Classic

builder.Services
    .AddJobs(builder.Configuration)              // binds "BackgroundJobs" section
    .AddJobsPostgreSql(o => o.ConnectionString = "…")
    .AddJobHandler<ExportUsers, ExportResult, ExportProgress, ExportUsersHandler>();

Initialize the Database

LowCodeHub.Jobs does not create or migrate its own database schema during service registration. The schema scripts are idempotent and ship as embedded resources in the package, but the consuming application owns when and how they are applied.

Embedded resource prefixes:

Provider Embedded resource prefix
SQL Server LowCodeHub.Jobs.Repositories.SqlServer.Scripts.
PostgreSQL LowCodeHub.Jobs.Repositories.PostgreSql.Scripts.

The scripts create Jobs, JobAttempts, JobEvents, and RecurringJobs tables under the default jobs schema. If you use a different Schema option, keep your application-owned migration scripts aligned with that schema.

With LowCodeHub.Migration.SqlServer

Configure the migration package to scan the LowCodeHub.Jobs assembly and include only the SQL Server Jobs scripts:

using LowCodeHub.Jobs.Options;
using LowCodeHub.Migration.SqlServer.Extensions;

builder.Services.AddMigration(o =>
{
    o.ConnectionString = builder.Configuration.GetConnectionString("Jobs")!;
    o.Directories = ["LowCodeHub.Jobs.Repositories.SqlServer.Scripts."];
});

await app.RunDatabaseMigrationAsync<SqlServerJobsOptions>();

With LowCodeHub.Migration.PostgreSql

using LowCodeHub.Jobs.Options;
using LowCodeHub.Migration.PostgreSql.Extensions;

builder.Services.AddMigration(o =>
{
    o.ConnectionString = builder.Configuration.GetConnectionString("Jobs")!;
    o.Directories = ["LowCodeHub.Jobs.Repositories.PostgreSql.Scripts."];
});

await app.RunDatabaseMigrationAsync<PostgreSqlJobsOptions>();

Do not scan the whole LowCodeHub.Jobs assembly without a directory filter, because the package contains scripts for both providers.

With EF Core Migrations

EF Core will not discover these embedded scripts automatically. They are not EF model migrations and they will not appear in dotnet ef migrations list. EF users should create an application-owned EF migration and execute the embedded scripts from the LowCodeHub.Jobs assembly in Up.

Example SQL Server migration:

using LowCodeHub.Jobs.Migrations;
using Microsoft.EntityFrameworkCore.Migrations;

public partial class AddLowCodeHubJobsSchema : Migration
{
    protected override void Up(MigrationBuilder migrationBuilder)
    {
        foreach (var resource in SqlJobs.SqlServerResources)
        {
            migrationBuilder.Sql(SqlJobs.ReadFromResource(resource));
        }
    }

    protected override void Down(MigrationBuilder migrationBuilder)
    {
        // Drop Jobs objects here if your application's migration policy requires reversible migrations.
    }
}

For PostgreSQL, loop over SqlJobs.PostgreSqlResources instead. SqlJobs.ReadFromResource(...) removes SQL Server GO separator lines so the SQL can be passed directly to EF Core's migrationBuilder.Sql(...).


Enqueue Jobs

app.MapPost("/exports/users/{tenantId:guid}", async (
    Guid tenantId,
    IJobScheduler jobs,
    CancellationToken ct) =>
{
    var job = await jobs.EnqueueAsync(
        new ExportUsers(tenantId),
        new ScheduleOptions
        {
            IdempotencyKey = $"export-users:{tenantId}",
            MaxRetries     = 5,
        },
        ct);

    return Results.Accepted($"/jobs/{job.JobId}", job);
});

ScheduleOptions

Field Effect
RunAt Delay until a UTC instant
MaxRetries Override worker default for this enqueue
IdempotencyKey Duplicate enqueues with the same key collapse to the existing job
ContinuesJobId Become Queued only after the parent reaches Succeeded
ResultTtlOverride Keep this job's result longer/shorter than the worker default

Recurring (programmatic)

await jobs.ScheduleRecurringAsync(
    name:            "nightly-user-export",
    cronExpression:  "0 2 * * *",
    requestTemplate: new ExportUsers(tenantId),
    timeZoneId:      "Europe/Amsterdam",
    cancellationToken: ct);

(Or: just set Cron on JobMetadata for JobKind.System jobs and let the bootstrapper register it.)


Tracking Job Status from User Code

When a user submits a job, the library gives you three complementary mechanisms to let that user (or any downstream caller) observe what's happening: polling a status snapshot, streaming live events over SSE, and reading the full event history. You can use any combination.

Pattern: 202 Accepted + status endpoint

The idiomatic REST pattern is to return 202 Accepted with a Location header pointing to a status resource. The caller can then poll or stream from that URL:

// POST /reports → enqueue and return a 202 with the job URL
app.MapPost("/reports", async (
    ReportRequest req,
    IJobScheduler scheduler,
    CancellationToken ct) =>
{
    var job = await scheduler.EnqueueAsync(
        req,
        new ScheduleOptions { IdempotencyKey = req.UniqueId },
        ct);

    return Results.Accepted($"/reports/{job.JobId}", new { jobId = job.JobId });
});

1. Poll status — IJobStatusReader

IJobStatusReader returns a lightweight JobDescriptor snapshot. Inject it anywhere and call GetAsync with the JobId you got from EnqueueAsync.

// GET /reports/{jobId} → current status snapshot
app.MapGet("/reports/{jobId:guid}", async (
    Guid jobId,
    IJobStatusReader reader,
    CancellationToken ct) =>
{
    var job = await reader.GetAsync(jobId, ct);
    if (job is null) return Results.NotFound();

    return Results.Ok(job);
});

The JobDescriptor returned:

Field Type Description
JobId Guid Stable identifier
JobType string Metadata display name
Status JobStatus See states below
Attempt int Current attempt number (1-based)
EnqueuedAt DateTimeOffset When the job was first created
StartedAt DateTimeOffset? When execution began
CompletedAt DateTimeOffset? When it reached a terminal state
IdempotencyKey string? Your key, if supplied at enqueue time
ResultJson string? JSON-serialized result on Succeeded
ErrorMessage string? Last error message on Failed / Dead
Job status lifecycle
Queued ──► Running ──► Succeeded
   │           │
   │      (retry)
   │           └──► Failed ──► Dead  (max retries exhausted)
   │
   └──────────────────────────► Cancelled
Scheduled (RunAt in the future)
Status Meaning
Queued Waiting to be picked up
Scheduled Delayed — will become Queued at RunAt
Running Actively executing
Succeeded Completed successfully; ResultJson is populated
Failed Last attempt failed; will be retried
Dead All retries exhausted; requires manual intervention
Cancelled Cancelled cooperatively
Lookup by idempotency key

If you supply an IdempotencyKey at enqueue time you don't need to store the JobId — look it up by key instead:

var job = await reader.FindByIdempotencyKeyAsync("export-users:tenant-42", ct);

2. Stream live events — IJobStreamService (SSE)

Inject IJobStreamService and expose a Minimal API route that returns a server-sent-events stream. The browser's native EventSource API connects to it and receives real-time events as the job runs.

// GET /reports/{jobId}/stream → live SSE feed
app.MapGet("/reports/{jobId:guid}/stream", (
    Guid jobId,
    HttpContext http,
    IJobStreamService stream,
    CancellationToken ct) =>
{
    // Pass Last-Event-ID so the stream replays missed events on reconnect
    var lastEventId = http.Request.Headers["Last-Event-ID"].ToString();
    return TypedResults.ServerSentEvents(stream.StreamAsync(jobId, lastEventId, ct));
});

Browser-side — filter events by kind using addEventListener:

const es = new EventSource(`/reports/${jobId}/stream`);

// Status transitions: Queued → Running → Succeeded
es.addEventListener('lifecycle', e => {
    const { status } = JSON.parse(e.data);
    console.log('Status changed →', status);
    if (status === 'Succeeded' || status === 'Failed' || status === 'Dead') {
        es.close(); // terminal — no more events
    }
});

// Progress bar updates
es.addEventListener('progress', e => {
    const { done, total, label } = JSON.parse(e.data);
    progressBar.value = done / total;
    progressLabel.textContent = label;
});

// Structured log lines (TRACE / INFO / WARN / ERROR)
es.addEventListener('log', e => {
    const { level, message, dataJson } = JSON.parse(e.data);
    console[level.toLowerCase()](`[${level}] ${message}`, dataJson ? JSON.parse(dataJson) : '');
});

// LLM-style token streaming / row-by-row results
es.addEventListener('chunk', e => {
    const { value } = JSON.parse(e.data);
    output.textContent += value;
});

// Live counters / badges
es.addEventListener('metric', e => {
    const { name, value } = JSON.parse(e.data);
    document.querySelector(`[data-metric="${name}"]`).textContent = value;
});

// Final result — available once status is Succeeded
es.addEventListener('result', e => {
    const { resultJson } = JSON.parse(e.data);
    const result = JSON.parse(resultJson);
    showResult(result);
    es.close();
});

// Connection/network errors
es.onerror = () => {
    // EventSource reconnects automatically — no action needed unless you want UI feedback
};

The stream is replay-safe: when Last-Event-ID is provided the server replays all persisted events with a sequence number higher than the last seen one, so a page refresh or reconnect never misses events.

Which event kinds are persisted to the database (and thus replayable) is controlled by EventLogOptions.PersistKinds:

"EventLog": {
  "PersistKinds": "Lifecycle, Progress, Result"
}

Add Log, Chunk, or Metric to persist those kinds too (at the cost of higher write volume).


3. Read historical events — IJobEventLog

For completed jobs, audit pages, or server-side result processing you can page through the full event history:

// GET /reports/{jobId}/events?afterSequence=0&limit=100
app.MapGet("/reports/{jobId:guid}/events", async (
    Guid jobId,
    IJobEventLog eventLog,
    [FromQuery] long afterSequence = 0,
    [FromQuery] int limit = 100,
    CancellationToken ct = default) =>
{
    var events = new List<object>();
    await foreach (var row in eventLog.ReadAfterAsync(jobId, afterSequence, limit, ct))
    {
        events.Add(new
        {
            sequence    = row.Sequence,
            kind        = ((JobEventKind)row.Kind).ToString(),
            occurredAt  = row.OccurredAt,
            payload     = row.PayloadJson,
        });
    }
    return Results.Ok(events);
});

ReadAfterAsync uses a stable sequence number, so you can page incrementally by passing the last sequence you received as afterSequence on the next call.


Putting it together — full async job API

using LowCodeHub.Jobs.Abstractions;
using LowCodeHub.Jobs.Contracts;

var api = app.MapGroup("/reports");

// Submit
api.MapPost("/", async (ReportRequest req, IJobScheduler scheduler, CancellationToken ct) =>
{
    var job = await scheduler.EnqueueAsync(req, new ScheduleOptions
    {
        IdempotencyKey = req.UniqueId,
        MaxRetries = 3,
    }, ct);
    return Results.Accepted($"/reports/{job.JobId}", new { jobId = job.JobId });
});

// Poll
api.MapGet("/{jobId:guid}", async (Guid jobId, IJobStatusReader reader, CancellationToken ct) =>
{
    var job = await reader.GetAsync(jobId, ct);
    return job is null ? Results.NotFound() : Results.Ok(job);
});

// Stream (live SSE)
api.MapGet("/{jobId:guid}/stream", (Guid jobId, HttpContext http, IJobStreamService stream, CancellationToken ct) =>
{
    var lastId = http.Request.Headers["Last-Event-ID"].ToString();
    return TypedResults.ServerSentEvents(stream.StreamAsync(jobId, lastId, ct));
});

// History (paged)
api.MapGet("/{jobId:guid}/events", async (
    Guid jobId, IJobEventLog log,
    [FromQuery] long afterSequence = 0,
    [FromQuery] int limit = 100,
    CancellationToken ct = default) =>
{
    var items = new List<object>();
    await foreach (var e in log.ReadAfterAsync(jobId, afterSequence, limit, ct))
        items.Add(new { e.Sequence, kind = ((JobEventKind)e.Kind).ToString(), e.OccurredAt, e.PayloadJson });
    return Results.Ok(items);
});

Stream Progress (SSE)

A single endpoint multiplexes all event kinds for a job: lifecycle, progress, log, chunk, metric, result. Resume on reconnect via the Last-Event-ID header.

app.MapGet("/jobs/{jobId:guid}/events", (
    Guid jobId,
    HttpContext http,
    IJobStreamService streams,
    CancellationToken ct) =>
{
    var lastId = http.Request.Headers["Last-Event-ID"].ToString();
    return Results.ServerSentEvents(streams.StreamAsync(jobId, lastId, ct));
});

Or use the strongly-typed overload for IJobRequest<TResult, TProgress> jobs:

return streams.StreamAsync<ExportProgress>(jobId, lastId, ct);

The IJobReporter API exposed via IJobContext:

Method Event kind Use for
ReportProgressAsync(done, total, label) progress Progress bar
ReportAsync(TProgress) progress Typed progress payload
LogTraceAsync / LogInfoAsync / LogWarnAsync / LogErrorAsync log Per-item diagnostics
EmitChunkAsync(string) / EmitChunkAsync<T>(obj) chunk LLM-style token streaming, streaming query rows
SetMetricAsync(name, value) metric Live counters / badges

Persistence of events to the DB is configurable per-kind via EventLogOptions.PersistKinds (default = Lifecycle | Progress | Result; logs and chunks are SSE-only by default because they're high-volume).


Dashboard API

User-facing routes are owned by your app, while the embedded dashboard maps its own authenticated REST API when you call UseJobsDashboard():

using LowCodeHub.Jobs.Dashboard;

builder.Services.AddJobsDashboard(o =>
{
    o.Password = builder.Configuration["Dashboard:Password"]!;
    o.RoutePrefix = "/jobs-ui";
});

app.UseJobsDashboard();

With the default dashboard prefix, the admin API lives under /jobs-ui/api.

Route Purpose
GET /jobs-ui/api/stats Dashboard counts, worker state, instance identity, coordination state
GET /jobs-ui/api/jobs Filtered/paged admin job list
GET /jobs-ui/api/jobs/{jobId} Admin job details
POST /jobs-ui/api/jobs/{jobId}/cancel Cooperative cancellation
POST /jobs-ui/api/jobs/{jobId}/retry Retry a dead job
DELETE /jobs-ui/api/jobs/{jobId} Hard-delete a job record
GET /jobs-ui/api/jobs/{jobId}/events Persisted event history
GET /jobs-ui/api/recurring Recurring job definitions

Embedded Admin Dashboard

An SPA + REST API + cookie auth, all packaged inside the assembly. Mount it with one line:

using LowCodeHub.Jobs.Dashboard;

builder.Services.AddJobsDashboard(o =>
{
    o.Password = builder.Configuration["Dashboard:Password"]!;   // required
    o.Username = "admin";
    o.RoutePrefix = "/jobs-ui";
});

// after app = builder.Build();
app.UseJobsDashboard();

Features:

  • Live status board (counts by status, queue depth, running now).
  • Job list with filters (status, kind, type name, idempotency key).
  • Job detail with attempt history, full event timeline, and live SSE tail.
  • For JobKind.System jobs: Run now, Cancel, Retry dead, enable/disable recurring.
  • For JobKind.User jobs: read-only history + housekeeping delete.
  • Same-origin CSRF check on mutating verbs, HttpOnly SameSite=Strict session cookie.

Dashboard is silently disabled if Password is empty (warning logged).


Configuration

{
  "Jobs": {
    "Worker": {
      "Enabled": true,
      "BatchSize": 32,
      "MaxParallelism": 8,
      "PollInterval": "00:00:01",
      "PromotionInterval": "00:00:05",
      "LeaseDuration": "00:05:00",
      "DefaultMaxRetries": 3,
      "BaseRetryDelay": "00:00:05",
      "MaxRetryDelay": "00:30:00",
      "ResultTtl": "1.00:00:00",
      "DrainOnShutdown": true,
      "DrainTimeout": "00:00:30",
      "CancellationPollInterval": "00:00:02"
    },
    "Recurring": {
      "Enabled": true,
      "TickInterval": "00:01:00",
      "LookAhead": "00:02:00"
    },
    "Janitor": {
      "Enabled": true,
      "SweepInterval": "00:05:00",
      "BatchSize": 500
    },
    "EventLog": {
      "Enabled": true,
      "PersistKinds": "Lifecycle, Progress, Result",
      "MaxPayloadBytes": 16384
    }
  }
}

Health & Metrics

Health checks (auto-registered):

  • jobs-worker — worker liveness
  • jobs-sql — SQL Server reachability (when configured)
  • jobs-pg — PostgreSQL reachability (when configured)

OpenTelemetry meter LowCodeHub.Jobs:

  • jobs.queue_depth
  • jobs.scheduled_pending
  • jobs.running_now

Plus an OpenTelemetry trace span per job execution; the trace id is surfaced on IJobContext.CorrelationId for handler-side correlation.


Running on Multiple Pods

The package is designed for horizontal scale-out. Different responsibilities are coordinated differently:

Component Multi-pod model Notes
JobExecutionWorker All pods compete. Each pod claims jobs with a unique workerId + DB lease. Failed pods have their leases expire and another pod re-claims. This is your throughput knob — more pods = more parallelism.
ScheduledJobPromotionWorker All pods race idempotently. Promotes Scheduled → Queued via a single UPDATE … WHERE Status='Scheduled' AND RunAt <= now. Multiple pods doing this is wasteful but never incorrect. Naturally safe.
RecurringJobScheduler Cluster singleton via leader lease. Only the leader pod advances cron items. Opt-in via WithRedisCoordination() / WithDatabaseCoordination() (see below). Without coordination, the data layer's CAS update keeps it correct, just wasteful.
ResultJanitor Cluster singleton via leader lease. Only the leader pod sweeps expired records. Same.
RecurringSystemJobBootstrapper All pods upsert idempotently at startup. Cheap, runs once per pod at boot.

Enable coordination

builder.Services.AddJobs(b => b
    .WithSqlServer(o => o.ConnectionString = sqlCs)
    .WithRedisCoordination(o => o.ConnectionString = redisCs)   // ← recommended
    .AddHandler<…>());

Behavior of WithRedisCoordination:

  1. At DI registration the package performs a synchronous 3-second TCP probe against Redis.
  2. If Redis is reachable, RedisDistributedLock is registered (SET NX PX + Lua CAS release/renew, safe across pods).
  3. If Redis is unreachable, the package logs a warning and falls back to whichever DB you registered:
    • SQL Server → sp_getapplock with session-scoped exclusive locks (auto-released if the pod crashes).
    • PostgreSQL → pg_try_advisory_lock with session-scoped advisory locks (same crash semantics).
  4. If you'd rather not depend on Redis at all, call WithDatabaseCoordination() directly.

Pick the right backend

Scenario Use
You already have Redis in the stack (most apps using LowCodeHub.SSE do) WithRedisCoordination(...)
Small cluster, don't want to operate Redis WithDatabaseCoordination()
Single pod / dev box Skip both — the default NullDistributedLock is fine and logs a warning at first use

Tuning

builder.Services.AddJobs(b => b
    .WithSqlServer(...)
    .WithRedisCoordination(o => o.ConnectionString = redisCs)
    .ConfigureCoordination(c =>
    {
        c.KeyPrefix          = "jobs:leader:";
        c.LeaseTtl           = TimeSpan.FromSeconds(30);   // expire if leader dies
        c.RenewInterval      = TimeSpan.FromSeconds(10);   // leader refresh cadence
        c.AcquireRetryInterval = TimeSpan.FromSeconds(5);  // non-leader retry cadence
    }));

The leader-loss path is graceful: if a renewal fails (lease lost, Redis blip, connection killed), the leader logs a warning and stops doing leader-only work. Another pod will detect the absent lease and take over on its next retry — typically within AcquireRetryInterval.

Reusing the SSE multiplexer

If you already have a Redis IConnectionMultiplexer (e.g. from LowCodeHub.SSE), pass it in to avoid a second connection:

.WithRedisCoordination(o => o.ConnectionMultiplexer = existingMux)

The package will not take ownership and will not dispose it.


Testing

Handlers are plain classes, so unit tests can instantiate the handler and pass a small test double for IJobContext / IJobContext<TProgress>.

var ctx = new CapturingJobContext<ExportProgress>();
var handler = new ExportUsersHandler();
var result = await handler.HandleAsync(new ExportUsers(tenantId), ctx, ct);

Assert.Equal(1000, result.Rows);
Assert.Contains(ctx.ProgressEvents, e => e.Done == 10);

Packaging

dotnet pack LowCodeHub.Jobs/LowCodeHub.Jobs.csproj -c Release -o artifacts/nupkg

In this repository, publishing is driven by bumping <PackageVersion> and running the publisher tool:

dotnet run --project tools/NuGetPublisher/NuGetPublisher.csproj -- "G:\LowCodeHub" env --dry-run

Remove --dry-run to push.

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.0.3 94 5/18/2026
0.0.2 99 5/13/2026
0.0.1 95 5/12/2026