LowCodeHub.SSE 0.0.5

dotnet add package LowCodeHub.SSE --version 0.0.5
                    
NuGet\Install-Package LowCodeHub.SSE -Version 0.0.5
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="LowCodeHub.SSE" Version="0.0.5" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="LowCodeHub.SSE" Version="0.0.5" />
                    
Directory.Packages.props
<PackageReference Include="LowCodeHub.SSE" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add LowCodeHub.SSE --version 0.0.5
                    
#r "nuget: LowCodeHub.SSE, 0.0.5"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package LowCodeHub.SSE@0.0.5
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=LowCodeHub.SSE&version=0.0.5
                    
Install as a Cake Addin
#tool nuget:?package=LowCodeHub.SSE&version=0.0.5
                    
Install as a Cake Tool

LowCodeHub.SSE

A generic SSE pub/sub library for ASP.NET Core. Choose Redis Streams, PostgreSQL, SQL Server, or in-memory storage; bring your own message model and map your own endpoints.

NuGet License: MIT

Why This Library?

Feature LowCodeHub.SSE Raw SSE + backing store SignalR
Transport Native SSE (zero JS deps) Manual plumbing WebSocket / fallback
Persistence Redis Streams, PostgreSQL, SQL Server, or memory Manual None (in-memory)
Replay Built-in Last-Event-ID Manual cursor tracking Not supported
Heartbeats Automatic (configurable interval) Manual Built-in ping
Message model Your own type (generic TMessage) Manual serialization Your own type
History backfill Mixed-source (external reader + live transport) Manual Manual
Payload guardrails Built-in size limits Manual None
Stream trimming Automatic bounded storage for durable transports Manual cleanup N/A

Installation

dotnet add package LowCodeHub.SSE

Quick Start

Define any message type — no base class or interface required:

public sealed record OrderEvent(string OrderId, string Status, decimal Total, DateTimeOffset OccurredAt);

Register one transport and wire up endpoints:

using LowCodeHub.SSE.Contracts;
using LowCodeHub.SSE.Extensions;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSse(options =>
{
    options.ConnectionString = "localhost:6379";
    options.StreamKeyPrefix = "sse:";
    options.MaxStreamLength = 100_000;
});

var app = builder.Build();

// Publish an event
app.MapPost("/orders/{orderId}/events", async (
        string orderId, OrderEvent payload,
        ISsePublisher publisher, CancellationToken ct) =>
    {
        var streamId = await publisher.PublishAsync($"orders:{orderId}", payload, ct);
        return Results.Accepted(value: new { streamId });
    });

// Stream events via SSE
app.MapGet("/orders/{orderId}/events/stream", (
        string orderId, HttpContext http,
        ISseStreamService streamService) =>
    {
        http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
        var stream = streamService.StreamAsync<OrderEvent>(
            $"orders:{orderId}", lastId.FirstOrDefault(), http.RequestAborted);
        return TypedResults.ServerSentEvents(stream);
    });

app.Run();

That's it. Messages are persisted by the selected transport, clients receive them via SSE, replay is handled automatically through Last-Event-ID, and heartbeats keep connections alive.


Table of Contents


Service Registration

Redis Streams

builder.Services.AddSse(options =>
{
    options.ConnectionString = "localhost:6379";
});

PostgreSQL

builder.Services.AddSsePostgreSql(options =>
{
    options.ConnectionString = builder.Configuration.GetConnectionString("PostgreSql")!;
    options.Schema = "public";
    options.Table = "sse_events";
    options.MaxStoredEventsPerChannel = 100_000;
});

SQL Server

builder.Services.AddSseSqlServer(options =>
{
    options.ConnectionString = builder.Configuration.GetConnectionString("SqlServer")!;
    options.Schema = "dbo";
    options.Table = "SseEvents";
    options.MaxStoredEventsPerChannel = 100_000;
});

In Memory

builder.Services.AddSseInMemory();

The SQL transports use embedded schema scripts exposed through LowCodeHub.SSE.Migrations.SqlSse. Run the matching script before publishing or streaming.

With History Reader and Heartbeat Factory

builder.Services.AddSse<NotificationEvent, NotificationHistoryReader, NotificationHeartbeatFactory>(options =>
{
    options.ConnectionString = "localhost:6379";
    options.StreamKeyPrefix = "sse:";
    options.MaxStreamLength = 100_000;
    options.HeartbeatIntervalSeconds = 15;
});

PostgreSQL and SQL Server have matching typed overloads:

builder.Services.AddSsePostgreSql<NotificationEvent, NotificationHistoryReader, NotificationHeartbeatFactory>(options =>
{
    options.ConnectionString = "...";
});

builder.Services.AddSseSqlServer<NotificationEvent, NotificationHistoryReader, NotificationHeartbeatFactory>(options =>
{
    options.ConnectionString = "...";
});

Register History / Heartbeat Independently

builder.Services.AddSse(options => { options.ConnectionString = "..."; });

// Add just a history reader (no heartbeat factory)
builder.Services.AddSseHistoryReader<OrderEvent, OrderEventHistoryReader>();

// Or just a heartbeat factory (no history reader)
builder.Services.AddSseHeartbeatFactory<DashboardUpdate, DashboardHeartbeatFactory>();

With External Redis Connection

If your application already manages an IConnectionMultiplexer, pass it directly to avoid creating a second connection:

builder.Services.AddSse(options =>
{
    options.ConnectionMultiplexer = existingMultiplexer; // package will NOT dispose this
});

Configuration

Redis options are configured through RedisSseOptions:

Option Type Default Description
ConnectionString string "localhost:6379" Redis connection string. Ignored if ConnectionMultiplexer is set.
ConnectionMultiplexer IConnectionMultiplexer? null Reuse an existing multiplexer. The package will not own or dispose it.
StreamKeyPrefix string "sse:" Prefix for Redis Stream keys. Final key: {prefix}{channel}.
MaxStreamLength long 50,000 Max entries per stream. Older entries are auto-trimmed via XTRIM.
UseApproximateTrimming bool true Use MAXLEN ~ for better Redis performance (slightly exceeds the cap).
PollIntervalMs int 300 Polling interval in ms when no new stream entries exist. Min: 20.
ReadBatchSize int 50 Entries read per Redis XREAD call.
ReplayBatchSize int 200 Entries read per replay batch from Redis range queries.
HeartbeatIntervalSeconds int 15 Seconds between heartbeat SSE events on idle connections. Min: 5.
MessageEventType string "message" SSE event: field for published messages.
MaxPayloadBytes int 16,384 (16 KB) Maximum UTF-8 payload size per message. Rejects if exceeded.
RequireAuthenticatedUser bool false Advisory flag for consumer endpoint authorization.

PostgreSQL and SQL Server use PostgreSqlSseOptions / SqlServerSseOptions:

Option Type Default Description
ConnectionString string "" Database connection string.
Schema string "public" / "dbo" Database schema for the event table.
Table string "sse_events" / "SseEvents" Event table name.
PollIntervalMs int 300 Polling interval in ms when no new rows exist. Min: 20.
ReadBatchSize int 50 Rows read per polling cycle.
HeartbeatIntervalSeconds int 15 Seconds between heartbeat SSE events on idle connections. Min: 5.
MessageEventType string "message" SSE event: field for published messages.
MaxPayloadBytes int 16,384 (16 KB) Maximum UTF-8 payload size per message. Rejects if exceeded.
MaxStoredEventsPerChannel int 50,000 Maximum retained rows per channel when trimming is enabled.
TrimOnPublish bool true Deletes older rows for the published channel after a successful publish.

Validation Rules

Options are validated at startup. The service throws InvalidOperationException if:

  • ConnectionString is empty and ConnectionMultiplexer is null
  • ReadBatchSize ≤ 0
  • MaxPayloadBytes ≤ 0
  • MaxStoredEventsPerChannel ≤ 0 for SQL transports
  • PollIntervalMs < 20
  • HeartbeatIntervalSeconds < 5
  • MessageEventType is empty

API Reference

ISsePublisher

public interface ISsePublisher
{
    ValueTask<string> PublishAsync<TMessage>(
        string channel,
        TMessage message,
        CancellationToken cancellationToken = default);
}
Parameter Description
channel Logical channel name (e.g., "orders", "user:123"). Redis maps it to a stream key; SQL transports store it in the event table.
message Any serializable object. Serialized to JSON with camelCase naming.
Returns Transport entry ID (for example Redis "1700000000000-0" or SQL "42").

Throws:

  • ArgumentException — if channel is null/empty.
  • ArgumentNullException — if message is null.
  • InvalidOperationException — if serialized payload exceeds MaxPayloadBytes.

ISseStreamService

public interface ISseStreamService
{
    IAsyncEnumerable<SseItem<TMessage>> StreamAsync<TMessage>(
        string channel,
        string? lastEventId,
        CancellationToken cancellationToken = default);
}
Parameter Description
channel Same channel name used in PublishAsync.
lastEventId The Last-Event-ID header sent by the browser on reconnect. null for fresh connections.
Returns Async stream of SseItem<TMessage> — passed directly to TypedResults.ServerSentEvents().

Behavior:

  1. If an ISseHistoryReader<TMessage> is registered, replays missed events from your external source first.
  2. Resolves a transport cursor from lastEventId. Redis accepts stream IDs such as "1700000000000-0"; SQL transports accept numeric row IDs. If the ID is not valid for the transport, it falls back to the latest entry.
  3. Enters a long-poll loop: reads up to ReadBatchSize entries per cycle, deserializes each payload, and yields SseItem<TMessage>.
  4. When idle (no new entries), emits heartbeat events every HeartbeatIntervalSeconds if an ISseHeartbeatFactory<TMessage> is registered.
  5. Sleeps PollIntervalMs between empty reads.
  6. Runs until cancellationToken is cancelled (client disconnects).

ISseHistoryReader<TMessage>

public interface ISseHistoryReader<TMessage>
{
    IAsyncEnumerable<SseHistoryEntry<TMessage>> ReadAfterAsync(
        string channel,
        string? lastEventId,
        CancellationToken cancellationToken = default);
}

Consumer-implemented. Called once at the start of StreamAsync to replay events from an external source (database, API, archive) before switching to the live transport.

ISseHeartbeatFactory<TMessage>

public interface ISseHeartbeatFactory<TMessage>
{
    TMessage Create(string channel);
}

Consumer-implemented. Creates a typed heartbeat message. The resulting SseItem is emitted with event type "heartbeat" (not MessageEventType).

SseHistoryEntry<TMessage>

public sealed record SseHistoryEntry<TMessage>(
    string EventId,
    TMessage Message,
    string EventType = "message");
Property Description
EventId Unique event identifier — becomes the SSE id: field. Can be any string (e.g., a database PK, a UUID, a Redis stream ID).
Message The deserialized message object.
EventType SSE event: field. Defaults to "message". Override to emit custom event types for history entries.

Registration Extension Methods

// Core — registers ISsePublisher + ISseStreamService
IServiceCollection AddSse(Action<RedisSseOptions> configure);
IServiceCollection AddSsePostgreSql(Action<PostgreSqlSseOptions> configure);
IServiceCollection AddSseSqlServer(Action<SqlServerSseOptions> configure);
IServiceCollection AddSseInMemory();

// Core + history reader + heartbeat factory for a specific TMessage
IServiceCollection AddSse<TMessage, THistoryReader, THeartbeatFactory>(Action<RedisSseOptions> configure);
IServiceCollection AddSsePostgreSql<TMessage, THistoryReader, THeartbeatFactory>(Action<PostgreSqlSseOptions> configure);
IServiceCollection AddSseSqlServer<TMessage, THistoryReader, THeartbeatFactory>(Action<SqlServerSseOptions> configure);

// Register only a history reader (call after AddSse)
IServiceCollection AddSseHistoryReader<TMessage, THistoryReader>();

// Register only a heartbeat factory (call after AddSse)
IServiceCollection AddSseHeartbeatFactory<TMessage, THeartbeatFactory>();

Transport registrations use TryAddSingleton — first registration wins. Register one transport (AddSse, AddSsePostgreSql, AddSseSqlServer, or AddSseInMemory) for the active ISsePublisher and ISseStreamService. The RedisSseService implements IAsyncDisposable and disposes only Redis connections it created.


Publishing Messages

Inject ISsePublisher and call PublishAsync:

public class NotificationService(ISsePublisher publisher)
{
    public async Task<string> SendAsync(string userId, NotificationEvent evt, CancellationToken ct)
    {
        // Channel can be any string — use it to scope streams per user, entity, topic, etc.
        string streamId = await publisher.PublishAsync($"user:{userId}", evt, ct);
        return streamId; // Transport entry ID, e.g. Redis "1700000000000-0" or SQL "42"
    }
}

What happens internally:

  1. Your message is serialized to JSON using System.Text.Json with camelCase naming (JsonSerializerDefaults.Web + JsonNamingPolicy.CamelCase).
  2. UTF-8 byte count is checked against MaxPayloadBytes. If it exceeds the limit, an InvalidOperationException is thrown — the message is not sent.
  3. The serialized payload is written to the selected transport (XADD for Redis, INSERT for PostgreSQL/SQL Server, or in-process fan-out for memory).
  4. Durable transports trim old entries according to their configured retention settings.
  5. Returns the transport event ID.

Streaming Events (SSE)

Inject ISseStreamService and return TypedResults.ServerSentEvents:

app.MapGet("/events/{channel}/stream", (
        string channel, HttpContext http,
        ISseStreamService streamService) =>
    {
        http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);

        var stream = streamService.StreamAsync<NotificationEvent>(
            channel,
            lastId.FirstOrDefault(),   // null on first connect, set on reconnect
            http.RequestAborted);       // cancelled when client disconnects

        return TypedResults.ServerSentEvents(stream);
    });

Stream lifecycle:

Client connects (GET /events/orders/stream)
       │
       ▼
┌─ History phase (if ISseHistoryReader<T> registered) ─┐
│  ReadAfterAsync(channel, lastEventId)                │
│  Yields SseHistoryEntry<T> → SseItem<T>              │
└──────────────────────┬───────────────────────────────┘
                       ▼
┌─ Live phase ─────────────────────────────────────────┐
│  Resolve Redis cursor from lastEventId               │
│     │                                                │
│     ▼                                                │
│  ┌─► XREAD sse:{channel} after {cursor}              │
│  │   ├─ entries found → deserialize, yield SseItem   │
│  │   └─ no entries ──► heartbeat? → yield heartbeat  │
│  │                     └─ sleep PollIntervalMs       │
│  └───────── loop until client disconnects ───────────│
└──────────────────────────────────────────────────────┘

SSE Wire Format

When a client connects to your SSE endpoint, the HTTP response stream follows the Server-Sent Events specification. Here's exactly what the browser receives:

Published Message

Given this message type and publish call:

public sealed record OrderEvent(string OrderId, string Status, decimal Total, DateTimeOffset OccurredAt);

await publisher.PublishAsync("orders", new OrderEvent("ORD-42", "Shipped", 99.95m, DateTimeOffset.UtcNow));

The SSE stream sends:

event: message
id: 1700000000000-0
data: {"orderId":"ORD-42","status":"Shipped","total":99.95,"occurredAt":"2026-04-23T12:00:00+00:00"}

SSE Field Value Source
event: message RedisSseOptions.MessageEventType (default "message")
id: 1700000000000-0 Redis Stream entry ID — auto-generated by Redis
data: JSON Your TMessage serialized with camelCase naming

Heartbeat Event

When no messages arrive for HeartbeatIntervalSeconds and an ISseHeartbeatFactory<T> is registered:

public sealed record KeepAlive(string Type, DateTimeOffset Timestamp);

public class MyHeartbeatFactory : ISseHeartbeatFactory<KeepAlive>
{
    public KeepAlive Create(string channel) =>
        new("heartbeat", DateTimeOffset.UtcNow);
}

The SSE stream sends:

event: heartbeat
id: 1700000000000-0
data: {"type":"heartbeat","timestamp":"2026-04-23T12:00:15+00:00"}

SSE Field Value Source
event: heartbeat Always "heartbeat" (hardcoded, not configurable)
id: 1700000000000-0 The current Redis cursor (last known stream position)
data: JSON Your heartbeat factory's Create() return value

History Replay Event

When an ISseHistoryReader<T> provides entries, each yields:

event: message
id: db-evt-1001
data: {"orderId":"ORD-40","status":"Created","total":50.00,"occurredAt":"2026-04-22T08:00:00+00:00"}

event: message
id: db-evt-1002
data: {"orderId":"ORD-41","status":"Paid","total":75.00,"occurredAt":"2026-04-22T09:30:00+00:00"}

SSE Field Value Source
event: message SseHistoryEntry.EventType (default "message", overridable per entry)
id: db-evt-1001 SseHistoryEntry.EventId — any string (database PK, UUID, etc.)
data: JSON SseHistoryEntry.Message

History events are sent first, before any live transport entries. Once history replay completes, the stream seamlessly transitions to the live poll loop.

Reconnection

When a client disconnects and reconnects, the browser automatically sends the Last-Event-ID header with the id: of the last received event. The stream service uses this to:

  1. Replay any history entries after that ID (if ISseHistoryReader is registered).
  2. Resume the live transport from that cursor position — only new entries after the last-seen ID are delivered.

This is automatic and requires no consumer-side code. The browser's EventSource API handles it natively.


Replay and History Backfill

Implement ISseHistoryReader<TMessage> to replay missed events from a database, API, or any external source before switching to the live transport:

public class OrderEventHistoryReader(AppDbContext db) : ISseHistoryReader<OrderEvent>
{
    public async IAsyncEnumerable<SseHistoryEntry<OrderEvent>> ReadAfterAsync(
        string channel,
        string? lastEventId,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var query = db.OrderEvents
            .Where(e => e.Channel == channel)
            .OrderBy(e => e.CreatedAt)
            .AsQueryable();

        if (lastEventId is not null)
            query = query.Where(e => e.Id.CompareTo(lastEventId) > 0);

        await foreach (var evt in query.AsAsyncEnumerable().WithCancellation(cancellationToken))
        {
            yield return new SseHistoryEntry<OrderEvent>(
                EventId: evt.Id,
                Message: new OrderEvent(evt.OrderId, evt.Status, evt.Total, evt.OccurredAt));
        }
    }
}

Register it:

builder.Services.AddSseHistoryReader<OrderEvent, OrderEventHistoryReader>();

Key points:

  • lastEventId can be a transport ID (for example Redis "1700000000000-0" or SQL "42") or any custom ID your history reader assigned. Your reader decides how to interpret it.
  • History entries with null EventId, null Message, or whitespace-only EventId are silently skipped.
  • If SseHistoryEntry.EventType is null or whitespace, it falls back to the selected transport's MessageEventType.

Heartbeats

Implement ISseHeartbeatFactory<TMessage> to create typed heartbeat messages that keep the SSE connection alive and prevent proxies/load balancers from timing out:

public class DashboardHeartbeatFactory : ISseHeartbeatFactory<DashboardUpdate>
{
    public DashboardUpdate Create(string channel) =>
        new(Type: "heartbeat", Channel: channel, Timestamp: DateTimeOffset.UtcNow, Data: null);
}

Register it:

builder.Services.AddSseHeartbeatFactory<DashboardUpdate, DashboardHeartbeatFactory>();

Behavior:

  • Heartbeats are emitted with SSE event type "heartbeat" — distinct from MessageEventType, so clients can filter them easily.
  • The id: field is set to the current transport cursor (the last known stream position).
  • If no ISseHeartbeatFactory<T> is registered, no heartbeat events are sent. The connection stays open but silent during idle periods. Clients relying on proxy keep-alive may need to set their own timeouts.
  • The heartbeat timer resets after every batch of real messages or after a heartbeat is sent.

Channel Key Building

Channel names are sanitized before being used as Redis keys:

Input Channel Redis Key (with default prefix sse:)
"orders" sse:orders
"user:123" sse:user:123
" My Topic " sse:my topic (trimmed + lowercased)
"" or whitespace sse:default
  • Channels are trimmed and lowercased"Orders" and "orders" map to the same stream.
  • Empty/whitespace channels fall back to "default".
  • The prefix is configurable via RedisSseOptions.StreamKeyPrefix.

Serialization

All messages are serialized/deserialized using System.Text.Json with these settings:

new JsonSerializerOptions(JsonSerializerDefaults.Web)
{
    PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};

This means:

  • C# OrderIdJSON orderId (camelCase property names)
  • DateTimeOffsetISO 8601 (e.g., "2026-04-23T12:00:00+00:00")
  • decimalJSON number (e.g., 99.95)
  • enuminteger by default (add JsonStringEnumConverter on your type if you want string values)
  • null propertiesomitted by default (Web defaults include DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)

Your TMessage must be serializable and deserializable by System.Text.Json. No attributes or annotations are required — any POCO, record, or class works.


Connection Lifecycle

Redis Connection

  • If RedisSseOptions.ConnectionMultiplexer is provided, the package reuses it and does not dispose it.
  • If only ConnectionString is set, the package creates its own IConnectionMultiplexer with:
    • AbortOnConnectFail = false — won't throw on startup if Redis is temporarily down.
    • ConnectTimeout, SyncTimeout, AsyncTimeout — minimum 15 seconds each.
  • The internal RedisSseService implements IAsyncDisposable. When the DI container disposes singletons at shutdown, the Redis connection is closed (only if the package owns it).

SQL Connections

  • PostgreSQL and SQL Server transports open short-lived pooled ADO.NET connections for publish, poll, and cursor resolution operations.
  • They require the matching event table to exist before use. Embedded scripts are listed by SqlSse.PostgreSqlResources and SqlSse.SqlServerResources.
  • Database-backed streams poll the table, so tune PollIntervalMs and database connection pool limits for your expected concurrent SSE clients.

SSE Connection

  • Each StreamAsync call runs for the lifetime of the HTTP request.
  • When the client disconnects, CancellationToken fires → the async enumerator exits → ASP.NET Core closes the response.
  • There is no server-side connection limit — that's managed by your ASP.NET Core configuration (Kestrel concurrent connections, reverse proxy timeouts, etc.).

Error Handling

Publishing

Scenario Behavior
channel is null/empty ArgumentException thrown
message is null ArgumentNullException thrown
Payload exceeds MaxPayloadBytes InvalidOperationException thrown — message is not sent
Redis or database is down Provider exception propagated to caller

Streaming

Scenario Behavior
channel is null/empty ArgumentException thrown
Redis or database read fails Warning logged, retries after PollIntervalMs — does not break the SSE stream
Payload deserialization fails Warning logged, entry skipped — stream continues with next entry
History entry has null EventId/Message Entry silently skipped
Client disconnects CancellationToken fires, async enumerator exits cleanly

Browser Client

Basic — EventSource API

const source = new EventSource('/events/orders/stream');

// Listen for published messages (event type: "message")
source.addEventListener('message', (e) => {
    const data = JSON.parse(e.data);
    console.log(`[${e.lastEventId}] New event:`, data);
    // data = { orderId: "ORD-42", status: "Shipped", total: 99.95, occurredAt: "..." }
});

// Listen for heartbeats (event type: "heartbeat")
source.addEventListener('heartbeat', (e) => {
    console.log('Connection alive at:', e.lastEventId);
});

// On error / disconnect — browser auto-reconnects with Last-Event-ID
source.onerror = (e) => {
    if (source.readyState === EventSource.CONNECTING) {
        console.log('SSE reconnecting...');
    } else {
        console.error('SSE connection failed');
    }
};

Custom Event Types

If you set MessageEventType to something other than "message":

options.MessageEventType = "order-update";

Then the browser must listen for that specific event type:

// "message" event listener will NOT fire
// Must use the custom event type:
source.addEventListener('order-update', (e) => {
    const data = JSON.parse(e.data);
    console.log('Order update:', data);
});

With Authentication

EventSource does not support custom headers. For authenticated endpoints, pass the token via query string or cookies:

// Option 1: Query string (ensure your endpoint reads it)
const source = new EventSource('/events/stream?access_token=eyJhbG...');

// Option 2: Cookies (if using cookie auth) — works automatically
const source = new EventSource('/events/stream', { withCredentials: true });

Fetch-based SSE (for custom headers)

const response = await fetch('/events/orders/stream', {
    headers: { 'Authorization': 'Bearer eyJhbG...' }
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const text = decoder.decode(value);
    // Parse SSE frames manually or use a library like eventsource-parser
    console.log(text);
}

Use Cases

Notifications Feed

public sealed record Notification(string Title, string Body, string Severity, DateTimeOffset CreatedAt);

builder.Services.AddSse(o => o.ConnectionString = "localhost:6379");

app.MapPost("/notifications/{userId}", async (string userId, Notification n, ISsePublisher pub, CancellationToken ct) =>
{
    await pub.PublishAsync($"user:{userId}", n, ct);
    return Results.Accepted();
});

app.MapGet("/notifications/{userId}/stream", (string userId, HttpContext http, ISseStreamService svc) =>
{
    http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
    return TypedResults.ServerSentEvents(
        svc.StreamAsync<Notification>($"user:{userId}", lastId.FirstOrDefault(), http.RequestAborted));
});

Live Dashboard

public sealed record MetricSnapshot(string Name, double Value, string Unit, DateTimeOffset Timestamp);

app.MapPost("/dashboard/metrics", async (MetricSnapshot snapshot, ISsePublisher pub, CancellationToken ct) =>
{
    await pub.PublishAsync("dashboard:metrics", snapshot, ct);
    return Results.Ok();
});

app.MapGet("/dashboard/metrics/stream", (HttpContext http, ISseStreamService svc) =>
{
    http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
    return TypedResults.ServerSentEvents(
        svc.StreamAsync<MetricSnapshot>("dashboard:metrics", lastId.FirstOrDefault(), http.RequestAborted));
});

Real-Time Chat

public sealed record ChatMessage(string SenderId, string SenderName, string Text, DateTimeOffset SentAt);

builder.Services.AddSse<ChatMessage, ChatHistoryReader, ChatHeartbeatFactory>(o =>
{
    o.ConnectionString = "localhost:6379";
    o.StreamKeyPrefix = "chat:";
});

app.MapPost("/chat/{room}/messages", async (string room, ChatMessage msg, ISsePublisher pub, CancellationToken ct) =>
{
    await pub.PublishAsync(room, msg, ct);
    return Results.Accepted();
});

app.MapGet("/chat/{room}/stream", (string room, HttpContext http, ISseStreamService svc) =>
{
    http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
    return TypedResults.ServerSentEvents(
        svc.StreamAsync<ChatMessage>(room, lastId.FirstOrDefault(), http.RequestAborted));
});

Audit Log Stream

public sealed record AuditEntry(string UserId, string Action, string Resource, DateTimeOffset Timestamp);

app.MapPost("/audit", async (AuditEntry entry, ISsePublisher pub, CancellationToken ct) =>
{
    await pub.PublishAsync("audit:all", entry, ct);
    return Results.Ok();
});

app.MapGet("/audit/stream", (HttpContext http, ISseStreamService svc) =>
{
    http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
    return TypedResults.ServerSentEvents(
        svc.StreamAsync<AuditEntry>("audit:all", lastId.FirstOrDefault(), http.RequestAborted));
});

How It Works

┌─────────────────────────────────────────────────────────┐
│  Publisher                                              │
│                                                         │
│  publisher.PublishAsync("orders", myEvent)              │
│       │                                                 │
│       ▼                                                 │
│  ┌──────────────┐    XADD sse:orders                   │
│  │ JSON serialize│──────────────────► Redis Stream      │
│  │ (camelCase)   │    MAXLEN ~ 50000                   │
│  └──────────────┘                                      │
│                                                         │
│  Returns: "1700000000000-0" (stream entry ID)           │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│  Stream Service                                         │
│                                                         │
│  GET /events/orders/stream                              │
│  Last-Event-ID: 1700000000000-0                         │
│       │                                                 │
│       ▼                                                 │
│  ┌──────────────────┐                                   │
│  │ History Reader    │ ◄── Optional: DB/API backfill    │
│  │ (ISseHistoryReader)│    Yields SseHistoryEntry<T>    │
│  └──────┬───────────┘                                   │
│         ▼                                               │
│  ┌──────────────────┐                                   │
│  │ Redis XREAD loop │ ◄── Poll every 300ms (default)   │
│  │   ├─ entries     │     Read up to 50 per cycle      │
│  │   │  └─ deser.   │                                  │
│  │   └─ idle        │                                  │
│  │      └─ heartbeat│ ◄── Every 15s if factory exists  │
│  └──────┬───────────┘                                   │
│         ▼                                               │
│  ┌──────────────────┐                                   │
│  │ SseItem<TMessage>│ ──► TypedResults.ServerSentEvents │
│  └──────────────────┘                                   │
│                                                         │
│  SSE output:                                            │
│  event: message                                         │
│  id: 1700000000001-0                                    │
│  data: {"orderId":"ORD-42","status":"Shipped",...}      │
└─────────────────────────────────────────────────────────┘

Requirements

  • .NET 10+
  • Redis 5.0+ for Redis Streams transport
  • PostgreSQL or SQL Server for SQL-backed transports
  • ASP.NET Core (for TypedResults.ServerSentEvents)

License

MIT

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on LowCodeHub.SSE:

Package Downloads
LowCodeHub.Jobs

Durable, distributed background-jobs library for ASP.NET Core: typed handlers, retries, recurring/delayed scheduling, cooperative cancellation, and Redis-Streams-backed SSE status streaming.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.0.5 99 5/20/2026
0.0.4 97 5/18/2026
0.0.3 138 5/12/2026
0.0.1 106 4/23/2026