Davasorus.Utility.DotNet.Messaging 2026.2.3.11

dotnet add package Davasorus.Utility.DotNet.Messaging --version 2026.2.3.11
                    
NuGet\Install-Package Davasorus.Utility.DotNet.Messaging -Version 2026.2.3.11
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Davasorus.Utility.DotNet.Messaging" Version="2026.2.3.11" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Davasorus.Utility.DotNet.Messaging" Version="2026.2.3.11" />
                    
Directory.Packages.props
<PackageReference Include="Davasorus.Utility.DotNet.Messaging" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Davasorus.Utility.DotNet.Messaging --version 2026.2.3.11
                    
#r "nuget: Davasorus.Utility.DotNet.Messaging, 2026.2.3.11"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Davasorus.Utility.DotNet.Messaging@2026.2.3.11
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Davasorus.Utility.DotNet.Messaging&version=2026.2.3.11
                    
Install as a Cake Addin
#tool nuget:?package=Davasorus.Utility.DotNet.Messaging&version=2026.2.3.11
                    
Install as a Cake Tool

Davasorus.Utility.DotNet.Messaging

Overview

Davasorus.Utility.DotNet.Messaging is a provider-agnostic messaging/queueing pillar for .NET applications. Your code depends on a small set of transport-neutral abstractions — publish, consume, idempotency, dead-letter queues — and a concrete backend plugs in behind the IMessageTransport seam. Amazon SQS is the first backend; future backends (Azure Service Bus, ActiveMQ, local/IPC) slot in behind the same seam with no application changes.

The package wires a strict-sequential hosted poller per queue (delete-after-success, visibility renewal for slow handlers, FIFO-preserving), at-least-once delivery with SQL-backed idempotency, FIFO and dead-letter-queue support, and end-to-end OpenTelemetry tracing across the queue hop — all behind a single fluent AddMessaging(...) composition root.

The package multi-targets net8.0 and net10.0.

Features

  • One fluent composition rootAddMessaging(m => ...) selects a transport, serializer, idempotency backend, and per-queue processors in one place.
  • Provider-agnostic — apps depend on IMessagePublisher / IMessageHandler<T> / IDeadLetterQueue; SQS is the first backend behind IMessageTransport.
  • No-throw publishIMessagePublisher.PublishAsync<T> returns MessageResult<string> (IsSuccess / Value / Error) rather than throwing. PublishBatchAsync<T> sends many in one round-trip and returns one result per message (partial failure is per-message); PublishOptions.Delay defers delivery.
  • Strict-sequential hosted consumer per queue — long-poll, delete-after-success, redrive-on-failure, visibility-renewal heartbeat for slow handlers, FIFO ordering preserved.
  • FIFO support — declare a queue FIFO; publishing requires a MessageGroupId and is startup-validated.
  • SQL-backed idempotencyIIdempotencyStore with an atomic claim/lease, plus an IdempotentHandler<TMessage> base class that skips duplicate deliveries automatically. SQL Server, SQLite, or in-memory backends.
  • Dead-letter queues — declare per queue, then PeekAsync / RedriveAsync (FIFO-correct, ack-after-republish) via IDeadLetterQueue; a hosted monitor reports DLQ depth as a metric.
  • Fail-fast startup validation — declared queues (and DLQs) are verified to exist with matching FIFO-ness before the app starts serving.
  • First-class telemetry — producer/consumer spans with W3C trace context propagated across the queue hop, built-in OTel consume metrics on by default (messaging.client.consumed.messages, messaging.process.duration), a EnrichConsumeActivity hook to tag the consume span, and a queue-reachability health check, via the Davasorus Telemetry pillar.

Installation

dotnet add package Davasorus.Utility.DotNet.Messaging

Quick Start

Configure

AddMessaging is the single composition root. Select a transport with UseSqs(...), optionally override the serializer, choose an idempotency backend with UseIdempotency(...), and register one processor per queue with AddQueueProcessor.

using Davasorus.Utility.DotNet.Messaging.Configuration;

services
    .AddMessaging(m => m
        .UseSqs(sqs => sqs.WithRegion("us-east-1"))
        .WithSerializer(o => o.PropertyNameCaseInsensitive = true))   // optional
    .UseIdempotency(i => i.UseSqlServer())                            // or .UseSqlite() / .UseInMemory()
    .AddQueueProcessor<OrderMessage, OrderHandler>("orders.fifo", q => q
        .RequireFifo()
        .WithDeadLetterQueue("orders-dlq.fifo", fifo: true)
        .WithDefaultMessageGroupId("orders"));

// Optional: a queue-reachability health check.
services.AddHealthChecks().AddMessagingQueueHealthCheck();

AddMessaging requires a transport — calling it without UseSqs(...) (or another backend) throws InvalidOperationException.

Publish

Inject IMessagePublisher and call PublishAsync<T>. Publishing never throws; inspect the returned MessageResult<string>.

public sealed class OrderService(IMessagePublisher publisher)
{
    public async Task PlaceAsync(OrderMessage order, CancellationToken ct)
    {
        var result = await publisher.PublishAsync(
            "orders.fifo",
            order,
            new PublishOptions { MessageGroupId = "orders" },
            ct);

        if (!result.IsSuccess)
        {
            // result.Error carries the captured exception; result.Value is the
            // provider message id on success.
            logger.LogError(result.Error, "Failed to publish order");
        }
    }
}

PublishOptions carries MessageGroupId, DeduplicationId, Delay, and extra Attributes. For a FIFO queue MessageGroupId is required — publishing to a FIFO queue without one returns a failed MessageResult (it does not throw).

Delay defers delivery by up to 15 minutes (SQS DelaySeconds). It is honoured on standard queues; SQS rejects a per-message delay on FIFO queues, which surfaces as a failed MessageResult.

Batch publish

PublishBatchAsync<T> sends many messages in one round-trip (SQS SendMessageBatch, chunked at the 10-message limit internally). It returns one MessageResult<string> per input message in order — partial failure is first-class, so inspect each result. Like single publish it never throws.

var results = await publisher.PublishBatchAsync(
    "orders.fifo",
    orders, // IReadOnlyList<OrderMessage>
    new PublishOptions { MessageGroupId = "orders" },
    ct);

for (var i = 0; i < results.Count; i++)
{
    if (!results[i].IsSuccess)
        logger.LogError(results[i].Error, "Failed to publish order {Index}", i);
}

The supplied PublishOptions (group id, dedup id, delay, attributes) apply to every message in the batch. The same FIFO rule holds — a batch to a FIFO queue without a MessageGroupId returns a failed result for every message and sends nothing.

Consume

Implement IMessageHandler<TMessage>. The hosted poller deserializes each message, invokes your handler, and deletes the message only after the handler returns successfully. Throw to trigger redelivery (and eventually DLQ routing if a redrive policy is in place).

using Davasorus.Utility.DotNet.Messaging.Abstractions;

public sealed class OrderHandler : IMessageHandler<OrderMessage>
{
    public async Task HandleAsync(OrderMessage message, MessageContext context, CancellationToken ct)
    {
        // context.MessageId, context.Attributes, context.ProducerActivityContext
        await ProcessOrderAsync(message, ct);
    }
}

MessageContext exposes MessageId, Attributes, and ProducerActivityContext (the producer's parsed W3C trace context — see Telemetry below).

Delivery is at-least-once. The poller guarantees delete-after-success and redrive-on-failure, but a message can still be delivered more than once (e.g. a handler that succeeds but crashes before the delete is acknowledged). Your handler must be idempotent — use IdempotentHandler<TMessage> (below) or your own dedup.

Message envelope & versioning

Messages of a mapped type (registered via MapMessageType) are published as a self-describing envelope carrying a stable logical type name and a schema version:

{ "type": "orders.placed", "v": 2, "data": { "OrderId": "A1", "Coupon": null } }

An unmapped type publishes a bare body (just the serialized payload, no envelope), so existing producers keep working unchanged — mapping is opt-in.

Registering a type + evolution

services.AddMessaging(m => m
    .UseSqs(/* ... */)
    .MapMessageType<OrderPlaced>("orders.placed", version: 2)
        .Upcast<OrderPlacedV1>(fromVersion: 1, v1 => new OrderPlaced(v1.OrderId, coupon: null)));

The logical name ("orders.placed") is the stable wire contract: rename the C# type freely, the name on the wire never changes. On consume, an older-version envelope is deserialized at its version (e.g. OrderPlacedV1) and then run forward through the upcaster chain, so your handler always receives the current shape (OrderPlaced).

Adding a new version is two lines: bump version: and append one .Upcast(...) that maps the previous version forward. Handler code never changes.

Legacy fallback

A body with no envelope — a message published before the type was mapped, or by an un-upgraded producer — is deserialized as the queue's registered type on a single-type queue. This makes rolling out mapping non-breaking: old and new bodies coexist on the same queue.

Heterogeneous queues

One queue can carry multiple message types, routed to their own handlers by the envelope's logical type name:

services.AddMultiTypeQueueProcessor("events", r => r
    .Handle<OrderPlaced, OrderHandler>()
    .Handle<OrderCancelled, CancelHandler>());

Each handled type must be registered via MapMessageType.

Behavior-pipeline limitation. The behavior pipeline (retry/observability) is not yet applied on the multi-type path. If you need behaviors, use the single-type AddQueueProcessor<TMessage, THandler>. This is a documented limitation.

No silent loss

A too-new version, a malformed envelope (one that intends to be an envelope but has a bad/missing v/data or a non-string type), or a type with no handler on a multi-type queue leaves the message un-acked for broker redrive → DLQ (the same fate as a poison message), logged at Warning. Nothing is silently dropped.

Single-type back-compat exception: on a single-type queue, a body that is not an envelope — including a well-formed envelope whose type is simply unregistered (a pre-envelope legacy body, or a producer that never adopted the registry) — is deserialized as the queue's registered message type (the legacy-bare-body fallback) and handled normally. This is intentional non-breaking back-compat: a single-type queue has an unambiguous fallback type, so it does not redrive such messages. A multi-type queue has no fallback, so an unregistered/unknown type there does redrive → DLQ.

Idempotency

IIdempotencyStore provides an atomic claim/lease: exactly one caller wins the right to run a (messageId, scope) side-effect; concurrent deliveries get false until the lease expires or is released. The easiest way to use it is the IdempotentHandler<TMessage> base class, which claims before running your work and marks it complete after — duplicate deliveries are skipped automatically.

using Davasorus.Utility.DotNet.Messaging.Abstractions;

public sealed class OrderHandler : IdempotentHandler<OrderMessage>
{
    public OrderHandler(IIdempotencyStore store)
        : base(store, scope: "order-finalize", leaseTtl: TimeSpan.FromSeconds(60))
    {
    }

    protected override async Task HandleClaimedAsync(
        OrderMessage message, MessageContext context, CancellationToken ct)
    {
        // Runs at most once per (MessageId, scope). A graceful failure here releases
        // the claim immediately so a redelivery can retry.
        await FinalizeOrderAsync(message, ct);
    }
}

Select the backend in the fluent root:

.UseIdempotency(i => i.UseSqlServer())   // or .UseSqlite() / .UseInMemory()

Set leaseTtl no greater than the queue's visibility timeout. A graceful handler failure releases the claim immediately, but a hard process crash leaves the claim live until the lease expires. If the lease outlives the visibility timeout, the redelivered message could find the claim still held, be skipped, and get silently acked as "done" — losing the work. Keeping leaseTtl ≤ visibility timeout guarantees an orphaned claim expires before redelivery.

Idempotency table schema

The SQL-backed store reads and writes an externally-provisioned table — the pillar does not create it. Provision it with the following schema.

SQL Server[Messaging].[IdempotencyLease]:

CREATE TABLE [Messaging].[IdempotencyLease]
(
    [MessageId]      NVARCHAR(256)    NOT NULL,
    [Scope]          NVARCHAR(128)    NOT NULL,
    [ProcessorId]    UNIQUEIDENTIFIER NULL,        -- MUST be nullable
    [LeaseExpiresAt] DATETIME2        NULL,
    [Completed]      BIT              NOT NULL DEFAULT 0,
    CONSTRAINT [PK_IdempotencyLease] PRIMARY KEY ([MessageId], [Scope])
);

SQLiteMessaging_IdempotencyLease:

CREATE TABLE Messaging_IdempotencyLease
(
    MessageId      TEXT    NOT NULL,
    Scope          TEXT    NOT NULL,
    ProcessorId    TEXT    NULL,
    LeaseExpiresAt TEXT    NULL,
    Completed      INTEGER NOT NULL DEFAULT 0,
    PRIMARY KEY (MessageId, Scope)
);

ProcessorId must be nullable in both schemas — the claim/lease logic depends on ProcessorId IS NULL to detect an unclaimed row.

The SQL-backed store resolves the SQL pillar's services at runtime, so the consumer must also register the SQL pillar (AddSqlServices for SQL Server or AddSqlServicesWithSqlite for SQLite) and logging (AddLogging). The SQL pillar in turn depends on ISqsService, which UseSqs(...) already registers.

Dead-Letter Queues

Declare a DLQ per queue with WithDeadLetterQueue(name, fifo). Inject IDeadLetterQueue to inspect and replay it.

.AddQueueProcessor<OrderMessage, OrderHandler>("orders.fifo", q => q
    .RequireFifo()
    .WithDeadLetterQueue("orders-dlq.fifo", fifo: true)
    .WithDefaultMessageGroupId("orders"));
public sealed class DlqAdmin(IDeadLetterQueue dlq)
{
    public async Task InspectAndRedriveAsync(CancellationToken ct)
    {
        // Peek without permanently removing — peeked messages are briefly reserved.
        var peek = await dlq.PeekAsync<OrderMessage>("orders.fifo", max: 10, ct);
        foreach (var (envelope, body) in peek.Items)
        {
            // body is null if the message failed to deserialize.
            logger.LogWarning("DLQ message {Id}: {Body}", envelope.MessageId, envelope.RawBody);
        }

        // Replay DLQ -> source queue. FIFO-correct, ack-after-republish.
        var outcome = await dlq.RedriveAsync(
            "orders.fifo",
            new RedriveOptions
            {
                MaxMessages = 10,
                Filter = e => true,   // optional predicate; false leaves the message on the DLQ
            },
            ct);

        logger.LogInformation(
            "Redrove {Moved}, skipped {Skipped}", outcome.MovedCount, outcome.SkippedCount);
    }
}

A hosted monitor periodically observes the declared DLQs and reports the most-recently-observed approximate depth as a single process-wide messaging.dlq.depth observable gauge metric (not per-queue; per-queue tagging is a future enhancement), so you can alert on a growing dead-letter backlog. Redrive re-supplies the WithDefaultMessageGroupId value when replaying to a FIFO source queue.

Operational Notes & Prerequisites

These are load-bearing facts for anyone operating the pillar in a real environment.

  1. Infrastructure owns the queues and DLQ. The pillar declares, validates, monitors, and replays queues — it does not create AWS resources. The source queue, the dead-letter queue, and the source queue's redrive policy (which binds the source to its DLQ) are all provisioned by your infrastructure (Terraform/console). The pillar never calls CreateQueue.
  2. Fail-fast startup validation. A hosted validator runs at startup and describes every declared queue (and declared DLQ). If a declared queue does not exist, or its FIFO-ness does not match what you declared (RequireFifo() / RequireStandard()), startup fails fast rather than silently mis-routing messages at runtime.
  3. Delivery is at-least-once; handlers must be idempotent. The poller guarantees delete-after-success and redrive-on-failure, but the handler is responsible for tolerating duplicate delivery. Use IdempotentHandler<TMessage> or your own deduplication.
  4. leaseTtl ≤ queue visibility timeout when using IdempotentHandler — see the Idempotency section above.
  5. Idempotency table is externally provisioned. The SQL-backed store expects the table to already exist with the documented schema; it is not created on first use.
  6. SQL pillar prerequisites. Using a SQL-backed idempotency store requires the SQL pillar (AddSqlServices / AddSqlServicesWithSqlite) and logging to be registered. The SQL pillar depends on ISqsService, which UseSqs(...) provides.

Provider Extensibility

Applications depend on the abstractions (IMessagePublisher, IMessageHandler<T>, IDeadLetterQueue, IIdempotencyStore), never on a specific cloud SDK. SQS is the first backend, implemented behind the IMessageTransport seam. A future backend — Azure Service Bus, ActiveMQ, an in-process/IPC transport for tests and local development — implements IMessageTransport and registers itself the same way UseSqs(...) does. Application code, handlers, publishers, and DLQ administration continue to compile and run unchanged.

Telemetry

Telemetry flows through the Davasorus.Utility.DotNet.Telemetry pillar.

  • Producer/consumer spans. Publishing and consuming emit spans. The producer injects W3C trace context (traceparent) into the message attributes; on receive, the poller parses it and exposes it on MessageContext.ProducerActivityContext. Start your handler's activity with that context as the parent to link your processing spans to the producer's trace across the queue hop.

  • Built-in consume metrics (on by default). Every queue processor emits OTel-standard consume metrics around handler dispatch, via a behavior registered outermost so its timing spans the whole chain (including retries):

    • messaging.client.consumed.messages (counter, unit {message}) — one per dispatch, tagged messaging.destination.name, messaging.system, and messaging.operation.name (process). A failed dispatch additionally carries error.type set to the failure's type name (MessageInProgressException for a message held in-progress by another delivery); a successful dispatch carries no error.type (OTel convention: failures are distinguished by the presence of error.type, not a custom outcome enum).
    • messaging.process.duration (histogram, seconds) — handler-dispatch (processing) duration, same tags.

    When nothing collects the Meter (Davasorus.Utility.Messaging.Consumer), recording is a no-op, so on-by-default costs nothing if you don't collect metrics. Turn it off with .WithoutObservability() on the messaging builder (tracing spans still emit; only the metric counter/histogram are suppressed).

  • Enrich the consume span. Add custom tags to the package's consume span without subclassing:

    services.AddMessaging(m => m
        .UseSqs(/* ... */)
        .EnrichConsumeActivity((activity, ctx) => activity.SetTag("app.tenant", ResolveTenant(ctx))));
    

    The callback receives the live consume Activity and the MessageContext. It is best-effort — a callback that throws is guarded and does not break dispatch. For custom observability logic (your own spans/business metrics), write an IMessageHandlerBehavior<TMessage> — the handler pipeline already supports it; no parallel observer abstraction is added.

  • DLQ depth. The dead-letter-queue monitor emits a single process-wide messaging.dlq.depth observable gauge (unit {message}) reporting the most-recently-observed DLQ depth across all declared DLQs (per-queue tagging is a future enhancement).

  • Health check. AddMessagingQueueHealthCheck() registers a health check that probes every registered queue for reachability and reports Unhealthy if any queue cannot be described.

Dependencies

  • Davasorus.Utility.DotNet.SQS (the first transport backend)
  • Davasorus.Utility.DotNet.SQL (SQL-backed idempotency store)
  • Davasorus.Utility.DotNet.Telemetry
  • Microsoft.Extensions.{DependencyInjection.Abstractions, Hosting.Abstractions, Options, Logging.Abstractions, Diagnostics.HealthChecks}

License

MIT.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2026.2.3.11 154 6/15/2026
2026.2.3.10 86 6/14/2026
2026.2.3.9 84 6/14/2026
2026.2.3.8 90 6/14/2026
2026.2.3.7 91 6/13/2026
2026.2.3.6 93 6/13/2026
2026.2.3.5 93 6/13/2026
2026.2.3.4 104 6/12/2026
2026.2.3.3 92 6/12/2026
2026.2.3.2 90 6/11/2026
2026.2.3.1 91 6/11/2026