Messaging.Queue.Core
1.0.4
See the version list below for details.
dotnet add package Messaging.Queue.Core --version 1.0.4
NuGet\Install-Package Messaging.Queue.Core -Version 1.0.4
<PackageReference Include="Messaging.Queue.Core" Version="1.0.4" />
<PackageVersion Include="Messaging.Queue.Core" Version="1.0.4" />
<PackageReference Include="Messaging.Queue.Core" />
paket add Messaging.Queue.Core --version 1.0.4
#r "nuget: Messaging.Queue.Core, 1.0.4"
#:package Messaging.Queue.Core@1.0.4
#addin nuget:?package=Messaging.Queue.Core&version=1.0.4
#tool nuget:?package=Messaging.Queue.Core&version=1.0.4
Messaging.Core
A Broker-agnostic messaging template. It provides robust, resilient, and observable abstractions while currently offering a highly tuned implementation for RabbitMQ.
Architecture & Execution Flow
The following diagram illustrates the lifecycle of a message from the broker through the internal execution pipeline, including gates, resilience policies, and behaviors.
flowchart TD
classDef broker fill:#f9f,stroke:#333,stroke-width:2px;
classDef pipeline fill:#fff3e0,stroke:#f57c00,stroke-width:2px;
classDef success fill:#c8e6c9,stroke:#388e3c,stroke-width:2px;
classDef fail fill:#ffcdd2,stroke:#d32f2f,stroke-width:2px;
subgraph Broker [RabbitMQ]
Queue[(Main Queue)]:::broker
DLQ[(Dead Letter Queue)]:::broker
end
subgraph Service [RabbitMqConsumerService]
direction TB
%% Gates
GateMonitor((Gate Monitor)) -->|Poll Interval| Gates{IConsumerGate}
Gates -->|All Open| Consume[Active Consumption]
Gates -->|Any Closed| Pause[Pause Consumption]
Pause -.->|Wait & Retry| GateMonitor
%% Delivery
Queue -->|BasicConsume| Consume
Consume -->|BasicDeliver| Deserialize[Deserialize JSON]
%% Execution
Deserialize --> Polly[Polly Resilience Pipeline]:::pipeline
subgraph Pipeline [Consumer Pipeline]
direction TB
Log[Logging Behavior] --> Trace[Tracing Behavior]
Trace --> Custom[Custom Behaviors...]
Custom --> Handler[[IMessageConsumer<T>]]
end
Polly --> Pipeline
%% Outcomes
Handler -- Success --> Ack[BasicAck]:::success
Handler -. Exception .-> Retry{Max Retries?}
Retry -. Retries left .-> Polly
Retry -. Exhausted .-> Nack[BasicNack]:::fail
end
Ack -->|Complete| Queue
Nack -->|x-dead-letter| DLQ
Key Features
- Clean Abstractions:
IMessageConsumer<T>,IMessage, andIMessagePublisherseparate your business logic from the underlying broker implementation. - Resilience (Polly v8): Configurable exponential backoff retries with full jitter. Automatically routes messages to a Dead Letter Queue (DLQ) after configured
MaxRetryAttempts. - Consumer Gates (
IConsumerGate): Pause consumption at the broker level when external dependencies (e.g., an API or Database) are unavailable, and automatically resume delivery when they recover. - Pipeline Behaviors: Middleware execution pipeline for consumer handlers (
IConsumerPipelineBehavior<TMessage>). Built-in behaviors include globalLoggingBehaviorandTracingBehavior. - Observability:
- OpenTelemetry: Creates distributed traces automatically connecting publishers and consumers. The
ActivitySourcename is provided by the implementer. - Structured Logging: High-performance Serilog integration using
[LoggerMessage]delegates.
- OpenTelemetry: Creates distributed traces automatically connecting publishers and consumers. The
- Graceful Shutdown: Drains in-flight messages cleanly before closing the channel, utilizing a configurable
ShutdownTimeoutSeconds. - K8s-Ready: Fully integrated with
AspNetCore.HealthChecks.RabbitMQ, providing automated readiness and liveness probes.
Quick Start
Register your broker, publishers, and consumers in Program.cs.
You can chain global behaviors and consumer-specific gates gracefully.
using Messaging.Core.Extensions;
using Messaging.Core.Pipeline;
var builder = WebApplication.CreateBuilder(args);
// 1. Add Broker and Publisher
builder.Services
.AddRabbitMqBroker(builder.Configuration)
.AddRabbitMqPublisher();
// 2. Add Global Behaviors (applied to all consumers)
builder.Services
.AddGlobalConsumerBehavior(typeof(LoggingBehavior<>))
.AddGlobalConsumerBehavior(typeof(TracingBehavior<>));
// 3. Register Consumers with Gates
builder.Services
.AddConsumer<SampleMessage, SampleConsumer>("sample-queue")
.WithGate<SampleDatabaseGate>(); // Consumption pauses if DB is unreachable
// 4. Observability & Health
builder.Services
.AddConsumerHealthChecks();
builder.Services
.AddOpenTelemetry()
.AddConsumerTracing("Messaging.Sample");
var app = builder.Build();
app.MapHealthEndpoint();
await app.RunAsync();
Consumer Example
Consumers simply implement IMessageConsumer<TMessage> and focus purely on business logic:
using Messaging.Core.Abstractions;
public class SampleMessage : IMessage
{
public Guid MessageId { get; init; } = Guid.NewGuid();
public string Payload { get; init; } = string.Empty;
}
public class SampleConsumer(ILogger<SampleConsumer> logger) : IMessageConsumer<SampleMessage>
{
public async Task ConsumeAsync(SampleMessage message, CancellationToken cancellationToken)
{
logger.LogInformation("Processing message: {Payload}", message.Payload);
// Throwing an exception here automatically triggers Polly retries
// and eventually routing to the DLQ.
}
}
Configuration
Settings are bound from the RabbitMq and Consumer sections in appsettings.json.
{
"RabbitMq": {
"Host": "localhost",
"Port": 5672,
"VirtualHost": "/",
"Username": "guest",
"Password": "guest", // Should be overridden via Environment Variables or Kubernetes Secrets.
"EnableDeadLetterQueue": true
},
"Consumer": {
"QueueName": "default-queue",
"ConcurrencyLimit": 10,
"MaxRetryAttempts": 3,
"RetryBaseDelayMs": 100,
"ShutdownTimeoutSeconds": 30,
"GatePollingIntervalSeconds": 10
}
}
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- AspNetCore.HealthChecks.RabbitMQ (>= 9.0.0)
- Microsoft.Extensions.Http.Resilience (>= 9.10.0)
- OpenTelemetry.Exporter.Console (>= 1.15.1)
- OpenTelemetry.Exporter.OpenTelemetryProtocol (>= 1.15.1)
- OpenTelemetry.Extensions.Hosting (>= 1.15.1)
- OpenTelemetry.Instrumentation.Runtime (>= 1.15.0)
- Polly.Core (>= 8.6.6)
- prometheus-net.AspNetCore (>= 8.2.1)
- RabbitMQ.Client (>= 7.2.1)
- Serilog.Enrichers.Environment (>= 3.0.1)
- Serilog.Enrichers.Thread (>= 4.0.0)
- Serilog.Extensions.Hosting (>= 9.0.0)
- Serilog.Settings.Configuration (>= 9.0.0)
- Serilog.Sinks.Console (>= 6.1.1)
- Serilog.Sinks.Elasticsearch (>= 10.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.