Opossum 0.5.0-preview.1

This is a prerelease version of Opossum.
dotnet add package Opossum --version 0.5.0-preview.1
                    
NuGet\Install-Package Opossum -Version 0.5.0-preview.1
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Opossum" Version="0.5.0-preview.1" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Opossum" Version="0.5.0-preview.1" />
                    
Directory.Packages.props
<PackageReference Include="Opossum" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Opossum --version 0.5.0-preview.1
                    
#r "nuget: Opossum, 0.5.0-preview.1"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Opossum@0.5.0-preview.1
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Opossum&version=0.5.0-preview.1&prerelease
                    
Install as a Cake Addin
#tool nuget:?package=Opossum&version=0.5.0-preview.1&prerelease
                    
Install as a Cake Tool

🦝 Opossum

A file system-based event store for .NET that implements the DCB (Dynamic Consistency Boundaries) specification.

Opossum turns your file system into a fully functional event store with projections, optimistic concurrency control, and tag-based indexing. Perfect for scenarios where simplicity, offline operation, and local data sovereignty matter more than cloud scalability.

NuGet .NET License


πŸ“‹ Table of Contents


🦝 What is Opossum?

Opossum is an event sourcing framework that uses your file system as the database. It's designed for applications that need:

  • βœ… 100% offline operation - No internet required
  • βœ… Complete audit trail - Every state change is an immutable event
  • βœ… Local data ownership - Your data never leaves your server
  • βœ… Optimistic concurrency - Built-in DCB pattern for consistency
  • βœ… Simple deployment - Just files, no database server to manage
  • βœ… Projections - Materialized views that rebuild from events
  • βœ… Tag-based indexing - Fast queries without full scans

What Makes Opossum Different?

Unlike cloud-based event stores (EventStoreDB, Azure Event Hubs) or database-backed solutions, Opossum stores events directly as files in a structured directory hierarchy. This makes it ideal for:

  • 🏒 On-premises applications (POS systems, dealership software)
  • πŸ“΄ Offline-first applications (field service, remote installations)
  • πŸ’Ό SMB solutions (where cloud costs don't make sense)
  • πŸ”’ Data sovereignty requirements (keep data in-country/on-site)
  • πŸ§ͺ Development & testing (no Docker/database setup needed)
  • πŸ–₯️ Multi-workstation deployments (multiple PCs sharing a store on a network drive β€” cross-process append safety via OS file locking)

βœ… When to Use Opossum

Perfect Use Cases

Scenario Why Opossum Fits
Automotive dealership sales tracking Offline operation, complete audit trail, local compliance
Point-of-sale systems Works during internet outages, simple IT management
Field service applications Offline data collection, sync when connected
Small business ERP No cloud costs, data stays on-premises
Compliance-heavy industries Immutable audit log, easy to backup/archive
Development & testing Zero infrastructure setup, just run the app

Key Characteristics

βœ… Single server/small deployment (< 100k events/day)
βœ… Offline-first requirements
βœ… Simple IT environment (IT staff comfortable with files/folders)
βœ… Budget-conscious (avoid monthly cloud fees)
βœ… Data residency requirements (legal/compliance)
βœ… Complete audit trail needed


❌ When NOT to Use Opossum

Opossum is not designed for:

Don't Use If... Use Instead
❌ Distributed systems across multiple servers EventStoreDB, Kafka
❌ High throughput (> 100k events/day per server) Cloud event stores
❌ Cloud-native microservices Azure Event Hubs, AWS Kinesis
❌ Multi-region replication needed Distributed event stores
❌ Event streaming to multiple consumers Kafka, RabbitMQ
❌ Massive scale (millions of events) Purpose-built event stores

Rule of thumb: If your application runs on a single server (or small cluster) and needs offline capabilities, Opossum is great. If you need cloud-scale distribution, choose a cloud-native solution.


πŸš€ Quick Start

1. Install the NuGet Package

dotnet add package Opossum

2. Configure Opossum

using Opossum.DependencyInjection;

var builder = WebApplication.CreateBuilder(args);

// Add Opossum event store
builder.Services.AddOpossum(options =>
{
    options.RootPath = @"D:\MyAppData\EventStore";  // Where to store events
    options.UseStore("MyApp");                       // Store name
    options.FlushEventsImmediately = true;           // Durability guarantee (recommended)
});

// Add projection system for read models
builder.Services.AddProjections(options =>
{
    options.ScanAssembly(typeof(Program).Assembly);  // Auto-discover projections
});

// Add mediator for command handling (optional but recommended)
builder.Services.AddMediator();

var app = builder.Build();
app.Run();

3. Define Your Events

Events are immutable records that represent state changes:

using Opossum;

public record StudentRegisteredEvent(
    Guid StudentId,
    string FirstName,
    string LastName,
    string Email) : IEvent;

public record StudentEnrolledToCourseEvent(
    Guid StudentId,
    Guid CourseId) : IEvent;

4. Append Events to the Store

using Opossum.Core;
using Opossum.Extensions;

public class RegisterStudentHandler
{
    private readonly IEventStore _eventStore;

    public RegisterStudentHandler(IEventStore eventStore)
    {
        _eventStore = eventStore;
    }

    public async Task<Guid> RegisterStudentAsync(string firstName, string lastName, string email)
    {
        var studentId = Guid.NewGuid();

        // Create event
        var evt = new StudentRegisteredEvent(studentId, firstName, lastName, email)
            .ToDomainEvent()
            .WithTag("studentId", studentId.ToString())
            .WithTag("studentEmail", email)
            .WithTimestamp(DateTimeOffset.UtcNow);

        // Append to event store
        await _eventStore.AppendAsync(evt);

        return studentId;
    }
}

5. Query Events

using Opossum.Core;

// Query all events for a specific student
var query = Query.FromItems(new QueryItem
{
    Tags = [new Tag("studentId", studentId.ToString())]
});

var events = await _eventStore.ReadAsync(query);

foreach (var evt in events)
{
    Console.WriteLine($"[{evt.Position}] {evt.Event.EventType}");
}

6. Create Projections (Read Models)

Projections transform events into queryable views:

using Opossum.Projections;

public record StudentDetails(
    Guid StudentId,
    string FirstName,
    string LastName,
    string Email,
    int EnrolledCoursesCount);

[ProjectionDefinition("StudentDetails")]
public class StudentDetailsProjection : IProjectionDefinition<StudentDetails>
{
    public string ProjectionName => "StudentDetails";

    public string[] EventTypes => new[]
    {
        nameof(StudentRegisteredEvent),
        nameof(StudentEnrolledToCourseEvent)
    };

    public string KeySelector(SequencedEvent evt)
    {
        // Extract student ID from tags
        return evt.Event.Tags.First(t => t.Key == "studentId").Value;
    }

    public StudentDetails? Apply(StudentDetails? current, SequencedEvent evt)
    {
        return evt.Event.Event switch
        {
            StudentRegisteredEvent registered => new StudentDetails(
                registered.StudentId,
                registered.FirstName,
                registered.LastName,
                registered.Email,
                EnrolledCoursesCount: 0),

            StudentEnrolledToCourseEvent enrolled when current != null =>
                current with { EnrolledCoursesCount = current.EnrolledCoursesCount + 1 },

            _ => current
        };
    }
}

7. Query Projections

using Opossum.Projections;

public class StudentController
{
    private readonly IProjectionStore _projectionStore;

    public async Task<StudentDetails?> GetStudentAsync(Guid studentId)
    {
        return await _projectionStore.GetAsync<StudentDetails>(
            "StudentDetails",
            studentId.ToString());
    }
}

🧠 Core Concepts

Events

Immutable records that represent state changes in your domain. Every event implements IEvent and gets stored permanently.

public record CourseCreatedEvent(Guid CourseId, string Name, int MaxStudents) : IEvent;

NewEvent (Write Side)

What you pass to AppendAsync. Contains the event payload and optional metadata, but no position β€” the store assigns that during append:

public class NewEvent
{
    public DomainEvent Event { get; set; }  // Your domain event + EventType + Tags
    public Metadata Metadata { get; set; }  // Optional: Timestamp, correlation IDs
}

You rarely construct this directly β€” use the fluent builder instead (see Extension Methods).

SequencedEvent (Read Side)

What ReadAsync returns. Wraps the original event with a position assigned by the store:

public class SequencedEvent
{
    public long Position { get; set; }      // Global sequence number (assigned by store)
    public DomainEvent Event { get; set; }  // Wrapper containing your domain event + tags
    public Metadata Metadata { get; set; }  // Timestamp, correlation/causation IDs
}

This is the DCB-spec distinction: Event (write input, no position) vs SequencedEvent (read output, position assigned by store).

Tags

Domain-specific metadata for fast filtering without full scans:

.WithTag("studentId", studentId.ToString())
.WithTag("courseId", courseId.ToString())
.WithTag("studentEmail", "student@example.com")

Tags are indexed automatically, enabling efficient queries like:

  • "All events for student X"
  • "All course enrollments in January 2024"

Queries

Filter events by EventType and/or Tags:

// All events for a specific student
var query = Query.FromItems(new QueryItem
{
    Tags = [new Tag("studentId", "123")]
});

// All StudentRegistered events
var query = Query.FromItems(new QueryItem
{
    EventTypes = [nameof(StudentRegisteredEvent)]
});

// Combination: StudentEnrolled events for student 123
var query = Query.FromItems(new QueryItem
{
    EventTypes = [nameof(StudentEnrolledToCourseEvent)],
    Tags = [new Tag("studentId", "123")]
});

Projections

Materialized views rebuilt from events. Think of them as denormalized read models:

[ProjectionDefinition("CourseEnrollmentCount")]
public class CourseEnrollmentProjection : IProjectionDefinition<CourseEnrollmentState>
{
    public CourseEnrollmentState? Apply(CourseEnrollmentState? current, SequencedEvent evt)
    {
        return evt.Event.Event switch
        {
            StudentEnrolledToCourseEvent =>
                current with { EnrollmentCount = current.EnrollmentCount + 1 },
            _ => current
        };
    }
}

When building a projection's state requires data from events matched by an additional query β€” events with different types or tags β€” implement IProjectionWithRelatedEvents<TState> instead of IProjectionDefinition<TState>. The framework calls GetRelatedEventsQuery before Apply, executes that second query, and passes the results in as a third parameter β€” no N+1 queries, no manual secondary reads.

Example: CourseDetailsProjection needs student names when a StudentEnrolledToCourseEvent arrives, but that data lives in StudentRegisteredEvent under a different tag:

[ProjectionDefinition("CourseDetails")]
public sealed class CourseDetailsProjection : IProjectionWithRelatedEvents<CourseDetails>
{
    public string ProjectionName => "CourseDetails";
    public string[] EventTypes => [nameof(CourseCreatedEvent), nameof(StudentEnrolledToCourseEvent)];
    public string KeySelector(SequencedEvent evt) =>
        evt.Event.Tags.First(t => t.Key == "courseId").Value;

    // Return a query for the extra events needed, or null if this event needs nothing extra.
    public Query? GetRelatedEventsQuery(SequencedEvent evt)
    {
        if (evt.Event.Event is StudentEnrolledToCourseEvent enrolled)
            return Query.FromItems(new QueryItem
            {
                Tags = [new Tag("studentId", enrolled.StudentId.ToString())],
                EventTypes = [nameof(StudentRegisteredEvent)]
            });
        return null;
    }

    // relatedEvents contains what GetRelatedEventsQuery returned (empty array when null was returned).
    public CourseDetails? Apply(CourseDetails? current, SequencedEvent evt, SequencedEvent[] relatedEvents) =>
        evt.Event.Event switch
        {
            CourseCreatedEvent created => new CourseDetails(
                CourseId: created.CourseId,
                Name: created.Name,
                MaxStudentCount: created.MaxStudentCount,
                CurrentEnrollmentCount: 0,
                EnrolledStudents: []),

            StudentEnrolledToCourseEvent when current is not null
                && relatedEvents.FirstOrDefault(e => e.Event.Event is StudentRegisteredEvent) is
                    { Event.Event: StudentRegisteredEvent reg } =>
                current with
                {
                    CurrentEnrollmentCount = current.CurrentEnrollmentCount + 1,
                    EnrolledStudents = [..current.EnrolledStudents,
                        new EnrolledStudentInfo(reg.StudentId, reg.FirstName, reg.LastName, reg.Email)]
                },

            _ => current
        };
}
Tag-Indexed Queries (IProjectionTagProvider<T>)

By default, projections are keyed by a single string (KeySelector) so GetAsync retrieves one instance by key. To also query projections by their current state properties β€” e.g., "all courses that are not yet full" β€” attach a tag provider. The index is updated automatically every time a projection is saved.

// 1. Implement the tag provider β€” return whatever tags should be queryable
public sealed class CourseShortInfoTagProvider : IProjectionTagProvider<CourseShortInfo>
{
    public IEnumerable<Tag> GetTags(CourseShortInfo state)
    {
        yield return new Tag("IsFull", state.IsFull.ToString());
    }
}

// 2. Attach it to the projection with [ProjectionTags] β€” auto-discovered during assembly scanning
[ProjectionDefinition("CourseShortInfo")]
[ProjectionTags(typeof(CourseShortInfoTagProvider))]
public sealed class CourseShortInfoProjection : IProjectionDefinition<CourseShortInfo>
{
    // ... normal IProjectionDefinition<T> implementation
}

// 3. Query by tag β€” uses the persisted index, no full table scan
IProjectionStore<CourseShortInfo> courseStore = ...;
var availableCourses = await courseStore.QueryByTagsAsync(
    [new Tag("IsFull", "False")]);

Decision Model Projections

Write-side, ephemeral projections used in the DCB read β†’ decide β†’ append pattern. Each projection is a typed in-memory fold that yields state and a pre-built AppendCondition β€” no persistence, no background services.

Unlike read-side projections, Decision Model projections are:

  • In-memory only β€” run once per command, result is never stored
  • Strongly typed β€” each projection owns a single business concern
  • Composable β€” multiple projections share a single ReadAsync call and produce one AppendCondition that spans all their queries
// Each projection is a self-contained factory method:
IDecisionProjection<MyState?> MyProjection(Guid id) =>
    new DecisionProjection<MyState?>(
        initialState: null,
        query: Query.FromItems(new QueryItem
        {
            EventTypes = [nameof(MyEvent)],
            Tags = [new Tag("id", id.ToString())]
        }),
        apply: (state, evt) => evt.Event.Event switch
        {
            MyEvent e => new MyState(e.Value),
            _ => state
        });

// Compose up to three projections β€” one read, one atomic AppendCondition:
var (state1, state2, state3, appendCondition) =
    await eventStore.BuildDecisionModelAsync(
        MyProjection(id1),
        AnotherProjection(id2),
        YetAnotherProjection(id3));

// Wrap the full cycle with automatic retry on concurrency conflicts:
return await eventStore.ExecuteDecisionAsync(async (store, ct) =>
{
    var (s1, s2, s3, condition) = await store.BuildDecisionModelAsync(p1, p2, p3, ct);
    // ... check invariants using s1, s2, s3 ...
    await store.AppendAsync(newEvent, condition);
    return result;
});
// If all retries are exhausted, ExecuteDecisionAsync re-throws AppendConditionFailedException.

Four BuildDecisionModelAsync overloads are available: single-projection (BuildDecisionModelAsync<T> β†’ DecisionModel<T>), two-projection ((T1, T2, AppendCondition)), three-projection ((T1, T2, T3, AppendCondition)), and N-ary (IReadOnlyList<IDecisionProjection<TState>> β†’ (IReadOnlyList<TState>, AppendCondition)) for a runtime-variable list of homogeneous projections (e.g. shopping cart). Use ExecuteDecisionAsync to wrap the entire cycle with automatic exponential-backoff retry.

See the Full Example section for a complete real-world walkthrough

Idempotency Tokens β€” Prevent Record Duplication

Enforce "process this request exactly once" using a client-generated idempotency token stored as a tag. This is the DCB pattern for infrastructure constraints β€” constraints that are not directly related to the domain.

Full specification: https://dcb.events/examples/prevent-record-duplication/

The key insight: the domain may allow an operation to happen multiple times (a course can have many announcements), but an accidental HTTP retry should not create duplicates. The idempotency token is the sole guard β€” and unlike a domain uniqueness constraint, it is controlled entirely by the client.

// The projection folds the token's lifecycle: Posted β†’ true, Retracted β†’ false (token freed).
// The query is scoped exclusively to the idempotency tag, so two concurrent requests with
// DIFFERENT tokens produce completely independent AppendConditions β€” they never block each other.
IDecisionProjection<bool> IdempotencyTokenWasUsed(Guid token) =>
    new DecisionProjection<bool>(
        initialState: false,
        query: Query.FromItems(new QueryItem
        {
            EventTypes = [nameof(AnnouncementPostedEvent), nameof(AnnouncementRetractedEvent)],
            Tags = [new Tag("idempotency", token.ToString())]
        }),
        apply: (_, evt) => evt.Event.Event switch
        {
            AnnouncementPostedEvent    => true,   // token consumed
            AnnouncementRetractedEvent => false,  // token freed β€” may be reused
            _                          => false
        });

// In the command handler β€” compose with a business prerequisite:
var (courseExists, tokenWasUsed, appendCondition) = await eventStore.BuildDecisionModelAsync(
    CourseExists(command.CourseId),
    IdempotencyTokenWasUsed(command.IdempotencyToken));

if (!courseExists)   return Fail("Course does not exist.");
if (tokenWasUsed)    return Fail("Re-submission detected.");

await eventStore.AppendAsync(
    new AnnouncementPostedEvent(Guid.NewGuid(), command.CourseId, command.Title, command.IdempotencyToken)
        .ToDomainEvent()
        .WithTag("courseId", command.CourseId.ToString())
        .WithTag("idempotency", command.IdempotencyToken.ToString()),
    appendCondition);

Token reuse after retraction: When an announcement is retracted, the AnnouncementRetractedEvent is stored with the same idempotency tag. On the next post attempt the projection folds both events in sequence order β€” Posted β†’ true, then Retracted β†’ false β€” and the final state is false. The token is free with no changes to the post handler.

See CourseAnnouncementProjections and CourseAnnouncementRetractionProjection in the sample application for the full implementation.

Contrast with Opt-In tokens: Idempotency tokens are client-generated and protect against accidental retry duplicates. For server-generated single-use tokens that can be issued, redeemed, and revoked, see the Opt-In Token pattern.

Dynamic Consistency Boundaries (DCB)

Enforce optimistic concurrency using append conditions. The raw DCB API is ideal for straightforward global-uniqueness rules (e.g. unique email, the Unique Username example):

// Ensure email is unique across ALL students
var validateEmailQuery = Query.FromItems(new QueryItem
{
    Tags = [new Tag("studentEmail", email)]
});

// This will fail if any event with this email already exists
await _eventStore.AppendAsync(
    evt,
    condition: new AppendCondition 
    { 
        FailIfEventsMatch = validateEmailQuery 
    });

Why this matters: Prevents race conditions without distributed locks. For more complex decisions that need to examine state before deciding, prefer BuildDecisionModelAsync (see Decision Model Projections).

Mediator

Opossum includes a lightweight in-process mediator that automatically discovers command and query handlers β€” no manual registration of individual handlers needed.

Discovery convention: any class whose name ends with Handler (or is marked [MessageHandler]), with a method named HandleAsync or Handle, where the first parameter is the message type and any additional parameters are injected from the DI container.

// 1. Register β€” auto-scans the calling assembly by default
builder.Services.AddMediator();

// 2. Define the command and its handler β€” no interface, no DI registration needed
public sealed record RegisterStudentCommand(Guid StudentId, string FirstName, string LastName, string Email);

public sealed class RegisterStudentCommandHandler
{
    // IEventStore is resolved automatically from DI
    public async Task<CommandResult> HandleAsync(
        RegisterStudentCommand command,
        IEventStore eventStore)
    {
        var evt = new StudentRegisteredEvent(command.StudentId, command.FirstName, command.LastName, command.Email)
            .ToDomainEvent()
            .WithTag("studentId", command.StudentId.ToString())
            .WithTimestamp(DateTimeOffset.UtcNow);

        await eventStore.AppendAsync(evt);
        return CommandResult.Ok();
    }
}

// 3. Dispatch β€” IMediator routes to the matching handler automatically
app.MapPost("/students", async ([FromBody] RegisterStudentRequest req, IMediator mediator) =>
{
    var command = new RegisterStudentCommand(Guid.NewGuid(), req.FirstName, req.LastName, req.Email);
    var result = await mediator.InvokeAsync<CommandResult>(command);
    return result.Success ? Results.Created() : Results.BadRequest(result.ErrorMessage);
});

βš™οΈ Configuration

OpossumOptions

builder.Services.AddOpossum(options =>
{
    // Root directory for event storage (REQUIRED)
    // Must be an absolute path
    options.RootPath = @"D:\MyApp\EventStore";

    // Store name (REQUIRED) β€” used as a subdirectory under RootPath
    options.UseStore("MyApplicationContext");

    // Flush events to disk immediately (OPTIONAL, default: true)
    // TRUE: Events are durable (survive power failure) but slower (~17ms per single event on SSD)
    //       Includes flushing event, index, and ledger files β€” the full durability guarantee.
    // FALSE: Faster but events may be lost on power failure (use for testing/dev only)
    options.FlushEventsImmediately = true;

    // Cross-process lock timeout (OPTIONAL, default: 5 seconds)
    // Relevant when multiple application instances share the same store directory over a network drive.
    // Increase this if appends consistently time out behind large batch operations on a slow share.
    options.CrossProcessLockTimeout = TimeSpan.FromSeconds(5);
});

ProjectionOptions

builder.Services.AddProjections(options =>
{
    // Scan assembly for projection definitions
    options.ScanAssembly(typeof(Program).Assembly);

    // Controls startup rebuild behaviour (default: MissingCheckpointsOnly)
    // None                   β€” no automatic rebuilds on startup
    // MissingCheckpointsOnly β€” only rebuild projections with no checkpoint file (default, recommended)
    // ForceFullRebuild       β€” rebuild all projections on every startup (dev / post-migration)
    options.AutoRebuild = AutoRebuildMode.MissingCheckpointsOnly;

    // Maximum projections rebuilt in parallel (default: 4). Increase on NVMe SSDs.
    options.MaxConcurrentRebuilds = 4;

    // Events loaded per batch during rebuild (default: 5 000). Lower = less peak memory.
    options.RebuildBatchSize = 5_000;

    // Events processed between crash-recovery journal flushes (default: 10 000).
    // Lower = more crash-durable; higher = less journal write overhead.
    options.RebuildFlushInterval = 10_000;
});

Configuration via appsettings.json

{
  "Opossum": {
    "RootPath": "D:\\MyApp\\EventStore",
    "StoreName": "MyApp",
    "FlushEventsImmediately": true
  },
  "Projections": {
    "AutoRebuild": "MissingCheckpointsOnly",
    "MaxConcurrentRebuilds": 4,
    "RebuildBatchSize": 5000,
    "RebuildFlushInterval": 10000
  }
}

Then bind in code:

builder.Services.AddOpossum(options =>
{
    builder.Configuration.GetSection("Opossum").Bind(options);

    // StoreName must be set programmatically β€” UseStore enforces the single-store contract
    var storeName = builder.Configuration["Opossum:StoreName"];
    if (storeName != null)
    {
        options.UseStore(storeName);
    }
});

πŸ’Ύ How Events Are Stored

Opossum creates a file-based database with the following structure:

D:\MyApp\EventStore\                 # RootPath
└── MyApplicationContext\             # Context name
    β”œβ”€β”€ .ledger                       # Ledger file (current sequence position)
    β”œβ”€β”€ Events\                       # Event files (one per event)
    β”‚   β”œβ”€β”€ 0000000001.json           # Event at position 1
    β”‚   β”œβ”€β”€ 0000000002.json           # Event at position 2
    β”‚   └── ...
    └── Indices\                      # Index directories
        β”œβ”€β”€ EventType\                # Index by event type
        β”‚   β”œβ”€β”€ StudentRegisteredEvent.idx
        β”‚   └── StudentEnrolledToCourseEvent.idx
        └── Tags\                     # Index by tags
            β”œβ”€β”€ studentId_123.idx     # All events with tag studentId=123
            └── studentEmail_test@example.com.idx

File Formats

Event File (Events/0000000001.json)
{
  "Event": {
    "EventType": "StudentRegisteredEvent",
    "Data": "{\"StudentId\":\"abc-123\",\"FirstName\":\"John\",\"LastName\":\"Doe\",\"Email\":\"john@example.com\"}",
    "Tags": [
      { "Key": "studentId", "Value": "abc-123" },
      { "Key": "studentEmail", "Value": "john@example.com" }
    ]
  },
  "Position": 1,
  "Metadata": {
    "Timestamp": "2024-01-15T10:30:00Z"
  }
}
Ledger File (.ledger)

Simple text file containing the current sequence position:

42
Index File (Indices/Tags/studentId_abc-123.idx)

Newline-separated list of sequence positions:

1
5
12
27

Why This Works

  • Events are immutable - Once written, never modified
  • Append-only writes - New events just get the next sequence number
  • Simple backup - Just copy the directory
  • Easy debugging - Open event files in any text editor
  • No corruption risk - Each event is a separate file

πŸ“– API Reference

IEventStore

Core event store operations:

public interface IEventStore
{
    // Append one or more events (position is assigned by the store)
    Task AppendAsync(NewEvent[] events, AppendCondition? condition = null, CancellationToken ct = default);

    // Read events matching a query (returns sequenced events with positions)
    // fromPosition: when provided, only events with Position > fromPosition are returned
    Task<SequencedEvent[]> ReadAsync(Query query, ReadOption[]? readOptions, long? fromPosition = null);

    // Read the single highest-position event matching a query β€” O(1) file reads
    // Returns null when no matching events exist
    // Ideal for consecutive-sequence patterns (e.g. invoice numbering)
    Task<SequencedEvent?> ReadLastAsync(Query query, CancellationToken ct = default);
}

// Convenience extension for single-event appends:
await eventStore.AppendAsync(singleEvent);
await eventStore.AppendAsync(singleEvent, condition);

Extension Methods

// Convert a domain event (IEvent) to a fluent DomainEventBuilder, then to NewEvent:
NewEvent evt = new MyEvent(...)
    .ToDomainEvent()                          // IEvent β†’ DomainEventBuilder
    .WithTag("key", "value")                  // add a single tag
    .WithTags(tag1, tag2)                     // add multiple tags
    .WithTimestamp(DateTimeOffset.UtcNow)     // set timestamp
    .WithCorrelationId(correlationId)         // optional: correlation / causation / operation / user IDs
    .WithCausationId(causationId);
                                              // implicit conversion β†’ NewEvent

// Read all matching events (ascending order):
SequencedEvent[] all = await eventStore.ReadAsync(query);

// Read only events appended after a known position (incremental polling):
SequencedEvent[] newEvents = await eventStore.ReadAsync(query, fromPosition: lastCheckpoint);

// Read in descending order (latest first):
SequencedEvent[] desc = await eventStore.ReadAsync(query, ReadOption.Descending);

// Decision model β€” read + fold + condition in one call:
DecisionModel<TState> model = await eventStore.BuildDecisionModelAsync(projection);

// Compose up to three projections (single ReadAsync, one AppendCondition spanning all):
var (t1, t2, t3, condition) = await eventStore.BuildDecisionModelAsync(p1, p2, p3);

// N-ary overload β€” runtime-variable list of homogeneous projections (e.g. shopping cart):
var projections = items.Select(item => PriceProjection(item.BookId)).ToList();
var (states, condition) = await eventStore.BuildDecisionModelAsync(
    (IReadOnlyList<IDecisionProjection<PriceState>>)projections);
// states[i] corresponds to projections[i]

// Execute the full read β†’ decide β†’ append cycle with automatic retry on concurrency conflicts:
TResult result = await eventStore.ExecuteDecisionAsync(async (store, ct) =>
{
    var model = await store.BuildDecisionModelAsync(projection, ct);
    // ... decide, append ...
    return result;
});

Query Building

// All events
Query.All()

// Events matching specific criteria
Query.FromItems(params QueryItem[] items)

// Shorthand: events of the given types
Query.FromEventTypes(nameof(InvoiceCreatedEvent))

// Shorthand: events carrying all of the given tags
Query.FromTags(new Tag("studentId", studentId.ToString()))

// Query items support AND/OR logic
new QueryItem
{
    EventTypes = ["EventA", "EventB"],  // EventA OR EventB
    Tags = [tagA, tagB]                 // AND tagA AND tagB
}

AppendCondition (DCB)

new AppendCondition
{
    // Fail if any event matches this query
    FailIfEventsMatch = query,

    // Only check events AFTER this position (optional)
    AfterSequencePosition = 42
}

CommandResult and CommandResult<T>

Lightweight return types for command handlers. Use CommandResult when the handler produces no value, or CommandResult<T> when the handler returns a result (e.g. a generated ID or number):

// No return value
public async Task<CommandResult> HandleAsync(RegisterStudentCommand cmd, IEventStore store)
{
    // ...
    return CommandResult.Ok();
    return CommandResult.Fail("A student with this email already exists.");
}

// With a return value
public async Task<CommandResult<int>> HandleAsync(CreateInvoiceCommand cmd, IEventStore store)
{
    // ...
    return CommandResult<int>.Ok(nextInvoiceNumber);
    return CommandResult<int>.Fail("Concurrent update β€” please retry.");
}

// Consume the result in the API endpoint:
var result = await mediator.InvokeAsync<CommandResult<int>>(command);
if (!result.Success) return Results.BadRequest(result.ErrorMessage);
return Results.Created($"/invoices/{result.Value}", new { invoiceNumber = result.Value });

IProjectionStore<TState>

Query projections (read models). Resolved from DI as IProjectionStore<TState>:

public interface IProjectionStore<TState> where TState : class
{
    // Get a single projection instance by key
    Task<TState?> GetAsync(string key);

    // Get all projection instances
    Task<IReadOnlyList<TState>> GetAllAsync();

    // Filter in-memory with a predicate
    Task<IReadOnlyList<TState>> QueryAsync(Func<TState, bool> predicate);

    // Query by a single tag β€” index-based, requires [ProjectionTags] (see above)
    Task<IReadOnlyList<TState>> QueryByTagAsync(Tag tag);

    // Query by multiple tags with AND logic β€” index-based, requires [ProjectionTags]
    Task<IReadOnlyList<TState>> QueryByTagsAsync(IEnumerable<Tag> tags);
}
// Inject the typed store directly β€” no projection name string needed
public class CourseController(IProjectionStore<CourseShortInfo> courseStore)
{
    public async Task<IReadOnlyList<CourseShortInfo>> GetAvailableAsync() =>
        await courseStore.QueryByTagsAsync([new Tag("IsFull", "False")]);

    public async Task<CourseShortInfo?> GetByIdAsync(Guid courseId) =>
        await courseStore.GetAsync(courseId.ToString());
}

IProjectionManager

Manages live projection lifecycle β€” registration, incremental updates, and checkpoint tracking. In normal use this is handled automatically by the background daemon:

public interface IProjectionManager
{
    // Register a projection definition (called during startup)
    void RegisterProjection<TState>(IProjectionDefinition<TState> definition) where TState : class;

    // Apply new events to all registered projections (called by the daemon)
    Task UpdateAsync(SequencedEvent[] events, CancellationToken cancellationToken = default);

    // Read the last processed event position for a named projection
    Task<long> GetCheckpointAsync(string projectionName, CancellationToken cancellationToken = default);

    // Names of all currently registered projections
    IReadOnlyList<string> GetRegisteredProjections();
}

IProjectionRebuilder

Rebuild projections from scratch β€” for disaster recovery, deploying projection logic fixes, or post-migration replays. Available from DI alongside IProjectionManager:

public interface IProjectionRebuilder
{
    // Rebuild a single named projection
    Task<ProjectionRebuildResult> RebuildAsync(
        string projectionName,
        CancellationToken cancellationToken = default);

    // Rebuild all registered projections in parallel (respects MaxConcurrentRebuilds)
    // forceRebuild: true  β€” rebuild every projection regardless of checkpoint
    // forceRebuild: false β€” only rebuild projections with no checkpoint file
    Task<ProjectionRebuildResult> RebuildAllAsync(
        bool forceRebuild = false,
        CancellationToken cancellationToken = default);

    // Rebuild a specific subset β€” useful after fixing a bug in one projection
    Task<ProjectionRebuildResult> RebuildAsync(
        string[] projectionNames,
        CancellationToken cancellationToken = default);

    // Poll current rebuild progress
    Task<ProjectionRebuildStatus> GetRebuildStatusAsync();
}

Expose as an admin endpoint (add proper authentication in production):

app.MapPost("/admin/projections/rebuild", async (IProjectionRebuilder rebuilder) =>
{
    var result = await rebuilder.RebuildAllAsync(forceRebuild: false);
    return result.Success
        ? Results.Ok(result)
        : Results.Problem($"Rebuild failed: {string.Join(", ", result.FailedProjections)}");
})
.RequireAuthorization("Admin");

πŸ’‘ Full Example

The following example is taken directly from the Course Management sample and shows the full DCB pattern at its most expressive: three independent business invariants enforced atomically through a single read.

Enrolling a student in a course requires checking three separate concerns simultaneously:

  • βœ… Course capacity β€” the course must exist and have available seats
  • βœ… Student enrollment limit β€” the student must be registered and below their tier's course limit
  • βœ… Duplicate prevention β€” the student must not already be enrolled in this course

All three are evaluated from one ReadAsync call. The resulting AppendCondition spans all three queries automatically β€” a concurrent write matching any of them will cause ExecuteDecisionAsync to retry from scratch, with no manual retry logic required.

// ── Step 1: Domain events ─────────────────────────────────────────────────────

public sealed record CourseCreatedEvent(
    Guid CourseId,
    string Name,
    string Description,
    int MaxStudentCount) : IEvent;

public sealed record StudentRegisteredEvent(
    Guid StudentId,
    string FirstName,
    string LastName,
    string Email) : IEvent;

public sealed record StudentEnrolledToCourseEvent(
    Guid CourseId,
    Guid StudentId) : IEvent;

// ── Step 2: Decision state types β€” one per business concern ──────────────────

// null until the course is created
public sealed record CourseCapacityState(int MaxCapacity, int CurrentEnrollmentCount)
{
    public bool IsFull => CurrentEnrollmentCount >= MaxCapacity;
}

// null until the student is registered
public sealed record StudentEnrollmentLimitState(EnrollmentTier Tier, int CurrentCourseCount)
{
    public int MaxAllowed => GetMaxCoursesByTier(Tier);   // e.g. Basic = 3, Professional = 10
    public bool IsAtLimit => CurrentCourseCount >= MaxAllowed;
}

// ── Step 3: Three focused, ephemeral decision projections ─────────────────────

public static class CourseEnrollmentProjections
{
    // Is the course over capacity?
    public static IDecisionProjection<CourseCapacityState?> CourseCapacity(Guid courseId) =>
        new DecisionProjection<CourseCapacityState?>(
            initialState: null,
            query: Query.FromItems(new QueryItem
            {
                EventTypes =
                [
                    nameof(CourseCreatedEvent),
                    nameof(CourseStudentLimitModifiedEvent),
                    nameof(StudentEnrolledToCourseEvent)
                ],
                Tags = [new Tag("courseId", courseId.ToString())]
            }),
            apply: (state, evt) => evt.Event.Event switch
            {
                CourseCreatedEvent created =>
                    new CourseCapacityState(created.MaxStudentCount, 0),
                CourseStudentLimitModifiedEvent modified when state is not null =>
                    state with { MaxCapacity = modified.NewMaxStudentCount },
                StudentEnrolledToCourseEvent when state is not null =>
                    state with { CurrentEnrollmentCount = state.CurrentEnrollmentCount + 1 },
                _ => state
            });

    // Has the student hit their tier's course limit?
    public static IDecisionProjection<StudentEnrollmentLimitState?> StudentEnrollmentLimit(Guid studentId) =>
        new DecisionProjection<StudentEnrollmentLimitState?>(
            initialState: null,
            query: Query.FromItems(new QueryItem
            {
                EventTypes =
                [
                    nameof(StudentRegisteredEvent),
                    nameof(StudentSubscriptionUpdatedEvent),
                    nameof(StudentEnrolledToCourseEvent)
                ],
                Tags = [new Tag("studentId", studentId.ToString())]
            }),
            apply: (state, evt) => evt.Event.Event switch
            {
                StudentRegisteredEvent =>
                    new StudentEnrollmentLimitState(EnrollmentTier.Basic, 0),
                StudentSubscriptionUpdatedEvent updated when state is not null =>
                    state with { Tier = updated.EnrollmentTier },
                StudentEnrolledToCourseEvent when state is not null =>
                    state with { CurrentCourseCount = state.CurrentCourseCount + 1 },
                _ => state
            });

    // Is this exact student–course pair already enrolled?
    // Both tags are required, so only the precise pair triggers this projection.
    public static IDecisionProjection<bool> AlreadyEnrolled(Guid courseId, Guid studentId) =>
        new DecisionProjection<bool>(
            initialState: false,
            query: Query.FromItems(new QueryItem
            {
                EventTypes = [nameof(StudentEnrolledToCourseEvent)],
                Tags =
                [
                    new Tag("courseId", courseId.ToString()),
                    new Tag("studentId", studentId.ToString())
                ]
            }),
            apply: (_, _) => true);   // any match means already enrolled
}

// ── Step 4: Command + handler β€” read β†’ decide β†’ append with automatic retry ──

public sealed record EnrollStudentToCourseCommand(Guid CourseId, Guid StudentId);

public sealed class EnrollStudentToCourseCommandHandler
{
    public async Task<CommandResult> HandleAsync(
        EnrollStudentToCourseCommand command,
        IEventStore eventStore)
    {
        try
        {
            return await eventStore.ExecuteDecisionAsync(
                (store, ct) => TryEnrollAsync(command, store));
        }
        catch (AppendConditionFailedException)
        {
            return CommandResult.Fail(
                "Failed to enroll student due to concurrent updates. Please try again.");
        }
    }

    private static async Task<CommandResult> TryEnrollAsync(
        EnrollStudentToCourseCommand command,
        IEventStore eventStore)
    {
        // One ReadAsync call materialises all three projections simultaneously.
        // appendCondition spans all three queries β€” if a concurrent write matches
        // ANY of them between this read and the append, AppendConditionFailedException
        // is thrown and ExecuteDecisionAsync retries automatically.
        var (courseCapacity, studentLimit, alreadyEnrolled, appendCondition) =
            await eventStore.BuildDecisionModelAsync(
                CourseEnrollmentProjections.CourseCapacity(command.CourseId),
                CourseEnrollmentProjections.StudentEnrollmentLimit(command.StudentId),
                CourseEnrollmentProjections.AlreadyEnrolled(command.CourseId, command.StudentId));

        if (courseCapacity is null)
            return CommandResult.Fail("Course does not exist.");
        if (studentLimit is null)
            return CommandResult.Fail("Student is not registered.");
        if (alreadyEnrolled)
            return CommandResult.Fail("Student is already enrolled in this course.");
        if (courseCapacity.IsFull)
            return CommandResult.Fail($"Course is at maximum capacity ({courseCapacity.MaxCapacity} students).");
        if (studentLimit.IsAtLimit)
            return CommandResult.Fail($"Student has reached their enrollment limit ({studentLimit.MaxAllowed} courses for {studentLimit.Tier} tier).");

        NewEvent enrollmentEvent = new StudentEnrolledToCourseEvent(
            CourseId: command.CourseId,
            StudentId: command.StudentId)
            .ToDomainEvent()
            .WithTag("courseId", command.CourseId.ToString())
            .WithTag("studentId", command.StudentId.ToString())
            .WithTimestamp(DateTimeOffset.UtcNow);

        // appendCondition guarantees all three invariants hold atomically at write time
        await eventStore.AppendAsync(enrollmentEvent, appendCondition);
        return CommandResult.Ok();
    }
}

Why this matters: Three separate business rules β€” spanning two independent tag-based queries (course events tagged with courseId, student events tagged with studentId) β€” are enforced with a single read and a single atomic append condition. There are no distributed locks, no sagas, and no two-phase commits. The DCB pattern handles concurrent writes through optimistic concurrency with automatic retry built in to ExecuteDecisionAsync.

See the Course Management sample for the full working application including read-side projections and API endpoints.


πŸ”„ Event-Sourced Aggregate β€” Alternative Write-Side Pattern

This is an alternative to the Decision Model pattern shown above β€” not a required addition. Pick one style and apply it consistently. The sample includes both so you can compare them side by side on the same domain.

Opossum also supports the classic Event-Sourced Aggregate pattern. Instead of stateless ephemeral projections, all course state is encapsulated in a reconstituted aggregate object. The DCB insight is that the repository replaces the traditional named-stream lock with a tag-scoped AppendCondition β€” no stream concept needed.

Choosing Between the Two Patterns

DCB Decision Model Event-Sourced Aggregate
State lives in Ephemeral in-memory fold, discarded after each command Reconstituted aggregate object
Invariants span Multiple entity types in one read (e.g. course capacity AND student tier) One entity type (course only)
Retry handled by ExecuteDecisionAsync (automatic exponential backoff) Manual retry loop in the caller
Concurrency boundary Union of all projection queries All events for a single courseId tag
Best for Cross-cutting business rules Rich domain models with many entity-internal invariants

How the Aggregate Repository Works

// Load: query by tag β€” no named stream needed
public async Task<CourseAggregate?> LoadAsync(Guid courseId)
{
    var query = Query.FromTags(new Tag("courseId", courseId.ToString()));
    var events = await eventStore.ReadAsync(query);

    if (events.Length == 0)
        return null;                            // course does not exist

    return CourseAggregate.Reconstitute(events); // replay events into state
}

// Save: append with the DCB tag-scoped optimistic lock
public async Task SaveAsync(CourseAggregate aggregate, CancellationToken ct = default)
{
    var query = Query.FromTags(new Tag("courseId", aggregate.CourseId.ToString()));

    var condition = new AppendCondition
    {
        FailIfEventsMatch = query,
        // null when Version == 0 (new): reject if ANY course event already exists.
        // Otherwise: reject only if a new event appeared after our last read.
        AfterSequencePosition = aggregate.Version == 0 ? null : aggregate.Version
    };

    var newEvents = aggregate.PullRecordedEvents()
        .Select(e => (NewEvent)(e.ToDomainEvent()
            .WithTag("courseId", aggregate.CourseId.ToString())
            .WithTimestamp(DateTimeOffset.UtcNow)))
        .ToArray();

    // Throws AppendConditionFailedException on conflict β€” reload and retry.
    await eventStore.AppendAsync(newEvents, condition, ct);
}

The Aggregate Class (pure C#, no Opossum machinery)

public sealed class CourseAggregate
{
    private readonly List<IEvent> _recordedEvents = [];

    public Guid CourseId { get; private set; }
    public int Capacity { get; private set; }
    public int EnrollmentCount { get; private set; }

    // Global store position of the last event seen β€” used as AfterSequencePosition.
    // Note: this is store-wide monotonic, not a per-aggregate counter.
    public long Version { get; private set; }

    public static CourseAggregate Create(Guid id, string name, string description, int maxStudents)
    {
        var instance = new CourseAggregate();
        instance.RecordEvent(new CourseCreatedEvent(id, name, description, maxStudents));
        return instance;
    }

    public static CourseAggregate Reconstitute(SequencedEvent[] events)
    {
        var instance = new CourseAggregate();
        foreach (var e in events)
        {
            instance.Apply(e.Event.Event);
            instance.Version = e.Position;
        }
        return instance;
    }

    public void ChangeCapacity(int newCapacity)
    {
        if (newCapacity == Capacity)
            throw new InvalidOperationException($"Course already has capacity {newCapacity}.");
        if (newCapacity < EnrollmentCount)
            throw new InvalidOperationException($"Can't set capacity below current enrollment.");

        RecordEvent(new CourseStudentLimitModifiedEvent(CourseId, newCapacity));
    }

    public void SubscribeStudent(Guid studentId)
    {
        if (EnrollmentCount >= Capacity)
            throw new InvalidOperationException("Course is already fully booked.");

        RecordEvent(new StudentEnrolledToCourseEvent(CourseId, studentId));
    }

    public IEvent[] PullRecordedEvents()
    {
        var events = _recordedEvents.ToArray();
        _recordedEvents.Clear();
        return events;
    }

    private void RecordEvent(IEvent @event) { _recordedEvents.Add(@event); Apply(@event); }

    private void Apply(IEvent @event)
    {
        switch (@event)
        {
            case CourseCreatedEvent c:   CourseId = c.CourseId; Capacity = c.MaxStudentCount; break;
            case CourseStudentLimitModifiedEvent m: Capacity = m.NewMaxStudentCount; break;
            case StudentEnrolledToCourseEvent:      EnrollmentCount++; break;
        }
    }
}

Retry Pattern in the Endpoint

// Reload β†’ reapply β†’ retry on concurrent write; last attempt propagates β†’ HTTP 409
for (var attempt = 0; attempt < MaxRetries; attempt++)
{
    var aggregate = await repository.LoadAsync(courseId);
    if (aggregate is null)
        return Results.NotFound();

    try
    {
        aggregate.ChangeCapacity(request.NewCapacity);
        await repository.SaveAsync(aggregate);
        return Results.Ok();
    }
    catch (InvalidOperationException ex)
    {
        return Results.BadRequest(ex.Message);   // invariant violation β€” no retry
    }
    catch (AppendConditionFailedException) when (attempt < MaxRetries - 1)
    {
        // concurrent write β€” reload fresh state and try again
    }
}

The full implementation lives in Samples/Opossum.Samples.CourseManagement/CourseAggregate/. Aggregate endpoints are tagged "Aggregate (Event-Sourced)" in the Scalar UI to distinguish them from the Decision Model endpoints tagged "Commands".


πŸ—ΊοΈ DCB Examples Coverage

All 7 examples from dcb.events/examples/ are implemented in the Course Management sample:

DCB Example Domain Adaptation Key Pattern Sample Location
Course Subscriptions Student enrollment (capacity + tier limit + duplicate check) BuildDecisionModelAsync (3-projection) CourseEnrollment/
Unique Username Student email uniqueness Raw AppendCondition (direct DCB API) StudentRegistration/
Invoice Number Gap-free invoice numbering ReadLastAsync + AppendCondition InvoiceCreation/
Dynamic Product Price Course book prices with grace period & shopping cart N-ary BuildDecisionModelAsync + TimeProvider CourseBookPurchase/
Event-Sourced Aggregate Course aggregate (capacity + enrollment) DCB tag-scoped AppendCondition in repository CourseAggregate/
Opt-In Token Exam registration tokens (issue / redeem / revoke) Enum-state projection; event store as token registry ExamRegistration/
Prevent Record Duplication Course announcements with client idempotency token BuildDecisionModelAsync (2-projection) + idempotency projection CourseAnnouncement/

πŸ”’ Consecutive Sequences β€” Invoice Numbers

The Invoice Number example shows how to generate a gap-free, monotonically increasing sequence without a separate sequence table.

The key primitive is ReadLastAsync β€” it returns the single highest-position event matching a query in O(1) file reads (one index lookup, one file read), regardless of how many total events the store contains.

// The query has NO tag filter β€” it spans ALL InvoiceCreatedEvents globally.
// Any new invoice created by anyone invalidates the "last number" we just read.
var query = Query.FromEventTypes(nameof(InvoiceCreatedEvent));

// Step 1 β€” Read: find the most recent invoice (O(1) file reads)
var last = await eventStore.ReadLastAsync(query);

// Step 2 β€” Decide: next consecutive number
var nextNumber = last is null ? 1 : ((InvoiceCreatedEvent)last.Event.Event).InvoiceNumber + 1;

// Step 3 β€” Append with a guard that rejects if ANY InvoiceCreatedEvent appeared since our read.
// AfterSequencePosition = null on the first invoice means "reject if ANY invoice already exists"
// β€” closing the bootstrap race condition.
var condition = new AppendCondition
{
    FailIfEventsMatch = query,
    AfterSequencePosition = last?.Position
};

NewEvent newEvent = new InvoiceCreatedEvent(nextNumber, customerId, amount)
    .ToDomainEvent()
    .WithTag("invoiceNumber", nextNumber.ToString())
    .WithTag("customerId", customerId.ToString())
    .WithTimestamp(DateTimeOffset.UtcNow);

// Throws AppendConditionFailedException on conflict β€” ExecuteDecisionAsync retries automatically.
await eventStore.AppendAsync(newEvent, condition);

Why this works: The consistency boundary is the entire set of invoice creation events β€” if any new invoice appears between our read and our append, the condition fires and ExecuteDecisionAsync retries the full cycle automatically.

See InvoiceCreation/CreateInvoice.cs in the sample for the full implementation.


πŸ’° Dynamic Product Price β€” Course Books

The Dynamic Product Price example shows three progressively complex features, all implemented as the Course Books feature in the sample.

Feature 1 β€” Current Price (no grace period)

A single DecisionProjection folds the book's defined price. The displayed price must match the stored price exactly at the moment of purchase.

Feature 2 β€” Grace Period

After a price change, both the old and the new price remain valid for a configurable window (default: 30 minutes). The fold function needs wall-clock time β€” use the TimeProvider constructor overload so the projection is unit-testable without sleeping:

new DecisionProjection<CourseBookPriceState>(
    initialState: CourseBookPriceState.Empty,
    query: Query.FromItems(new QueryItem
    {
        EventTypes = [nameof(CourseBookDefinedEvent), nameof(CourseBookPriceChangedEvent)],
        Tags = [new Tag("bookId", bookId.ToString())]
    }),
    apply: (state, evt, timeProvider) =>
    {
        var age = timeProvider.GetUtcNow() - evt.Metadata.Timestamp;
        return evt.Event.Event switch
        {
            CourseBookDefinedEvent e     => state.ApplyDefined(e.Price, age, gracePeriod),
            CourseBookPriceChangedEvent e => state.ApplyPriceChanged(e.NewPrice, age, gracePeriod),
            _ => state
        };
    },
    timeProvider: timeProvider);  // null β†’ TimeProvider.System in production

Testing with time control: Inject FakeTimeProvider (from Microsoft.Extensions.TimeProvider.Testing) to advance time in unit tests without sleeping:

var fakeTime = new FakeTimeProvider(DateTimeOffset.UtcNow);
var projection = CourseBookPriceProjections.PriceWithGracePeriod(bookId, timeProvider: fakeTime);
// ... append a price-changed event, then:
fakeTime.Advance(TimeSpan.FromMinutes(31));  // grace period expires
// projection now accepts only the new price

Feature 3 β€” Shopping Cart (N-ary overload)

Validate the price of every item in a cart in a single event store read with a single AppendCondition spanning all items:

// Build one projection per cart item
var projections = command.Items
    .Select(item => CourseBookPriceProjections.PriceWithGracePeriod(item.BookId))
    .ToList();

// One ReadAsync call β†’ states[i] corresponds to projections[i]
var (states, appendCondition) = await eventStore.BuildDecisionModelAsync(
    (IReadOnlyList<IDecisionProjection<CourseBookPriceState>>)projections);

for (var i = 0; i < command.Items.Count; i++)
{
    if (states[i].CurrentPrice is null)
        return CommandResult.Fail($"Book '{command.Items[i].BookId}' does not exist.");
    if (!states[i].IsValidPrice(command.Items[i].DisplayedPrice))
        return CommandResult.Fail($"Price changed for book '{command.Items[i].BookId}'. Please refresh.");
}

// appendCondition covers all books atomically β€” a concurrent price change for any book
// in the cart invalidates the entire order and triggers a retry.
await eventStore.AppendAsync(orderEvent, appendCondition);

See CourseBookPurchase/ and CourseBookManagement/ in the sample for the full implementation.


🎟️ Opt-In Token β€” Server-Generated Single-Use Tokens

The Opt-In Token example shows how the event store itself can replace a persistent "valid tokens" table entirely.

Contrast with idempotency tokens: Idempotency tokens are client-generated and protect against retry duplicates. Opt-In tokens are server-generated (the instructor creates them), handed out to a specific student, and consumed exactly once.

The key insight: a query scoped to examToken:{tokenId} IS the token registry β€” no IProjectionDefinition for token state is needed for correctness. A single enum projection replaces the two-bool pattern (WasIssued + WasRedeemed) and naturally accommodates revocation as a third state:

public enum ExamTokenStatus { NotIssued, Issued, Revoked, Redeemed }

public sealed record ExamTokenState(ExamTokenStatus Status, Guid ExamId);

IDecisionProjection<ExamTokenState> TokenStatus(Guid tokenId) =>
    new DecisionProjection<ExamTokenState>(
        initialState: new ExamTokenState(ExamTokenStatus.NotIssued, Guid.Empty),
        query: Query.FromItems(new QueryItem
        {
            EventTypes =
            [
                nameof(ExamRegistrationTokenIssuedEvent),
                nameof(ExamRegistrationTokenRevokedEvent),
                nameof(ExamRegistrationTokenRedeemedEvent)
            ],
            Tags = [new Tag("examToken", tokenId.ToString())]
        }),
        apply: (state, evt) => evt.Event.Event switch
        {
            ExamRegistrationTokenIssuedEvent issued => new ExamTokenState(ExamTokenStatus.Issued, issued.ExamId),
            ExamRegistrationTokenRevokedEvent       => state with { Status = ExamTokenStatus.Revoked },
            ExamRegistrationTokenRedeemedEvent      => state with { Status = ExamTokenStatus.Redeemed },
            _                                       => state
        });

// Redeem β€” pattern-match the status; no if/else chains needed
var model = await eventStore.BuildDecisionModelAsync(TokenStatus(command.TokenId));

return model.State.Status switch
{
    ExamTokenStatus.NotIssued => CommandResult.Fail("Token not found."),
    ExamTokenStatus.Revoked   => CommandResult.Fail("Token has been revoked."),
    ExamTokenStatus.Redeemed  => CommandResult.Fail("Token has already been used."),
    _                         => await AppendRedemptionAsync(command, eventStore, model.State, model.AppendCondition)
};

Concurrency safety: Two concurrent redemptions of the same token β€” one succeeds; the other reads Redeemed on retry (via ExecuteDecisionAsync) and receives the appropriate error. Different tokens never contend because each query is scoped to a unique examToken tag value.

See ExamRegistration/ in the sample for issue, redeem, and revoke implementations.


⚑ Performance

Typical Throughput

Benchmarked on Windows 11, .NET 10.0.2, SSD storage (2026-03-11):

Operation Throughput Notes
Append (FlushImmediately = true, single event) ~55 events/sec True durability: event + index files flushed (~18ms/event on SSD)
Append (FlushImmediately = true, batch 10) ~78 events/sec ~13ms/event when amortised over a batch
Append (FlushImmediately = false) ~185 events/sec OS page cache only (testing/dev mode β€” data loss risk on power failure)
Tag query (high selectivity) ~524 ΞΌs Index-based, excellent for targeted queries
Tag query (1K events) ~11.6 ms Sub-linear scaling
ReadLastAsync (100 β†’ 10K events) 948–1,158 ΞΌs Near-O(1): one index lookup + one file read
Read by EventType (10K events) ~206 ms Index-based
Projection rebuild ~4.5 ms / 50 events Write-through; bounded memory regardless of unique key count
Incremental projection update ~4.6 ΞΌs / 0 B ~978Γ— faster than full rebuild; zero allocation

Query Performance by Selectivity

Selectivity 10K Events Performance
High (few matches) ~524 μs ⭐ Excellent - tag index highly effective
Medium (moderate matches) ~5.3 ms βœ… Good - typical use case
Low (many matches) ~103 ms ⚠️ Expected - must deserialize many events

Optimization Tips

βœ… Use SSDs - Flush operations are much faster (10ms vs 50ms+ on HDD)
βœ… Use tag-based queries - ~524ΞΌs for high selectivity vs ~5.3ms for broader queries
βœ… Enable parallel projection rebuilding - MaxConcurrentRebuilds config; Concurrency=4 is ~47 % faster than sequential for large datasets (write-through I/O parallelises well)
βœ… Use incremental projection updates - ~978Γ— faster than full rebuild; zero allocation
βœ… Optimize query selectivity - More specific tags = faster queries
⚠️ Avoid Query.All() for large datasets - Use projections for read models instead
⚠️ Use FlushEventsImmediately = false for testing only (data loss risk on power failure)

Descending Order Queries

βœ… Zero performance overhead - Descending order is as fast as ascending (optimized in-place)

Perfect for:

  • Activity feeds (latest first)
  • Recent orders
  • Audit log displays

Scalability Limits

Opossum is designed for single-server deployments:

Metric Recommended Limit Notes
Events per day < 100,000 ~1 event/second average
Total events < 10 million Performance degrades with file count
Projections < 100 types More = slower startup
Tags per event < 20 Affects index write speed
Concurrent appends < 100 simultaneous File system lock contention

Beyond these limits? Consider cloud-based event stores (EventStoreDB, Azure Event Hubs).

Detailed benchmarks: See docs/benchmarking/results/20260311/

Rebuild performance note (0.5.0): The write-through projection rebuild introduced in 0.5.0 writes each SaveAsync call directly to disk during rebuild rather than accumulating state in memory. For large datasets, sequential rebuild of 4 projections takes ~3.7 s; with MaxConcurrentRebuilds = 4 this drops to ~2.0 s (~47 % faster). This is the expected trade-off for bounded memory (no more OOM with 1 M+ unique keys) and crash-recovery durability. Rebuild is a rare, background operation β€” the memory and safety guarantees outweigh the I/O cost.

IEventStoreAdmin

Administrative operations for store lifecycle management. Resolved from DI as IEventStoreAdmin:

public interface IEventStoreAdmin
{
    // Permanently delete all store data (events, indices, projections, ledger).
    // Write-protected files are handled transparently β€” read-only attributes are stripped.
    // The store directory is recreated automatically on the next AppendAsync/ReadAsync call.
    Task DeleteStoreAsync();
}

πŸ“‘ OpenTelemetry

Opossum emits distributed traces via System.Diagnostics.ActivitySource β€” no extra packages required. Register the activity source name with your OpenTelemetry pipeline:

using Opossum.Telemetry;

tracerProviderBuilder.AddSource(OpossumTelemetry.ActivitySourceName); // "Opossum"

Traced operations:

Activity Operation Name Description
AppendAsync EventStore.Append Every append, including batch appends
ReadAsync EventStore.Read Every query read
ReadLastAsync EventStore.ReadLast Every last-event read
RebuildAsync Projection.Rebuild Every projection rebuild

When no listener is attached the overhead is a single null-check per operation.


⚠️ Known Limitations

Single-Server / Single-Context Design

Opossum is designed as a single-context event store β€” one store name per AddOpossum() call. Multi-tenancy is handled at the application layer (e.g. per-tenant root paths). See ADR-004 for the full rationale.

ProjectionTagIndex Lock Growth (long-running processes, high cardinality)

Each unique projection key that is ever written to a [ProjectionTags]-enabled projection causes a per-key SemaphoreSlim to be allocated and held in memory for the lifetime of the process. For projections with high-cardinality keys (e.g. one projection per order over years), this map grows without bound β€” ~48 bytes per key.

Impact: Negligible for typical deployments (< 100 K unique keys = < 5 MB). Relevant only for long-running processes on high-cardinality projections.

Fix target: 0.6.0 (lock pooling or weak-reference cleanup).


πŸ“š Documentation


🀝 Contributing

Contributions are welcome! Please read CONTRIBUTING.md first.


πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


πŸ™ Acknowledgments

  • Inspired by the DCB Specification
  • Built for real-world use cases in automotive retail and SMB applications

Made with ❀️ for developers who value simplicity and local-first data ownership.

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.5.0-preview.1 36 3/11/2026
0.4.0-preview.3 37 3/4/2026
0.4.0-preview.2 40 3/3/2026
0.4.0-preview.1 43 2/26/2026
0.3.0-preview.1 51 2/23/2026
0.2.0-preview.2 45 2/22/2026
0.2.0-preview.1 51 2/21/2026
0.1.0-preview.1 55 2/15/2026

Breaking changes:
- Rebuild methods (RebuildProjectionAsync, RebuildAllAsync, RebuildMissingProjectionsAsync, ForceRebuildAllAsync) removed from IProjectionManager. Inject IProjectionRebuilder instead.
- ProjectionOptions.EnableAutoRebuild (bool) replaced by AutoRebuild (AutoRebuildMode enum): None | MissingCheckpointsOnly | ForceFullRebuild. Update appsettings.json: "AutoRebuild": "MissingCheckpointsOnly".

Added:
- IProjectionRebuilder: dedicated rebuild orchestrator (RebuildAsync, RebuildAllAsync, GetRebuildStatusAsync).
- AutoRebuildMode enum: None, MissingCheckpointsOnly, ForceFullRebuild.
- RebuildBatchSize option (default 5 000): controls per-batch event load during rebuild.
- RebuildFlushInterval option (default 10 000): crash-recovery journal flush frequency.
- Crash-recovery rebuild journal: rebuild progress survives process crashes; resume from last flush point on next startup.
- ResumeInterruptedRebuildsAsync: automatic resume of interrupted rebuilds on daemon startup.
- Orphaned temp directory cleanup on startup.
- maxCount parameter on IEventStore.ReadAsync for page-by-page iteration.
- Per-batch progress logging during rebuild (events/sec, elapsed time).

Fixed:
- Crash-recovery position collision: orphaned event files from a crash between write and ledger update are no longer silently overwritten. LedgerManager.ReconcileLedgerAsync advances the ledger to match on-disk state on first append after restart; WriteEventAsync is now idempotent (skip if destination exists).
- Corrupt ledger no longer silently resets sequence positions to zero. JsonException now throws InvalidOperationException with recovery guidance instead of returning 0.
- LogReadError nullable StoreName guard.
- Write-through projection rebuild: SaveAsync writes directly to temp directory during rebuild, bounding peak memory to O(batch_size x state_size) regardless of unique key count. Eliminates OOM failures at scale.
- No aggregated metadata index written during rebuild; post-rebuild reads served from per-file embedded metadata.
- Post-rebuild daemon thrashing (UnauthorizedAccessException on Windows) on sparse projections fixed.

Performance (benchmark baseline 2026-03-11):
- Incremental projection update: ~4.6 us / 0 B (zero-allocation hot path confirmed).
- Parallel rebuild Concurrency=4: ~47 % faster than sequential (strengthened vs 0.4.0).
- All core read/write/query paths stable vs 0.4.0-preview.3 baseline.

Full changelog: https://github.com/majormartintibor/Opossum/blob/main/CHANGELOG.md