Uniphar.ServiceBus.Extensions 2.8.0

dotnet add package Uniphar.ServiceBus.Extensions --version 2.8.0
                    
NuGet\Install-Package Uniphar.ServiceBus.Extensions -Version 2.8.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Uniphar.ServiceBus.Extensions" Version="2.8.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Uniphar.ServiceBus.Extensions" Version="2.8.0" />
                    
Directory.Packages.props
<PackageReference Include="Uniphar.ServiceBus.Extensions" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Uniphar.ServiceBus.Extensions --version 2.8.0
                    
#r "nuget: Uniphar.ServiceBus.Extensions, 2.8.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Uniphar.ServiceBus.Extensions@2.8.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Uniphar.ServiceBus.Extensions&version=2.8.0
                    
Install as a Cake Addin
#tool nuget:?package=Uniphar.ServiceBus.Extensions&version=2.8.0
                    
Install as a Cake Tool

ServiceBus Extensions

NuGet package repository for ServiceBus Extensions

Purpose

  • Administration of:
    • Topics
    • Subscriptions
  • Usage of:
    • Processors
    • Senders

Everything can be configured in appsettings.json. The producer owns the topic, the consumer owns the subscription.

Administration

Register an administration client:

builder.Services.Configure<SubscriberOptions>(builder.Configuration.GetSection(nameof(SubscriberOptions)));
builder.Services.Configure<TopicOptions>(builder.Configuration.GetSection(nameof(TopicOptions)));
builder.Services.AddSingleton(
    new ServiceBusAdministrationClient(builder.Configuration["AzureServiceBus:Name"]!, 
    new DefaultAzureCredential()));

Then use it to initialize topics and subscriptions.

IOptions<TopicOptions> topicOptions;
await topicOptions.InitializeAsync(serviceBusAdministrationClient);

IOptions<SubscriberOptions> subscriberOptions;
await subscriberOptions.InitializeAsync(serviceBusAdministrationClient);

TopicOption

Reference with all fields.

"TopicOptions": {
  "Topics": [
    {
      "Name": "topic-name"
    }
  ]
}

SubscriberOption

Reference with all fields.

"SubscriberOptions": {
  "SubscriptionKey": {
    "Name": "subscription-name",
    "TopicName": "topic-name",
    "CorrelationRules": [
      {
        "Name": "filter-name",
        "Subject": "subject-name",
      }
    ]
  }
}
Rules

Each Subscription configuration element can contain a set of rules that can be applied to the subscription. There are two types of rules that can be configured a SqlRule and a CorrelationRule. Therefore each CorrelationRule property may be overridden from its default value.

CorrelationRule

A CorrelationRule configuration type contains optional attributes that correspond to the properties of the CorrelationRuleFilter class.

Application Properties
"SubscriberOptions": {
  "Subscriptions": [
    {
      "Name": "subscription-name",
      "MaxDeliveryCount": 20,
      "CorrelationRules": [
        {
            "Name": "CorrelationRule",
            "Subject ": "MessageSubject",
            "ApplicationProperties": {
                "object_type": "ObjectType",
                "event_type": "Created"
            }
        }
      ]
    }
  ]
}

A CorrelationRule may also specify ApplicationProperties that are applied to the CorrelationRuleFilter. These attributes are key value pairs, defined as a dictionary.

SqlRule
"SubscriberOptions": {
  "Subscriptions": [
    {
      "Name": "subscription-name",
      "MaxDeliveryCount": 20,
      "SqlRules": [
        {
            "Name": "SqlRule",
            "Filter" : "color='blue' AND quantity=10",
            "Action": "SET quantity = quantity / 2;"
        }
      ]
    }
  ]
}

A SqlRule configuration type contains optional attributes that correspond to the properties of the SqlRuleFilter class.

The required attributes are Name and Filter the Action attribute is optional.

On InitializeAsync the following will happen:

  • New rules are created
  • Existing rules that are not in the configuration are removed
  • Existing rules that are in the configuration are updated to match the configuration

Usage

There is an extension method to register processors and senders:

var serviceBusName = builder.Configuration["AzureServiceBus:Name"]!;
builder.AddServiceBusProcessors(serviceBusName, jsonSerializerOptions);

Sender

Add a SenderOptions section in appsettings.json:

"SenderOptions": [
  {
    "Identifier": "sender-id",
    "QueueOrTopicName": "topic-name"
  }
]

Then call AddSender(...):

builder.AddServiceBusProcessors(...)
    .AddSender("sender-id")

The sender then can be injected into your services:

[FromKeyedServices("sender-id")] 
ServiceBusSender serviceBusSender

Processor

In each subscriber options add:

"Processor": {
  "ProcessingType": "Topic",
  "MaxConcurrentCalls": 10
}

Implement an IMessageHandler<T> for the message type to process. Then call AddProcessor(...):

builder.AddServiceBusProcessors(...)
    .AddProcessor<MyMessageHandler, MyMessage>("subscription-name")

This calls register a HostedService that will process messages from the subscription. The MyMessageHandler will be called for each message of type MyMessage.

The service handles parsing, retry, dead-letter queue and more.

A container called dead-letter must exist in Azure Storage.

Message Handler

HandlerAsync expect a IMessageResult returned.

Use the code below to tell the producer what to do with the message:

MessageResult.Complete();
MessageResult.Abandon();
MessageResult.Timeout("reason");
How it works:
  • It receives messages from the subscription.
  • It calls the HandlerAsync method of the registered IMessageHandler<T>.
  • If IMessageResult is completed, the message is completed too.
  • If IMessageResult is abandoned, the message is abandoned and will be redelivered later.
  • If IMessageResult is timed out, the message is added to the DLQ (dead-letter-queue) with the specified reason.
    • If three consecutive messages time out (configurable), then the processor stops for 5 minutes (configurable)
  • When the DLQ processor receives a message:
    • If it has been delivered less than three times (configurable), it re-queues the message in the original subscription.
    • Otherwise, it adds the message to the DLQ Azure Storage container for manual inspection.
Logs
  • ERROR An error occurred while processing events in {ErrorSource}
  • ERROR An error occurred while deserializing message {MessageId}
  • WARN: Message {MessageId} has been redelivered {DeliveryCount} times by {Processor}
  • INFO Message {MessageId} has been re-queued from the dead-letter queue
  • WARN Message {MessageId} stored in cold storage

Testing

There is a utility class to wait messages. For example:

private readonly MessageTester _messageTester = new(JsonSerializerOptions.Default);

var found = await _messageTester.AssertReceivedAsync<SalesOrderConfirmedEvent>(
    fixture.OrderConfirmedReceiver!, ev => ev.OrderId == orderId);
Assert.True(found, $"Order with ID {orderId} was not found in Confirmed Order messages.");

where OrderConfirmedReceiver is:

_serviceBusClient = new ServiceBusClient(
    serviceBusName,
    new DefaultAzureCredential(),
    serviceBusClientOptions);

OrderConfirmedReceiver = _serviceBusClient.CreateReceiver(
    _options.TopicName!,
    _options.SalesOrderConfirmedSubscriptionName);
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2.8.0 110 8/28/2025
2.7.0 382 8/26/2025
2.6.0 469 8/19/2025
2.5.0 147 8/19/2025
2.4.2 6,370 4/8/2025
2.4.1 244 4/3/2025
2.4.0 472 3/31/2025
2.3.0 1,574 2/13/2025
2.2.0 351 2/7/2025
2.1.0 210 2/6/2025
2.0.0 135 2/5/2025
1.0.0 121 1/30/2025