LowCodeHub.SSE
0.0.5
dotnet add package LowCodeHub.SSE --version 0.0.5
NuGet\Install-Package LowCodeHub.SSE -Version 0.0.5
<PackageReference Include="LowCodeHub.SSE" Version="0.0.5" />
<PackageVersion Include="LowCodeHub.SSE" Version="0.0.5" />
<PackageReference Include="LowCodeHub.SSE" />
paket add LowCodeHub.SSE --version 0.0.5
#r "nuget: LowCodeHub.SSE, 0.0.5"
#:package LowCodeHub.SSE@0.0.5
#addin nuget:?package=LowCodeHub.SSE&version=0.0.5
#tool nuget:?package=LowCodeHub.SSE&version=0.0.5
LowCodeHub.SSE
A generic SSE pub/sub library for ASP.NET Core. Choose Redis Streams, PostgreSQL, SQL Server, or in-memory storage; bring your own message model and map your own endpoints.
Why This Library?
| Feature | LowCodeHub.SSE | Raw SSE + backing store | SignalR |
|---|---|---|---|
| Transport | Native SSE (zero JS deps) | Manual plumbing | WebSocket / fallback |
| Persistence | Redis Streams, PostgreSQL, SQL Server, or memory | Manual | None (in-memory) |
| Replay | Built-in Last-Event-ID |
Manual cursor tracking | Not supported |
| Heartbeats | Automatic (configurable interval) | Manual | Built-in ping |
| Message model | Your own type (generic TMessage) |
Manual serialization | Your own type |
| History backfill | Mixed-source (external reader + live transport) | Manual | Manual |
| Payload guardrails | Built-in size limits | Manual | None |
| Stream trimming | Automatic bounded storage for durable transports | Manual cleanup | N/A |
Installation
dotnet add package LowCodeHub.SSE
Quick Start
Define any message type — no base class or interface required:
public sealed record OrderEvent(string OrderId, string Status, decimal Total, DateTimeOffset OccurredAt);
Register one transport and wire up endpoints:
using LowCodeHub.SSE.Contracts;
using LowCodeHub.SSE.Extensions;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSse(options =>
{
options.ConnectionString = "localhost:6379";
options.StreamKeyPrefix = "sse:";
options.MaxStreamLength = 100_000;
});
var app = builder.Build();
// Publish an event
app.MapPost("/orders/{orderId}/events", async (
string orderId, OrderEvent payload,
ISsePublisher publisher, CancellationToken ct) =>
{
var streamId = await publisher.PublishAsync($"orders:{orderId}", payload, ct);
return Results.Accepted(value: new { streamId });
});
// Stream events via SSE
app.MapGet("/orders/{orderId}/events/stream", (
string orderId, HttpContext http,
ISseStreamService streamService) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
var stream = streamService.StreamAsync<OrderEvent>(
$"orders:{orderId}", lastId.FirstOrDefault(), http.RequestAborted);
return TypedResults.ServerSentEvents(stream);
});
app.Run();
That's it. Messages are persisted by the selected transport, clients receive them via SSE, replay is handled automatically through Last-Event-ID, and heartbeats keep connections alive.
Table of Contents
- Service Registration
- Configuration
- API Reference
- Publishing Messages
- Streaming Events (SSE)
- SSE Wire Format
- Replay and History Backfill
- Heartbeats
- Channel Key Building
- Serialization
- Connection Lifecycle
- Error Handling
- Browser Client
- Use Cases
- How It Works
- Requirements
- License
Service Registration
Redis Streams
builder.Services.AddSse(options =>
{
options.ConnectionString = "localhost:6379";
});
PostgreSQL
builder.Services.AddSsePostgreSql(options =>
{
options.ConnectionString = builder.Configuration.GetConnectionString("PostgreSql")!;
options.Schema = "public";
options.Table = "sse_events";
options.MaxStoredEventsPerChannel = 100_000;
});
SQL Server
builder.Services.AddSseSqlServer(options =>
{
options.ConnectionString = builder.Configuration.GetConnectionString("SqlServer")!;
options.Schema = "dbo";
options.Table = "SseEvents";
options.MaxStoredEventsPerChannel = 100_000;
});
In Memory
builder.Services.AddSseInMemory();
The SQL transports use embedded schema scripts exposed through LowCodeHub.SSE.Migrations.SqlSse.
Run the matching script before publishing or streaming.
With History Reader and Heartbeat Factory
builder.Services.AddSse<NotificationEvent, NotificationHistoryReader, NotificationHeartbeatFactory>(options =>
{
options.ConnectionString = "localhost:6379";
options.StreamKeyPrefix = "sse:";
options.MaxStreamLength = 100_000;
options.HeartbeatIntervalSeconds = 15;
});
PostgreSQL and SQL Server have matching typed overloads:
builder.Services.AddSsePostgreSql<NotificationEvent, NotificationHistoryReader, NotificationHeartbeatFactory>(options =>
{
options.ConnectionString = "...";
});
builder.Services.AddSseSqlServer<NotificationEvent, NotificationHistoryReader, NotificationHeartbeatFactory>(options =>
{
options.ConnectionString = "...";
});
Register History / Heartbeat Independently
builder.Services.AddSse(options => { options.ConnectionString = "..."; });
// Add just a history reader (no heartbeat factory)
builder.Services.AddSseHistoryReader<OrderEvent, OrderEventHistoryReader>();
// Or just a heartbeat factory (no history reader)
builder.Services.AddSseHeartbeatFactory<DashboardUpdate, DashboardHeartbeatFactory>();
With External Redis Connection
If your application already manages an IConnectionMultiplexer, pass it directly to avoid creating a second connection:
builder.Services.AddSse(options =>
{
options.ConnectionMultiplexer = existingMultiplexer; // package will NOT dispose this
});
Configuration
Redis options are configured through RedisSseOptions:
| Option | Type | Default | Description |
|---|---|---|---|
ConnectionString |
string |
"localhost:6379" |
Redis connection string. Ignored if ConnectionMultiplexer is set. |
ConnectionMultiplexer |
IConnectionMultiplexer? |
null |
Reuse an existing multiplexer. The package will not own or dispose it. |
StreamKeyPrefix |
string |
"sse:" |
Prefix for Redis Stream keys. Final key: {prefix}{channel}. |
MaxStreamLength |
long |
50,000 |
Max entries per stream. Older entries are auto-trimmed via XTRIM. |
UseApproximateTrimming |
bool |
true |
Use MAXLEN ~ for better Redis performance (slightly exceeds the cap). |
PollIntervalMs |
int |
300 |
Polling interval in ms when no new stream entries exist. Min: 20. |
ReadBatchSize |
int |
50 |
Entries read per Redis XREAD call. |
ReplayBatchSize |
int |
200 |
Entries read per replay batch from Redis range queries. |
HeartbeatIntervalSeconds |
int |
15 |
Seconds between heartbeat SSE events on idle connections. Min: 5. |
MessageEventType |
string |
"message" |
SSE event: field for published messages. |
MaxPayloadBytes |
int |
16,384 (16 KB) |
Maximum UTF-8 payload size per message. Rejects if exceeded. |
RequireAuthenticatedUser |
bool |
false |
Advisory flag for consumer endpoint authorization. |
PostgreSQL and SQL Server use PostgreSqlSseOptions / SqlServerSseOptions:
| Option | Type | Default | Description |
|---|---|---|---|
ConnectionString |
string |
"" |
Database connection string. |
Schema |
string |
"public" / "dbo" |
Database schema for the event table. |
Table |
string |
"sse_events" / "SseEvents" |
Event table name. |
PollIntervalMs |
int |
300 |
Polling interval in ms when no new rows exist. Min: 20. |
ReadBatchSize |
int |
50 |
Rows read per polling cycle. |
HeartbeatIntervalSeconds |
int |
15 |
Seconds between heartbeat SSE events on idle connections. Min: 5. |
MessageEventType |
string |
"message" |
SSE event: field for published messages. |
MaxPayloadBytes |
int |
16,384 (16 KB) |
Maximum UTF-8 payload size per message. Rejects if exceeded. |
MaxStoredEventsPerChannel |
int |
50,000 |
Maximum retained rows per channel when trimming is enabled. |
TrimOnPublish |
bool |
true |
Deletes older rows for the published channel after a successful publish. |
Validation Rules
Options are validated at startup. The service throws InvalidOperationException if:
ConnectionStringis empty andConnectionMultiplexeris nullReadBatchSize≤ 0MaxPayloadBytes≤ 0MaxStoredEventsPerChannel≤ 0 for SQL transportsPollIntervalMs< 20HeartbeatIntervalSeconds< 5MessageEventTypeis empty
API Reference
ISsePublisher
public interface ISsePublisher
{
ValueTask<string> PublishAsync<TMessage>(
string channel,
TMessage message,
CancellationToken cancellationToken = default);
}
| Parameter | Description |
|---|---|
channel |
Logical channel name (e.g., "orders", "user:123"). Redis maps it to a stream key; SQL transports store it in the event table. |
message |
Any serializable object. Serialized to JSON with camelCase naming. |
| Returns | Transport entry ID (for example Redis "1700000000000-0" or SQL "42"). |
Throws:
ArgumentException— ifchannelis null/empty.ArgumentNullException— ifmessageis null.InvalidOperationException— if serialized payload exceedsMaxPayloadBytes.
ISseStreamService
public interface ISseStreamService
{
IAsyncEnumerable<SseItem<TMessage>> StreamAsync<TMessage>(
string channel,
string? lastEventId,
CancellationToken cancellationToken = default);
}
| Parameter | Description |
|---|---|
channel |
Same channel name used in PublishAsync. |
lastEventId |
The Last-Event-ID header sent by the browser on reconnect. null for fresh connections. |
| Returns | Async stream of SseItem<TMessage> — passed directly to TypedResults.ServerSentEvents(). |
Behavior:
- If an
ISseHistoryReader<TMessage>is registered, replays missed events from your external source first. - Resolves a transport cursor from
lastEventId. Redis accepts stream IDs such as"1700000000000-0"; SQL transports accept numeric row IDs. If the ID is not valid for the transport, it falls back to the latest entry. - Enters a long-poll loop: reads up to
ReadBatchSizeentries per cycle, deserializes each payload, and yieldsSseItem<TMessage>. - When idle (no new entries), emits heartbeat events every
HeartbeatIntervalSecondsif anISseHeartbeatFactory<TMessage>is registered. - Sleeps
PollIntervalMsbetween empty reads. - Runs until
cancellationTokenis cancelled (client disconnects).
ISseHistoryReader<TMessage>
public interface ISseHistoryReader<TMessage>
{
IAsyncEnumerable<SseHistoryEntry<TMessage>> ReadAfterAsync(
string channel,
string? lastEventId,
CancellationToken cancellationToken = default);
}
Consumer-implemented. Called once at the start of StreamAsync to replay events from an external source (database, API, archive) before switching to the live transport.
ISseHeartbeatFactory<TMessage>
public interface ISseHeartbeatFactory<TMessage>
{
TMessage Create(string channel);
}
Consumer-implemented. Creates a typed heartbeat message. The resulting SseItem is emitted with event type "heartbeat" (not MessageEventType).
SseHistoryEntry<TMessage>
public sealed record SseHistoryEntry<TMessage>(
string EventId,
TMessage Message,
string EventType = "message");
| Property | Description |
|---|---|
EventId |
Unique event identifier — becomes the SSE id: field. Can be any string (e.g., a database PK, a UUID, a Redis stream ID). |
Message |
The deserialized message object. |
EventType |
SSE event: field. Defaults to "message". Override to emit custom event types for history entries. |
Registration Extension Methods
// Core — registers ISsePublisher + ISseStreamService
IServiceCollection AddSse(Action<RedisSseOptions> configure);
IServiceCollection AddSsePostgreSql(Action<PostgreSqlSseOptions> configure);
IServiceCollection AddSseSqlServer(Action<SqlServerSseOptions> configure);
IServiceCollection AddSseInMemory();
// Core + history reader + heartbeat factory for a specific TMessage
IServiceCollection AddSse<TMessage, THistoryReader, THeartbeatFactory>(Action<RedisSseOptions> configure);
IServiceCollection AddSsePostgreSql<TMessage, THistoryReader, THeartbeatFactory>(Action<PostgreSqlSseOptions> configure);
IServiceCollection AddSseSqlServer<TMessage, THistoryReader, THeartbeatFactory>(Action<SqlServerSseOptions> configure);
// Register only a history reader (call after AddSse)
IServiceCollection AddSseHistoryReader<TMessage, THistoryReader>();
// Register only a heartbeat factory (call after AddSse)
IServiceCollection AddSseHeartbeatFactory<TMessage, THeartbeatFactory>();
Transport registrations use TryAddSingleton — first registration wins. Register one transport (AddSse, AddSsePostgreSql, AddSseSqlServer, or AddSseInMemory) for the active ISsePublisher and ISseStreamService. The RedisSseService implements IAsyncDisposable and disposes only Redis connections it created.
Publishing Messages
Inject ISsePublisher and call PublishAsync:
public class NotificationService(ISsePublisher publisher)
{
public async Task<string> SendAsync(string userId, NotificationEvent evt, CancellationToken ct)
{
// Channel can be any string — use it to scope streams per user, entity, topic, etc.
string streamId = await publisher.PublishAsync($"user:{userId}", evt, ct);
return streamId; // Transport entry ID, e.g. Redis "1700000000000-0" or SQL "42"
}
}
What happens internally:
- Your message is serialized to JSON using
System.Text.Jsonwith camelCase naming (JsonSerializerDefaults.Web+JsonNamingPolicy.CamelCase). - UTF-8 byte count is checked against
MaxPayloadBytes. If it exceeds the limit, anInvalidOperationExceptionis thrown — the message is not sent. - The serialized payload is written to the selected transport (
XADDfor Redis,INSERTfor PostgreSQL/SQL Server, or in-process fan-out for memory). - Durable transports trim old entries according to their configured retention settings.
- Returns the transport event ID.
Streaming Events (SSE)
Inject ISseStreamService and return TypedResults.ServerSentEvents:
app.MapGet("/events/{channel}/stream", (
string channel, HttpContext http,
ISseStreamService streamService) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
var stream = streamService.StreamAsync<NotificationEvent>(
channel,
lastId.FirstOrDefault(), // null on first connect, set on reconnect
http.RequestAborted); // cancelled when client disconnects
return TypedResults.ServerSentEvents(stream);
});
Stream lifecycle:
Client connects (GET /events/orders/stream)
│
▼
┌─ History phase (if ISseHistoryReader<T> registered) ─┐
│ ReadAfterAsync(channel, lastEventId) │
│ Yields SseHistoryEntry<T> → SseItem<T> │
└──────────────────────┬───────────────────────────────┘
▼
┌─ Live phase ─────────────────────────────────────────┐
│ Resolve Redis cursor from lastEventId │
│ │ │
│ ▼ │
│ ┌─► XREAD sse:{channel} after {cursor} │
│ │ ├─ entries found → deserialize, yield SseItem │
│ │ └─ no entries ──► heartbeat? → yield heartbeat │
│ │ └─ sleep PollIntervalMs │
│ └───────── loop until client disconnects ───────────│
└──────────────────────────────────────────────────────┘
SSE Wire Format
When a client connects to your SSE endpoint, the HTTP response stream follows the Server-Sent Events specification. Here's exactly what the browser receives:
Published Message
Given this message type and publish call:
public sealed record OrderEvent(string OrderId, string Status, decimal Total, DateTimeOffset OccurredAt);
await publisher.PublishAsync("orders", new OrderEvent("ORD-42", "Shipped", 99.95m, DateTimeOffset.UtcNow));
The SSE stream sends:
event: message
id: 1700000000000-0
data: {"orderId":"ORD-42","status":"Shipped","total":99.95,"occurredAt":"2026-04-23T12:00:00+00:00"}
| SSE Field | Value | Source |
|---|---|---|
event: |
message |
RedisSseOptions.MessageEventType (default "message") |
id: |
1700000000000-0 |
Redis Stream entry ID — auto-generated by Redis |
data: |
JSON | Your TMessage serialized with camelCase naming |
Heartbeat Event
When no messages arrive for HeartbeatIntervalSeconds and an ISseHeartbeatFactory<T> is registered:
public sealed record KeepAlive(string Type, DateTimeOffset Timestamp);
public class MyHeartbeatFactory : ISseHeartbeatFactory<KeepAlive>
{
public KeepAlive Create(string channel) =>
new("heartbeat", DateTimeOffset.UtcNow);
}
The SSE stream sends:
event: heartbeat
id: 1700000000000-0
data: {"type":"heartbeat","timestamp":"2026-04-23T12:00:15+00:00"}
| SSE Field | Value | Source |
|---|---|---|
event: |
heartbeat |
Always "heartbeat" (hardcoded, not configurable) |
id: |
1700000000000-0 |
The current Redis cursor (last known stream position) |
data: |
JSON | Your heartbeat factory's Create() return value |
History Replay Event
When an ISseHistoryReader<T> provides entries, each yields:
event: message
id: db-evt-1001
data: {"orderId":"ORD-40","status":"Created","total":50.00,"occurredAt":"2026-04-22T08:00:00+00:00"}
event: message
id: db-evt-1002
data: {"orderId":"ORD-41","status":"Paid","total":75.00,"occurredAt":"2026-04-22T09:30:00+00:00"}
| SSE Field | Value | Source |
|---|---|---|
event: |
message |
SseHistoryEntry.EventType (default "message", overridable per entry) |
id: |
db-evt-1001 |
SseHistoryEntry.EventId — any string (database PK, UUID, etc.) |
data: |
JSON | SseHistoryEntry.Message |
History events are sent first, before any live transport entries. Once history replay completes, the stream seamlessly transitions to the live poll loop.
Reconnection
When a client disconnects and reconnects, the browser automatically sends the Last-Event-ID header with the id: of the last received event. The stream service uses this to:
- Replay any history entries after that ID (if
ISseHistoryReaderis registered). - Resume the live transport from that cursor position — only new entries after the last-seen ID are delivered.
This is automatic and requires no consumer-side code. The browser's EventSource API handles it natively.
Replay and History Backfill
Implement ISseHistoryReader<TMessage> to replay missed events from a database, API, or any external source before switching to the live transport:
public class OrderEventHistoryReader(AppDbContext db) : ISseHistoryReader<OrderEvent>
{
public async IAsyncEnumerable<SseHistoryEntry<OrderEvent>> ReadAfterAsync(
string channel,
string? lastEventId,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var query = db.OrderEvents
.Where(e => e.Channel == channel)
.OrderBy(e => e.CreatedAt)
.AsQueryable();
if (lastEventId is not null)
query = query.Where(e => e.Id.CompareTo(lastEventId) > 0);
await foreach (var evt in query.AsAsyncEnumerable().WithCancellation(cancellationToken))
{
yield return new SseHistoryEntry<OrderEvent>(
EventId: evt.Id,
Message: new OrderEvent(evt.OrderId, evt.Status, evt.Total, evt.OccurredAt));
}
}
}
Register it:
builder.Services.AddSseHistoryReader<OrderEvent, OrderEventHistoryReader>();
Key points:
lastEventIdcan be a transport ID (for example Redis"1700000000000-0"or SQL"42") or any custom ID your history reader assigned. Your reader decides how to interpret it.- History entries with null
EventId, nullMessage, or whitespace-onlyEventIdare silently skipped. - If
SseHistoryEntry.EventTypeis null or whitespace, it falls back to the selected transport'sMessageEventType.
Heartbeats
Implement ISseHeartbeatFactory<TMessage> to create typed heartbeat messages that keep the SSE connection alive and prevent proxies/load balancers from timing out:
public class DashboardHeartbeatFactory : ISseHeartbeatFactory<DashboardUpdate>
{
public DashboardUpdate Create(string channel) =>
new(Type: "heartbeat", Channel: channel, Timestamp: DateTimeOffset.UtcNow, Data: null);
}
Register it:
builder.Services.AddSseHeartbeatFactory<DashboardUpdate, DashboardHeartbeatFactory>();
Behavior:
- Heartbeats are emitted with SSE event type
"heartbeat"— distinct fromMessageEventType, so clients can filter them easily. - The
id:field is set to the current transport cursor (the last known stream position). - If no
ISseHeartbeatFactory<T>is registered, no heartbeat events are sent. The connection stays open but silent during idle periods. Clients relying on proxy keep-alive may need to set their own timeouts. - The heartbeat timer resets after every batch of real messages or after a heartbeat is sent.
Channel Key Building
Channel names are sanitized before being used as Redis keys:
| Input Channel | Redis Key (with default prefix sse:) |
|---|---|
"orders" |
sse:orders |
"user:123" |
sse:user:123 |
" My Topic " |
sse:my topic (trimmed + lowercased) |
"" or whitespace |
sse:default |
- Channels are trimmed and lowercased —
"Orders"and"orders"map to the same stream. - Empty/whitespace channels fall back to
"default". - The prefix is configurable via
RedisSseOptions.StreamKeyPrefix.
Serialization
All messages are serialized/deserialized using System.Text.Json with these settings:
new JsonSerializerOptions(JsonSerializerDefaults.Web)
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
This means:
- C#
OrderId→ JSONorderId(camelCase property names) DateTimeOffset→ ISO 8601 (e.g.,"2026-04-23T12:00:00+00:00")decimal→ JSON number (e.g.,99.95)enum→ integer by default (addJsonStringEnumConverteron your type if you want string values)nullproperties → omitted by default (Web defaults includeDefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)
Your TMessage must be serializable and deserializable by System.Text.Json. No attributes or annotations are required — any POCO, record, or class works.
Connection Lifecycle
Redis Connection
- If
RedisSseOptions.ConnectionMultiplexeris provided, the package reuses it and does not dispose it. - If only
ConnectionStringis set, the package creates its ownIConnectionMultiplexerwith:AbortOnConnectFail = false— won't throw on startup if Redis is temporarily down.ConnectTimeout,SyncTimeout,AsyncTimeout— minimum 15 seconds each.
- The internal
RedisSseServiceimplementsIAsyncDisposable. When the DI container disposes singletons at shutdown, the Redis connection is closed (only if the package owns it).
SQL Connections
- PostgreSQL and SQL Server transports open short-lived pooled ADO.NET connections for publish, poll, and cursor resolution operations.
- They require the matching event table to exist before use. Embedded scripts are listed by
SqlSse.PostgreSqlResourcesandSqlSse.SqlServerResources. - Database-backed streams poll the table, so tune
PollIntervalMsand database connection pool limits for your expected concurrent SSE clients.
SSE Connection
- Each
StreamAsynccall runs for the lifetime of the HTTP request. - When the client disconnects,
CancellationTokenfires → the async enumerator exits → ASP.NET Core closes the response. - There is no server-side connection limit — that's managed by your ASP.NET Core configuration (
Kestrelconcurrent connections, reverse proxy timeouts, etc.).
Error Handling
Publishing
| Scenario | Behavior |
|---|---|
channel is null/empty |
ArgumentException thrown |
message is null |
ArgumentNullException thrown |
Payload exceeds MaxPayloadBytes |
InvalidOperationException thrown — message is not sent |
| Redis or database is down | Provider exception propagated to caller |
Streaming
| Scenario | Behavior |
|---|---|
channel is null/empty |
ArgumentException thrown |
| Redis or database read fails | Warning logged, retries after PollIntervalMs — does not break the SSE stream |
| Payload deserialization fails | Warning logged, entry skipped — stream continues with next entry |
History entry has null EventId/Message |
Entry silently skipped |
| Client disconnects | CancellationToken fires, async enumerator exits cleanly |
Browser Client
Basic — EventSource API
const source = new EventSource('/events/orders/stream');
// Listen for published messages (event type: "message")
source.addEventListener('message', (e) => {
const data = JSON.parse(e.data);
console.log(`[${e.lastEventId}] New event:`, data);
// data = { orderId: "ORD-42", status: "Shipped", total: 99.95, occurredAt: "..." }
});
// Listen for heartbeats (event type: "heartbeat")
source.addEventListener('heartbeat', (e) => {
console.log('Connection alive at:', e.lastEventId);
});
// On error / disconnect — browser auto-reconnects with Last-Event-ID
source.onerror = (e) => {
if (source.readyState === EventSource.CONNECTING) {
console.log('SSE reconnecting...');
} else {
console.error('SSE connection failed');
}
};
Custom Event Types
If you set MessageEventType to something other than "message":
options.MessageEventType = "order-update";
Then the browser must listen for that specific event type:
// "message" event listener will NOT fire
// Must use the custom event type:
source.addEventListener('order-update', (e) => {
const data = JSON.parse(e.data);
console.log('Order update:', data);
});
With Authentication
EventSource does not support custom headers. For authenticated endpoints, pass the token via query string or cookies:
// Option 1: Query string (ensure your endpoint reads it)
const source = new EventSource('/events/stream?access_token=eyJhbG...');
// Option 2: Cookies (if using cookie auth) — works automatically
const source = new EventSource('/events/stream', { withCredentials: true });
Fetch-based SSE (for custom headers)
const response = await fetch('/events/orders/stream', {
headers: { 'Authorization': 'Bearer eyJhbG...' }
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
// Parse SSE frames manually or use a library like eventsource-parser
console.log(text);
}
Use Cases
Notifications Feed
public sealed record Notification(string Title, string Body, string Severity, DateTimeOffset CreatedAt);
builder.Services.AddSse(o => o.ConnectionString = "localhost:6379");
app.MapPost("/notifications/{userId}", async (string userId, Notification n, ISsePublisher pub, CancellationToken ct) =>
{
await pub.PublishAsync($"user:{userId}", n, ct);
return Results.Accepted();
});
app.MapGet("/notifications/{userId}/stream", (string userId, HttpContext http, ISseStreamService svc) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
return TypedResults.ServerSentEvents(
svc.StreamAsync<Notification>($"user:{userId}", lastId.FirstOrDefault(), http.RequestAborted));
});
Live Dashboard
public sealed record MetricSnapshot(string Name, double Value, string Unit, DateTimeOffset Timestamp);
app.MapPost("/dashboard/metrics", async (MetricSnapshot snapshot, ISsePublisher pub, CancellationToken ct) =>
{
await pub.PublishAsync("dashboard:metrics", snapshot, ct);
return Results.Ok();
});
app.MapGet("/dashboard/metrics/stream", (HttpContext http, ISseStreamService svc) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
return TypedResults.ServerSentEvents(
svc.StreamAsync<MetricSnapshot>("dashboard:metrics", lastId.FirstOrDefault(), http.RequestAborted));
});
Real-Time Chat
public sealed record ChatMessage(string SenderId, string SenderName, string Text, DateTimeOffset SentAt);
builder.Services.AddSse<ChatMessage, ChatHistoryReader, ChatHeartbeatFactory>(o =>
{
o.ConnectionString = "localhost:6379";
o.StreamKeyPrefix = "chat:";
});
app.MapPost("/chat/{room}/messages", async (string room, ChatMessage msg, ISsePublisher pub, CancellationToken ct) =>
{
await pub.PublishAsync(room, msg, ct);
return Results.Accepted();
});
app.MapGet("/chat/{room}/stream", (string room, HttpContext http, ISseStreamService svc) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
return TypedResults.ServerSentEvents(
svc.StreamAsync<ChatMessage>(room, lastId.FirstOrDefault(), http.RequestAborted));
});
Audit Log Stream
public sealed record AuditEntry(string UserId, string Action, string Resource, DateTimeOffset Timestamp);
app.MapPost("/audit", async (AuditEntry entry, ISsePublisher pub, CancellationToken ct) =>
{
await pub.PublishAsync("audit:all", entry, ct);
return Results.Ok();
});
app.MapGet("/audit/stream", (HttpContext http, ISseStreamService svc) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
return TypedResults.ServerSentEvents(
svc.StreamAsync<AuditEntry>("audit:all", lastId.FirstOrDefault(), http.RequestAborted));
});
How It Works
┌─────────────────────────────────────────────────────────┐
│ Publisher │
│ │
│ publisher.PublishAsync("orders", myEvent) │
│ │ │
│ ▼ │
│ ┌──────────────┐ XADD sse:orders │
│ │ JSON serialize│──────────────────► Redis Stream │
│ │ (camelCase) │ MAXLEN ~ 50000 │
│ └──────────────┘ │
│ │
│ Returns: "1700000000000-0" (stream entry ID) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Stream Service │
│ │
│ GET /events/orders/stream │
│ Last-Event-ID: 1700000000000-0 │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ History Reader │ ◄── Optional: DB/API backfill │
│ │ (ISseHistoryReader)│ Yields SseHistoryEntry<T> │
│ └──────┬───────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Redis XREAD loop │ ◄── Poll every 300ms (default) │
│ │ ├─ entries │ Read up to 50 per cycle │
│ │ │ └─ deser. │ │
│ │ └─ idle │ │
│ │ └─ heartbeat│ ◄── Every 15s if factory exists │
│ └──────┬───────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ SseItem<TMessage>│ ──► TypedResults.ServerSentEvents │
│ └──────────────────┘ │
│ │
│ SSE output: │
│ event: message │
│ id: 1700000000001-0 │
│ data: {"orderId":"ORD-42","status":"Shipped",...} │
└─────────────────────────────────────────────────────────┘
Requirements
- .NET 10+
- Redis 5.0+ for Redis Streams transport
- PostgreSQL or SQL Server for SQL-backed transports
- ASP.NET Core (for
TypedResults.ServerSentEvents)
License
MIT
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.Data.SqlClient (>= 7.0.1)
- Npgsql (>= 10.0.2)
- StackExchange.Redis (>= 2.13.1)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on LowCodeHub.SSE:
| Package | Downloads |
|---|---|
|
LowCodeHub.Jobs
Durable, distributed background-jobs library for ASP.NET Core: typed handlers, retries, recurring/delayed scheduling, cooperative cancellation, and Redis-Streams-backed SSE status streaming. |
GitHub repositories
This package is not used by any popular GitHub repositories.