LowCodeHub.SSE
0.0.4
See the version list below for details.
dotnet add package LowCodeHub.SSE --version 0.0.4
NuGet\Install-Package LowCodeHub.SSE -Version 0.0.4
<PackageReference Include="LowCodeHub.SSE" Version="0.0.4" />
<PackageVersion Include="LowCodeHub.SSE" Version="0.0.4" />
<PackageReference Include="LowCodeHub.SSE" />
paket add LowCodeHub.SSE --version 0.0.4
#r "nuget: LowCodeHub.SSE, 0.0.4"
#:package LowCodeHub.SSE@0.0.4
#addin nuget:?package=LowCodeHub.SSE&version=0.0.4
#tool nuget:?package=LowCodeHub.SSE&version=0.0.4
LowCodeHub.SSE
A generic Redis-Streams-backed SSE pub/sub library for ASP.NET Core. Durable, replayable, heartbeat-aware — bring your own message model and map your own endpoints.
Why This Library?
| Feature | LowCodeHub.SSE | Raw SSE + Redis | SignalR |
|---|---|---|---|
| Transport | Native SSE (zero JS deps) | Manual plumbing | WebSocket / fallback |
| Persistence | Redis Streams (ordered, durable) | Manual | None (in-memory) |
| Replay | Built-in Last-Event-ID |
Manual cursor tracking | Not supported |
| Heartbeats | Automatic (configurable interval) | Manual | Built-in ping |
| Message model | Your own type (generic TMessage) |
Manual serialization | Your own type |
| History backfill | Mixed-source (DB + Redis) | Manual | Manual |
| Payload guardrails | Built-in size limits | Manual | None |
| Stream trimming | Automatic bounded storage | Manual XTRIM |
N/A |
Installation
dotnet add package LowCodeHub.SSE
Quick Start
Define any message type — no base class or interface required:
public sealed record OrderEvent(string OrderId, string Status, decimal Total, DateTimeOffset OccurredAt);
Register services and wire up endpoints:
using LowCodeHub.SSE.Contracts;
using LowCodeHub.SSE.Extensions;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSse(options =>
{
options.ConnectionString = "localhost:6379";
options.StreamKeyPrefix = "sse:";
options.MaxStreamLength = 100_000;
});
var app = builder.Build();
// Publish an event
app.MapPost("/orders/{orderId}/events", async (
string orderId, OrderEvent payload,
ISsePublisher publisher, CancellationToken ct) =>
{
var streamId = await publisher.PublishAsync($"orders:{orderId}", payload, ct);
return Results.Accepted(value: new { streamId });
});
// Stream events via SSE
app.MapGet("/orders/{orderId}/events/stream", (
string orderId, HttpContext http,
ISseStreamService streamService) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
var stream = streamService.StreamAsync<OrderEvent>(
$"orders:{orderId}", lastId.FirstOrDefault(), http.RequestAborted);
return TypedResults.ServerSentEvents(stream);
});
app.Run();
That's it. Messages are persisted to Redis Streams, clients receive them via SSE, replay is handled automatically through Last-Event-ID, and heartbeats keep connections alive.
Table of Contents
- Service Registration
- Configuration
- API Reference
- Publishing Messages
- Streaming Events (SSE)
- SSE Wire Format
- Replay and History Backfill
- Heartbeats
- Channel Key Building
- Serialization
- Connection Lifecycle
- Error Handling
- Browser Client
- Use Cases
- How It Works
- Requirements
- License
Service Registration
Basic (no history, no heartbeats)
builder.Services.AddSse(options =>
{
options.ConnectionString = "localhost:6379";
});
With History Reader and Heartbeat Factory
builder.Services.AddSse<NotificationEvent, NotificationHistoryReader, NotificationHeartbeatFactory>(options =>
{
options.ConnectionString = "localhost:6379";
options.StreamKeyPrefix = "sse:";
options.MaxStreamLength = 100_000;
options.HeartbeatIntervalSeconds = 15;
});
Register History / Heartbeat Independently
builder.Services.AddSse(options => { options.ConnectionString = "..."; });
// Add just a history reader (no heartbeat factory)
builder.Services.AddSseHistoryReader<OrderEvent, OrderEventHistoryReader>();
// Or just a heartbeat factory (no history reader)
builder.Services.AddSseHeartbeatFactory<DashboardUpdate, DashboardHeartbeatFactory>();
With External Redis Connection
If your application already manages an IConnectionMultiplexer, pass it directly to avoid creating a second connection:
builder.Services.AddSse(options =>
{
options.ConnectionMultiplexer = existingMultiplexer; // package will NOT dispose this
});
Configuration
All options are configured through RedisSseOptions:
| Option | Type | Default | Description |
|---|---|---|---|
ConnectionString |
string |
"localhost:6379" |
Redis connection string. Ignored if ConnectionMultiplexer is set. |
ConnectionMultiplexer |
IConnectionMultiplexer? |
null |
Reuse an existing multiplexer. The package will not own or dispose it. |
StreamKeyPrefix |
string |
"sse:" |
Prefix for Redis Stream keys. Final key: {prefix}{channel}. |
MaxStreamLength |
long |
50,000 |
Max entries per stream. Older entries are auto-trimmed via XTRIM. |
UseApproximateTrimming |
bool |
true |
Use MAXLEN ~ for better Redis performance (slightly exceeds the cap). |
PollIntervalMs |
int |
300 |
Polling interval in ms when no new stream entries exist. Min: 20. |
ReadBatchSize |
int |
50 |
Entries read per Redis XREAD call. |
ReplayBatchSize |
int |
200 |
Entries read per replay batch from Redis range queries. |
HeartbeatIntervalSeconds |
int |
15 |
Seconds between heartbeat SSE events on idle connections. Min: 5. |
MessageEventType |
string |
"message" |
SSE event: field for published messages. |
MaxPayloadBytes |
int |
16,384 (16 KB) |
Maximum UTF-8 payload size per message. Rejects if exceeded. |
RequireAuthenticatedUser |
bool |
false |
Advisory flag for consumer endpoint authorization. |
Validation Rules
Options are validated at startup. The service throws InvalidOperationException if:
ConnectionStringis empty andConnectionMultiplexeris nullReadBatchSize≤ 0MaxPayloadBytes≤ 0PollIntervalMs< 20HeartbeatIntervalSeconds< 5MessageEventTypeis empty
API Reference
ISsePublisher
public interface ISsePublisher
{
ValueTask<string> PublishAsync<TMessage>(
string channel,
TMessage message,
CancellationToken cancellationToken = default);
}
| Parameter | Description |
|---|---|
channel |
Logical channel name (e.g., "orders", "user:123"). Mapped to Redis Stream key {StreamKeyPrefix}{channel}. |
message |
Any serializable object. Serialized to JSON with camelCase naming. |
| Returns | Redis Stream entry ID (e.g., "1700000000000-0"). |
Throws:
ArgumentException— ifchannelis null/empty.ArgumentNullException— ifmessageis null.InvalidOperationException— if serialized payload exceedsMaxPayloadBytes.
ISseStreamService
public interface ISseStreamService
{
IAsyncEnumerable<SseItem<TMessage>> StreamAsync<TMessage>(
string channel,
string? lastEventId,
CancellationToken cancellationToken = default);
}
| Parameter | Description |
|---|---|
channel |
Same channel name used in PublishAsync. |
lastEventId |
The Last-Event-ID header sent by the browser on reconnect. null for fresh connections. |
| Returns | Async stream of SseItem<TMessage> — passed directly to TypedResults.ServerSentEvents(). |
Behavior:
- If an
ISseHistoryReader<TMessage>is registered, replays missed events from your external source first. - Resolves a Redis cursor from
lastEventId(validates it's a Redis stream ID format like"1700000000000-0"). Falls back to the latest stream entry. - Enters a long-poll loop: reads up to
ReadBatchSizeentries per cycle, deserializes each payload, and yieldsSseItem<TMessage>. - When idle (no new entries), emits heartbeat events every
HeartbeatIntervalSecondsif anISseHeartbeatFactory<TMessage>is registered. - Sleeps
PollIntervalMsbetween empty reads. - Runs until
cancellationTokenis cancelled (client disconnects).
ISseHistoryReader<TMessage>
public interface ISseHistoryReader<TMessage>
{
IAsyncEnumerable<SseHistoryEntry<TMessage>> ReadAfterAsync(
string channel,
string? lastEventId,
CancellationToken cancellationToken = default);
}
Consumer-implemented. Called once at the start of StreamAsync to replay events from an external source (database, API, archive) before switching to the live Redis stream.
ISseHeartbeatFactory<TMessage>
public interface ISseHeartbeatFactory<TMessage>
{
TMessage Create(string channel);
}
Consumer-implemented. Creates a typed heartbeat message. The resulting SseItem is emitted with event type "heartbeat" (not MessageEventType).
SseHistoryEntry<TMessage>
public sealed record SseHistoryEntry<TMessage>(
string EventId,
TMessage Message,
string EventType = "message");
| Property | Description |
|---|---|
EventId |
Unique event identifier — becomes the SSE id: field. Can be any string (e.g., a database PK, a UUID, a Redis stream ID). |
Message |
The deserialized message object. |
EventType |
SSE event: field. Defaults to "message". Override to emit custom event types for history entries. |
Registration Extension Methods
// Core — registers ISsePublisher + ISseStreamService
IServiceCollection AddSse(Action<RedisSseOptions> configure);
// Core + history reader + heartbeat factory for a specific TMessage
IServiceCollection AddSse<TMessage, THistoryReader, THeartbeatFactory>(Action<RedisSseOptions> configure);
// Register only a history reader (call after AddSse)
IServiceCollection AddSseHistoryReader<TMessage, THistoryReader>();
// Register only a heartbeat factory (call after AddSse)
IServiceCollection AddSseHeartbeatFactory<TMessage, THeartbeatFactory>();
All registrations use TryAddSingleton — first registration wins. The RedisSseService is registered as a singleton and implements IAsyncDisposable (disposes the Redis connection if it created one).
Publishing Messages
Inject ISsePublisher and call PublishAsync:
public class NotificationService(ISsePublisher publisher)
{
public async Task<string> SendAsync(string userId, NotificationEvent evt, CancellationToken ct)
{
// Channel can be any string — use it to scope streams per user, entity, topic, etc.
string streamId = await publisher.PublishAsync($"user:{userId}", evt, ct);
return streamId; // Redis Stream entry ID, e.g. "1700000000000-0"
}
}
What happens internally:
- Your message is serialized to JSON using
System.Text.Jsonwith camelCase naming (JsonSerializerDefaults.Web+JsonNamingPolicy.CamelCase). - UTF-8 byte count is checked against
MaxPayloadBytes. If it exceeds the limit, anInvalidOperationExceptionis thrown — the message is not sent. - The serialized payload is written to Redis Stream
{StreamKeyPrefix}{sanitized_channel}viaXADD. - Redis auto-trims the stream to
MaxStreamLengthentries (approximate ifUseApproximateTrimmingistrue). - Returns the Redis stream entry ID.
Streaming Events (SSE)
Inject ISseStreamService and return TypedResults.ServerSentEvents:
app.MapGet("/events/{channel}/stream", (
string channel, HttpContext http,
ISseStreamService streamService) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
var stream = streamService.StreamAsync<NotificationEvent>(
channel,
lastId.FirstOrDefault(), // null on first connect, set on reconnect
http.RequestAborted); // cancelled when client disconnects
return TypedResults.ServerSentEvents(stream);
});
Stream lifecycle:
Client connects (GET /events/orders/stream)
│
▼
┌─ History phase (if ISseHistoryReader<T> registered) ─┐
│ ReadAfterAsync(channel, lastEventId) │
│ Yields SseHistoryEntry<T> → SseItem<T> │
└──────────────────────┬───────────────────────────────┘
▼
┌─ Live phase ─────────────────────────────────────────┐
│ Resolve Redis cursor from lastEventId │
│ │ │
│ ▼ │
│ ┌─► XREAD sse:{channel} after {cursor} │
│ │ ├─ entries found → deserialize, yield SseItem │
│ │ └─ no entries ──► heartbeat? → yield heartbeat │
│ │ └─ sleep PollIntervalMs │
│ └───────── loop until client disconnects ───────────│
└──────────────────────────────────────────────────────┘
SSE Wire Format
When a client connects to your SSE endpoint, the HTTP response stream follows the Server-Sent Events specification. Here's exactly what the browser receives:
Published Message
Given this message type and publish call:
public sealed record OrderEvent(string OrderId, string Status, decimal Total, DateTimeOffset OccurredAt);
await publisher.PublishAsync("orders", new OrderEvent("ORD-42", "Shipped", 99.95m, DateTimeOffset.UtcNow));
The SSE stream sends:
event: message
id: 1700000000000-0
data: {"orderId":"ORD-42","status":"Shipped","total":99.95,"occurredAt":"2026-04-23T12:00:00+00:00"}
| SSE Field | Value | Source |
|---|---|---|
event: |
message |
RedisSseOptions.MessageEventType (default "message") |
id: |
1700000000000-0 |
Redis Stream entry ID — auto-generated by Redis |
data: |
JSON | Your TMessage serialized with camelCase naming |
Heartbeat Event
When no messages arrive for HeartbeatIntervalSeconds and an ISseHeartbeatFactory<T> is registered:
public sealed record KeepAlive(string Type, DateTimeOffset Timestamp);
public class MyHeartbeatFactory : ISseHeartbeatFactory<KeepAlive>
{
public KeepAlive Create(string channel) =>
new("heartbeat", DateTimeOffset.UtcNow);
}
The SSE stream sends:
event: heartbeat
id: 1700000000000-0
data: {"type":"heartbeat","timestamp":"2026-04-23T12:00:15+00:00"}
| SSE Field | Value | Source |
|---|---|---|
event: |
heartbeat |
Always "heartbeat" (hardcoded, not configurable) |
id: |
1700000000000-0 |
The current Redis cursor (last known stream position) |
data: |
JSON | Your heartbeat factory's Create() return value |
History Replay Event
When an ISseHistoryReader<T> provides entries, each yields:
event: message
id: db-evt-1001
data: {"orderId":"ORD-40","status":"Created","total":50.00,"occurredAt":"2026-04-22T08:00:00+00:00"}
event: message
id: db-evt-1002
data: {"orderId":"ORD-41","status":"Paid","total":75.00,"occurredAt":"2026-04-22T09:30:00+00:00"}
| SSE Field | Value | Source |
|---|---|---|
event: |
message |
SseHistoryEntry.EventType (default "message", overridable per entry) |
id: |
db-evt-1001 |
SseHistoryEntry.EventId — any string (database PK, UUID, etc.) |
data: |
JSON | SseHistoryEntry.Message |
History events are sent first, before any live Redis entries. Once history replay completes, the stream seamlessly transitions to the live poll loop.
Reconnection
When a client disconnects and reconnects, the browser automatically sends the Last-Event-ID header with the id: of the last received event. The stream service uses this to:
- Replay any history entries after that ID (if
ISseHistoryReaderis registered). - Resume the Redis stream from that cursor position — only new entries after the last-seen ID are delivered.
This is automatic and requires no consumer-side code. The browser's EventSource API handles it natively.
Replay and History Backfill
Implement ISseHistoryReader<TMessage> to replay missed events from a database, API, or any external source before switching to live Redis Streams:
public class OrderEventHistoryReader(AppDbContext db) : ISseHistoryReader<OrderEvent>
{
public async IAsyncEnumerable<SseHistoryEntry<OrderEvent>> ReadAfterAsync(
string channel,
string? lastEventId,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var query = db.OrderEvents
.Where(e => e.Channel == channel)
.OrderBy(e => e.CreatedAt)
.AsQueryable();
if (lastEventId is not null)
query = query.Where(e => e.Id.CompareTo(lastEventId) > 0);
await foreach (var evt in query.AsAsyncEnumerable().WithCancellation(cancellationToken))
{
yield return new SseHistoryEntry<OrderEvent>(
EventId: evt.Id,
Message: new OrderEvent(evt.OrderId, evt.Status, evt.Total, evt.OccurredAt));
}
}
}
Register it:
builder.Services.AddSseHistoryReader<OrderEvent, OrderEventHistoryReader>();
Key points:
lastEventIdcan be a Redis stream ID (e.g.,"1700000000000-0") or any custom ID your history reader assigned. Your reader decides how to interpret it.- History entries with null
EventId, nullMessage, or whitespace-onlyEventIdare silently skipped. - If
SseHistoryEntry.EventTypeis null or whitespace, it falls back toRedisSseOptions.MessageEventType.
Heartbeats
Implement ISseHeartbeatFactory<TMessage> to create typed heartbeat messages that keep the SSE connection alive and prevent proxies/load balancers from timing out:
public class DashboardHeartbeatFactory : ISseHeartbeatFactory<DashboardUpdate>
{
public DashboardUpdate Create(string channel) =>
new(Type: "heartbeat", Channel: channel, Timestamp: DateTimeOffset.UtcNow, Data: null);
}
Register it:
builder.Services.AddSseHeartbeatFactory<DashboardUpdate, DashboardHeartbeatFactory>();
Behavior:
- Heartbeats are emitted with SSE event type
"heartbeat"— distinct fromMessageEventType, so clients can filter them easily. - The
id:field is set to the current Redis cursor (the last known stream position). - If no
ISseHeartbeatFactory<T>is registered, no heartbeat events are sent. The connection stays open but silent during idle periods. Clients relying on proxy keep-alive may need to set their own timeouts. - The heartbeat timer resets after every batch of real messages or after a heartbeat is sent.
Channel Key Building
Channel names are sanitized before being used as Redis keys:
| Input Channel | Redis Key (with default prefix sse:) |
|---|---|
"orders" |
sse:orders |
"user:123" |
sse:user:123 |
" My Topic " |
sse:my topic (trimmed + lowercased) |
"" or whitespace |
sse:default |
- Channels are trimmed and lowercased —
"Orders"and"orders"map to the same stream. - Empty/whitespace channels fall back to
"default". - The prefix is configurable via
RedisSseOptions.StreamKeyPrefix.
Serialization
All messages are serialized/deserialized using System.Text.Json with these settings:
new JsonSerializerOptions(JsonSerializerDefaults.Web)
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
This means:
- C#
OrderId→ JSONorderId(camelCase property names) DateTimeOffset→ ISO 8601 (e.g.,"2026-04-23T12:00:00+00:00")decimal→ JSON number (e.g.,99.95)enum→ integer by default (addJsonStringEnumConverteron your type if you want string values)nullproperties → omitted by default (Web defaults includeDefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)
Your TMessage must be serializable and deserializable by System.Text.Json. No attributes or annotations are required — any POCO, record, or class works.
Connection Lifecycle
Redis Connection
- If
RedisSseOptions.ConnectionMultiplexeris provided, the package reuses it and does not dispose it. - If only
ConnectionStringis set, the package creates its ownIConnectionMultiplexerwith:AbortOnConnectFail = false— won't throw on startup if Redis is temporarily down.ConnectTimeout,SyncTimeout,AsyncTimeout— minimum 15 seconds each.
- The internal
RedisSseServiceimplementsIAsyncDisposable. When the DI container disposes singletons at shutdown, the Redis connection is closed (only if the package owns it).
SSE Connection
- Each
StreamAsynccall runs for the lifetime of the HTTP request. - When the client disconnects,
CancellationTokenfires → the async enumerator exits → ASP.NET Core closes the response. - There is no server-side connection limit — that's managed by your ASP.NET Core configuration (
Kestrelconcurrent connections, reverse proxy timeouts, etc.).
Error Handling
Publishing
| Scenario | Behavior |
|---|---|
channel is null/empty |
ArgumentException thrown |
message is null |
ArgumentNullException thrown |
Payload exceeds MaxPayloadBytes |
InvalidOperationException thrown — message is not sent |
| Redis is down | RedisException propagated to caller |
Streaming
| Scenario | Behavior |
|---|---|
channel is null/empty |
ArgumentException thrown |
| Redis read fails | Warning logged, retries after PollIntervalMs — does not break the SSE stream |
| Payload deserialization fails | Warning logged, entry skipped — stream continues with next entry |
History entry has null EventId/Message |
Entry silently skipped |
| Client disconnects | CancellationToken fires, async enumerator exits cleanly |
Browser Client
Basic — EventSource API
const source = new EventSource('/events/orders/stream');
// Listen for published messages (event type: "message")
source.addEventListener('message', (e) => {
const data = JSON.parse(e.data);
console.log(`[${e.lastEventId}] New event:`, data);
// data = { orderId: "ORD-42", status: "Shipped", total: 99.95, occurredAt: "..." }
});
// Listen for heartbeats (event type: "heartbeat")
source.addEventListener('heartbeat', (e) => {
console.log('Connection alive at:', e.lastEventId);
});
// On error / disconnect — browser auto-reconnects with Last-Event-ID
source.onerror = (e) => {
if (source.readyState === EventSource.CONNECTING) {
console.log('SSE reconnecting...');
} else {
console.error('SSE connection failed');
}
};
Custom Event Types
If you set MessageEventType to something other than "message":
options.MessageEventType = "order-update";
Then the browser must listen for that specific event type:
// "message" event listener will NOT fire
// Must use the custom event type:
source.addEventListener('order-update', (e) => {
const data = JSON.parse(e.data);
console.log('Order update:', data);
});
With Authentication
EventSource does not support custom headers. For authenticated endpoints, pass the token via query string or cookies:
// Option 1: Query string (ensure your endpoint reads it)
const source = new EventSource('/events/stream?access_token=eyJhbG...');
// Option 2: Cookies (if using cookie auth) — works automatically
const source = new EventSource('/events/stream', { withCredentials: true });
Fetch-based SSE (for custom headers)
const response = await fetch('/events/orders/stream', {
headers: { 'Authorization': 'Bearer eyJhbG...' }
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
// Parse SSE frames manually or use a library like eventsource-parser
console.log(text);
}
Use Cases
Notifications Feed
public sealed record Notification(string Title, string Body, string Severity, DateTimeOffset CreatedAt);
builder.Services.AddSse(o => o.ConnectionString = "localhost:6379");
app.MapPost("/notifications/{userId}", async (string userId, Notification n, ISsePublisher pub, CancellationToken ct) =>
{
await pub.PublishAsync($"user:{userId}", n, ct);
return Results.Accepted();
});
app.MapGet("/notifications/{userId}/stream", (string userId, HttpContext http, ISseStreamService svc) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
return TypedResults.ServerSentEvents(
svc.StreamAsync<Notification>($"user:{userId}", lastId.FirstOrDefault(), http.RequestAborted));
});
Live Dashboard
public sealed record MetricSnapshot(string Name, double Value, string Unit, DateTimeOffset Timestamp);
app.MapPost("/dashboard/metrics", async (MetricSnapshot snapshot, ISsePublisher pub, CancellationToken ct) =>
{
await pub.PublishAsync("dashboard:metrics", snapshot, ct);
return Results.Ok();
});
app.MapGet("/dashboard/metrics/stream", (HttpContext http, ISseStreamService svc) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
return TypedResults.ServerSentEvents(
svc.StreamAsync<MetricSnapshot>("dashboard:metrics", lastId.FirstOrDefault(), http.RequestAborted));
});
Real-Time Chat
public sealed record ChatMessage(string SenderId, string SenderName, string Text, DateTimeOffset SentAt);
builder.Services.AddSse<ChatMessage, ChatHistoryReader, ChatHeartbeatFactory>(o =>
{
o.ConnectionString = "localhost:6379";
o.StreamKeyPrefix = "chat:";
});
app.MapPost("/chat/{room}/messages", async (string room, ChatMessage msg, ISsePublisher pub, CancellationToken ct) =>
{
await pub.PublishAsync(room, msg, ct);
return Results.Accepted();
});
app.MapGet("/chat/{room}/stream", (string room, HttpContext http, ISseStreamService svc) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
return TypedResults.ServerSentEvents(
svc.StreamAsync<ChatMessage>(room, lastId.FirstOrDefault(), http.RequestAborted));
});
Audit Log Stream
public sealed record AuditEntry(string UserId, string Action, string Resource, DateTimeOffset Timestamp);
app.MapPost("/audit", async (AuditEntry entry, ISsePublisher pub, CancellationToken ct) =>
{
await pub.PublishAsync("audit:all", entry, ct);
return Results.Ok();
});
app.MapGet("/audit/stream", (HttpContext http, ISseStreamService svc) =>
{
http.Request.Headers.TryGetValue("Last-Event-ID", out var lastId);
return TypedResults.ServerSentEvents(
svc.StreamAsync<AuditEntry>("audit:all", lastId.FirstOrDefault(), http.RequestAborted));
});
How It Works
┌─────────────────────────────────────────────────────────┐
│ Publisher │
│ │
│ publisher.PublishAsync("orders", myEvent) │
│ │ │
│ ▼ │
│ ┌──────────────┐ XADD sse:orders │
│ │ JSON serialize│──────────────────► Redis Stream │
│ │ (camelCase) │ MAXLEN ~ 50000 │
│ └──────────────┘ │
│ │
│ Returns: "1700000000000-0" (stream entry ID) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Stream Service │
│ │
│ GET /events/orders/stream │
│ Last-Event-ID: 1700000000000-0 │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ History Reader │ ◄── Optional: DB/API backfill │
│ │ (ISseHistoryReader)│ Yields SseHistoryEntry<T> │
│ └──────┬───────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Redis XREAD loop │ ◄── Poll every 300ms (default) │
│ │ ├─ entries │ Read up to 50 per cycle │
│ │ │ └─ deser. │ │
│ │ └─ idle │ │
│ │ └─ heartbeat│ ◄── Every 15s if factory exists │
│ └──────┬───────────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ SseItem<TMessage>│ ──► TypedResults.ServerSentEvents │
│ └──────────────────┘ │
│ │
│ SSE output: │
│ event: message │
│ id: 1700000000001-0 │
│ data: {"orderId":"ORD-42","status":"Shipped",...} │
└─────────────────────────────────────────────────────────┘
Requirements
- .NET 10+
- Redis 5.0+ (for Streams support)
- ASP.NET Core (for
TypedResults.ServerSentEvents)
License
MIT
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- StackExchange.Redis (>= 2.13.1)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on LowCodeHub.SSE:
| Package | Downloads |
|---|---|
|
LowCodeHub.Jobs
Durable, distributed background-jobs library for ASP.NET Core: typed handlers, retries, recurring/delayed scheduling, cooperative cancellation, and Redis-Streams-backed SSE status streaming. |
GitHub repositories
This package is not used by any popular GitHub repositories.