Rabbitor 0.0.4
dotnet add package Rabbitor --version 0.0.4
NuGet\Install-Package Rabbitor -Version 0.0.4
<PackageReference Include="Rabbitor" Version="0.0.4" />
paket add Rabbitor --version 0.0.4
#r "nuget: Rabbitor, 0.0.4"
// Install Rabbitor as a Cake Addin
#addin nuget:?package=Rabbitor&version=0.0.4
// Install Rabbitor as a Cake Tool
#tool nuget:?package=Rabbitor&version=0.0.4
Rabbitor ๐
<p align="center"> <img src="https://github.com/marcingolenia/rabbitor/raw/main/logo.png" width="150px"/> </p> The F# friendly (I hope so) RabbitMQ Client. The goal of this library is to enable quick asynchronous communication with a low-configuration approach. The library focuses (at the moment) on RabbitMQ. If You need more You should check awesome https://github.com/pchalamet/fbus.
Contents
- Features
- Coming next
- Recipes
- Quickstart: async communication between services
- Increase throughput through parallel consumers
- Stream consumption (useful when adding new service or whenever You need to synchronize)
- Extending the consumer capabilities using decorators
- Explanations
- Serialization/Deserialization of messages
- Queues and Exchanges topology convention
- Connection to RabbitMQ
Features ๐คน
- Publish-Subscribe messaging
- Automatic creation of queues & exchanges
- Rabbit Streams Support
- Parallel Consumers
- Single-threaded consumers for in-order processing
- Easy decorators for rich message consumptions
Coming next ๐ฎ
Next features will depend on the author's whims unless related issues on GH will get some votes.
- Dead-Letter exchange support for failures
- Streams retention
- More examples
- In-memory transport
- Possibility to pass in custom conventions for queues/exchanges topology
- Kafka equivalent
Recipes ๐
Recipes will help You get started and solve some common problems. There is also a minimal example in the source code with Giraffe if You can't wait.
1. Quickstart: async communication between services
Note: Nothing stops You from using Rabbitor for asynchronous communication within a single service (for instance for inter-bounded contexts communication). The configuration will be almost the same.
Make sure that You have access to running RabbitMQ instance, for local development You can just take docker-compose.yml
from this repository and run docker-compose up -d
Let's assume we have services A and B. You need to establish asynchronous communication between these two services. You have decided to use Rabbitor together with RabbitMQ. Let's start with the definition of the messages that can be emitted from service A.
namespace Service.A.Contracts
open System
type ManKilled = { Name: string }
type ManResurrected = { Name: string; When: DateTime }
type CrimeNotifications =
| ManKilled of ManKilled
| ManResurrected of ManResurrected
The same types (including namespace) should be placed in Service B. You can copy the file or publish a shared nuget package (I prefer copy). See 2. Queues and Exchanges topology convention to learn why it is important.
Start the bus near Your application entry-point and init the publisher.
let bus = Bus.connect ["localhost"]
|> Bus.initPublisher<CrimeNotifications>
If service A is only publishing messages, You are ready. How to manage the bus dependency is Your concern. You can register it as a single instance if You use dependency injection or just pass it to composition root to include it as a dependency. There is also a simple example in the repository, where the bus is passed down to the Giraffe HttpHandler. Publishing messages is as easy as:
let notification = CrimeNotifications.ManKilled { Name = "Stasiek" }
notification |> Bus.publish bus
note that You can use partial application to associate bus instance with Bus.publish
function to reduce usage complexity.
Time to configure consumer in service B. We need to define a handler that will process received message and again configure the Bus instance:
let handler =
(fun event ->
async {
printfn $"Received: %A{event}"
return Ok()
})
a handler is any function with signature 'a -> Async<Result<unit,'b>>
. Let's configure bus:
let bus = Bus.connect ["localhost"]
|> Bus.subscribe<CrimeNotifications> handler
That is all! Bus handler will process received notification from now on.
You can also play with related tests from the source code: Tests โ PubSub.fs
2. Increase throughput through parallel consumers
By default the consumer (single subscription) works on single thread related with channel. This guarantees ordered processing of messages. If You don't need order guarantee (I hope You don't) You can enable parallel consumer processing. Let's assume You did everything in step 1, now to turn parallel processing let's update the consumer bus setup:
let bus = Bus.connect ["localhost"]
|> Bus.parallelSubscribe<CrimeNotifications> 5 handler
Alternatively You can subscribe multiple times;
let bus = Bus.connect ["localhost"]
|> Bus.subscribe<CrimeNotifications> handler
|> Bus.subscribe<CrimeNotifications> handler
|> Bus.subscribe<CrimeNotifications> handler
|> Bus.subscribe<CrimeNotifications> handler
|> Bus.subscribe<CrimeNotifications> handler
This gives You the flexibility to consume different Events in number of ways:
let bus = Bus.connect ["localhost"]
|> Bus.parallelSubscribe<WhateverNotifications> 10 handler1
|> Bus.subscribe<CrimeNotifications> handler2
|> Bus.parallelSubscribe<OtherNotifications> 5 handler3
It will be beneficial to observe the application behaviour and adjust the parallelism threshold.
You can also play with related tests from the source code: Tests โ ParallelSubscriptions.fs
3. Stream consumption
Streams consumption may be handy when adding new service and You need to synchronise the state between those services. They can also be great when facing failures - You can subscribe to stream and reply the events (assuming that Your receiver is idempotent).
Rabbitor makes it easy to consume the stream, it is similar to subscriptions and looks like this:
Bus.consumeStream<CrimeNotifications> handler offset bus |> ignore
handler will process the stream messages one by one in the background on a thread related to a dedicated channel. Rabbitor knows how many messages were in the stream upon requesting the stream, so it will consume all of them and skip messages which were appended to the stream in the meantime.
You can also play with related tests from the source code: Tests โ Streams.fs
4. Extending the consumer capabilities using decorators
Since the handler is any function with type 'a -> Async<Result<unit,'b>>
they can be easily composed. Rabbitor comes with one decorator for retries on exception or errors cases. It is not applied by default but it can be easily added upon subscribing as follows:
let bus = Bus.connect ["localhost"]
|> Bus.subscribe<CrimeNotifications> (handler |> decorate [ ConsumerDecorators.retry 4 ])
building a decorator is as easy as this:
let measureTimeDecorator next event =
async {
let stopwatch = Stopwatch.StartNew()
let! result = next event // call next decorator / handler
printfn $"Processed within {stopwatch.Elapsed}"
return result
}
now it is enough to add it to the decorators list and pass it to decorate function (or compose it by yourself). You can pass as many decorators as You want, the execution order conforms to the order in the list, so:
...
|> Bus.subscribe<F.Whatever7> (handler |> decorate [ decorator1; decorator2])
means that the execution order is as follows;
- decorator1
- decorator2
- handler
- decorator2
- decorator1
The first use cases for decorator You may need (besides retry) are audit, metrics.
You can also play with related tests from the source code: Tests โ CustomDecorators.fs
Explanations ๐
Few words about Rabbitor internals.
1. Serialization/Deserialization of messages
Rabbitor by default uses Newtonsoft.Json for serialization/deserialization. You can easily plugin Your own by using following Bus functions;
use bus =
Bus.connect [ "localhost" ]
|> Bus.initPublisher<CrimeNotifications>
|> Bus.parallelSubscribeWithDeserializer<CrimeNotifications> myDeserialize 1 handler
// ... And later for publishing:
Bus.serializeAndPublish bus mySerialize event
where
mySerialize
is a function'a -> string
myDeserialize
is a functionstring -> 'a
Again, consider using partial application to simplify usage later on.
You can also play with related tests from the source code: Tests โ CustomJsonSerializer.fs
2. Queues and Exchanges topology convention
To make the topology not Your concern, Rabbitor uses the following convention to set up queues and exchanges for You:
- Exchanges are created upon
Bus.initPublisher<'a>
using the full type name of<'a>
. - Stream Queues are created upon
Bus.initStreamedPublisher<'a>
using the full type name of<'a>
. - Plain queues are created upon
Bus.subscribe<'a>
or parallel equivalent using the executing assembly name and full type name of<'a>
, which allow multiple subscriptions per topic.
At the moment it is not possible to override the convention.
I am thinking about removing the exchange creation, so the initPublisher function. When no queue exists, messages won't be published, that is why it the exchange can be created upon queue creation.
3. Connection to RabbitMQ
Rabbitor uses official RabbitMQ .net client underneath and tries not to get into Your way too much. You can pass custom connection factory if You want using Bus.customConnect
function
(unit -> ConnectionFactory) -> string list -> Bus
where ConnectionFactory is .net RabbitMQ library type. Here You can setup password, use configure certificates, override connection recovery settings. Rabbitor uses defaults.
Rabbitor uses separate connections for publishing and consuming because:
Separate the connections for publishers and consumers to achieve high throughput. RabbitMQ can apply back pressure on the TCP connection when the publisher is sending too many messages for the server to handle โ [source]
End note
PRs are welcome ๐ค
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 is compatible. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net5.0
- FSharp.Core (>= 5.0.0)
- Newtonsoft.Json (>= 13.0.1)
- RabbitMQ.Client (>= 6.2.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
0.0.4 | 363 | 9/25/2021 |
0.0.3 | 280 | 9/15/2021 |
0.0.2-alpha | 214 | 8/30/2021 |
0.0.1-alpha | 201 | 8/30/2021 |
v.0.0.4
- Add README.md to nuget
v.0.0.3
- Publish-Subscribe messaging
- Automatic creation of queues and exchanges
- Rabbit Streams Support
- Parallel Consumers
- Single-threaded consumers for in-order processing
- Easy decorators for rich message consumptions