NetCoreBackend.NArchitecture.Core.Outbox 1.0.0

dotnet add package NetCoreBackend.NArchitecture.Core.Outbox --version 1.0.0
                    
NuGet\Install-Package NetCoreBackend.NArchitecture.Core.Outbox -Version 1.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="NetCoreBackend.NArchitecture.Core.Outbox" Version="1.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="NetCoreBackend.NArchitecture.Core.Outbox" Version="1.0.0" />
                    
Directory.Packages.props
<PackageReference Include="NetCoreBackend.NArchitecture.Core.Outbox" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add NetCoreBackend.NArchitecture.Core.Outbox --version 1.0.0
                    
#r "nuget: NetCoreBackend.NArchitecture.Core.Outbox, 1.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package NetCoreBackend.NArchitecture.Core.Outbox@1.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=NetCoreBackend.NArchitecture.Core.Outbox&version=1.0.0
                    
Install as a Cake Addin
#tool nuget:?package=NetCoreBackend.NArchitecture.Core.Outbox&version=1.0.0
                    
Install as a Cake Tool

Core.Outbox

Transactional Outbox pattern implementasyonu. Atomik DB-write + event publish'i tek bir transaction'da kilitler. Distributed event göndermek için "publish ile DB-commit arasında crash olursa ne olur?" sorusunu çözer.

TL;DR: RabbitMQ / Kafka / SNS gibi dış sistemlere event gönderiyorsan, "DB commit OK ama event publish FAIL" senaryosu sistemini kalıcı inconsistent state'e sokar. Outbox bu senaryoyu imkânsız kılar.


1. Çözülen problem

❌ Klasik kırık pattern

public async Task PlaceOrderAsync(Order order, CancellationToken ct)
{
    _db.Orders.Add(order);
    await _db.SaveChangesAsync(ct);              // 1. DB commit OK

    // ⚠️ Bu satırda crash olursa (network glitch, container OOM kill, process restart):
    //    - DB'de order var
    //    - RabbitMQ'ya event GİTMEDİ
    //    - Inventory service "OrderPlaced" haberini hiç almıyor
    //    - Sistem kalıcı inconsistent state'te → manuel reconciliation gerekir
    await _rabbit.PublishAsync(new OrderPlacedEvent(order.Id), ct);
}

Tersini denersen (önce publish, sonra DB) bu sefer:

  • Publish OK ama DB commit fail → consumer'lar olmayan order'a tepki verir → OrderNotFound exception fırtınası

try/catch + retry ile düzeltmeye çalışmak da çözüm değil: process tamamen ölürse retry mantığı bile çalışmaz.

✅ Outbox çözümü

public async Task PlaceOrderAsync(Order order, CancellationToken ct)
{
    _db.Orders.Add(order);

    // Outbox satırı DA aynı DbContext'e eklenir → tek transaction
    await _outbox.AppendAsync(new OutboxMessage
    {
        Id            = Guid.NewGuid(),
        EventType     = "Orders.Placed.v1",
        Payload       = JsonSerializer.Serialize(new { order.Id, order.Total, order.CustomerId }),
        CorrelationId = _httpContext.TraceIdentifier,
        OccurredAtUtc = DateTime.UtcNow
    }, ct);

    await _db.SaveChangesAsync(ct);  // Order + outbox ATOMIC commit
}

Sonra arka planda OutboxPublisherWorker outbox tablosunu polling eder ve consumer'ın yazdığı IOutboxPublisher'a teslim eder. Üç senaryo:

Senaryo Sonuç
DB commit'ten önce crash İkisi de yazılmaz → kullanıcı 500 alır, retry eder, no inconsistency
Commit OK, sonra crash Outbox satırı persist'tir → worker restart olunca yakalar, RabbitMQ'ya yollar, ProcessedAtUtc stamp'ler
RabbitMQ down Worker fail eder, AttemptCount++, exponential backoff ile retry, MaxAttempts aşılırsa IsPoisoned = true → operator inceler. Event kaybı yok.

2. Akış diyagramı

┌─────────────────────────────┐
│ HTTP request / Command      │
│   PlaceOrderCommand         │
└──────────────┬──────────────┘
               │
               ▼
┌─────────────────────────────────────────────────┐
│ Handler                                         │
│   _db.Orders.Add(order)                         │
│   _outbox.AppendAsync(orderPlacedMessage)       │
│   await _db.SaveChangesAsync()  ◄── ATOMIC      │
└──────────────┬──────────────────────────────────┘
               │
               ▼
       ┌───────────────┐
       │   Database    │
       │ ┌───────────┐ │       ┌───────────────────────┐
       │ │ Orders    │ │       │ OutboxPublisherWorker │
       │ ├───────────┤ │       │  (BackgroundService)  │
       │ │ Outbox    │◄┼───────┤  - FetchDueAsync      │
       │ └───────────┘ │       │  - PublishAsync       │
       └───────────────┘       │  - MarkProcessed /    │
                               │    RecordFailure      │
                               └──────────┬────────────┘
                                          │
                                          ▼
                               ┌──────────────────────┐
                               │  IOutboxPublisher    │
                               │  (consumer impl)     │
                               │   → RabbitMQ /       │
                               │     Kafka / SNS /    │
                               │     MediatR / vb.    │
                               └──────────────────────┘

3. Tam kurulum — Order servisi örneği

3.1 DbContext'e outbox tablosunu ekle

using Microsoft.EntityFrameworkCore;
using NetCoreBackend.NArchitecture.Core.Outbox.EfPersistence;
using NetCoreBackend.NArchitecture.Core.Outbox.Entities;

public class AppDbContext : DbContext
{
    public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }

    public DbSet<Order> Orders => Set<Order>();
    public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);

        // Outbox tablosu + dispatch index'i (hot-path polling için filtered index)
        modelBuilder.ConfigureOutbox();

        // ... senin diğer entity konfigürasyonların
    }
}

Migration ekle:

dotnet ef migrations add AddOutbox
dotnet ef database update

3.2 RabbitMQ publisher'ı yaz

Framework IOutboxPublisherboş bırakıyor — çünkü her consumer farklı broker / topic / serializer kullanır. Sen kendi şirketinin konvansiyonuna uygun şekilde implement edersin.

using System.Text;
using RabbitMQ.Client;
using NetCoreBackend.NArchitecture.Core.Outbox.Abstractions;
using NetCoreBackend.NArchitecture.Core.Outbox.Entities;

public sealed class RabbitMqOutboxPublisher : IOutboxPublisher, IAsyncDisposable
{
    private const string ExchangeName = "events";  // topic exchange
    private readonly IConnection _connection;
    private readonly ILogger<RabbitMqOutboxPublisher> _logger;

    public RabbitMqOutboxPublisher(IConnection connection, ILogger<RabbitMqOutboxPublisher> logger)
    {
        _connection = connection;
        _logger = logger;
    }

    public async Task PublishAsync(OutboxMessage message, CancellationToken cancellationToken)
    {
        await using IChannel channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);

        // Routing key olarak EventType kullan — "Orders.Placed.v1" → topic.* binding'lerine
        // göre tüketici servislere dağıtılır.
        BasicProperties props = new()
        {
            MessageId     = message.Id.ToString(),
            CorrelationId = message.CorrelationId,
            ContentType   = "application/json",
            Type          = message.EventType,
            Timestamp     = new AmqpTimestamp(new DateTimeOffset(message.OccurredAtUtc).ToUnixTimeSeconds()),
            Persistent    = true  // disk-backed → broker restart'ında kaybolmaz
        };

        await channel.BasicPublishAsync(
            exchange:    ExchangeName,
            routingKey:  message.EventType,
            mandatory:   false,
            basicProperties: props,
            body:        Encoding.UTF8.GetBytes(message.Payload),
            cancellationToken: cancellationToken);

        // Burada throw atarsan OutboxPublisherWorker yakalar, AttemptCount++,
        // exponential backoff ile retry eder. Atmazsan MarkProcessedAsync çağrılır.
    }

    public async ValueTask DisposeAsync()
    {
        await _connection.DisposeAsync();
    }
}

3.3 Program.cs — DI wiring

using NetCoreBackend.NArchitecture.Core.Outbox.Abstractions;
using NetCoreBackend.NArchitecture.Core.Outbox.DependencyInjection;
using RabbitMQ.Client;

var builder = WebApplication.CreateBuilder(args);

// 1. DbContext (SQL Server / Postgres / vb.)
builder.Services.AddDbContext<AppDbContext>(opt =>
    opt.UseSqlServer(builder.Configuration.GetConnectionString("AppDb")));

// 2. Multi-tenancy — multi-tenant SaaS senaryosunda ZORUNLU. AddOutbox'tan önce gelmeli;
// EfOutboxStore.AppendAsync TenantId stamp'i için ITenantEntitySetter resolve eder.
// Setter yoksa + msg.TenantId boşsa append loud error fırlatır (orphan row yok).
builder.Services.AddMultiTenancy();

// 3. Outbox store + worker.
// OutboxOptions startup'ta ValidateOnStart() ile Validate() çağırır —
// BatchSize=0 / MaxRetryDelay<BaseRetryDelay gibi misconfiguration host build'te fail eder.
builder.Services.AddOutbox<AppDbContext>(opt =>
{
    opt.BatchSize      = 100;
    opt.MaxAttempts    = 8;
    opt.IdlePollDelay  = TimeSpan.FromSeconds(2);
    opt.BaseRetryDelay = TimeSpan.FromSeconds(2);
    opt.MaxRetryDelay  = TimeSpan.FromMinutes(10);
});

// 4. RabbitMQ connection — Singleton (connection pahalı, channel ucuz)
builder.Services.AddSingleton<IConnection>(sp =>
{
    var factory = new ConnectionFactory
    {
        Uri = new Uri(builder.Configuration.GetConnectionString("RabbitMq")!),
        AutomaticRecoveryEnabled = true,
        NetworkRecoveryInterval  = TimeSpan.FromSeconds(10)
    };
    return factory.CreateConnectionAsync().GetAwaiter().GetResult();
});

// 5. Senin publisher — Scoped (worker her batch için yeni scope açar)
builder.Services.AddScoped<IOutboxPublisher, RabbitMqOutboxPublisher>();

var app = builder.Build();
app.Run();

3.4 Handler içinde kullanım

public sealed class PlaceOrderHandler : IRequestHandler<PlaceOrderCommand, Guid>
{
    private readonly AppDbContext _db;
    private readonly IOutboxStore _outbox;
    private readonly IHttpContextAccessor _http;

    public PlaceOrderHandler(AppDbContext db, IOutboxStore outbox, IHttpContextAccessor http)
    {
        _db = db;
        _outbox = outbox;
        _http = http;
    }

    public async Task<Guid> Handle(PlaceOrderCommand cmd, CancellationToken ct)
    {
        Order order = new()
        {
            Id         = Guid.NewGuid(),
            CustomerId = cmd.CustomerId,
            Total      = cmd.Total,
            CreatedAt  = DateTime.UtcNow
        };
        _db.Orders.Add(order);

        await _outbox.AppendAsync(new OutboxMessage
        {
            Id            = Guid.NewGuid(),
            EventType     = "Orders.Placed.v1",
            Payload       = JsonSerializer.Serialize(new
            {
                orderId    = order.Id,
                customerId = order.CustomerId,
                total      = order.Total
            }),
            CorrelationId = _http.HttpContext?.TraceIdentifier,
            OccurredAtUtc = DateTime.UtcNow
        }, ct);

        // ATOMIC: order + outbox row tek transaction'da. Crash olursa ikisi de gitmez.
        await _db.SaveChangesAsync(ct);

        return order.Id;
    }
}

Önemli: AppendAsync çağrısı SaveChanges etmiyor — sadece DbContext'e ekliyor. Atomicity'i SaveChangesAsync sağlar. Eğer TransactionScopeBehavior (Core.Application pipeline) kullanıyorsan otomatik transaction da iş görür.


4. Components

Type Rol
OutboxMessage TenantEntity<Guid>EventType, Payload, CorrelationId, retry bookkeeping (AttemptCount, NextAttemptUtc, IsPoisoned, Error)
IOutboxStore Storage soyutlaması — AppendAsync, FetchDueAsync, MarkProcessedAsync, RecordFailureAsync
EfOutboxStore<TDbContext> EF Core implementasyonu — consumer DbContext üzerine binili, default scoped impl
IOutboxPublisher Consumer implementer — gerçek event'i broker'a shipping eden tek interface
OutboxPublisherWorker BackgroundService — polling + retry + poison-pill handling + per-message failure isolation
OutboxOptions BatchSize / IdlePollDelay / MaxAttempts / BaseRetryDelay / MaxRetryDelay
ConfigureOutbox() ModelBuilder extension — outbox tablosu + filtered dispatch index

5. Retry policy

Exponential backoff: attempt n → BaseRetryDelay × 2^(n-1), MaxRetryDelay ile cap'li.

Default ayarlarla (BaseRetryDelay=2s, MaxAttempts=8, MaxRetryDelay=10m) program:

Attempt Delay
1 2s
2 4s
3 8s
4 16s
5 32s
6 1m 4s
7 2m 8s
8 4m 16s
9 POISONED (IsPoisoned = true, dispatch durur, operator inceler)

6. Failure isolation

OutboxPublisherWorker üç katman koruma kullanır:

  1. Per-message try/catch — bir mesajın PublishAsync'te throw atması batch'in geri kalanını etkilemez; sadece o satırın retry counter'ı artar.
  2. Batch-level try/catchFetchDueAsync wholesale fail ederse (DB outage), worker log'lar ve IdlePollDelay kadar bekleyip yeniden dener. CPU spin yok.
  3. Cancellation respectOperationCanceledException host shutdown'ı temsil eder; mesaj penalize edilmez, sıradaki worker run'da yeniden fetch edilir.

7. Worker lifecycle

Host start
   ↓
ExecuteAsync loop:
   ├─ scope = scopeFactory.CreateAsyncScope()
   ├─ store     = scope.GetRequiredService<IOutboxStore>()
   ├─ publisher = scope.GetRequiredService<IOutboxPublisher>()
   ├─ due       = await store.FetchDueAsync(batchSize, ct)
   │
   ├─ if (due.Count == 0)
   │     await Task.Delay(IdlePollDelay)
   │     continue
   │
   ├─ foreach (msg in due)
   │     try { await publisher.PublishAsync(msg); store.MarkProcessed(msg); }
   │     catch { store.RecordFailure(msg, ...); }
   │
   └─ loop back immediately (drain backlog)
   ↓
Host stop → ExecuteAsync exits gracefully

Her batch için yeni DI scope açılır — DbContext per-batch tracker state'i biriktirmez, IOutboxPublisher'ın scoped dependency'leri (RabbitMQ channel gibi) düzgün dispose olur.


7.1 Horizontal scaling — bilinen sınırlama

OutboxPublisherWorker tek replica çalışacak şekilde tasarlandı. Birden çok replica aynı anda FetchDueAsync çağırırsa aynı row'u iki kez publish edebilir (pessimistic lock / FOR UPDATE SKIP LOCKED framework'te yok — provider-specific).

Pratik çözümler

Yaklaşım Açıklama
Tek replica (önerilen) K8s Deployment.replicas: 1 veya bir tek BackgroundService host'u. Worker zaten cheap (polling-based), throughput için BatchSize artırmak çoğu zaman replicas çoğaltmaktan iyi sonuç verir.
Leader election Birden çok host varsa, sadece "leader" worker'ın çalışmasına izin ver (ZooKeeper / Consul / Redis lock). Worker'a IHostedService koşulu ekleyip lock alamayan instance hemen exit eder.
Custom store + FOR UPDATE SKIP LOCKED IOutboxStore'u kendi raw-SQL implementasyonunla değiştir; Postgres/MySQL/SQL Server için SELECT … FOR UPDATE SKIP LOCKED ile multiple worker safe consume sağlar.
Idempotent consumer Publisher'ın throw atmadığı ama mesajın çift teslim edildiği senaryoda consumer tarafında idempotency key kontrolü zaten varsa, duplicate publish veri açısından zararsız olur.

Default kurulum (tek replica) çoğu SaaS için yeterli — outbox iş yükü typically saniyede yüzlerce mesaj seviyesindedir, batch=100 ile rahatlıkla karşılanır.


7.2 Multi-tenant semantik

Outbox TenantEntity — her satır TenantId'ye bağlı, ama iki path farklı davranır:

Write path (handler → AppendAsync)

AppendAsync TenantId == Guid.Empty ise mevcut ITenantEntitySetter'dan stamp eder. Tipik akış:

Request → TenantMiddleware → TenantContext.SetTenant(tenantA)
   → PlaceOrderHandler → _outbox.AppendAsync(msg)   // msg.TenantId boş
   → EfOutboxStore stamp eder: msg.TenantId = tenantA
   → SaveChangesAsync atomic commit

Tenant context yoksa + ITenantEntitySetter register edilmemişse loud error — orphan row asla yazılmaz.

Read path (worker → FetchDueAsync)

OutboxPublisherWorker BackgroundService'tir. HttpContext yok, TenantMiddleware çalışmamış, tenant context boş. Worker by-design cross-tenant — tüm tenant'ların outbox'ını drain etmeli, yoksa hiç event ship'lenmez.

Bu yüzden FetchDueAsync IgnoreQueryFilters() çağırır. Consumer'ın DbContext'inde OutboxMessage üzerinde tenant query filter olsa bile worker tüm satırları görür.

Per-tenant routing istiyorsan IOutboxPublisher.PublishAsync içinde message.TenantId'yi okuyup tenant-specific topic/queue'ya yönlendirebilirsin — TenantId payload'a yazılmasa bile entity'de mevcut.

Özet kontrat

Path Tenant context lazım mı? Filter davranışı
AppendAsync Evet — TenantSetter'dan stamp ya da msg.TenantId explicit set
FetchDueAsync Hayır — worker tüm tenant'ları görür IgnoreQueryFilters()
MarkProcessedAsync / RecordFailureAsync Hayır — entity zaten tracker'da, sadece SaveChanges

7.3 DbContext isolation — bilinen sınırlama

EfOutboxStore.MarkProcessedAsync ve RecordFailureAsync SaveChangesAsync çağırır — bu DbContext'teki TÜM tracked değişiklikleri commit eder, sadece outbox row'unu değil.

  • Worker yolu güvenli: OutboxPublisherWorker her batch için fresh DI scope açar → DbContext sadece outbox row'larını tracker'da tutar. ✅
  • Consumer dikkat etmeli: IOutboxStore'u request-scoped DbContext üzerinden başka bir handler'dan kullanıyorsan, SaveChangesAsync request'te tracked olan diğer entity'leri de commit edebilir.

Pratik öneri: IOutboxStore'u sadece AppendAsync için kullan; MarkProcessed/RecordFailure'ı worker'a bırak. AppendAsync zaten SaveChangesAsync çağırmıyor — sadece DbContext'e Add ediyor, atomicity'i caller'ın transaction'ı sağlıyor.


8. Sana ne zaman lazım?

Senaryo Outbox lazım mı?
Microservices arası event-driven iletişim (RabbitMQ / Kafka / SNS / Azure Service Bus) Kesinlikle
Webhook gönderimi (Stripe-style: kendi consumer'larına notify) ✅ Evet
Email/SMS notification — "user kaydını commit edince mail at" ✅ Evet
Search index sync (PostgreSQL'e yaz → Elasticsearch'e index) ✅ Evet
Audit log shipping (external SIEM'e) ✅ Evet
Sadece monolith, dış sistem yok, in-process MediatR notification ❌ Gereksiz — direkt INotificationPublisher yeterli
"Best-effort" yeterli, event kaybı tolere ediliyor ⚠️ Belki — basit _broker.PublishAsync da iş görür

9. Konfigürasyon

Tam appsettings + connection string örneği için bkz. SETUP.md → Outbox & RabbitMQ Configuration.


10. İlgili dosyalar

  • Entities/OutboxMessage.cs
  • Abstractions/IOutboxStore.cs, Abstractions/IOutboxPublisher.cs
  • EfPersistence/EfOutboxStore.cs, EfPersistence/EfOutboxModelExtensions.cs
  • Worker/OutboxPublisherWorker.cs, Worker/OutboxOptions.cs
  • ../Core.Outbox.DependencyInjection/OutboxServiceRegistration.cs
Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on NetCoreBackend.NArchitecture.Core.Outbox:

Package Downloads
NetCoreBackend.NArchitecture.Core.Outbox.DependencyInjection

DI registration for Core.Outbox.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.0.0 93 6/2/2026