Davasorus.Utility.DotNet.Messaging
2026.2.3.11
dotnet add package Davasorus.Utility.DotNet.Messaging --version 2026.2.3.11
NuGet\Install-Package Davasorus.Utility.DotNet.Messaging -Version 2026.2.3.11
<PackageReference Include="Davasorus.Utility.DotNet.Messaging" Version="2026.2.3.11" />
<PackageVersion Include="Davasorus.Utility.DotNet.Messaging" Version="2026.2.3.11" />
<PackageReference Include="Davasorus.Utility.DotNet.Messaging" />
paket add Davasorus.Utility.DotNet.Messaging --version 2026.2.3.11
#r "nuget: Davasorus.Utility.DotNet.Messaging, 2026.2.3.11"
#:package Davasorus.Utility.DotNet.Messaging@2026.2.3.11
#addin nuget:?package=Davasorus.Utility.DotNet.Messaging&version=2026.2.3.11
#tool nuget:?package=Davasorus.Utility.DotNet.Messaging&version=2026.2.3.11
Davasorus.Utility.DotNet.Messaging
Overview
Davasorus.Utility.DotNet.Messaging is a provider-agnostic messaging/queueing
pillar for .NET applications. Your code depends on a small set of transport-neutral
abstractions — publish, consume, idempotency, dead-letter queues — and a concrete
backend plugs in behind the IMessageTransport seam. Amazon SQS is the first
backend; future backends (Azure Service Bus, ActiveMQ, local/IPC) slot in behind
the same seam with no application changes.
The package wires a strict-sequential hosted poller per queue (delete-after-success,
visibility renewal for slow handlers, FIFO-preserving), at-least-once delivery with
SQL-backed idempotency, FIFO and dead-letter-queue support, and end-to-end
OpenTelemetry tracing across the queue hop — all behind a single fluent
AddMessaging(...) composition root.
The package multi-targets net8.0 and net10.0.
Features
- One fluent composition root —
AddMessaging(m => ...)selects a transport, serializer, idempotency backend, and per-queue processors in one place. - Provider-agnostic — apps depend on
IMessagePublisher/IMessageHandler<T>/IDeadLetterQueue; SQS is the first backend behindIMessageTransport. - No-throw publish —
IMessagePublisher.PublishAsync<T>returnsMessageResult<string>(IsSuccess/Value/Error) rather than throwing.PublishBatchAsync<T>sends many in one round-trip and returns one result per message (partial failure is per-message);PublishOptions.Delaydefers delivery. - Strict-sequential hosted consumer per queue — long-poll, delete-after-success, redrive-on-failure, visibility-renewal heartbeat for slow handlers, FIFO ordering preserved.
- FIFO support — declare a queue FIFO; publishing requires a
MessageGroupIdand is startup-validated. - SQL-backed idempotency —
IIdempotencyStorewith an atomic claim/lease, plus anIdempotentHandler<TMessage>base class that skips duplicate deliveries automatically. SQL Server, SQLite, or in-memory backends. - Dead-letter queues — declare per queue, then
PeekAsync/RedriveAsync(FIFO-correct, ack-after-republish) viaIDeadLetterQueue; a hosted monitor reports DLQ depth as a metric. - Fail-fast startup validation — declared queues (and DLQs) are verified to exist with matching FIFO-ness before the app starts serving.
- First-class telemetry — producer/consumer spans with W3C trace context
propagated across the queue hop, built-in OTel consume metrics on by default
(
messaging.client.consumed.messages,messaging.process.duration), aEnrichConsumeActivityhook to tag the consume span, and a queue-reachability health check, via the Davasorus Telemetry pillar.
Installation
dotnet add package Davasorus.Utility.DotNet.Messaging
Quick Start
Configure
AddMessaging is the single composition root. Select a transport with UseSqs(...),
optionally override the serializer, choose an idempotency backend with
UseIdempotency(...), and register one processor per queue with AddQueueProcessor.
using Davasorus.Utility.DotNet.Messaging.Configuration;
services
.AddMessaging(m => m
.UseSqs(sqs => sqs.WithRegion("us-east-1"))
.WithSerializer(o => o.PropertyNameCaseInsensitive = true)) // optional
.UseIdempotency(i => i.UseSqlServer()) // or .UseSqlite() / .UseInMemory()
.AddQueueProcessor<OrderMessage, OrderHandler>("orders.fifo", q => q
.RequireFifo()
.WithDeadLetterQueue("orders-dlq.fifo", fifo: true)
.WithDefaultMessageGroupId("orders"));
// Optional: a queue-reachability health check.
services.AddHealthChecks().AddMessagingQueueHealthCheck();
AddMessaging requires a transport — calling it without UseSqs(...) (or another
backend) throws InvalidOperationException.
Publish
Inject IMessagePublisher and call PublishAsync<T>. Publishing never throws;
inspect the returned MessageResult<string>.
public sealed class OrderService(IMessagePublisher publisher)
{
public async Task PlaceAsync(OrderMessage order, CancellationToken ct)
{
var result = await publisher.PublishAsync(
"orders.fifo",
order,
new PublishOptions { MessageGroupId = "orders" },
ct);
if (!result.IsSuccess)
{
// result.Error carries the captured exception; result.Value is the
// provider message id on success.
logger.LogError(result.Error, "Failed to publish order");
}
}
}
PublishOptions carries MessageGroupId, DeduplicationId, Delay, and extra
Attributes. For a FIFO queue MessageGroupId is required — publishing to a
FIFO queue without one returns a failed MessageResult (it does not throw).
Delay defers delivery by up to 15 minutes (SQS DelaySeconds). It is honoured on
standard queues; SQS rejects a per-message delay on FIFO queues, which surfaces as a
failed MessageResult.
Batch publish
PublishBatchAsync<T> sends many messages in one round-trip (SQS SendMessageBatch,
chunked at the 10-message limit internally). It returns one MessageResult<string>
per input message in order — partial failure is first-class, so inspect each result.
Like single publish it never throws.
var results = await publisher.PublishBatchAsync(
"orders.fifo",
orders, // IReadOnlyList<OrderMessage>
new PublishOptions { MessageGroupId = "orders" },
ct);
for (var i = 0; i < results.Count; i++)
{
if (!results[i].IsSuccess)
logger.LogError(results[i].Error, "Failed to publish order {Index}", i);
}
The supplied PublishOptions (group id, dedup id, delay, attributes) apply to every
message in the batch. The same FIFO rule holds — a batch to a FIFO queue without a
MessageGroupId returns a failed result for every message and sends nothing.
Consume
Implement IMessageHandler<TMessage>. The hosted poller deserializes each message,
invokes your handler, and deletes the message only after the handler returns
successfully. Throw to trigger redelivery (and eventually DLQ routing if a redrive
policy is in place).
using Davasorus.Utility.DotNet.Messaging.Abstractions;
public sealed class OrderHandler : IMessageHandler<OrderMessage>
{
public async Task HandleAsync(OrderMessage message, MessageContext context, CancellationToken ct)
{
// context.MessageId, context.Attributes, context.ProducerActivityContext
await ProcessOrderAsync(message, ct);
}
}
MessageContext exposes MessageId, Attributes, and ProducerActivityContext
(the producer's parsed W3C trace context — see Telemetry below).
Delivery is at-least-once. The poller guarantees delete-after-success and redrive-on-failure, but a message can still be delivered more than once (e.g. a handler that succeeds but crashes before the delete is acknowledged). Your handler must be idempotent — use
IdempotentHandler<TMessage>(below) or your own dedup.
Message envelope & versioning
Messages of a mapped type (registered via MapMessageType) are published as a
self-describing envelope carrying a stable logical type name and a schema version:
{ "type": "orders.placed", "v": 2, "data": { "OrderId": "A1", "Coupon": null } }
An unmapped type publishes a bare body (just the serialized payload, no envelope), so existing producers keep working unchanged — mapping is opt-in.
Registering a type + evolution
services.AddMessaging(m => m
.UseSqs(/* ... */)
.MapMessageType<OrderPlaced>("orders.placed", version: 2)
.Upcast<OrderPlacedV1>(fromVersion: 1, v1 => new OrderPlaced(v1.OrderId, coupon: null)));
The logical name ("orders.placed") is the stable wire contract: rename the C#
type freely, the name on the wire never changes. On consume, an older-version envelope
is deserialized at its version (e.g. OrderPlacedV1) and then run forward through the
upcaster chain, so your handler always receives the current shape (OrderPlaced).
Adding a new version is two lines: bump version: and append one .Upcast(...) that
maps the previous version forward. Handler code never changes.
Legacy fallback
A body with no envelope — a message published before the type was mapped, or by an un-upgraded producer — is deserialized as the queue's registered type on a single-type queue. This makes rolling out mapping non-breaking: old and new bodies coexist on the same queue.
Heterogeneous queues
One queue can carry multiple message types, routed to their own handlers by the envelope's logical type name:
services.AddMultiTypeQueueProcessor("events", r => r
.Handle<OrderPlaced, OrderHandler>()
.Handle<OrderCancelled, CancelHandler>());
Each handled type must be registered via MapMessageType.
Behavior-pipeline limitation. The behavior pipeline (retry/observability) is not yet applied on the multi-type path. If you need behaviors, use the single-type
AddQueueProcessor<TMessage, THandler>. This is a documented limitation.
No silent loss
A too-new version, a malformed envelope (one that intends to be an envelope but has a
bad/missing v/data or a non-string type), or a type with no handler on a multi-type
queue leaves the message un-acked for broker redrive → DLQ (the same fate as a poison
message), logged at Warning. Nothing is silently dropped.
Single-type back-compat exception: on a single-type queue, a body that is not an
envelope — including a well-formed envelope whose type is simply unregistered (a
pre-envelope legacy body, or a producer that never adopted the registry) — is deserialized
as the queue's registered message type (the legacy-bare-body fallback) and handled normally.
This is intentional non-breaking back-compat: a single-type queue has an unambiguous fallback
type, so it does not redrive such messages. A multi-type queue has no fallback, so an
unregistered/unknown type there does redrive → DLQ.
Idempotency
IIdempotencyStore provides an atomic claim/lease: exactly one caller wins the right
to run a (messageId, scope) side-effect; concurrent deliveries get false until
the lease expires or is released. The easiest way to use it is the
IdempotentHandler<TMessage> base class, which claims before running your work and
marks it complete after — duplicate deliveries are skipped automatically.
using Davasorus.Utility.DotNet.Messaging.Abstractions;
public sealed class OrderHandler : IdempotentHandler<OrderMessage>
{
public OrderHandler(IIdempotencyStore store)
: base(store, scope: "order-finalize", leaseTtl: TimeSpan.FromSeconds(60))
{
}
protected override async Task HandleClaimedAsync(
OrderMessage message, MessageContext context, CancellationToken ct)
{
// Runs at most once per (MessageId, scope). A graceful failure here releases
// the claim immediately so a redelivery can retry.
await FinalizeOrderAsync(message, ct);
}
}
Select the backend in the fluent root:
.UseIdempotency(i => i.UseSqlServer()) // or .UseSqlite() / .UseInMemory()
Set
leaseTtlno greater than the queue's visibility timeout. A graceful handler failure releases the claim immediately, but a hard process crash leaves the claim live until the lease expires. If the lease outlives the visibility timeout, the redelivered message could find the claim still held, be skipped, and get silently acked as "done" — losing the work. KeepingleaseTtl ≤ visibility timeoutguarantees an orphaned claim expires before redelivery.
Idempotency table schema
The SQL-backed store reads and writes an externally-provisioned table — the pillar does not create it. Provision it with the following schema.
SQL Server — [Messaging].[IdempotencyLease]:
CREATE TABLE [Messaging].[IdempotencyLease]
(
[MessageId] NVARCHAR(256) NOT NULL,
[Scope] NVARCHAR(128) NOT NULL,
[ProcessorId] UNIQUEIDENTIFIER NULL, -- MUST be nullable
[LeaseExpiresAt] DATETIME2 NULL,
[Completed] BIT NOT NULL DEFAULT 0,
CONSTRAINT [PK_IdempotencyLease] PRIMARY KEY ([MessageId], [Scope])
);
SQLite — Messaging_IdempotencyLease:
CREATE TABLE Messaging_IdempotencyLease
(
MessageId TEXT NOT NULL,
Scope TEXT NOT NULL,
ProcessorId TEXT NULL,
LeaseExpiresAt TEXT NULL,
Completed INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (MessageId, Scope)
);
ProcessorIdmust be nullable in both schemas — the claim/lease logic depends onProcessorId IS NULLto detect an unclaimed row.
The SQL-backed store resolves the SQL pillar's services at runtime, so the consumer
must also register the SQL pillar (AddSqlServices for SQL Server or
AddSqlServicesWithSqlite for SQLite) and logging (AddLogging). The SQL pillar in
turn depends on ISqsService, which UseSqs(...) already registers.
Dead-Letter Queues
Declare a DLQ per queue with WithDeadLetterQueue(name, fifo). Inject
IDeadLetterQueue to inspect and replay it.
.AddQueueProcessor<OrderMessage, OrderHandler>("orders.fifo", q => q
.RequireFifo()
.WithDeadLetterQueue("orders-dlq.fifo", fifo: true)
.WithDefaultMessageGroupId("orders"));
public sealed class DlqAdmin(IDeadLetterQueue dlq)
{
public async Task InspectAndRedriveAsync(CancellationToken ct)
{
// Peek without permanently removing — peeked messages are briefly reserved.
var peek = await dlq.PeekAsync<OrderMessage>("orders.fifo", max: 10, ct);
foreach (var (envelope, body) in peek.Items)
{
// body is null if the message failed to deserialize.
logger.LogWarning("DLQ message {Id}: {Body}", envelope.MessageId, envelope.RawBody);
}
// Replay DLQ -> source queue. FIFO-correct, ack-after-republish.
var outcome = await dlq.RedriveAsync(
"orders.fifo",
new RedriveOptions
{
MaxMessages = 10,
Filter = e => true, // optional predicate; false leaves the message on the DLQ
},
ct);
logger.LogInformation(
"Redrove {Moved}, skipped {Skipped}", outcome.MovedCount, outcome.SkippedCount);
}
}
A hosted monitor periodically observes the declared DLQs and reports the
most-recently-observed approximate depth as a single process-wide
messaging.dlq.depth observable gauge metric (not per-queue; per-queue tagging is a
future enhancement), so you can alert on a growing dead-letter backlog. Redrive
re-supplies the WithDefaultMessageGroupId value when replaying to a FIFO source
queue.
Operational Notes & Prerequisites
These are load-bearing facts for anyone operating the pillar in a real environment.
- Infrastructure owns the queues and DLQ. The pillar declares, validates,
monitors, and replays queues — it does not create AWS resources. The source
queue, the dead-letter queue, and the source queue's redrive policy (which
binds the source to its DLQ) are all provisioned by your infrastructure
(Terraform/console). The pillar never calls
CreateQueue. - Fail-fast startup validation. A hosted validator runs at startup and
describes every declared queue (and declared DLQ). If a declared queue does not
exist, or its FIFO-ness does not match what you declared (
RequireFifo()/RequireStandard()), startup fails fast rather than silently mis-routing messages at runtime. - Delivery is at-least-once; handlers must be idempotent. The poller guarantees
delete-after-success and redrive-on-failure, but the handler is responsible for
tolerating duplicate delivery. Use
IdempotentHandler<TMessage>or your own deduplication. leaseTtl ≤ queue visibility timeoutwhen usingIdempotentHandler— see the Idempotency section above.- Idempotency table is externally provisioned. The SQL-backed store expects the table to already exist with the documented schema; it is not created on first use.
- SQL pillar prerequisites. Using a SQL-backed idempotency store requires the
SQL pillar (
AddSqlServices/AddSqlServicesWithSqlite) and logging to be registered. The SQL pillar depends onISqsService, whichUseSqs(...)provides.
Provider Extensibility
Applications depend on the abstractions (IMessagePublisher, IMessageHandler<T>,
IDeadLetterQueue, IIdempotencyStore), never on a specific cloud SDK. SQS is the
first backend, implemented behind the IMessageTransport seam. A future backend —
Azure Service Bus, ActiveMQ, an in-process/IPC transport for tests and local
development — implements IMessageTransport and registers itself the same way
UseSqs(...) does. Application code, handlers, publishers, and DLQ administration
continue to compile and run unchanged.
Telemetry
Telemetry flows through the Davasorus.Utility.DotNet.Telemetry pillar.
Producer/consumer spans. Publishing and consuming emit spans. The producer injects W3C trace context (
traceparent) into the message attributes; on receive, the poller parses it and exposes it onMessageContext.ProducerActivityContext. Start your handler's activity with that context as the parent to link your processing spans to the producer's trace across the queue hop.Built-in consume metrics (on by default). Every queue processor emits OTel-standard consume metrics around handler dispatch, via a behavior registered outermost so its timing spans the whole chain (including retries):
messaging.client.consumed.messages(counter, unit{message}) — one per dispatch, taggedmessaging.destination.name,messaging.system, andmessaging.operation.name(process). A failed dispatch additionally carrieserror.typeset to the failure's type name (MessageInProgressExceptionfor a message held in-progress by another delivery); a successful dispatch carries noerror.type(OTel convention: failures are distinguished by the presence oferror.type, not a custom outcome enum).messaging.process.duration(histogram, seconds) — handler-dispatch (processing) duration, same tags.
When nothing collects the Meter (
Davasorus.Utility.Messaging.Consumer), recording is a no-op, so on-by-default costs nothing if you don't collect metrics. Turn it off with.WithoutObservability()on the messaging builder (tracing spans still emit; only the metric counter/histogram are suppressed).Enrich the consume span. Add custom tags to the package's consume span without subclassing:
services.AddMessaging(m => m .UseSqs(/* ... */) .EnrichConsumeActivity((activity, ctx) => activity.SetTag("app.tenant", ResolveTenant(ctx))));The callback receives the live consume
Activityand theMessageContext. It is best-effort — a callback that throws is guarded and does not break dispatch. For custom observability logic (your own spans/business metrics), write anIMessageHandlerBehavior<TMessage>— the handler pipeline already supports it; no parallel observer abstraction is added.DLQ depth. The dead-letter-queue monitor emits a single process-wide
messaging.dlq.depthobservable gauge (unit{message}) reporting the most-recently-observed DLQ depth across all declared DLQs (per-queue tagging is a future enhancement).Health check.
AddMessagingQueueHealthCheck()registers a health check that probes every registered queue for reachability and reportsUnhealthyif any queue cannot be described.
Dependencies
- Davasorus.Utility.DotNet.SQS (the first transport backend)
- Davasorus.Utility.DotNet.SQL (SQL-backed idempotency store)
- Davasorus.Utility.DotNet.Telemetry
- Microsoft.Extensions.{DependencyInjection.Abstractions, Hosting.Abstractions, Options, Logging.Abstractions, Diagnostics.HealthChecks}
License
MIT.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Davasorus.Utility.DotNet.SQL (>= 2026.2.3.2)
- Davasorus.Utility.DotNet.SQS (>= 2026.2.3.3)
- Davasorus.Utility.DotNet.Telemetry (>= 2026.2.3.2)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.9)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 10.0.9)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.9)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.9)
- Microsoft.Extensions.Options (>= 10.0.9)
-
net8.0
- Davasorus.Utility.DotNet.SQL (>= 2026.2.3.2)
- Davasorus.Utility.DotNet.SQS (>= 2026.2.3.3)
- Davasorus.Utility.DotNet.Telemetry (>= 2026.2.3.2)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.9)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 10.0.9)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.9)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.9)
- Microsoft.Extensions.Options (>= 10.0.9)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2026.2.3.11 | 154 | 6/15/2026 |
| 2026.2.3.10 | 86 | 6/14/2026 |
| 2026.2.3.9 | 84 | 6/14/2026 |
| 2026.2.3.8 | 90 | 6/14/2026 |
| 2026.2.3.7 | 91 | 6/13/2026 |
| 2026.2.3.6 | 93 | 6/13/2026 |
| 2026.2.3.5 | 93 | 6/13/2026 |
| 2026.2.3.4 | 104 | 6/12/2026 |
| 2026.2.3.3 | 92 | 6/12/2026 |
| 2026.2.3.2 | 90 | 6/11/2026 |
| 2026.2.3.1 | 91 | 6/11/2026 |