NetCoreBackend.NArchitecture.Core.Outbox
1.0.0
dotnet add package NetCoreBackend.NArchitecture.Core.Outbox --version 1.0.0
NuGet\Install-Package NetCoreBackend.NArchitecture.Core.Outbox -Version 1.0.0
<PackageReference Include="NetCoreBackend.NArchitecture.Core.Outbox" Version="1.0.0" />
<PackageVersion Include="NetCoreBackend.NArchitecture.Core.Outbox" Version="1.0.0" />
<PackageReference Include="NetCoreBackend.NArchitecture.Core.Outbox" />
paket add NetCoreBackend.NArchitecture.Core.Outbox --version 1.0.0
#r "nuget: NetCoreBackend.NArchitecture.Core.Outbox, 1.0.0"
#:package NetCoreBackend.NArchitecture.Core.Outbox@1.0.0
#addin nuget:?package=NetCoreBackend.NArchitecture.Core.Outbox&version=1.0.0
#tool nuget:?package=NetCoreBackend.NArchitecture.Core.Outbox&version=1.0.0
Core.Outbox
Transactional Outbox pattern implementasyonu. Atomik DB-write + event publish'i tek bir transaction'da kilitler. Distributed event göndermek için "publish ile DB-commit arasında crash olursa ne olur?" sorusunu çözer.
TL;DR: RabbitMQ / Kafka / SNS gibi dış sistemlere event gönderiyorsan, "DB commit OK ama event publish FAIL" senaryosu sistemini kalıcı inconsistent state'e sokar. Outbox bu senaryoyu imkânsız kılar.
1. Çözülen problem
❌ Klasik kırık pattern
public async Task PlaceOrderAsync(Order order, CancellationToken ct)
{
_db.Orders.Add(order);
await _db.SaveChangesAsync(ct); // 1. DB commit OK
// ⚠️ Bu satırda crash olursa (network glitch, container OOM kill, process restart):
// - DB'de order var
// - RabbitMQ'ya event GİTMEDİ
// - Inventory service "OrderPlaced" haberini hiç almıyor
// - Sistem kalıcı inconsistent state'te → manuel reconciliation gerekir
await _rabbit.PublishAsync(new OrderPlacedEvent(order.Id), ct);
}
Tersini denersen (önce publish, sonra DB) bu sefer:
- Publish OK ama DB commit fail → consumer'lar olmayan order'a tepki verir →
OrderNotFoundexception fırtınası
try/catch + retry ile düzeltmeye çalışmak da çözüm değil: process tamamen ölürse retry mantığı bile çalışmaz.
✅ Outbox çözümü
public async Task PlaceOrderAsync(Order order, CancellationToken ct)
{
_db.Orders.Add(order);
// Outbox satırı DA aynı DbContext'e eklenir → tek transaction
await _outbox.AppendAsync(new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = "Orders.Placed.v1",
Payload = JsonSerializer.Serialize(new { order.Id, order.Total, order.CustomerId }),
CorrelationId = _httpContext.TraceIdentifier,
OccurredAtUtc = DateTime.UtcNow
}, ct);
await _db.SaveChangesAsync(ct); // Order + outbox ATOMIC commit
}
Sonra arka planda OutboxPublisherWorker outbox tablosunu polling eder ve consumer'ın yazdığı IOutboxPublisher'a teslim eder. Üç senaryo:
| Senaryo | Sonuç |
|---|---|
| DB commit'ten önce crash | İkisi de yazılmaz → kullanıcı 500 alır, retry eder, no inconsistency |
| Commit OK, sonra crash | Outbox satırı persist'tir → worker restart olunca yakalar, RabbitMQ'ya yollar, ProcessedAtUtc stamp'ler |
| RabbitMQ down | Worker fail eder, AttemptCount++, exponential backoff ile retry, MaxAttempts aşılırsa IsPoisoned = true → operator inceler. Event kaybı yok. |
2. Akış diyagramı
┌─────────────────────────────┐
│ HTTP request / Command │
│ PlaceOrderCommand │
└──────────────┬──────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ Handler │
│ _db.Orders.Add(order) │
│ _outbox.AppendAsync(orderPlacedMessage) │
│ await _db.SaveChangesAsync() ◄── ATOMIC │
└──────────────┬──────────────────────────────────┘
│
▼
┌───────────────┐
│ Database │
│ ┌───────────┐ │ ┌───────────────────────┐
│ │ Orders │ │ │ OutboxPublisherWorker │
│ ├───────────┤ │ │ (BackgroundService) │
│ │ Outbox │◄┼───────┤ - FetchDueAsync │
│ └───────────┘ │ │ - PublishAsync │
└───────────────┘ │ - MarkProcessed / │
│ RecordFailure │
└──────────┬────────────┘
│
▼
┌──────────────────────┐
│ IOutboxPublisher │
│ (consumer impl) │
│ → RabbitMQ / │
│ Kafka / SNS / │
│ MediatR / vb. │
└──────────────────────┘
3. Tam kurulum — Order servisi örneği
3.1 DbContext'e outbox tablosunu ekle
using Microsoft.EntityFrameworkCore;
using NetCoreBackend.NArchitecture.Core.Outbox.EfPersistence;
using NetCoreBackend.NArchitecture.Core.Outbox.Entities;
public class AppDbContext : DbContext
{
public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }
public DbSet<Order> Orders => Set<Order>();
public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
// Outbox tablosu + dispatch index'i (hot-path polling için filtered index)
modelBuilder.ConfigureOutbox();
// ... senin diğer entity konfigürasyonların
}
}
Migration ekle:
dotnet ef migrations add AddOutbox
dotnet ef database update
3.2 RabbitMQ publisher'ı yaz
Framework
IOutboxPublisher'ı boş bırakıyor — çünkü her consumer farklı broker / topic / serializer kullanır. Sen kendi şirketinin konvansiyonuna uygun şekilde implement edersin.
using System.Text;
using RabbitMQ.Client;
using NetCoreBackend.NArchitecture.Core.Outbox.Abstractions;
using NetCoreBackend.NArchitecture.Core.Outbox.Entities;
public sealed class RabbitMqOutboxPublisher : IOutboxPublisher, IAsyncDisposable
{
private const string ExchangeName = "events"; // topic exchange
private readonly IConnection _connection;
private readonly ILogger<RabbitMqOutboxPublisher> _logger;
public RabbitMqOutboxPublisher(IConnection connection, ILogger<RabbitMqOutboxPublisher> logger)
{
_connection = connection;
_logger = logger;
}
public async Task PublishAsync(OutboxMessage message, CancellationToken cancellationToken)
{
await using IChannel channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
// Routing key olarak EventType kullan — "Orders.Placed.v1" → topic.* binding'lerine
// göre tüketici servislere dağıtılır.
BasicProperties props = new()
{
MessageId = message.Id.ToString(),
CorrelationId = message.CorrelationId,
ContentType = "application/json",
Type = message.EventType,
Timestamp = new AmqpTimestamp(new DateTimeOffset(message.OccurredAtUtc).ToUnixTimeSeconds()),
Persistent = true // disk-backed → broker restart'ında kaybolmaz
};
await channel.BasicPublishAsync(
exchange: ExchangeName,
routingKey: message.EventType,
mandatory: false,
basicProperties: props,
body: Encoding.UTF8.GetBytes(message.Payload),
cancellationToken: cancellationToken);
// Burada throw atarsan OutboxPublisherWorker yakalar, AttemptCount++,
// exponential backoff ile retry eder. Atmazsan MarkProcessedAsync çağrılır.
}
public async ValueTask DisposeAsync()
{
await _connection.DisposeAsync();
}
}
3.3 Program.cs — DI wiring
using NetCoreBackend.NArchitecture.Core.Outbox.Abstractions;
using NetCoreBackend.NArchitecture.Core.Outbox.DependencyInjection;
using RabbitMQ.Client;
var builder = WebApplication.CreateBuilder(args);
// 1. DbContext (SQL Server / Postgres / vb.)
builder.Services.AddDbContext<AppDbContext>(opt =>
opt.UseSqlServer(builder.Configuration.GetConnectionString("AppDb")));
// 2. Multi-tenancy — multi-tenant SaaS senaryosunda ZORUNLU. AddOutbox'tan önce gelmeli;
// EfOutboxStore.AppendAsync TenantId stamp'i için ITenantEntitySetter resolve eder.
// Setter yoksa + msg.TenantId boşsa append loud error fırlatır (orphan row yok).
builder.Services.AddMultiTenancy();
// 3. Outbox store + worker.
// OutboxOptions startup'ta ValidateOnStart() ile Validate() çağırır —
// BatchSize=0 / MaxRetryDelay<BaseRetryDelay gibi misconfiguration host build'te fail eder.
builder.Services.AddOutbox<AppDbContext>(opt =>
{
opt.BatchSize = 100;
opt.MaxAttempts = 8;
opt.IdlePollDelay = TimeSpan.FromSeconds(2);
opt.BaseRetryDelay = TimeSpan.FromSeconds(2);
opt.MaxRetryDelay = TimeSpan.FromMinutes(10);
});
// 4. RabbitMQ connection — Singleton (connection pahalı, channel ucuz)
builder.Services.AddSingleton<IConnection>(sp =>
{
var factory = new ConnectionFactory
{
Uri = new Uri(builder.Configuration.GetConnectionString("RabbitMq")!),
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};
return factory.CreateConnectionAsync().GetAwaiter().GetResult();
});
// 5. Senin publisher — Scoped (worker her batch için yeni scope açar)
builder.Services.AddScoped<IOutboxPublisher, RabbitMqOutboxPublisher>();
var app = builder.Build();
app.Run();
3.4 Handler içinde kullanım
public sealed class PlaceOrderHandler : IRequestHandler<PlaceOrderCommand, Guid>
{
private readonly AppDbContext _db;
private readonly IOutboxStore _outbox;
private readonly IHttpContextAccessor _http;
public PlaceOrderHandler(AppDbContext db, IOutboxStore outbox, IHttpContextAccessor http)
{
_db = db;
_outbox = outbox;
_http = http;
}
public async Task<Guid> Handle(PlaceOrderCommand cmd, CancellationToken ct)
{
Order order = new()
{
Id = Guid.NewGuid(),
CustomerId = cmd.CustomerId,
Total = cmd.Total,
CreatedAt = DateTime.UtcNow
};
_db.Orders.Add(order);
await _outbox.AppendAsync(new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = "Orders.Placed.v1",
Payload = JsonSerializer.Serialize(new
{
orderId = order.Id,
customerId = order.CustomerId,
total = order.Total
}),
CorrelationId = _http.HttpContext?.TraceIdentifier,
OccurredAtUtc = DateTime.UtcNow
}, ct);
// ATOMIC: order + outbox row tek transaction'da. Crash olursa ikisi de gitmez.
await _db.SaveChangesAsync(ct);
return order.Id;
}
}
Önemli:
AppendAsyncçağrısıSaveChangesetmiyor — sadece DbContext'e ekliyor. Atomicity'iSaveChangesAsyncsağlar. EğerTransactionScopeBehavior(Core.Application pipeline) kullanıyorsan otomatik transaction da iş görür.
4. Components
| Type | Rol |
|---|---|
OutboxMessage |
TenantEntity<Guid> — EventType, Payload, CorrelationId, retry bookkeeping (AttemptCount, NextAttemptUtc, IsPoisoned, Error) |
IOutboxStore |
Storage soyutlaması — AppendAsync, FetchDueAsync, MarkProcessedAsync, RecordFailureAsync |
EfOutboxStore<TDbContext> |
EF Core implementasyonu — consumer DbContext üzerine binili, default scoped impl |
IOutboxPublisher |
Consumer implementer — gerçek event'i broker'a shipping eden tek interface |
OutboxPublisherWorker |
BackgroundService — polling + retry + poison-pill handling + per-message failure isolation |
OutboxOptions |
BatchSize / IdlePollDelay / MaxAttempts / BaseRetryDelay / MaxRetryDelay |
ConfigureOutbox() |
ModelBuilder extension — outbox tablosu + filtered dispatch index |
5. Retry policy
Exponential backoff: attempt n → BaseRetryDelay × 2^(n-1), MaxRetryDelay ile cap'li.
Default ayarlarla (BaseRetryDelay=2s, MaxAttempts=8, MaxRetryDelay=10m) program:
| Attempt | Delay |
|---|---|
| 1 | 2s |
| 2 | 4s |
| 3 | 8s |
| 4 | 16s |
| 5 | 32s |
| 6 | 1m 4s |
| 7 | 2m 8s |
| 8 | 4m 16s |
| 9 | POISONED (IsPoisoned = true, dispatch durur, operator inceler) |
6. Failure isolation
OutboxPublisherWorker üç katman koruma kullanır:
- Per-message try/catch — bir mesajın
PublishAsync'te throw atması batch'in geri kalanını etkilemez; sadece o satırın retry counter'ı artar. - Batch-level try/catch —
FetchDueAsyncwholesale fail ederse (DB outage), worker log'lar veIdlePollDelaykadar bekleyip yeniden dener. CPU spin yok. - Cancellation respect —
OperationCanceledExceptionhost shutdown'ı temsil eder; mesaj penalize edilmez, sıradaki worker run'da yeniden fetch edilir.
7. Worker lifecycle
Host start
↓
ExecuteAsync loop:
├─ scope = scopeFactory.CreateAsyncScope()
├─ store = scope.GetRequiredService<IOutboxStore>()
├─ publisher = scope.GetRequiredService<IOutboxPublisher>()
├─ due = await store.FetchDueAsync(batchSize, ct)
│
├─ if (due.Count == 0)
│ await Task.Delay(IdlePollDelay)
│ continue
│
├─ foreach (msg in due)
│ try { await publisher.PublishAsync(msg); store.MarkProcessed(msg); }
│ catch { store.RecordFailure(msg, ...); }
│
└─ loop back immediately (drain backlog)
↓
Host stop → ExecuteAsync exits gracefully
Her batch için yeni DI scope açılır — DbContext per-batch tracker state'i biriktirmez, IOutboxPublisher'ın scoped dependency'leri (RabbitMQ channel gibi) düzgün dispose olur.
7.1 Horizontal scaling — bilinen sınırlama
OutboxPublisherWorker tek replica çalışacak şekilde tasarlandı. Birden çok replica aynı anda FetchDueAsync çağırırsa aynı row'u iki kez publish edebilir (pessimistic lock / FOR UPDATE SKIP LOCKED framework'te yok — provider-specific).
Pratik çözümler
| Yaklaşım | Açıklama |
|---|---|
| Tek replica (önerilen) | K8s Deployment.replicas: 1 veya bir tek BackgroundService host'u. Worker zaten cheap (polling-based), throughput için BatchSize artırmak çoğu zaman replicas çoğaltmaktan iyi sonuç verir. |
| Leader election | Birden çok host varsa, sadece "leader" worker'ın çalışmasına izin ver (ZooKeeper / Consul / Redis lock). Worker'a IHostedService koşulu ekleyip lock alamayan instance hemen exit eder. |
| Custom store + FOR UPDATE SKIP LOCKED | IOutboxStore'u kendi raw-SQL implementasyonunla değiştir; Postgres/MySQL/SQL Server için SELECT … FOR UPDATE SKIP LOCKED ile multiple worker safe consume sağlar. |
| Idempotent consumer | Publisher'ın throw atmadığı ama mesajın çift teslim edildiği senaryoda consumer tarafında idempotency key kontrolü zaten varsa, duplicate publish veri açısından zararsız olur. |
Default kurulum (tek replica) çoğu SaaS için yeterli — outbox iş yükü typically saniyede yüzlerce mesaj seviyesindedir, batch=100 ile rahatlıkla karşılanır.
7.2 Multi-tenant semantik
Outbox TenantEntity — her satır TenantId'ye bağlı, ama iki path farklı davranır:
Write path (handler → AppendAsync)
AppendAsync TenantId == Guid.Empty ise mevcut ITenantEntitySetter'dan stamp eder. Tipik akış:
Request → TenantMiddleware → TenantContext.SetTenant(tenantA)
→ PlaceOrderHandler → _outbox.AppendAsync(msg) // msg.TenantId boş
→ EfOutboxStore stamp eder: msg.TenantId = tenantA
→ SaveChangesAsync atomic commit
Tenant context yoksa + ITenantEntitySetter register edilmemişse loud error — orphan row asla yazılmaz.
Read path (worker → FetchDueAsync)
OutboxPublisherWorker BackgroundService'tir. HttpContext yok, TenantMiddleware çalışmamış, tenant context boş. Worker by-design cross-tenant — tüm tenant'ların outbox'ını drain etmeli, yoksa hiç event ship'lenmez.
Bu yüzden FetchDueAsync IgnoreQueryFilters() çağırır. Consumer'ın DbContext'inde OutboxMessage üzerinde tenant query filter olsa bile worker tüm satırları görür.
Per-tenant routing istiyorsan
IOutboxPublisher.PublishAsynciçindemessage.TenantId'yi okuyup tenant-specific topic/queue'ya yönlendirebilirsin — TenantId payload'a yazılmasa bile entity'de mevcut.
Özet kontrat
| Path | Tenant context lazım mı? | Filter davranışı |
|---|---|---|
AppendAsync |
Evet — TenantSetter'dan stamp ya da msg.TenantId explicit set |
— |
FetchDueAsync |
Hayır — worker tüm tenant'ları görür | IgnoreQueryFilters() |
MarkProcessedAsync / RecordFailureAsync |
Hayır — entity zaten tracker'da, sadece SaveChanges | — |
7.3 DbContext isolation — bilinen sınırlama
EfOutboxStore.MarkProcessedAsync ve RecordFailureAsync SaveChangesAsync çağırır — bu DbContext'teki TÜM tracked değişiklikleri commit eder, sadece outbox row'unu değil.
- Worker yolu güvenli:
OutboxPublisherWorkerher batch için fresh DI scope açar → DbContext sadece outbox row'larını tracker'da tutar. ✅ - Consumer dikkat etmeli:
IOutboxStore'u request-scoped DbContext üzerinden başka bir handler'dan kullanıyorsan,SaveChangesAsyncrequest'te tracked olan diğer entity'leri de commit edebilir.
Pratik öneri: IOutboxStore'u sadece AppendAsync için kullan; MarkProcessed/RecordFailure'ı worker'a bırak. AppendAsync zaten SaveChangesAsync çağırmıyor — sadece DbContext'e Add ediyor, atomicity'i caller'ın transaction'ı sağlıyor.
8. Sana ne zaman lazım?
| Senaryo | Outbox lazım mı? |
|---|---|
| Microservices arası event-driven iletişim (RabbitMQ / Kafka / SNS / Azure Service Bus) | ✅ Kesinlikle |
| Webhook gönderimi (Stripe-style: kendi consumer'larına notify) | ✅ Evet |
| Email/SMS notification — "user kaydını commit edince mail at" | ✅ Evet |
| Search index sync (PostgreSQL'e yaz → Elasticsearch'e index) | ✅ Evet |
| Audit log shipping (external SIEM'e) | ✅ Evet |
| Sadece monolith, dış sistem yok, in-process MediatR notification | ❌ Gereksiz — direkt INotificationPublisher yeterli |
| "Best-effort" yeterli, event kaybı tolere ediliyor | ⚠️ Belki — basit _broker.PublishAsync da iş görür |
9. Konfigürasyon
Tam appsettings + connection string örneği için bkz. SETUP.md → Outbox & RabbitMQ Configuration.
10. İlgili dosyalar
Entities/OutboxMessage.csAbstractions/IOutboxStore.cs,Abstractions/IOutboxPublisher.csEfPersistence/EfOutboxStore.cs,EfPersistence/EfOutboxModelExtensions.csWorker/OutboxPublisherWorker.cs,Worker/OutboxOptions.cs../Core.Outbox.DependencyInjection/OutboxServiceRegistration.cs
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Microsoft.EntityFrameworkCore (>= 10.0.5)
- Microsoft.EntityFrameworkCore.InMemory (>= 10.0.5)
- Microsoft.EntityFrameworkCore.Relational (>= 10.0.5)
- Microsoft.Extensions.Caching.Abstractions (>= 10.0.5)
- Microsoft.Extensions.Caching.Memory (>= 10.0.5)
- Microsoft.Extensions.Configuration.Abstractions (>= 10.0.5)
- Microsoft.Extensions.DependencyInjection (>= 10.0.5)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.5)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.5)
- Microsoft.Extensions.Logging (>= 10.0.5)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.5)
- NetCoreBackend.NArchitecture.Core.Persistence (>= 1.0.0)
- System.Linq.Dynamic.Core (>= 1.7.1)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on NetCoreBackend.NArchitecture.Core.Outbox:
| Package | Downloads |
|---|---|
|
NetCoreBackend.NArchitecture.Core.Outbox.DependencyInjection
DI registration for Core.Outbox. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0 | 93 | 6/2/2026 |