Streamon 0.2.1

dotnet add package Streamon --version 0.2.1
                    
NuGet\Install-Package Streamon -Version 0.2.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Streamon" Version="0.2.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Streamon" Version="0.2.1" />
                    
Directory.Packages.props
<PackageReference Include="Streamon" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Streamon --version 0.2.1
                    
#r "nuget: Streamon, 0.2.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Streamon@0.2.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Streamon&version=0.2.1
                    
Install as a Cake Addin
#tool nuget:?package=Streamon&version=0.2.1
                    
Install as a Cake Tool

NuGet Version GitHub License .github/workflows/ci.yml .github/workflows/cd.yml

Streamon

Event streaming store platform for real-time data processing and analytics.

Streamon is the attempt to create a common set of abstraction APIs over a variety of event streaming stores, such as Azure Table Storage, Azure Cosmos DB, and others.

The goal is to provide a simple and consistent way to work with event streams, allowing developers to focus on the business logic and not on the underlying storage implementation.

Streamon is not an Event Sourcing library, but it can be used as an Event/Stream store for existing ones.

Providers

Features

  • POCO events, no base classes or inheritance required
  • Event ids and Metadata detection by using both Attribute and Interface markers
  • Customizable serialization and type resolution
  • Flexible stream sorage naming and partitioning, e.g. allowing for multitenancy by using one stream per tenant
  • Optimistic concurrency control
  • Soft and hard deletion modes
  • Global event positioning & tracking
  • Subscriptions
  • Projections (read model generation backed by storage providers)
  • Snapshots and Checkpoints

To Do's

  • Telemetry
  • Relational Stores (Entity Framework?)
  • Claim Checks for large events
  • Stream Sweeper (Archiving and Purging)

Subscriptions

The Streamon.Subscription package provides a pipeline for consuming events from a stream store via polling-based subscriptions. Subscriptions track their progress using checkpoints, so they can resume from where they left off after restarts.

Core Interfaces

Interface Responsibility
IEventHandler<TEvent> Handles a single event type asynchronously
ICheckpointStore Persists and retrieves the last processed position per subscription
ISubscriptionStreamReader Reads events from a position and reports the last global position
IEventHandlerRegistry Discovers and stores handler delegates by event type

Subscription Types

Type Behavior
StreamSubscriptionType.CatchUp Starts from StreamPosition.Start when no checkpoint exists — replays the full history
StreamSubscriptionType.Live Starts from the current end of the stream — only new events are processed
StreamSubscriptionType.InMemory In-memory only, useful for testing and temporary processing

Defining an Event Handler

Implement IEventHandler<TEvent> for each event type you want to handle:

public class OrderCapturedHandler : IEventHandler<OrderCaptured>
{
    public ValueTask HandleAsync(EventHandlerContext<OrderCaptured> context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"Order captured: {context.Payload.Id} at position {context.GlobalPosition}");
        return ValueTask.CompletedTask;
    }
}

public class OrderShippedHandler : IEventHandler<OrderShipped>
{
    public ValueTask HandleAsync(EventHandlerContext<OrderShipped> context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"Order shipped: {context.Payload.Id}, tracking: {context.Payload.Tracking}");
        return ValueTask.CompletedTask;
    }
}

The EventHandlerContext<T> record provides the full event envelope to the handler:

Property Description
SubscriptionId The subscription that received the event
StreamId The stream the event belongs to
EventId Unique event identifier
StreamPosition Position within the stream
GlobalPosition Position across all streams
Timestamp When the event was recorded
BatchId Groups events appended in the same call
Payload The strongly-typed event payload (T)
Metadata Optional per-event metadata dictionary

A single class can implement multiple IEventHandler<T> interfaces to handle several event types:

public class OrderEventHandlers : IEventHandler<OrderCaptured>, IEventHandler<OrderShipped>
{
    public ValueTask HandleAsync(EventHandlerContext<OrderCaptured> context, CancellationToken cancellationToken = default)
    {
        // handle OrderCaptured
        return ValueTask.CompletedTask;
    }

    public ValueTask HandleAsync(EventHandlerContext<OrderShipped> context, CancellationToken cancellationToken = default)
    {
        // handle OrderShipped
        return ValueTask.CompletedTask;
    }
}

Registering Subscriptions (Dependency Injection)

Use AddStreamSubscription to register a subscription and configure its checkpoint store, stream reader, and event handlers via the fluent builder:

services.AddStreamSubscription(SubscriptionId.From("order-processing"), StreamSubscriptionType.CatchUp)
    .UseTableStorageCheckpointStore(connectionString, streamTableName)
    .UseTableStorageSubscriptionStreamReader(connectionString, streamTableName)
    .AddEventHandler<OrderCapturedHandler>()
    .AddEventHandler<OrderShippedHandler>();

Multiple independent subscriptions can be registered in the same DI container — each is stored as a keyed singleton using its SubscriptionId:

services.AddStreamSubscription(SubscriptionId.From("order-processing"), StreamSubscriptionType.CatchUp)
    .UseTableStorageCheckpointStore(connectionString, streamTableName)
    .UseTableStorageSubscriptionStreamReader(connectionString, streamTableName)
    .AddEventHandler<OrderCapturedHandler>();

services.AddStreamSubscription(SubscriptionId.From("analytics"), StreamSubscriptionType.Live)
    .UseTableStorageCheckpointStore(connectionString, streamTableName)
    .UseTableStorageSubscriptionStreamReader(connectionString, streamTableName)
    .AddEventHandler<AnalyticsHandler>();

Polling for Events

The SubscriptionManager drives all registered subscriptions. Call PollAsync periodically (e.g. from a background service) to fetch and process new events:

var manager = sp.GetRequiredService<SubscriptionManager>();

// Poll all subscriptions
await manager.PollAsync(cancellationToken);

// Or poll a specific subscription
var subscription = manager.Get(SubscriptionId.From("order-processing"));
await subscription.PollAsync(cancellationToken);

Checkpoints

Checkpoints track the last successfully processed global position for each subscription. The ICheckpointStore interface supports:

  • GetCheckpointAsync(subscriptionId) — returns the last saved position, or StreamPosition.End if none exists
  • SetCheckpointAsync(subscriptionId, position) — persists the position after processing

The Azure Table Storage provider includes a built-in TableCheckpointStore implementation. For testing, you can implement ICheckpointStore with an in-memory dictionary.

Projectors

For building read models or projections, implement IEventInitialProjector<TEvent, TState> (for creating initial state) and/or IEventProjector<TEvent, TState> (for updating existing state):

public class OrderProjector : 
    IEventInitialProjector<OrderCaptured, OrderSummary>,
    IEventProjector<OrderShipped, OrderSummary>
{
    // Creates state from the first event
    public ValueTask<OrderSummary> ProjectAsync(EventHandlerContext<OrderCaptured> @event, CancellationToken cancellationToken = default)
    {
        return ValueTask.FromResult(new OrderSummary(@event.Payload.Id, @event.Payload.Product, IsShipped: false));
    }

    public string GetIdentity(EventHandlerContext<OrderCaptured> @event, CancellationToken cancellationToken = default) => @event.Payload.Id;

    // Updates existing state
    public ValueTask<OrderSummary> ProjectAsync(OrderSummary state, EventHandlerContext<OrderShipped> @event, CancellationToken cancellationToken = default)
    {
        return ValueTask.FromResult(state with { IsShipped = true });
    }

    public string GetIdentity(EventHandlerContext<OrderShipped> @event, CancellationToken cancellationToken = default) => @event.Payload.Id;
}

public record OrderSummary(string Id, string Product, bool IsShipped);

Note: The projector infrastructure (EventProjectorBase<TProjector, TState>) and its state read/write abstractions are still a work in progress. See the copilot instructions for current status.

Full Example

// 1. Define your events
public record OrderCaptured(string Id, string Product, decimal Price);
public record OrderShipped(string Id, string Tracking);

// 2. Define your handler
public class OrderCapturedHandler : IEventHandler<OrderCaptured>
{
    public ValueTask HandleAsync(EventHandlerContext<OrderCaptured> context, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"Processing order {context.Payload.Id} for {context.Payload.Product}");
        return ValueTask.CompletedTask;
    }
}

// 3. Register services
services.AddStreamon()
    .AddTableStorageStreamStore(connectionString, options =>
    {
        options.StreamTypeProvider = new StreamTypeProvider()
            .RegisterTypes<OrderCaptured>()
            .RegisterTypes<OrderShipped>();
    });

services.AddStreamSubscription(SubscriptionId.From("order-sub"), StreamSubscriptionType.CatchUp)
    .UseTableStorageCheckpointStore(connectionString, "streams")
    .UseTableStorageSubscriptionStreamReader(connectionString, "streams")
    .AddEventHandler<OrderCapturedHandler>();

// 4. Poll in a background service
public class SubscriptionWorker(SubscriptionManager manager) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            await manager.PollAsync(stoppingToken);
            await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
        }
    }
}

Azure Table Storage Provider Details

The Azure Table Storage provider is a simple implementation of the IStreamStore interface. It uses Azure Table Storage to store events in a single table. The table is partitioned by the stream id and the row key is the event id. The provider uses the Ulid library to generate unique identifiers for events.

Table Storage only support batches of up to 100 entities, trying to write more than that will result in an exception. The responsibility of handling this is left to the caller, as the provider does not implement any batching logic due to the fact that it can't guarantee the consistency of persistence across different batches.

Thanks

For inspiration and ideas, thanks to:

Streamstone Eveneum Eventflow and Eventuous

Icon:

Pipe icons created by srip - Flaticon

Dependencies

Continuous Integration & Deployment

The project is built and tested using GitHub Actions. The build artifacts are published to GitHub Packages.

Github actions are configured trigger a CI build on every push to the main branch. Packages will be published to nuget.org on every release.

Local testing and development of Github actions can be done using the act tool.

Contributing

Let's keep it simple and clean. Feel free to open an issue or a pull request, I'll review it and most likely than not, merge it.

License

MIT License

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Streamon:

Package Downloads
Streamon.Subscription

Abstractions and common logic for Streamon event streams Subscriptions.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.2.1 127 5/11/2026
0.1.1 254 12/3/2024
0.0.1 221 11/29/2024