EventHorizon.EventStreaming
1.3.2
See the version list below for details.
dotnet add package EventHorizon.EventStreaming --version 1.3.2
NuGet\Install-Package EventHorizon.EventStreaming -Version 1.3.2
<PackageReference Include="EventHorizon.EventStreaming" Version="1.3.2" />
<PackageVersion Include="EventHorizon.EventStreaming" Version="1.3.2" />
<PackageReference Include="EventHorizon.EventStreaming" />
paket add EventHorizon.EventStreaming --version 1.3.2
#r "nuget: EventHorizon.EventStreaming, 1.3.2"
#:package EventHorizon.EventStreaming@1.3.2
#addin nuget:?package=EventHorizon.EventStreaming&version=1.3.2
#tool nuget:?package=EventHorizon.EventStreaming&version=1.3.2
EventHorizon
EventHorizon is a .NET framework for Event Sourcing and Event Streaming, providing a clean abstraction layer over multiple storage and streaming backends. Build event-driven applications with pluggable persistence (MongoDB, Elasticsearch, Apache Ignite, In-Memory) and messaging (Apache Pulsar, Kafka, In-Memory).
Table of Contents
- Features
- Supported Platforms
- NuGet Packages
- Quick Start
- Core Concepts
- Architecture
- Storage Backends
- Streaming Backends
- Configuration
- Testing
- CI/CD
- Samples
- Contributing
- License
Features
- Event Sourcing — Snapshot and View stores with automatic event application
- Event Streaming — Publish/subscribe with topic-based routing
- CQRS — Commands, Events, Requests, and Responses as first-class citizens
- Pluggable Backends — Swap storage and streaming providers without changing business logic
- Aggregate Pattern — Built-in aggregate lifecycle management with locking
- Middleware — Extensible pipeline for aggregate processing
- Multi-Stream Subscriptions — Subscribe to multiple event streams in a single consumer
- Migration Support — Built-in tooling for migrating between state schemas
Supported Platforms
| .NET Version | Support Level | End of Support |
|---|---|---|
| .NET 10 | ✅ LTS | November 2028 |
| .NET 9 | ✅ STS | May 2026 |
| .NET 8 | ✅ LTS | November 2026 |
NuGet Packages
All packages are published to NuGet.org with the Cts. prefix.
| Package | Description |
|---|---|
Cts.EventHorizon.Abstractions |
Core interfaces, models, and attributes |
Cts.EventHorizon.EventStore |
Event store abstractions (CRUD stores, locks) |
Cts.EventHorizon.EventStore.InMemory |
In-memory event store (great for testing) |
Cts.EventHorizon.EventStore.MongoDb |
MongoDB-backed event store |
Cts.EventHorizon.EventStore.ElasticSearch |
Elasticsearch-backed event store |
Cts.EventHorizon.EventStore.Ignite |
Apache Ignite-backed event store |
Cts.EventHorizon.EventStreaming |
Event streaming abstractions |
Cts.EventHorizon.EventStreaming.InMemory |
In-memory streaming (great for testing) |
Cts.EventHorizon.EventStreaming.Pulsar |
Apache Pulsar streaming provider |
Cts.EventHorizon.EventSourcing |
Event sourcing orchestration (aggregates, senders, subscriptions) |
Quick Start
1. Install packages
# Core + In-Memory (for getting started / testing)
dotnet add package Cts.EventHorizon.EventSourcing
dotnet add package Cts.EventHorizon.EventStore.InMemory
dotnet add package Cts.EventHorizon.EventStreaming.InMemory
2. Define your state
using EventHorizon.Abstractions.Attributes;
using EventHorizon.Abstractions.Interfaces;
using EventHorizon.Abstractions.Interfaces.Actions;
using EventHorizon.Abstractions.Interfaces.Handlers;
[SnapshotStore("my_app_accounts")]
[Stream("$type")]
public class Account : IState,
IHandleCommand<CreateAccount>,
IApplyEvent<AccountCreated>
{
public string Id { get; set; }
public string Name { get; set; }
public int Balance { get; set; }
public void Handle(CreateAccount command, AggregateContext context)
{
context.AddEvent(new AccountCreated(command.Name, command.InitialBalance));
}
public void Apply(AccountCreated @event)
{
Name = @event.Name;
Balance = @event.Balance;
}
}
3. Define actions
using EventHorizon.Abstractions.Interfaces.Actions;
public record CreateAccount(string Name, int InitialBalance) : ICommand<Account>;
public record AccountCreated(string Name, int Balance) : IEvent<Account>;
4. Register services
using EventHorizon.Abstractions.Extensions;
using EventHorizon.EventSourcing.Extensions;
using EventHorizon.EventStore.InMemory.Extensions;
using EventHorizon.EventStreaming.InMemory.Extensions;
services.AddEventHorizon(x =>
{
x.AddEventSourcing()
.AddInMemorySnapshotStore()
.AddInMemoryViewStore()
.AddInMemoryEventStream()
.ApplyCommandsToSnapshot<Account>();
});
5. Use the client
using EventHorizon.EventSourcing;
public class AccountService
{
private readonly EventSourcingClient<Account> _client;
public AccountService(EventSourcingClient<Account> client)
{
_client = client;
}
public async Task CreateAccountAsync(string name, int balance)
{
await _client.CreateSender()
.Send(new CreateAccount(name, balance))
.ExecuteAsync();
}
}
Core Concepts
State (IState)
The root entity that represents the current state of your domain object. Must implement IState with an Id property.
Actions
EventHorizon uses a CQRS-style action hierarchy:
| Action | Interface | Purpose |
|---|---|---|
| Command | ICommand<T> |
Mutates state — handled by IHandleCommand<T> on the state class |
| Event | IEvent<T> |
Records what happened — applied by IApplyEvent<T> on the state class |
| Request | IRequest<T, TResponse> |
Query or operation that returns a response |
| Response | IResponse<T> |
Result of a request |
Snapshots and Views
- Snapshot (
Snapshot<T>) — The authoritative persisted state, rebuilt by replaying events - View (
View<T>) — A read-optimized projection derived from events, can combine data from multiple streams
Aggregates
The AggregateBuilder manages the lifecycle of loading state from a store, applying actions, and persisting results. It handles optimistic concurrency via sequence IDs and distributed locking.
Subscriptions
SubscriptionBuilder<T> creates durable consumers that process messages from one or more streams. Implement IStreamConsumer<T> to handle batches of messages:
public class MyConsumer : IStreamConsumer<Event>
{
public Task OnBatch(SubscriptionContext<Event> context)
{
foreach (var message in context.Messages)
{
var payload = message.Data.GetPayload();
// Process event...
}
return Task.CompletedTask;
}
}
Register with:
x.AddSubscription<MyConsumer, Event>(s => s.AddStream<Account>());
Middleware
Aggregate processing supports middleware for cross-cutting concerns:
x.ApplyEventsToView<MyView>(h => h.UseMiddleware<MyMiddleware>());
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Your Application │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ EventSourcingClient│ │ StreamingClient │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
├───────────┼──────────────────────┼──────────────────────────┤
│ │ EventHorizon Core │ │
│ ┌────────▼─────────┐ ┌────────▼─────────┐ │
│ │ AggregateBuilder │ │ SubscriptionBuilder│ │
│ │ SenderBuilder │ │ PublisherBuilder │ │
│ │ ICrudStore<T> │ │ ReaderBuilder │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
├───────────┼──────────────────────┼──────────────────────────┤
│ ┌────────▼─────────┐ ┌────────▼─────────┐ │
│ │ Event Stores │ │ Event Streaming │ │
│ │ ┌─────────────┐ │ │ ┌─────────────┐ │ │
│ │ │ MongoDB │ │ │ │ Pulsar │ │ │
│ │ │ Elastic │ │ │ │ Kafka │ │ │
│ │ │ Ignite │ │ │ │ In-Memory │ │ │
│ │ │ In-Memory │ │ │ └─────────────┘ │ │
│ │ └─────────────┘ │ └──────────────────┘ │
│ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Storage Backends
MongoDB
x.AddMongoDbSnapshotStore(config.GetSection("MongoDb").Bind)
.AddMongoDbViewStore(config.GetSection("MongoDb").Bind);
{
"MongoDb": {
"ConnectionString": "mongodb://localhost:27017",
"Database": "my_database"
}
}
Elasticsearch
x.AddElasticSnapshotStore(config.GetSection("ElasticSearch").Bind)
.AddElasticViewStore(config.GetSection("ElasticSearch").Bind);
{
"ElasticSearch": {
"Uri": "http://localhost:9200"
}
}
Apache Ignite
x.AddIgniteSnapshotStore(config.GetSection("Ignite").Bind)
.AddIgniteViewStore(config.GetSection("Ignite").Bind);
In-Memory
x.AddInMemorySnapshotStore()
.AddInMemoryViewStore();
Best suited for unit/integration testing. No external dependencies required.
Streaming Backends
Apache Pulsar
x.AddPulsarEventStream(config.GetSection("Pulsar").Bind);
{
"Pulsar": {
"ServiceUrl": "pulsar://localhost:6650"
}
}
In-Memory
x.AddInMemoryEventStream();
Best suited for unit/integration testing. No external dependencies required.
Configuration
Attributes
| Attribute | Target | Purpose |
|---|---|---|
[SnapshotStore("bucket_id")] |
Class | Configures the snapshot store bucket/collection name |
[ViewStore("database")] |
Class | Configures the view store database/index name |
[Stream("topic")] |
Class | Maps a type to a streaming topic |
[StreamPartitionKey] |
Property | Designates the property used for stream partitioning |
Docker Compose
Development infrastructure is provided in the compose/ directory:
# Start MongoDB
docker compose -f compose/MongoDb/docker-compose.yml up -d
# Start Elasticsearch
docker compose -f compose/ElasticSearch/docker-compose.yml up -d
# Start Pulsar
docker compose -f compose/Pulsar/docker-compose.yml up -d
# Start Ignite
docker compose -f compose/Ignite/docker-compose.yml up -d
Testing
The test suite uses xUnit with Bogus for data generation.
# Run unit tests only
dotnet test --filter "Category!=Integration"
# Run all tests (requires Docker services)
dotnet test
Writing Tests
Use the in-memory providers for fast, isolated unit tests:
services.AddEventHorizon(x =>
{
x.AddInMemorySnapshotStore()
.AddInMemoryViewStore()
.AddInMemoryEventStream()
.AddEventSourcing();
});
Integration tests use [Collection("Integration")] and require running Docker Compose services.
CI/CD
This project uses GitHub Actions (.github/workflows/ci.yml) with GitVersion for automatic semantic versioning based on the GitFlow branching model.
Versions are derived from git history and tags — no manual version bumping required after initial setup.
| Branch/Tag | Pre-release Label | Example Version |
|---|---|---|
v* tag |
(stable) | 1.3.0 |
master / main |
(stable) | 1.3.0 |
release/* |
rc |
1.3.0-rc.3 |
hotfix/* |
hf |
1.3.1-hf.1 |
develop |
preview |
1.4.0-preview.12 |
feature/* |
{branch} |
1.4.0-my-feature.1 |
How versioning works
- Tag a release on
main/master(e.g.,v1.3.0) to set the version baseline - All subsequent commits on branches derive their version from git tags and merge history
- Commit messages with
+semver: major,+semver: minor, or+semver: fixcontrol version increments - Configuration lives in
GitVersion.ymlat the repo root
All packages are published with the Cts.* prefix (e.g., Cts.EventHorizon.Abstractions).
Trusted Publishing
NuGet packages are published using trusted publishing via GitHub's OIDC tokens — no API keys or secrets required. The trusted publisher is configured on nuget.org to trust the ci.yml workflow in this repository.
Samples
Working examples are in the samples/ directory:
EventHorizon.EventSourcing.Samples— Full event sourcing example with accounts, commands, events, views, and subscriptions using MongoDB + Elasticsearch + PulsarEventHorizon.EventStreaming.Samples— Standalone streaming example with multi-topic subscription and publishing
Run samples with:
# Start required infrastructure
docker compose -f compose/MongoDb/docker-compose.yml up -d
docker compose -f compose/ElasticSearch/docker-compose.yml up -d
docker compose -f compose/Pulsar/docker-compose.yml up -d
# Run the event sourcing sample
dotnet run --project samples/EventHorizon.EventSourcing.Samples
Project Structure
EventHorizon/
├── src/
│ ├── EventHorizon.Abstractions/ # Core interfaces, models, attributes
│ ├── EventHorizon.EventStore/ # Store abstractions (ICrudStore, Lock)
│ ├── EventHorizon.EventStore.InMemory/ # In-memory store implementation
│ ├── EventHorizon.EventStore.MongoDb/ # MongoDB store implementation
│ ├── EventHorizon.EventStore.ElasticSearch/ # Elasticsearch store implementation
│ ├── EventHorizon.EventStore.Ignite/ # Apache Ignite store implementation
│ ├── EventHorizon.EventStreaming/ # Streaming abstractions
│ ├── EventHorizon.EventStreaming.InMemory/ # In-memory streaming
│ ├── EventHorizon.EventStreaming.Pulsar/ # Apache Pulsar streaming
│ └── EventHorizon.EventSourcing/ # Event sourcing orchestration
├── test/ # Unit and integration tests
├── samples/ # Working example applications
├── benchmark/ # Performance benchmarks
├── compose/ # Docker Compose files for local dev
└── charts/ # Helm charts for Kubernetes deployment
Contributing
- Fork the repository
- Create a feature branch (
feature/my-feature) - Commit changes with clear messages
- Open a pull request against
develop
License
This project is licensed under the MIT License.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- EventHorizon.Abstractions (>= 1.3.2)
- Microsoft.Extensions.Configuration (>= 10.0.2)
- Microsoft.Extensions.Configuration.Json (>= 10.0.2)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.2)
- Microsoft.Extensions.Logging (>= 10.0.2)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.2)
- Newtonsoft.Json (>= 13.0.4)
-
net8.0
- EventHorizon.Abstractions (>= 1.3.2)
- Microsoft.Extensions.Configuration (>= 10.0.2)
- Microsoft.Extensions.Configuration.Json (>= 10.0.2)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.2)
- Microsoft.Extensions.Logging (>= 10.0.2)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.2)
- Newtonsoft.Json (>= 13.0.4)
-
net9.0
- EventHorizon.Abstractions (>= 1.3.2)
- Microsoft.Extensions.Configuration (>= 10.0.2)
- Microsoft.Extensions.Configuration.Json (>= 10.0.2)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.2)
- Microsoft.Extensions.Logging (>= 10.0.2)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.2)
- Newtonsoft.Json (>= 13.0.4)
NuGet packages (3)
Showing the top 3 NuGet packages that depend on EventHorizon.EventStreaming:
| Package | Downloads |
|---|---|
|
EventHorizon.EventSourcing
EventHorizon - Event Sourcing and Event Streaming framework for .NET |
|
|
EventHorizon.EventStreaming.Pulsar
EventHorizon - Event Sourcing and Event Streaming framework for .NET |
|
|
EventHorizon.EventStreaming.InMemory
EventHorizon - Event Sourcing and Event Streaming framework for .NET |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.4.0-preview.1 | 50 | 2/6/2026 |
| 1.4.0-preview.0 | 46 | 2/6/2026 |
| 1.3.2 | 374 | 2/6/2026 |