WebSocket.Rx 0.1.3

This package has a SemVer 2.0.0 package version: 0.1.3+1.
There is a newer version of this package available.
See the version list below for details.
dotnet add package WebSocket.Rx --version 0.1.3
                    
NuGet\Install-Package WebSocket.Rx -Version 0.1.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="WebSocket.Rx" Version="0.1.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="WebSocket.Rx" Version="0.1.3" />
                    
Directory.Packages.props
<PackageReference Include="WebSocket.Rx" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add WebSocket.Rx --version 0.1.3
                    
#r "nuget: WebSocket.Rx, 0.1.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package WebSocket.Rx@0.1.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=WebSocket.Rx&version=0.1.3
                    
Install as a Cake Addin
#tool nuget:?package=WebSocket.Rx&version=0.1.3
                    
Install as a Cake Tool

WebSocket.Rx

<div align="center">

<img alt="WebSocket.Rx logo" height="128" src="https://raw.githubusercontent.com/st0o0/WebSocket.Rx/refs/heads/main/docs/logo/logo.png" width="128"/>

A powerful .NET library for reactive WebSocket communication using R3 (Reactive Extensions)

NuGet Build Status License Downloads

</div>


๐Ÿ“‹ Table of Contents

โœจ Features

Client Features

  • ๐Ÿ”„ Automatic Reconnection - Built-in reconnection logic with configurable strategies
  • ๐Ÿ“ก Reactive Streams - Observable sequences for messages, connections, and disconnections
  • ๐Ÿงต Thread-Safe - Safe concurrent message sending and receiving
  • ๐Ÿ“ฆ Message Queuing - Automatic buffering with channel-based send queue
  • โšก High Performance - Built on System.Threading.Channels and ArrayPool<byte>
  • ๐ŸŽฏ Type-Safe - Strong typing with text/binary message support
  • ๐Ÿ”’ Proper Resource Management - Full IAsyncDisposable support with graceful shutdown

Server Features

  • ๐Ÿ‘ฅ Multi-Client Support - Handle multiple WebSocket connections simultaneously
  • ๐Ÿ“Š Client Tracking - Built-in client metadata and connection management
  • ๐Ÿ”” Event Streams - Observables for client connect/disconnect events
  • ๐ŸŽฏ Selective Messaging - Send to specific clients or broadcast to all
  • ๐Ÿ›ก๏ธ Robust Cleanup - Automatic client cleanup on disconnect

๐Ÿ“ฆ Installation

dotnet add package WebSocket.Rx

Requirements: .NET 10.0 or higher

๐Ÿš€ Quick Start

Client Example

using WebSocket.Rx;
using R3;

// Create and configure client
await using var client = new ReactiveWebSocketClient(new Uri("wss://echo.websocket.org"))
{
    IsReconnectionEnabled = true,
    KeepAliveInterval = TimeSpan.FromSeconds(30),
    IsTextMessageConversionEnabled = true
};

// Subscribe to messages
client.MessageReceived
    .Subscribe(msg => Console.WriteLine($"Received: {msg.Text}"));

// Subscribe to connection events
client.ConnectionHappened
    .Subscribe(info => Console.WriteLine($"Connected: {info.Reason}"));

client.DisconnectionHappened
    .Subscribe(info => Console.WriteLine($"Disconnected: {info.Reason}"));

// Connect and send messages
await client.StartAsync();
await client.SendAsTextAsync("Hello WebSocket!");

Server Example

using WebSocket.Rx;
using R3;

// Create and start server
await using var server = new ReactiveWebSocketServer("http://localhost:8080/")
{
    IsTextMessageConversionEnabled = true
};

// Subscribe to client events
server.ClientConnected
    .Subscribe(client => Console.WriteLine($"Client connected: {client.Metadata.Id}"));

server.Messages
    .Subscribe(msg => 
    {
        Console.WriteLine($"From {msg.Metadata.Id}: {msg.Message.Text}");
        // Echo back to sender
        server.SendAsTextAsync(msg.Metadata.Id, $"Echo: {msg.Message.Text}");
    });

await server.StartAsync();
Console.WriteLine($"Server running with {server.ClientCount} clients");

๐ŸŽ“ Core Concepts

Observable Streams

WebSocket.Rx is built around reactive streams using R3:

// Filter and transform messages
client.MessageReceived
    .Where(msg => msg.MessageType == MessageType.Text)
    .Select(msg => msg.Text.ToUpper())
    .Subscribe(text => Console.WriteLine(text));

// Debounce reconnection events
client.ConnectionHappened
    .Throttle(TimeSpan.FromSeconds(1))
    .Subscribe(info => Console.WriteLine("Stable connection established"));

Message Types

// Send text message (queued)
await client.SendAsTextAsync("Hello");

// Send binary message (queued)
await client.SendAsBinaryAsync(new byte[] { 0x01, 0x02 });

// Send instant (bypasses queue)
await client.SendInstantAsync("Urgent message");

// Try send (non-blocking)
bool sent = client.TrySendAsText("Optional message");

Connection Lifecycle

// Start connection
await client.StartAsync();

// Reconnect manually
await client.ReconnectAsync();

// Stop gracefully
await client.StopAsync(WebSocketCloseStatus.NormalClosure, "Goodbye");

// Dispose (automatic cleanup)
await client.DisposeAsync();

๐Ÿ”ง Advanced Usage

Custom Configuration

var client = new ReactiveWebSocketClient(new Uri("wss://example.com"))
{
    // Connection settings
    ConnectTimeout = TimeSpan.FromSeconds(10),
    KeepAliveInterval = TimeSpan.FromSeconds(30),
    KeepAliveTimeout = TimeSpan.FromSeconds(10),
    
    // Reconnection
    IsReconnectionEnabled = true,
    
    // Message handling
    IsTextMessageConversionEnabled = true,
    MessageEncoding = Encoding.UTF8
};

Server Broadcasting

// Broadcast to all clients
foreach (var clientId in server.ConnectedClients.Keys)
{
    await server.SendAsTextAsync(clientId, "Broadcast message");
}

// Send to specific clients
var targetClients = server.ConnectedClients
    .Where(c => c.Value.CustomData?.Contains("premium") == true)
    .Select(c => c.Key);

foreach (var clientId in targetClients)
{
    await server.SendAsTextAsync(clientId, "Premium feature alert!");
}

Error Handling

client.DisconnectionHappened
    .Subscribe(info => 
    {
        Console.WriteLine($"Disconnect reason: {info.Reason}");
        if (info.Exception != null)
        {
            Console.WriteLine($"Error: {info.Exception.Message}");
        }
    });

Combining Observables

// Wait for connection before sending
client.ConnectionHappened
    .Take(1)
    .Subscribe(_ => client.SendAsTextAsync("Connected!"));

// Process messages in batches
client.MessageReceived
    .Buffer(TimeSpan.FromSeconds(1))
    .Where(batch => batch.Count > 0)
    .Subscribe(batch => Console.WriteLine($"Processed {batch.Count} messages"));

๐Ÿ“š API Reference

ReactiveWebSocketClient

Property Type Description
Url Uri WebSocket server URL
IsStarted bool Client started state
IsRunning bool Client running state
IsReconnectionEnabled bool Enable auto-reconnect
MessageReceived Observable<ReceivedMessage> Message stream
ConnectionHappened Observable<Connected> Connection stream
DisconnectionHappened Observable<Disconnected> Disconnection stream

Key Methods:

  • Task StartAsync() - Start the client
  • Task StopAsync(status, description) - Stop gracefully
  • Task ReconnectAsync() - Manual reconnect
  • Task SendAsTextAsync(message) - Send text (queued)
  • Task SendAsBinaryAsync(data) - Send binary (queued)
  • ValueTask DisposeAsync() - Clean up resources

ReactiveWebSocketServer

Property Type Description
IsRunning bool Server running state
ClientCount int Number of connected clients
ConnectedClients IReadOnlyDictionary<Guid, Metadata> Client metadata
ClientConnected Observable<ClientConnected> Client connect stream
ClientDisconnected Observable<ClientDisconnected> Client disconnect stream
Messages Observable<ServerReceivedMessage> Server message stream

Key Methods:

  • Task StartAsync() - Start the server
  • Task<bool> StopAsync(status, description) - Stop server
  • Task<bool> SendAsTextAsync(clientId, message) - Send to client
  • ValueTask DisposeAsync() - Clean up resources

๐Ÿ’ก Inspiration

This library is inspired by and builds upon the excellent work of:

Websocket.Client by Marfusios

WebSocket.Rx takes inspiration from Websocket.Client's elegant reactive approach to WebSocket communication. Key influences include:

  • Reactive-First Design - Using observables for all events and messages
  • Automatic Reconnection - Built-in reconnection logic for robust connections
  • Clean API - Intuitive and easy-to-use interface
What's Different?

While honoring the spirit of Websocket.Client, WebSocket.Rx offers:

  • โœ… R3 Integration - Built on the modern R3 reactive library (successor to Rx.NET)
  • โœ… Server Support - Full-featured WebSocket server implementation
  • โœ… Modern .NET - Built for .NET 10+ with latest performance optimizations
  • โœ… IAsyncDisposable - Proper async resource cleanup
  • โœ… Channel-Based Queuing - High-performance message queue using System.Threading.Channels
  • โœ… Enhanced Memory Management - Uses ArrayPool<byte> and RecyclableMemoryStream

Both libraries share the same core philosophy: WebSocket communication should be simple, reactive, and reliable.

๐Ÿค Contributing

Contributions are welcome! This library grows with the community's needs.

How to Contribute

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/amazing-feature
  3. Write tests for your changes
  4. Ensure all tests pass: dotnet test
  5. Submit a Pull Request

Guidelines

  • โœ… Follow existing code style and conventions
  • โœ… Include unit tests for new features
  • โœ… Update documentation for API changes
  • โœ… Keep PRs focused and atomic
  • โœ… Write meaningful commit messages

Development Setup

git clone https://github.com/st0o0/WebSocket.Rx.git
cd WebSocket.Rx
dotnet restore
dotnet build
dotnet test

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


<div align="center">

Built with โค๏ธ for the .NET community

Report Bug ยท Request Feature ยท Documentation

</div>

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.1.6 0 2/13/2026
0.1.5 78 2/9/2026
0.1.4 76 2/7/2026
0.1.3 26 2/6/2026
0.1.2 31 2/6/2026
0.1.1 24 2/6/2026
0.1.0 33 2/3/2026

🎯 WebSocket.Rx v0.1.3

📦 NuGet: https://nuget.org/packages/WebSocket.Rx
🔗 Release: https://github.com/st0o0/WebSocket.Rx/releases/v0.1.3

See release page for full changelog and details!

Built with โค๏ธ