ReactiveList 4.0.4
dotnet add package ReactiveList --version 4.0.4
NuGet\Install-Package ReactiveList -Version 4.0.4
<PackageReference Include="ReactiveList" Version="4.0.4" />
<PackageVersion Include="ReactiveList" Version="4.0.4" />
<PackageReference Include="ReactiveList" />
paket add ReactiveList --version 4.0.4
#r "nuget: ReactiveList, 4.0.4"
#:package ReactiveList@4.0.4
#addin nuget:?package=ReactiveList&version=4.0.4
#tool nuget:?package=ReactiveList&version=4.0.4
ReactiveList
A high-performance, thread-safe, observable collection library for .NET that combines the power of reactive extensions with standard list and dictionary operations. ReactiveList provides real-time change notifications, making it ideal for data-binding, reactive programming, and scenarios where collection changes need to be tracked and responded to—especially with continuous live data streams.
Table of Contents
- Features
- Installation
- Quick Start
- Core Concepts
- Collections
- Views
- Extension Methods Reference
- Real-World Examples
- Thread Safety
- Performance Considerations
- Benchmark Results
- License
Features
- Thread-Safe Operations: All public methods are thread-safe with proper synchronization
- Reactive Notifications: Observe additions, removals, updates, moves, and clears in real-time via
IObservable<T> - Batch Operations: Efficient
AddRange,RemoveRange,InsertRange,RemoveMany, andReplaceAllmethods - Edit Transactions: Batch multiple operations with single notification via
Edit() - Views: Create filtered, sorted, grouped, and secondary-indexed views that auto-update
- Change Sets: Fine-grained change tracking with
ChangeSet<T>for DynamicData-compatible processing - Secondary Indices: O(1) lookups by custom keys for high-performance filtering
- AOT Compatible: Supports Native AOT on .NET 8+
- Cross-Platform: Targets .NET 8, .NET 9, .NET 10, and .NET Framework 4.7.2/4.8
- High Performance: QuaternaryList/Dictionary provide 6-17x faster remove operations at scale
Installation
dotnet add package CP.ReactiveList
Required Dependencies:
System.Reactive(automatically included)
Namespaces:
using CP.Reactive; // Extension methods
using CP.Reactive.Collections; // Collections (ReactiveList, Reactive2DList, QuaternaryList, QuaternaryDictionary)
using CP.Reactive.Core; // Core types (CacheNotify, ChangeSet, Change, CacheAction, ChangeReason)
using CP.Reactive.Views; // View types (FilteredReactiveView, SortedReactiveView, etc.)
Quick Start
Basic Usage
using CP.Reactive.Collections;
using System.Reactive.Linq;
// Create a reactive list
var sensorReadings = new ReactiveList<double>();
// Subscribe to additions - great for logging/monitoring
sensorReadings.Added.Subscribe(readings =>
Console.WriteLine($"New readings: {string.Join(", ", readings:F2)}"));
// Subscribe to removals
sensorReadings.Removed.Subscribe(readings =>
Console.WriteLine($"Removed: {string.Join(", ", readings)}"));
// Simulate live sensor data arriving
sensorReadings.Add(23.5);
sensorReadings.AddRange([24.1, 23.8, 24.5]);
// Remove old readings
sensorReadings.Remove(23.5);
Observing All Changes with Stream
The Stream property is the primary way to observe all collection changes:
using CP.Reactive.Collections;
using CP.Reactive.Core;
var orders = new ReactiveList<Order>();
// Subscribe to the unified change stream
orders.Stream.Subscribe(notification =>
{
switch (notification.Action)
{
case CacheAction.Added:
Console.WriteLine($"Order added: {notification.Item.Id}");
break;
case CacheAction.Removed:
Console.WriteLine($"Order removed: {notification.Item.Id}");
break;
case CacheAction.BatchAdded:
Console.WriteLine($"Batch of {notification.Batch?.Count} orders added");
break;
case CacheAction.Cleared:
Console.WriteLine("All orders cleared");
break;
}
});
orders.Add(new Order { Id = 1, Amount = 100 });
orders.AddRange([new Order { Id = 2 }, new Order { Id = 3 }]);
orders.Clear();
Batch Edit Operations
Use Edit() to batch multiple operations and emit a single notification:
var products = new ReactiveList<Product>();
// Multiple operations, single notification at the end
products.Edit(editor =>
{
editor.Add(new Product { Id = 1, Name = "Widget" });
editor.Add(new Product { Id = 2, Name = "Gadget" });
editor.Add(new Product { Id = 3, Name = "Gizmo" });
editor.RemoveAt(0); // Remove first item
});
// Only ONE change notification is emitted after Edit() completes
Core Concepts
The Stream Property
Every reactive collection exposes a Stream property that emits CacheNotify<T> notifications for all changes. This is the most flexible way to observe collection changes.
IObservable<CacheNotify<T>> Stream { get; }
CacheNotify and CacheAction
CacheNotify<T> is a record that describes what changed:
public sealed record CacheNotify<T>(
CacheAction Action, // What happened (Added, Removed, Updated, etc.)
T? Item, // The item involved (for single-item operations)
PooledBatch<T>? Batch = null, // Batch of items (for batch operations)
int CurrentIndex = -1, // Current index of the item
int PreviousIndex = -1, // Previous index (for moves)
T? Previous = default // Previous value (for updates)
);
CacheAction Enumeration:
| Action | Description |
|---|---|
Added |
Single item was added |
Removed |
Single item was removed |
Updated |
Item was updated/replaced |
Moved |
Item was moved to a different index |
Refreshed |
Item was refreshed (re-evaluated) |
Cleared |
Collection was cleared |
BatchOperation |
Multiple mixed operations occurred |
BatchAdded |
Multiple items were added at once |
BatchRemoved |
Multiple items were removed at once |
ChangeSet and Change
For DynamicData-style processing, convert the stream to ChangeSet<T>:
var list = new ReactiveList<string>();
// Convert stream to change sets
list.Stream
.ToChangeSets() // IObservable<ChangeSet<T>>
.Subscribe(changeSet =>
{
Console.WriteLine($"Changes: {changeSet.Adds} adds, {changeSet.Removes} removes");
foreach (var change in changeSet)
{
Console.WriteLine($" {change.Reason}: {change.Current}");
}
});
ChangeReason Enumeration:
| Reason | Description |
|---|---|
Add |
Item was added |
Remove |
Item was removed |
Update |
Item was updated |
Move |
Item was moved |
Refresh |
Item was refreshed |
Clear |
Collection was cleared |
Collections
ReactiveList<T>
A thread-safe, observable list that provides reactive notifications for all changes.
Constructor Overloads
// Empty list
var list = new ReactiveList<string>();
// Initialize with items
var list = new ReactiveList<string>(["apple", "banana", "cherry"]);
// Initialize with single item
var list = new ReactiveList<string>("hello");
Properties
| Property | Type | Description |
|---|---|---|
Count |
int |
Number of items in the list |
Items |
ReadOnlyObservableCollection<T> |
Bindable read-only collection |
ItemsAdded |
ReadOnlyObservableCollection<T> |
Items added during last change |
ItemsRemoved |
ReadOnlyObservableCollection<T> |
Items removed during last change |
ItemsChanged |
ReadOnlyObservableCollection<T> |
Items changed during last change |
Stream |
IObservable<CacheNotify<T>> |
Primary change notification stream |
Added |
IObservable<IEnumerable<T>> |
Observable of added items |
Removed |
IObservable<IEnumerable<T>> |
Observable of removed items |
Changed |
IObservable<IEnumerable<T>> |
Observable of changed items |
CurrentItems |
IObservable<IEnumerable<T>> |
Emits current items on subscription and after changes |
Version |
long |
Incremented on each modification (atomic) |
IsDisposed |
bool |
Whether the list has been disposed |
Methods
Adding Items:
var list = new ReactiveList<Order>();
// Add single item
list.Add(new Order { Id = 1 });
// Add multiple items
list.AddRange([new Order { Id = 2 }, new Order { Id = 3 }]);
// Insert at specific index
list.Insert(0, new Order { Id = 0 });
// Insert range at specific index
list.InsertRange(2, [new Order { Id = 10 }, new Order { Id = 11 }]);
// .NET 6+: Add from span (zero-copy when possible)
ReadOnlySpan<Order> orders = stackalloc[] { new Order { Id = 100 } };
list.AddRange(orders);
Removing Items:
// Remove specific item
bool removed = list.Remove(orderToRemove);
// Remove at index
list.RemoveAt(0);
// Remove range by index and count
list.RemoveRange(startIndex: 2, count: 3);
// Remove multiple items matching predicate
int removedCount = list.RemoveMany(order => order.Amount < 10);
// Remove all items matching condition
list.Remove(list.Where(o => o.IsCancelled));
// Clear all items
list.Clear();
// .NET 6+: Clear without deallocating internal array (performance optimization)
list.ClearWithoutDeallocation();
Updating Items:
// Replace item at index
list[0] = new Order { Id = 999 };
// Update specific item
list.Update(oldOrder, newOrder);
// Move item to new position
list.Move(oldIndex: 0, newIndex: 5);
Batch Operations:
// Edit transaction - single notification for multiple operations
list.Edit(editor =>
{
editor.Add(new Order { Id = 1 });
editor.Add(new Order { Id = 2 });
editor.RemoveAt(0);
editor.Insert(0, new Order { Id = 0 });
});
// Replace all items atomically
list.ReplaceAll([new Order { Id = 100 }, new Order { Id = 200 }]);
Querying:
// Check if item exists
bool exists = list.Contains(order);
// Find index of item
int index = list.IndexOf(order);
// Access by index
Order order = list[0];
// Enumerate (thread-safe snapshot)
foreach (var item in list)
{
Console.WriteLine(item);
}
// .NET 6+: Get as array
Order[] array = list.ToArray();
// .NET 6+: Get as span (WARNING: not thread-safe, must ensure no concurrent modifications)
ReadOnlySpan<Order> span = list.AsSpan();
// .NET 6+: Get as memory (WARNING: not thread-safe)
ReadOnlyMemory<Order> memory = list.AsMemory();
Complete Example: Live Order Processing
using CP.Reactive;
using CP.Reactive.Collections;
using CP.Reactive.Core;
using System.Reactive.Linq;
using System.Reactive.Disposables;
public class OrderProcessor : IDisposable
{
private readonly ReactiveList<Order> _orders = new();
private readonly CompositeDisposable _subscriptions = new();
public OrderProcessor()
{
// Log all new orders
_subscriptions.Add(
_orders.Stream
.WhereAdded()
.SelectAllItems()
.Subscribe(order =>
Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] New order: #{order.Id} - ${order.Amount:F2}"))
);
// Alert on high-value orders
_subscriptions.Add(
_orders.Stream
.OnItemAdded()
.Where(order => order.Amount > 1000)
.Subscribe(order =>
Console.WriteLine($"*** HIGH VALUE ORDER: #{order.Id} - ${order.Amount:F2} ***"))
);
// Track order count changes
_subscriptions.Add(
_orders.CurrentItems
.Select(items => items.Count())
.DistinctUntilChanged()
.Subscribe(count =>
Console.WriteLine($"Total orders: {count}"))
);
}
public void ProcessIncomingOrder(Order order) => _orders.Add(order);
public void ProcessBatch(IEnumerable<Order> orders) => _orders.AddRange(orders);
public void CancelOrder(int orderId)
{
var order = _orders.FirstOrDefault(o => o.Id == orderId);
if (order != null)
_orders.Remove(order);
}
public void Dispose()
{
_subscriptions.Dispose();
_orders.Dispose();
}
}
public record Order
{
public int Id { get; init; }
public decimal Amount { get; init; }
public DateTime CreatedAt { get; init; } = DateTime.UtcNow;
}
Reactive2DList<T>
A two-dimensional reactive list where each element is itself a ReactiveList<T>. Perfect for representing tables, matrices, or hierarchical data.
Constructor Overloads
// Empty 2D list
var grid = new Reactive2DList<int>();
// Initialize from jagged collections
var grid = new Reactive2DList<int>([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]);
// Initialize from flat collection (each item becomes a row)
var grid = new Reactive2DList<string>(["row1", "row2", "row3"]);
// Initialize with single row
var grid = new Reactive2DList<int>(new ReactiveList<int>([1, 2, 3]));
Unique Methods
| Method | Description |
|---|---|
AddToInner(outerIndex, item) |
Add item to specific inner list |
AddToInner(outerIndex, items) |
Add items to specific inner list |
GetItem(outerIndex, innerIndex) |
Get item at specific position |
SetItem(outerIndex, innerIndex, value) |
Set item at specific position |
RemoveFromInner(outerIndex, innerIndex) |
Remove item from inner list |
ClearInner(outerIndex) |
Clear specific inner list |
Flatten() |
Get all items as flat sequence |
TotalCount() |
Get total count of all items |
Insert(index, items) |
Insert new row with items |
Insert(index, items, innerIndex) |
Insert items into existing row |
Example: Spreadsheet-like Data
using CP.Reactive.Collections;
// Create a spreadsheet-like structure
var spreadsheet = new Reactive2DList<string>();
// Add rows
spreadsheet.Add(new ReactiveList<string>(["Name", "Age", "City"])); // Header row
spreadsheet.Add(new ReactiveList<string>(["Alice", "30", "NYC"]));
spreadsheet.Add(new ReactiveList<string>(["Bob", "25", "LA"]));
// Add data to specific cell (row 1, column 2)
spreadsheet.SetItem(1, 2, "Boston");
// Add new column to all rows
for (int row = 0; row < spreadsheet.Count; row++)
{
spreadsheet.AddToInner(row, row == 0 ? "Country" : "USA");
}
// Get flattened data
var allCells = spreadsheet.Flatten().ToList();
// Get total cell count
int totalCells = spreadsheet.TotalCount();
// Subscribe to changes in any cell
spreadsheet.Stream.Subscribe(notification =>
{
if (notification.Action == CacheAction.Added && notification.Item != null)
{
// A new row was added
notification.Item.Stream.Subscribe(innerNotification =>
{
Console.WriteLine($"Cell changed: {innerNotification.Action}");
});
}
});
QuaternaryList<T> (.NET 8+ Only)
A high-performance, thread-safe list that partitions elements across four internal shards for efficient concurrent access. Optimized for large datasets with frequent remove operations.
Note: Available only on .NET 8 and later.
Key Advantages
- 6-17x faster remove operations compared to SourceList
- 3-4x less memory usage at scale
- Built-in secondary indices for O(1) lookups
- Optimized for parallel operations on large datasets
Constructor
var list = new QuaternaryList<Product>();
Properties
| Property | Type | Description |
|---|---|---|
Count |
int |
Total number of items across all shards |
Stream |
IObservable<CacheNotify<T>> |
Change notification stream |
IsReadOnly |
bool |
Always returns false |
Methods
Basic Operations:
var products = new QuaternaryList<Product>();
// Add items
products.Add(new Product { Id = 1, Category = "Electronics" });
products.AddRange([product1, product2, product3]);
// Remove items
bool removed = products.Remove(product);
products.RemoveRange([product1, product2]);
// Remove by predicate
int removedCount = products.RemoveMany(p => p.Price < 10);
// Check existence
bool exists = products.Contains(product);
// Replace all items atomically
products.ReplaceAll(newProducts);
// Batch edit
products.Edit(collection =>
{
collection.Add(newProduct);
collection.Clear();
// Add more operations...
});
// Get snapshot
IReadOnlyList<Product> snapshot = products.Snapshot();
Secondary Indices:
Secondary indices enable O(1) lookups by custom keys:
var products = new QuaternaryList<Product>();
// Add secondary index by category
products.AddIndex("ByCategory", p => p.Category);
// Add secondary index by price range
products.AddIndex("ByPriceRange", p => p.Price switch
{
< 10 => "Budget",
< 100 => "Standard",
_ => "Premium"
});
// Add products
products.AddRange([
new Product { Id = 1, Category = "Electronics", Price = 299 },
new Product { Id = 2, Category = "Books", Price = 15 },
new Product { Id = 3, Category = "Electronics", Price = 49 }
]);
// O(1) lookup by category
var electronics = products.GetItemsBySecondaryIndex("ByCategory", "Electronics");
// O(1) lookup by price range
var premiumProducts = products.GetItemsBySecondaryIndex("ByPriceRange", "Premium");
// Check if item matches index key
bool isElectronics = products.ItemMatchesSecondaryIndex("ByCategory", product, "Electronics");
Creating Views with Secondary Index:
using CP.Reactive;
using System.Reactive.Concurrency;
// Create a reactive view filtered by secondary index
var electronicsView = products.CreateViewBySecondaryIndex(
indexName: "ByCategory",
key: "Electronics",
scheduler: Scheduler.Default,
throttleMs: 50
);
// View automatically updates when products change
electronicsView.Items.CollectionChanged += (s, e) =>
{
Console.WriteLine($"Electronics view updated: {electronicsView.Count} items");
};
// Create view with multiple keys (OR logic)
var budgetAndStandardView = products.CreateViewBySecondaryIndex(
indexName: "ByPriceRange",
keys: ["Budget", "Standard"],
scheduler: Scheduler.Default
);
Complete Example: Product Inventory System
using CP.Reactive;
using CP.Reactive.Collections;
using CP.Reactive.Core;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Disposables;
public class InventorySystem : IDisposable
{
private readonly QuaternaryList<Product> _inventory = new();
private readonly CompositeDisposable _subscriptions = new();
public InventorySystem()
{
// Setup secondary indices
_inventory.AddIndex("ByCategory", p => p.Category);
_inventory.AddIndex("BySku", p => p.Sku);
_inventory.AddIndex("BySupplier", p => p.SupplierId);
// Subscribe to low stock alerts
_subscriptions.Add(
_inventory.Stream
.OnItemAdded()
.Merge(_inventory.Stream.OnItemUpdated())
.Where(p => p.StockQuantity < 10)
.Subscribe(p =>
Console.WriteLine($"LOW STOCK ALERT: {p.Name} - Only {p.StockQuantity} left!"))
);
// Track inventory value changes
_subscriptions.Add(
_inventory.Stream
.Throttle(TimeSpan.FromSeconds(1))
.Select(_ => _inventory.Sum(p => p.Price * p.StockQuantity))
.DistinctUntilChanged()
.Subscribe(totalValue =>
Console.WriteLine($"Total inventory value: ${totalValue:N2}"))
);
}
// O(1) lookup by SKU
public Product? GetBySku(string sku) =>
_inventory.GetItemsBySecondaryIndex("BySku", sku).FirstOrDefault();
// O(1) lookup by category
public IEnumerable<Product> GetByCategory(string category) =>
_inventory.GetItemsBySecondaryIndex("ByCategory", category);
// Efficient bulk operations
public void RestockFromSupplier(int supplierId, Dictionary<string, int> quantities)
{
var supplierProducts = _inventory.GetItemsBySecondaryIndex("BySupplier", supplierId);
_inventory.Edit(collection =>
{
foreach (var product in supplierProducts)
{
if (quantities.TryGetValue(product.Sku, out int qty))
{
// Update stock (would need to handle immutability appropriately)
collection.Remove(product);
collection.Add(product with { StockQuantity = product.StockQuantity + qty });
}
}
});
}
// Create reactive view for specific category
public ReactiveView<Product> CreateCategoryView(string category) =>
_inventory.CreateViewBySecondaryIndex("ByCategory", category, Scheduler.Default);
public void Dispose()
{
_subscriptions.Dispose();
_inventory.Dispose();
}
}
public record Product
{
public int Id { get; init; }
public string Sku { get; init; } = "";
public string Name { get; init; } = "";
public string Category { get; init; } = "";
public int SupplierId { get; init; }
public decimal Price { get; init; }
public int StockQuantity { get; init; }
}
QuaternaryDictionary<TKey, TValue> (.NET 8+ Only)
A high-performance, thread-safe dictionary that distributes key-value pairs across four internal shards for improved concurrency. Optimized for large datasets with frequent lookups and modifications.
Note: Available only on .NET 8 and later.
Key Advantages
- 3-5x faster than SourceCache for bulk operations
- 3-4x less memory usage at scale
- Built-in secondary value indices for O(1) lookups by value properties
- Thread-safe with fine-grained locking per shard
Constructor
var cache = new QuaternaryDictionary<int, Customer>();
Properties
| Property | Type | Description |
|---|---|---|
Count |
int |
Total number of entries |
Keys |
ICollection<TKey> |
All keys |
Values |
ICollection<TValue> |
All values |
Stream |
IObservable<CacheNotify<KeyValuePair<TKey, TValue>>> |
Change notifications |
Methods
Basic Operations:
var customers = new QuaternaryDictionary<int, Customer>();
// Add items
customers.Add(1, new Customer { Id = 1, Name = "Alice" });
customers.AddRange([
new KeyValuePair<int, Customer>(2, new Customer { Id = 2, Name = "Bob" }),
new KeyValuePair<int, Customer>(3, new Customer { Id = 3, Name = "Charlie" })
]);
// Try add (returns false if key exists)
bool added = customers.TryAdd(4, new Customer { Id = 4, Name = "Diana" });
// Add or update
customers.AddOrUpdate(1, new Customer { Id = 1, Name = "Alice Updated" });
// Use indexer (adds or updates)
customers[5] = new Customer { Id = 5, Name = "Eve" };
// Get value
Customer customer = customers[1];
// Try get value
if (customers.TryGetValue(1, out Customer? found))
{
Console.WriteLine($"Found: {found.Name}");
}
// Lookup (returns tuple)
var (hasValue, value) = customers.Lookup(1);
// Check existence
bool exists = customers.ContainsKey(1);
bool containsPair = customers.Contains(new KeyValuePair<int, Customer>(1, customer));
// Remove
bool removed = customers.Remove(1);
customers.RemoveKeys([2, 3, 4]);
// Remove by predicate
int removedCount = customers.RemoveMany(kvp => kvp.Value.IsInactive);
// Batch edit
customers.Edit(dict =>
{
dict[100] = new Customer { Id = 100, Name = "New Customer" };
dict.Remove(5);
});
// Clear
customers.Clear();
Secondary Value Indices:
Index values by their properties for O(1) lookups:
var customers = new QuaternaryDictionary<int, Customer>();
// Add index by customer region
customers.AddValueIndex("ByRegion", c => c.Region);
// Add index by account manager
customers.AddValueIndex("ByManager", c => c.AccountManagerId);
// Add customers
customers.AddRange([
new(1, new Customer { Id = 1, Name = "Acme Corp", Region = "West", AccountManagerId = 101 }),
new(2, new Customer { Id = 2, Name = "Globex", Region = "East", AccountManagerId = 101 }),
new(3, new Customer { Id = 3, Name = "Initech", Region = "West", AccountManagerId = 102 })
]);
// O(1) lookup by region
var westCustomers = customers.GetValuesBySecondaryIndex("ByRegion", "West");
// O(1) lookup by manager
var manager101Customers = customers.GetValuesBySecondaryIndex("ByManager", 101);
// Check if value matches index
bool isWestCustomer = customers.ValueMatchesSecondaryIndex("ByRegion", customer, "West");
// Create reactive view filtered by secondary index
var westView = customers.CreateViewBySecondaryIndex<string>(
indexName: "ByRegion",
key: "West",
scheduler: Scheduler.Default,
throttleMs: 50
);
Complete Example: Real-Time User Session Cache
using CP.Reactive;
using CP.Reactive.Collections;
using CP.Reactive.Core;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Disposables;
public class SessionCache : IDisposable
{
private readonly QuaternaryDictionary<string, UserSession> _sessions = new();
private readonly CompositeDisposable _subscriptions = new();
public SessionCache()
{
// Setup secondary indices
_sessions.AddValueIndex("ByUserId", s => s.UserId);
_sessions.AddValueIndex("ByRole", s => s.Role);
_sessions.AddValueIndex("ByRegion", s => s.Region);
// Monitor session count
_subscriptions.Add(
_sessions.Stream
.Select(_ => _sessions.Count)
.DistinctUntilChanged()
.Subscribe(count =>
Console.WriteLine($"Active sessions: {count}"))
);
// Alert on admin logins
_subscriptions.Add(
_sessions.Stream
.WhereAdded()
.SelectAllItems()
.Where(kvp => kvp.Value.Role == "Admin")
.Subscribe(kvp =>
Console.WriteLine($"ADMIN LOGIN: User {kvp.Value.UserId} from {kvp.Value.IpAddress}"))
);
// Auto-expire sessions
_subscriptions.Add(
Observable.Interval(TimeSpan.FromMinutes(1))
.Subscribe(_ => ExpireOldSessions())
);
}
public void CreateSession(string sessionId, UserSession session) =>
_sessions.TryAdd(sessionId, session);
public void UpdateSession(string sessionId, UserSession session) =>
_sessions.AddOrUpdate(sessionId, session);
public UserSession? GetSession(string sessionId) =>
_sessions.TryGetValue(sessionId, out var session) ? session : null;
// Get all sessions for a user (O(1) via secondary index)
public IEnumerable<UserSession> GetUserSessions(int userId) =>
_sessions.GetValuesBySecondaryIndex("ByUserId", userId);
// Get all admin sessions
public IEnumerable<UserSession> GetAdminSessions() =>
_sessions.GetValuesBySecondaryIndex("ByRole", "Admin");
// Create reactive view for region
public SecondaryIndexReactiveView<string, UserSession, string> CreateRegionView(string region) =>
_sessions.CreateViewBySecondaryIndex<string>("ByRegion", region, Scheduler.Default);
public void EndSession(string sessionId) =>
_sessions.Remove(sessionId);
private void ExpireOldSessions()
{
var expireBefore = DateTime.UtcNow.AddHours(-24);
int expired = _sessions.RemoveMany(kvp => kvp.Value.LastActivity < expireBefore);
if (expired > 0)
Console.WriteLine($"Expired {expired} inactive sessions");
}
public void Dispose()
{
_subscriptions.Dispose();
_sessions.Dispose();
}
}
public record UserSession
{
public int UserId { get; init; }
public string Role { get; init; } = "User";
public string Region { get; init; } = "";
public string IpAddress { get; init; } = "";
public DateTime CreatedAt { get; init; } = DateTime.UtcNow;
public DateTime LastActivity { get; init; } = DateTime.UtcNow;
}
Views
Views are auto-updating projections of a reactive collection. They implement INotifyCollectionChanged and INotifyPropertyChanged for seamless UI binding.
FilteredReactiveView<T>
Creates a filtered view that automatically updates when the source changes.
using CP.Reactive;
using CP.Reactive.Collections;
using System.Reactive.Concurrency;
var employees = new ReactiveList<Employee>();
employees.AddRange(GetAllEmployees());
// Create filtered view - only active employees
var activeEmployees = employees.CreateView(
filter: e => e.IsActive,
scheduler: Scheduler.Default,
throttleMs: 50
);
// Bind to UI
myListBox.ItemsSource = activeEmployees.Items;
// Or use ToProperty for view models
activeEmployees.ToProperty(items => ActiveEmployeesList = items);
// Or use out parameter pattern
var view = employees.CreateView(e => e.IsActive)
.ToProperty(out ReadOnlyObservableCollection<Employee> activeList);
// View automatically updates when:
// - Items are added/removed from source
// - Item properties change (if tracked)
// Force refresh if needed
activeEmployees.Refresh();
// Don't forget to dispose when done
activeEmployees.Dispose();
SortedReactiveView<T>
Creates a sorted view that maintains sort order as items change.
var products = new ReactiveList<Product>();
// Sort by price ascending
var byPrice = products.SortBy(
comparer: Comparer<Product>.Create((a, b) => a.Price.CompareTo(b.Price)),
scheduler: Scheduler.Default,
throttleMs: 50
);
// Or use key selector
var byName = products.SortBy(
keySelector: p => p.Name,
descending: false,
scheduler: Scheduler.Default
);
// Sort descending by date
var byDateDesc = products.SortBy(
keySelector: p => p.CreatedAt,
descending: true
);
// Bind to UI
productGrid.ItemsSource = byPrice.Items;
GroupedReactiveView<T, TKey>
Groups items by a key and maintains groups as items change.
var tasks = new ReactiveList<TaskItem>();
// Group by status
var byStatus = tasks.GroupBy(
keySelector: t => t.Status,
scheduler: Scheduler.Default,
throttleMs: 50
);
// Access groups
foreach (var group in byStatus.Groups)
{
Console.WriteLine($"{group.Key}: {group.Count} tasks");
foreach (var task in group)
{
Console.WriteLine($" - {task.Title}");
}
}
// Check if group exists
if (byStatus.ContainsKey("Completed"))
{
var completedTasks = byStatus["Completed"];
}
// Try get group
if (byStatus.TryGetValue("InProgress", out var inProgress))
{
// Use the group...
}
// Bind groups to hierarchical UI
treeView.ItemsSource = byStatus.Groups;
DynamicFilteredReactiveView<T>
A filtered view where the filter can change dynamically.
using System.Reactive.Subjects;
var products = new ReactiveList<Product>();
// Create observable filter
var searchFilter = new BehaviorSubject<Func<Product, bool>>(_ => true);
// Create dynamic view
var searchResults = products.CreateView(
filterObservable: searchFilter,
scheduler: Scheduler.Default,
throttleMs: 100
);
// Update filter dynamically (view auto-rebuilds)
searchFilter.OnNext(p => p.Name.Contains("widget", StringComparison.OrdinalIgnoreCase));
// Change filter to show only expensive items
searchFilter.OnNext(p => p.Price > 100);
// Clear filter
searchFilter.OnNext(_ => true);
// Bind to search UI
searchResultsList.ItemsSource = searchResults.Items;
ReactiveView<T>
A general-purpose view for QuaternaryList that supports filtering.
var items = new QuaternaryList<DataPoint>();
// Create unfiltered view
var allItems = items.CreateView(Scheduler.Default, throttleMs: 50);
// Create filtered view
var recentItems = items.CreateView(
filter: dp => dp.Timestamp > DateTime.UtcNow.AddHours(-1),
scheduler: Scheduler.Default,
throttleMs: 50
);
// Create view with dynamic filter
var queryFilter = new BehaviorSubject<Func<DataPoint, bool>>(_ => true);
var dynamicView = items.CreateView(queryFilter, Scheduler.Default);
// Create view with query observable
var queryObservable = textBox.TextChanged.Select(e => e.Text);
var searchView = items.CreateView(
queryObservable: queryObservable,
filter: (query, item) => item.Name.Contains(query, StringComparison.OrdinalIgnoreCase),
scheduler: Scheduler.Default
);
Secondary Index Views
Views filtered by secondary index for efficient large dataset filtering.
// For QuaternaryList
var products = new QuaternaryList<Product>();
products.AddIndex("ByCategory", p => p.Category);
// Static key view
var electronicsView = products.CreateViewBySecondaryIndex(
indexName: "ByCategory",
key: "Electronics",
scheduler: Scheduler.Default
);
// Multiple keys view (OR logic)
var multiCategoryView = products.CreateViewBySecondaryIndex(
indexName: "ByCategory",
keys: ["Electronics", "Computers"],
scheduler: Scheduler.Default
);
// Dynamic key view (key changes over time)
var categorySelector = new BehaviorSubject<string[]>(["Electronics"]);
var dynamicCategoryView = products.CreateDynamicViewBySecondaryIndex(
indexName: "ByCategory",
keysObservable: categorySelector,
scheduler: Scheduler.Default
);
// Change category filter
categorySelector.OnNext(["Books", "Music"]);
Extension Methods Reference
Stream Extensions (CacheNotifyExtensions)
Extensions for working with IObservable<CacheNotify<T>>:
| Method | Description | Example |
|---|---|---|
WhereAction(action) |
Filter by specific action | stream.WhereAction(CacheAction.Added) |
WhereAdded() |
Filter to add notifications (single + batch) | stream.WhereAdded() |
WhereRemoved() |
Filter to remove notifications (single + batch) | stream.WhereRemoved() |
SelectItems() |
Project single items from notifications | stream.SelectItems() |
SelectAllItems() |
Project all items (single + batch) | stream.SelectAllItems() |
OnItemAdded() |
Get added items only | stream.OnItemAdded() |
OnItemRemoved() |
Get removed items only | stream.OnItemRemoved() |
OnItemUpdated() |
Get updated items only | stream.OnItemUpdated() |
OnItemMoved() |
Get moved items with indices | stream.OnItemMoved() |
OnCleared() |
Get clear notifications | stream.OnCleared() |
BufferNotifications(timeSpan) |
Buffer notifications over time | stream.BufferNotifications(TimeSpan.FromSeconds(1)) |
ThrottleNotifications(timeSpan) |
Throttle notification frequency | stream.ThrottleNotifications(TimeSpan.FromMilliseconds(100)) |
ObserveOnScheduler(scheduler) |
Observe on specific scheduler | stream.ObserveOnScheduler(Scheduler.Default) |
TransformItems(selector) |
Transform items | stream.TransformItems(x => x.ToString()) |
FilterItems(predicate) |
Filter items | stream.FilterItems(x => x.IsValid) |
ToChangeSets() |
Convert to ChangeSet stream | stream.ToChangeSets() |
ChangeSet Extensions (ReactiveListExtensions)
Extensions for working with IObservable<ChangeSet<T>>:
| Method | Description | Example |
|---|---|---|
WhereChanges(predicate) |
Filter changes by predicate | changeSets.WhereChanges(c => c.CurrentIndex > 0) |
WhereReason(reason) |
Filter by change reason | changeSets.WhereReason(ChangeReason.Add) |
SelectChanges<TResult>(selector) |
Transform changes | changeSets.SelectChanges(c => c.Current.Name) |
OnAdd() |
Get added items | changeSets.OnAdd() |
OnRemove() |
Get removed items | changeSets.OnRemove() |
OnUpdate() |
Get update tuples (previous, current) | changeSets.OnUpdate() |
OnMove() |
Get move tuples (item, old index, new index) | changeSets.OnMove() |
GroupByChanges(keySelector) |
Group changes by key | changeSets.GroupByChanges(c => c.Category) |
GroupingByChanges(keySelector) |
Group as IGrouping | changeSets.GroupingByChanges(c => c.Type) |
SortBy(keySelector) |
Sort changes by key | changeSets.SortBy(c => c.Name) |
AutoRefresh(propertyName) |
Refresh on property changes | changeSets.AutoRefresh("Price") |
AutoRefresh() |
Refresh on any property change | changeSets.AutoRefresh() |
FilterDynamic(filterObservable) |
Dynamic filtering | stream.FilterDynamic(filterSubject) |
Connect() |
Connect to stream as ChangeSet | source.Connect() |
View Creation Extensions
Extensions for creating views:
| Method | Collection | Description |
|---|---|---|
CreateView(filter, scheduler, throttleMs) |
IReactiveList<T> |
Create filtered view |
CreateView(scheduler, throttleMs) |
IReactiveList<T> |
Create unfiltered view |
CreateView(filterObservable, scheduler, throttleMs) |
IReactiveList<T> |
Create dynamic filtered view |
SortBy(comparer, scheduler, throttleMs) |
IReactiveList<T> |
Create sorted view |
SortBy(keySelector, descending, scheduler, throttleMs) |
IReactiveList<T> |
Create sorted view by key |
GroupBy(keySelector, scheduler, throttleMs) |
IReactiveList<T> |
Create grouped view |
CreateView(filter, scheduler, throttleMs) |
IReactiveSource<T> |
Create view from any source |
CreateView(filterObservable, scheduler, throttleMs) |
IReactiveSource<T> |
Create dynamic view |
CreateView(queryObservable, filter, scheduler, throttleMs) |
IReactiveSource<T> |
Create query-based view |
CreateViewBySecondaryIndex(indexName, key, scheduler, throttleMs) |
QuaternaryList<T> |
View by secondary index |
CreateViewBySecondaryIndex(indexName, keys, scheduler, throttleMs) |
QuaternaryList<T> |
View by multiple keys |
CreateDynamicViewBySecondaryIndex(indexName, keysObservable, scheduler, throttleMs) |
QuaternaryList<T> |
Dynamic secondary index view |
CreateViewBySecondaryIndex(indexName, key, scheduler, throttleMs) |
QuaternaryDictionary<K,V> |
Dictionary secondary index view |
FilterBySecondaryIndex(list, indexName, key) |
Stream | Filter stream by index |
FilterBySecondaryIndex(list, indexName, keys) |
Stream | Filter stream by multiple keys |
Real-World Examples
Live Stock Ticker
A real-time stock price monitoring system:
using CP.Reactive;
using CP.Reactive.Collections;
using CP.Reactive.Core;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
public class StockTicker : IDisposable
{
private readonly QuaternaryDictionary<string, StockQuote> _quotes = new();
private readonly CompositeDisposable _subscriptions = new();
public IObservable<StockQuote> PriceUpdates { get; }
public IObservable<StockQuote> SignificantChanges { get; }
public StockTicker()
{
// Setup indices
_quotes.AddValueIndex("BySector", q => q.Sector);
_quotes.AddValueIndex("ByExchange", q => q.Exchange);
// Create price update stream
PriceUpdates = _quotes.Stream
.WhereAction(CacheAction.Updated)
.SelectItems()
.Select(kvp => kvp.Value);
// Alert on significant price changes (>5%)
SignificantChanges = _quotes.Stream
.Where(n => n.Action == CacheAction.Updated && n.Previous != null)
.Select(n => (Current: n.Item!.Value, Previous: n.Previous!.Value.Value))
.Where(x => Math.Abs((x.Current.Price - x.Previous.Price) / x.Previous.Price) > 0.05m)
.Select(x => x.Current);
// Log all updates
_subscriptions.Add(
_quotes.Stream
.ThrottleNotifications(TimeSpan.FromMilliseconds(100))
.Subscribe(n => Console.WriteLine($"Quote updated: {n.Item?.Key}"))
);
}
// Called from market data feed
public void UpdateQuote(string symbol, decimal price, decimal volume)
{
var quote = new StockQuote
{
Symbol = symbol,
Price = price,
Volume = volume,
Timestamp = DateTime.UtcNow,
Sector = GetSector(symbol),
Exchange = GetExchange(symbol)
};
_quotes.AddOrUpdate(symbol, quote);
}
// Efficient sector lookup
public IEnumerable<StockQuote> GetBySector(string sector) =>
_quotes.GetValuesBySecondaryIndex("BySector", sector);
// Create live view for a sector
public SecondaryIndexReactiveView<string, StockQuote, string> CreateSectorView(string sector) =>
_quotes.CreateViewBySecondaryIndex<string>("BySector", sector, Scheduler.Default);
private string GetSector(string symbol) => symbol[0] switch
{
'A' or 'B' or 'C' => "Technology",
'D' or 'E' or 'F' => "Finance",
_ => "Other"
};
private string GetExchange(string symbol) => "NYSE"; // Simplified
public void Dispose()
{
_subscriptions.Dispose();
_quotes.Dispose();
}
}
public record StockQuote
{
public string Symbol { get; init; } = "";
public decimal Price { get; init; }
public decimal Volume { get; init; }
public DateTime Timestamp { get; init; }
public string Sector { get; init; } = "";
public string Exchange { get; init; } = "";
}
IoT Sensor Dashboard
Monitor sensors with real-time updates and alerting:
using CP.Reactive;
using CP.Reactive.Collections;
using CP.Reactive.Core;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
public class SensorDashboard : IDisposable
{
private readonly ReactiveList<SensorReading> _readings = new();
private readonly CompositeDisposable _subscriptions = new();
// Reactive views for different data needs
public FilteredReactiveView<SensorReading> RecentReadings { get; }
public GroupedReactiveView<SensorReading, string> ReadingsByDevice { get; }
// Alert streams
public IObservable<SensorReading> HighTemperatureAlerts { get; }
public IObservable<SensorReading> LowBatteryAlerts { get; }
public SensorDashboard()
{
// Create view of last hour's readings
RecentReadings = _readings.CreateView(
filter: r => r.Timestamp > DateTime.UtcNow.AddHours(-1),
scheduler: Scheduler.Default,
throttleMs: 100
);
// Group readings by device
ReadingsByDevice = _readings.GroupBy(
keySelector: r => r.DeviceId,
scheduler: Scheduler.Default
);
// High temperature alerts
HighTemperatureAlerts = _readings.Stream
.OnItemAdded()
.Where(r => r.SensorType == "Temperature" && r.Value > 85);
// Low battery alerts (unique per device)
LowBatteryAlerts = _readings.Stream
.OnItemAdded()
.Where(r => r.BatteryLevel < 20)
.GroupBy(r => r.DeviceId)
.SelectMany(g => g.Throttle(TimeSpan.FromMinutes(5)));
// Cleanup old readings periodically
_subscriptions.Add(
Observable.Interval(TimeSpan.FromMinutes(5))
.Subscribe(_ => CleanupOldReadings())
);
// Subscribe to alerts
_subscriptions.Add(
HighTemperatureAlerts.Subscribe(r =>
Console.WriteLine($"🔥 HIGH TEMP: Device {r.DeviceId} = {r.Value}°F"))
);
_subscriptions.Add(
LowBatteryAlerts.Subscribe(r =>
Console.WriteLine($"🔋 LOW BATTERY: Device {r.DeviceId} = {r.BatteryLevel}%"))
);
}
// Called from IoT hub
public void ProcessReading(SensorReading reading)
{
_readings.Add(reading);
}
// Bulk import
public void ProcessBatch(IEnumerable<SensorReading> readings)
{
_readings.AddRange(readings);
}
// Get statistics for a device
public DeviceStats? GetDeviceStats(string deviceId)
{
if (!ReadingsByDevice.ContainsKey(deviceId))
return null;
var readings = ReadingsByDevice[deviceId]
.Where(r => r.Timestamp > DateTime.UtcNow.AddHours(-1))
.ToList();
if (!readings.Any())
return null;
return new DeviceStats
{
DeviceId = deviceId,
AverageValue = readings.Average(r => r.Value),
MinValue = readings.Min(r => r.Value),
MaxValue = readings.Max(r => r.Value),
ReadingCount = readings.Count
};
}
private void CleanupOldReadings()
{
var cutoff = DateTime.UtcNow.AddHours(-24);
int removed = _readings.RemoveMany(r => r.Timestamp < cutoff);
Console.WriteLine($"Cleaned up {removed} old readings");
}
public void Dispose()
{
_subscriptions.Dispose();
RecentReadings.Dispose();
ReadingsByDevice.Dispose();
_readings.Dispose();
}
}
public record SensorReading
{
public string DeviceId { get; init; } = "";
public string SensorType { get; init; } = "";
public double Value { get; init; }
public int BatteryLevel { get; init; } = 100;
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
}
public record DeviceStats
{
public string DeviceId { get; init; } = "";
public double AverageValue { get; init; }
public double MinValue { get; init; }
public double MaxValue { get; init; }
public int ReadingCount { get; init; }
}
Chat Application
Real-time message handling with conversation grouping:
using CP.Reactive;
using CP.Reactive.Collections;
using CP.Reactive.Core;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
public class ChatService : IDisposable
{
private readonly ReactiveList<ChatMessage> _messages = new();
private readonly CompositeDisposable _subscriptions = new();
// Search functionality
private readonly BehaviorSubject<string> _searchQuery = new("");
public DynamicFilteredReactiveView<ChatMessage> SearchResults { get; }
// Grouped by conversation
public GroupedReactiveView<ChatMessage, string> Conversations { get; }
// Sorted by timestamp
public SortedReactiveView<ChatMessage> Timeline { get; }
// Real-time message stream
public IObservable<ChatMessage> NewMessages { get; }
public IObservable<ChatMessage> MentionAlerts { get; }
public ChatService(string currentUserId)
{
// New message notifications
NewMessages = _messages.Stream.OnItemAdded();
// Mentions of current user
MentionAlerts = NewMessages
.Where(m => m.Text.Contains($"@{currentUserId}", StringComparison.OrdinalIgnoreCase));
// Search results with dynamic query
var searchFilter = _searchQuery
.Throttle(TimeSpan.FromMilliseconds(300))
.Select<string, Func<ChatMessage, bool>>(query =>
string.IsNullOrWhiteSpace(query)
? _ => false
: m => m.Text.Contains(query, StringComparison.OrdinalIgnoreCase) ||
m.SenderName.Contains(query, StringComparison.OrdinalIgnoreCase));
SearchResults = _messages.CreateView(searchFilter, Scheduler.Default, 100);
// Group messages by conversation
Conversations = _messages.GroupBy(
m => m.ConversationId,
Scheduler.Default
);
// Timeline view (most recent first)
Timeline = _messages.SortBy(
m => m.Timestamp,
descending: true,
Scheduler.Default
);
// Subscribe to mention alerts
_subscriptions.Add(
MentionAlerts.Subscribe(m =>
Console.WriteLine($"📢 You were mentioned by {m.SenderName}: {m.Text}"))
);
}
// Called when message received
public void ReceiveMessage(ChatMessage message)
{
_messages.Add(message);
}
// Search messages
public void Search(string query)
{
_searchQuery.OnNext(query);
}
// Get messages for conversation
public IReadOnlyList<ChatMessage>? GetConversation(string conversationId)
{
return Conversations.TryGetValue(conversationId, out var messages)
? messages
: null;
}
// Delete old messages
public void PurgeOldMessages(TimeSpan olderThan)
{
var cutoff = DateTime.UtcNow - olderThan;
_messages.RemoveMany(m => m.Timestamp < cutoff);
}
public void Dispose()
{
_subscriptions.Dispose();
SearchResults.Dispose();
Conversations.Dispose();
Timeline.Dispose();
_messages.Dispose();
_searchQuery.Dispose();
}
}
public record ChatMessage
{
public string Id { get; init; } = Guid.NewGuid().ToString();
public string ConversationId { get; init; } = "";
public string SenderId { get; init; } = "";
public string SenderName { get; init; } = "";
public string Text { get; init; } = "";
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
}
WPF Data Binding
using CP.Reactive;
using CP.Reactive.Collections;
using CP.Reactive.Views;
using System.Collections.ObjectModel;
using System.ComponentModel;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Windows.Threading;
public class MainViewModel : INotifyPropertyChanged, IDisposable
{
private readonly ReactiveList<TodoItem> _todos = new();
private readonly FilteredReactiveView<TodoItem> _activeTodosView;
private readonly FilteredReactiveView<TodoItem> _completedTodosView;
private readonly CompositeDisposable _subscriptions = new();
public event PropertyChangedEventHandler? PropertyChanged;
// Bindable collections
public ReadOnlyObservableCollection<TodoItem> AllTodos { get; }
public ReadOnlyObservableCollection<TodoItem> ActiveTodos { get; private set; }
public ReadOnlyObservableCollection<TodoItem> CompletedTodos { get; private set; }
public int TotalCount => _todos.Count;
public int ActiveCount => ActiveTodos?.Count ?? 0;
public MainViewModel()
{
// Use Dispatcher scheduler for WPF thread safety
var wpfScheduler = DispatcherScheduler.Current;
AllTodos = _todos.Items;
// Active todos view
_activeTodosView = _todos.CreateView(
filter: t => !t.IsCompleted,
scheduler: wpfScheduler,
throttleMs: 50
).ToProperty(out ActiveTodos!);
// Completed todos view
_completedTodosView = _todos.CreateView(
filter: t => t.IsCompleted,
scheduler: wpfScheduler,
throttleMs: 50
).ToProperty(out CompletedTodos!);
// Update counts when collection changes
_subscriptions.Add(
_todos.Stream
.ObserveOn(wpfScheduler)
.Subscribe(_ =>
{
PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(TotalCount)));
PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(ActiveCount)));
})
);
}
public void AddTodo(string title)
{
_todos.Add(new TodoItem { Title = title });
}
public void ToggleComplete(TodoItem todo)
{
var index = _todos.IndexOf(todo);
if (index >= 0)
{
_todos[index] = todo with { IsCompleted = !todo.IsCompleted };
}
}
public void RemoveTodo(TodoItem todo)
{
_todos.Remove(todo);
}
public void ClearCompleted()
{
_todos.RemoveMany(t => t.IsCompleted);
}
public void Dispose()
{
_subscriptions.Dispose();
_activeTodosView.Dispose();
_completedTodosView.Dispose();
_todos.Dispose();
}
}
public record TodoItem
{
public string Id { get; init; } = Guid.NewGuid().ToString();
public string Title { get; init; } = "";
public bool IsCompleted { get; init; }
public DateTime CreatedAt { get; init; } = DateTime.Now;
}
XAML:
<Window x:Class="TodoApp.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
Title="Todo App" Height="400" Width="600">
<Grid>
<Grid.RowDefinitions>
<RowDefinition Height="Auto"/>
<RowDefinition Height="*"/>
<RowDefinition Height="Auto"/>
</Grid.RowDefinitions>
<StackPanel Orientation="Horizontal" Margin="10">
<TextBox x:Name="NewTodoText" Width="300" Margin="0,0,10,0"/>
<Button Content="Add" Click="AddTodo_Click"/>
</StackPanel>
<TabControl Grid.Row="1" Margin="10">
<TabItem Header="{Binding TotalCount, StringFormat='All ({0})'}">
<ListBox ItemsSource="{Binding AllTodos}"/>
</TabItem>
<TabItem Header="{Binding ActiveCount, StringFormat='Active ({0})'}">
<ListBox ItemsSource="{Binding ActiveTodos}"/>
</TabItem>
<TabItem Header="Completed">
<ListBox ItemsSource="{Binding CompletedTodos}"/>
</TabItem>
</TabControl>
<Button Grid.Row="2" Content="Clear Completed" Click="ClearCompleted_Click" Margin="10"/>
</Grid>
</Window>
Avalonia UI Data Binding
using CP.Reactive;
using CP.Reactive.Collections;
using CP.Reactive.Views;
using System.Collections.ObjectModel;
using ReactiveUI;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
public class MainViewModel : ReactiveObject, IDisposable
{
private readonly ReactiveList<Product> _products = new();
private readonly CompositeDisposable _subscriptions = new();
private string _searchText = "";
private ReadOnlyObservableCollection<Product>? _filteredProducts;
public string SearchText
{
get => _searchText;
set => this.RaiseAndSetIfChanged(ref _searchText, value);
}
public ReadOnlyObservableCollection<Product>? FilteredProducts
{
get => _filteredProducts;
private set => this.RaiseAndSetIfChanged(ref _filteredProducts, value);
}
public MainViewModel()
{
// Create dynamic search filter
var searchFilter = this.WhenAnyValue(x => x.SearchText)
.Throttle(TimeSpan.FromMilliseconds(300))
.Select<string, Func<Product, bool>>(text =>
string.IsNullOrWhiteSpace(text)
? _ => true
: p => p.Name.Contains(text, StringComparison.OrdinalIgnoreCase) ||
p.Category.Contains(text, StringComparison.OrdinalIgnoreCase));
// Create filtered view with Avalonia's UI thread scheduler
var view = _products.CreateView(
filterObservable: searchFilter,
scheduler: RxApp.MainThreadScheduler,
throttleMs: 50
).ToProperty(out var items);
FilteredProducts = items;
_subscriptions.Add(view);
// Load sample data
LoadSampleData();
}
private void LoadSampleData()
{
_products.AddRange([
new Product { Name = "Laptop", Category = "Electronics", Price = 999.99m },
new Product { Name = "Headphones", Category = "Electronics", Price = 149.99m },
new Product { Name = "Desk Chair", Category = "Furniture", Price = 299.99m },
new Product { Name = "Monitor", Category = "Electronics", Price = 399.99m },
new Product { Name = "Keyboard", Category = "Electronics", Price = 79.99m }
]);
}
public void Dispose()
{
_subscriptions.Dispose();
_products.Dispose();
}
}
public record Product
{
public string Name { get; init; } = "";
public string Category { get; init; } = "";
public decimal Price { get; init; }
}
Avalonia XAML:
<Window xmlns="https://github.com/avaloniaui"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
x:Class="ProductCatalog.MainWindow"
Title="Product Catalog" Width="600" Height="400">
<DockPanel Margin="10">
<TextBox DockPanel.Dock="Top"
Watermark="Search products..."
Text="{Binding SearchText}"
Margin="0,0,0,10"/>
<DataGrid ItemsSource="{Binding FilteredProducts}"
AutoGenerateColumns="False"
IsReadOnly="True">
<DataGrid.Columns>
<DataGridTextColumn Header="Name" Binding="{Binding Name}" Width="*"/>
<DataGridTextColumn Header="Category" Binding="{Binding Category}" Width="*"/>
<DataGridTextColumn Header="Price" Binding="{Binding Price, StringFormat=C}" Width="100"/>
</DataGrid.Columns>
</DataGrid>
</DockPanel>
</Window>
Thread Safety
All ReactiveList collections are designed to be thread-safe:
var list = new ReactiveList<int>();
// Safe to call from multiple threads
Parallel.For(0, 1000, i => list.Add(i));
// Safe concurrent reads and writes
var tasks = new[]
{
Task.Run(() => { for (int i = 0; i < 100; i++) list.Add(i); }),
Task.Run(() => { for (int i = 0; i < 100; i++) list.Remove(i % 50); }),
Task.Run(() => { foreach (var item in list) Console.Write(item); })
};
await Task.WhenAll(tasks);
Key Thread Safety Features:
- ReactiveList<T>: Uses lock-based synchronization
- Reactive2DList<T>: Inherits ReactiveList thread safety
- QuaternaryList<T>: Uses
ReaderWriterLockSlimwith 4 independent shards - QuaternaryDictionary<TKey, TValue>: Uses
ReaderWriterLockSlimwith 4 independent shards
Warning: Some methods like AsSpan() and AsMemory() return direct references without copying and are NOT thread-safe. Use only when you can guarantee no concurrent modifications.
Performance Considerations
Use Edit() for Batch Operations
// ❌ Bad: Multiple notifications
for (int i = 0; i < 1000; i++)
list.Add(i);
// ✅ Good: Single notification
list.Edit(editor =>
{
for (int i = 0; i < 1000; i++)
editor.Add(i);
});
// ✅ Also good: AddRange
list.AddRange(Enumerable.Range(0, 1000));
Use Throttling for Rapid Updates
// Views throttle by default (50ms)
var view = list.CreateView(x => x.IsActive, throttleMs: 100);
// Manual throttling on streams
list.Stream
.ThrottleNotifications(TimeSpan.FromMilliseconds(100))
.Subscribe(HandleChange);
Use QuaternaryList for Large Datasets
// For datasets > 1000 items with frequent removes
var largeList = new QuaternaryList<DataPoint>();
// 6-17x faster removes
largeList.RemoveMany(dp => dp.Timestamp < cutoff);
Use Secondary Indices for Frequent Lookups
// O(n) without index
var result = list.Where(x => x.Category == "Electronics").ToList();
// O(1) with index
list.AddIndex("ByCategory", x => x.Category);
var result = list.GetItemsBySecondaryIndex("ByCategory", "Electronics");
Dispose Views When Done
// Views hold subscriptions - always dispose
using var view = list.CreateView(x => x.IsActive);
// or
view.Dispose();
Benchmark Results
Benchmarks run on Windows 11, 12th Gen Intel Core i7-12650H, .NET 10.0.2
ReactiveList<T> vs SourceList<T> (DynamicData) - .NET 10
| Method | Count | Mean | Allocated |
|---|---|---|---|
| ReactiveList_AddRange | 10,000 | 602,415 ns | 3,462 KB |
| SourceList_AddRange | 10,000 | 76,536 ns | 172.2 KB |
| ReactiveList_Clear | 10,000 | 1,055,010 ns | 5,619 KB |
| SourceList_Clear | 10,000 | 156,889 ns | 252.9 KB |
| ReactiveList_Connect | 10,000 | 1,037,696 ns | 3,990 KB |
| SourceList_Connect | 10,000 | 77,675 ns | 172.8 KB |
| ReactiveList_RemoveMany | 10,000 | 20,227,197 ns | 5,001 KB |
| SourceList_RemoveMany | 10,000 | 10,773,439 ns | 2,759 KB |
Summary: SourceList is faster. ReactiveList provides fine-grained change tracking with higher overhead.
ReactiveList<T> vs List<T> - .NET 10
| Method | Count | Mean | Allocated |
|---|---|---|---|
| List_AddRange | 10,000 | 3,821 ns | 40.1 KB |
| ReactiveList_AddRange | 10,000 | 602,415 ns | 3,462 KB |
| List_Clear | 10,000 | 4,036 ns | 40.1 KB |
| ReactiveList_Clear | 10,000 | 1,055,010 ns | 5,619 KB |
| List_Filter | 10,000 | 7,044 ns | 40.1 KB |
| ReactiveList_Filter | 10,000 | 598,058 ns | 3,462 KB |
Summary: List is ~100x faster for raw operations. Use ReactiveList when you need reactive notifications.
QuaternaryList<T> vs SourceList<T> (DynamicData) - .NET 10
| Method | Count | Mean | Allocated |
|---|---|---|---|
| QuaternaryList_AddRange | 10,000 | 101,599 ns | 72.4 KB |
| SourceList_AddRange | 10,000 | 77,280 ns | 172.3 KB |
| QuaternaryList_RemoveRange | 10,000 | 1,363,441 ns | 75.1 KB |
| SourceList_RemoveRange | 10,000 | 24,111,156 ns | 2,373 KB |
| QuaternaryList_Remove | 10,000 | 5,243,958 ns | 72.4 KB |
| SourceList_Remove | 10,000 | 34,751,282 ns | 1,333 KB |
| QuaternaryList_RemoveMany | 10,000 | 1,294,411 ns | 72.4 KB |
| SourceList_RemoveMany | 10,000 | 10,546,519 ns | 2,759 KB |
| QuaternaryList_MixedOperations | 10,000 | 607,775 ns | 72.4 KB |
| SourceList_MixedOperations | 10,000 | 4,500,532 ns | 1,842 KB |
Summary: QuaternaryList is 6-17x faster for Remove operations and uses 3-4x less memory at scale.
QuaternaryDictionary<TKey, TValue> vs SourceCache<TValue, TKey> (DynamicData) - .NET 10
| Method | Count | Mean | Allocated |
|---|---|---|---|
| QuaternaryDictionary_AddRange | 10,000 | 132.2 us | 327.2 KB |
| SourceCache_AddRange | 10,000 | 602.3 us | 1,155.7 KB |
| QuaternaryDictionary_Clear | 10,000 | 142.4 us | 327.2 KB |
| SourceCache_Clear | 10,000 | 486.7 us | 1,155.7 KB |
| QuaternaryDictionary_Lookup | 10,000 | 133.7 us | 327.2 KB |
| SourceCache_Lookup | 10,000 | 302.4 us | 1,155.6 KB |
| QuaternaryDictionary_Stream_Add | 10,000 | 180.7 us | 455.8 KB |
| SourceCache_Stream_Add | 10,000 | 665.6 us | 2,437.8 KB |
| QuaternaryDictionary_IterateAll | 10,000 | 220.6 us | 327.2 KB |
| SourceCache_IterateAll | 10,000 | 349.1 us | 1,233.9 KB |
Summary: QuaternaryDictionary is 3-5x faster and uses 3-4x less memory than SourceCache at scale.
QuaternaryDictionary<TKey, TValue> vs Dictionary<TKey, TValue> - .NET 10
| Method | Count | Mean | Allocated |
|---|---|---|---|
| Dictionary_AddRange | 10,000 | 235.9 us | 657.6 KB |
| QuaternaryDictionary_AddRange | 10,000 | 132.2 us | 327.2 KB |
| Dictionary_Clear | 10,000 | 113.3 us | 197.5 KB |
| QuaternaryDictionary_Clear | 10,000 | 142.4 us | 327.2 KB |
| Dictionary_TryGetValue | 10,000 | 91.9 us | 197.5 KB |
| QuaternaryDictionary_TryGetValue | 10,000 | 134.3 us | 327.2 KB |
| Dictionary_IterateAll | 10,000 | 87.1 us | 197.5 KB |
| QuaternaryDictionary_IterateAll | 10,000 | 220.6 us | 327.2 KB |
Summary: Dictionary is faster for raw operations. QuaternaryDictionary is 1.8x faster for bulk AddRange and adds thread-safety, reactive notifications, and secondary indices.
When to Use Which Collection
| Scenario | Recommendation |
|---|---|
| Small datasets (<1,000 items) | ReactiveList<T> |
| Large datasets with frequent removes | QuaternaryList<T> (6-17x faster) |
| Large key-value datasets | QuaternaryDictionary<TKey,TValue> (3-5x faster) |
| Memory-constrained environments | QuaternaryList/Dictionary (3-4x less memory) |
| Rich LINQ operators needed | DynamicData SourceList<T> / SourceCache<TValue, TKey> |
| Secondary indices for O(1) lookups | QuaternaryList<T> / QuaternaryDictionary<TKey,TValue> |
| Thread-safe concurrent access | All ReactiveList collections |
| 2D/Matrix data structures | Reactive2DList<T> |
| .NET Framework 4.7.2/4.8 | ReactiveList<T> / Reactive2DList<T> |
License
This project is licensed under the MIT License - see the LICENSE file for details.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
ReactiveList - Empowering Reactive Applications with Observable Collections
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net8.0-windows10.0.19041 is compatible. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net9.0-windows10.0.19041 is compatible. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. net10.0-windows10.0.19041 is compatible. |
| .NET Framework | net472 is compatible. net48 is compatible. net481 was computed. |
-
.NETFramework 4.7.2
- System.Buffers (>= 4.6.1)
- System.Memory (>= 4.6.3)
- System.Reactive (>= 6.1.0)
- System.Threading.Channels (>= 10.0.2)
-
.NETFramework 4.8
- System.Buffers (>= 4.6.1)
- System.Memory (>= 4.6.3)
- System.Reactive (>= 6.1.0)
- System.Threading.Channels (>= 10.0.2)
-
net10.0
- System.Reactive (>= 6.1.0)
-
net10.0-windows10.0.19041
- System.Reactive (>= 6.1.0)
-
net8.0
- System.Reactive (>= 6.1.0)
-
net8.0-windows10.0.19041
- System.Reactive (>= 6.1.0)
-
net9.0
- System.Reactive (>= 6.1.0)
-
net9.0-windows10.0.19041
- System.Reactive (>= 6.1.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on ReactiveList:
| Package | Downloads |
|---|---|
|
CrissCross.WPF.UI
A Reactive Navigation Framework for ReactiveUI |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 4.0.4 | 62 | 2/5/2026 |
| 4.0.2 | 74 | 2/4/2026 |
| 3.0.5 | 110 | 1/13/2026 |
| 3.0.2 | 102 | 1/11/2026 |
| 2.4.5 | 252 | 12/7/2025 |
| 2.4.4 | 219 | 12/7/2025 |
| 2.3.2 | 303 | 10/6/2025 |
| 2.2.1 | 602 | 6/12/2025 |
| 2.2.0 | 290 | 1/28/2025 |
| 2.1.0 | 360 | 10/13/2024 |
| 2.0.2 | 346 | 5/13/2024 |
| 2.0.1 | 197 | 5/13/2024 |
| 2.0.0 | 182 | 5/2/2024 |
| 1.2.0 | 272 | 4/26/2024 |
| 1.1.3 | 383 | 1/29/2024 |
| 1.1.2 | 204 | 1/29/2024 |
| 1.1.1 | 265 | 12/26/2023 |
| 1.0.1 | 225 | 10/7/2023 |
| 1.0.0 | 213 | 10/7/2023 |
Compatability with Net 8 / 9 / 10 and netstandard2.0