Uniphar.ServiceBus.Extensions
2.8.0
dotnet add package Uniphar.ServiceBus.Extensions --version 2.8.0
NuGet\Install-Package Uniphar.ServiceBus.Extensions -Version 2.8.0
<PackageReference Include="Uniphar.ServiceBus.Extensions" Version="2.8.0" />
<PackageVersion Include="Uniphar.ServiceBus.Extensions" Version="2.8.0" />
<PackageReference Include="Uniphar.ServiceBus.Extensions" />
paket add Uniphar.ServiceBus.Extensions --version 2.8.0
#r "nuget: Uniphar.ServiceBus.Extensions, 2.8.0"
#:package Uniphar.ServiceBus.Extensions@2.8.0
#addin nuget:?package=Uniphar.ServiceBus.Extensions&version=2.8.0
#tool nuget:?package=Uniphar.ServiceBus.Extensions&version=2.8.0
ServiceBus Extensions
NuGet package repository for ServiceBus Extensions
Purpose
- Administration of:
- Topics
- Subscriptions
- Usage of:
- Processors
- Senders
Everything can be configured in appsettings.json
. The producer owns the topic, the consumer owns the subscription.
Administration
Register an administration client:
builder.Services.Configure<SubscriberOptions>(builder.Configuration.GetSection(nameof(SubscriberOptions)));
builder.Services.Configure<TopicOptions>(builder.Configuration.GetSection(nameof(TopicOptions)));
builder.Services.AddSingleton(
new ServiceBusAdministrationClient(builder.Configuration["AzureServiceBus:Name"]!,
new DefaultAzureCredential()));
Then use it to initialize topics and subscriptions.
IOptions<TopicOptions> topicOptions;
await topicOptions.InitializeAsync(serviceBusAdministrationClient);
IOptions<SubscriberOptions> subscriberOptions;
await subscriberOptions.InitializeAsync(serviceBusAdministrationClient);
TopicOption
"TopicOptions": {
"Topics": [
{
"Name": "topic-name"
}
]
}
SubscriberOption
"SubscriberOptions": {
"SubscriptionKey": {
"Name": "subscription-name",
"TopicName": "topic-name",
"CorrelationRules": [
{
"Name": "filter-name",
"Subject": "subject-name",
}
]
}
}
Rules
Each Subscription configuration element can contain a set of rules that can be applied to the subscription. There are two types of rules that can be configured a SqlRule and a CorrelationRule. Therefore each CorrelationRule property may be overridden from its default value.
CorrelationRule
A CorrelationRule configuration type contains optional attributes that correspond to the properties of the CorrelationRuleFilter class.
Application Properties
"SubscriberOptions": {
"Subscriptions": [
{
"Name": "subscription-name",
"MaxDeliveryCount": 20,
"CorrelationRules": [
{
"Name": "CorrelationRule",
"Subject ": "MessageSubject",
"ApplicationProperties": {
"object_type": "ObjectType",
"event_type": "Created"
}
}
]
}
]
}
A CorrelationRule may also specify ApplicationProperties that are applied to the CorrelationRuleFilter. These attributes are key value pairs, defined as a dictionary.
SqlRule
"SubscriberOptions": {
"Subscriptions": [
{
"Name": "subscription-name",
"MaxDeliveryCount": 20,
"SqlRules": [
{
"Name": "SqlRule",
"Filter" : "color='blue' AND quantity=10",
"Action": "SET quantity = quantity / 2;"
}
]
}
]
}
A SqlRule configuration type contains optional attributes that correspond to the properties of the SqlRuleFilter class.
The required attributes are Name and Filter the Action attribute is optional.
On
InitializeAsync
the following will happen:
- New rules are created
- Existing rules that are not in the configuration are removed
- Existing rules that are in the configuration are updated to match the configuration
Usage
There is an extension method to register processors and senders:
var serviceBusName = builder.Configuration["AzureServiceBus:Name"]!;
builder.AddServiceBusProcessors(serviceBusName, jsonSerializerOptions);
Sender
Add a SenderOptions
section in appsettings.json
:
"SenderOptions": [
{
"Identifier": "sender-id",
"QueueOrTopicName": "topic-name"
}
]
Then call AddSender(...)
:
builder.AddServiceBusProcessors(...)
.AddSender("sender-id")
The sender then can be injected into your services:
[FromKeyedServices("sender-id")]
ServiceBusSender serviceBusSender
Processor
In each subscriber options add:
"Processor": {
"ProcessingType": "Topic",
"MaxConcurrentCalls": 10
}
Implement an IMessageHandler<T>
for the message type to process. Then call AddProcessor(...)
:
builder.AddServiceBusProcessors(...)
.AddProcessor<MyMessageHandler, MyMessage>("subscription-name")
This calls register a HostedService
that will process messages from the subscription. The MyMessageHandler
will be called for each message of type MyMessage
.
The service handles parsing, retry, dead-letter
queue and more.
A container called
dead-letter
must exist in Azure Storage.
Message Handler
HandlerAsync
expect a IMessageResult
returned.
Use the code below to tell the producer what to do with the message:
MessageResult.Complete();
MessageResult.Abandon();
MessageResult.Timeout("reason");
How it works:
- It receives messages from the subscription.
- It calls the
HandlerAsync
method of the registeredIMessageHandler<T>
. - If
IMessageResult
is completed, the message is completed too. - If
IMessageResult
is abandoned, the message is abandoned and will be redelivered later. - If
IMessageResult
is timed out, the message is added to theDLQ
(dead-letter-queue) with the specified reason.- If three consecutive messages time out (configurable), then the processor stops for 5 minutes (configurable)
- When the DLQ processor receives a message:
- If it has been delivered less than three times (configurable), it re-queues the message in the original subscription.
- Otherwise, it adds the message to the
DLQ
Azure Storage container for manual inspection.
Logs
- ERROR
An error occurred while processing events in {ErrorSource}
- ERROR
An error occurred while deserializing message {MessageId}
- WARN:
Message {MessageId} has been redelivered {DeliveryCount} times by {Processor}
- INFO
Message {MessageId} has been re-queued from the dead-letter queue
- WARN
Message {MessageId} stored in cold storage
Testing
There is a utility class to wait messages. For example:
private readonly MessageTester _messageTester = new(JsonSerializerOptions.Default);
var found = await _messageTester.AssertReceivedAsync<SalesOrderConfirmedEvent>(
fixture.OrderConfirmedReceiver!, ev => ev.OrderId == orderId);
Assert.True(found, $"Order with ID {orderId} was not found in Confirmed Order messages.");
where OrderConfirmedReceiver
is:
_serviceBusClient = new ServiceBusClient(
serviceBusName,
new DefaultAzureCredential(),
serviceBusClientOptions);
OrderConfirmedReceiver = _serviceBusClient.CreateReceiver(
_options.TopicName!,
_options.SalesOrderConfirmedSubscriptionName);
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Azure.Identity (>= 1.14.2)
- Azure.Messaging.ServiceBus (>= 7.20.1)
- Azure.Storage.Blobs (>= 12.25.0)
- Microsoft.Extensions.Configuration.Binder (>= 8.0.2)
- Microsoft.Extensions.Hosting.Abstractions (>= 8.0.1)
- Microsoft.Extensions.Options (>= 9.0.8)
- Uniphar.PlatformEvents (>= 8.8.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.