Ledbim.Messaging
1.2.0
dotnet add package Ledbim.Messaging --version 1.2.0
NuGet\Install-Package Ledbim.Messaging -Version 1.2.0
<PackageReference Include="Ledbim.Messaging" Version="1.2.0" />
<PackageVersion Include="Ledbim.Messaging" Version="1.2.0" />
<PackageReference Include="Ledbim.Messaging" />
paket add Ledbim.Messaging --version 1.2.0
#r "nuget: Ledbim.Messaging, 1.2.0"
#:package Ledbim.Messaging@1.2.0
#addin nuget:?package=Ledbim.Messaging&version=1.2.0
#tool nuget:?package=Ledbim.Messaging&version=1.2.0
Ledbim.Messaging
<p align="center"> <img src="ledbim_messaging_logo.png" alt="Ledbim.Messaging" width="120" /> </p>
<p align="center"> <strong>MassTransit · RabbitMQ · Integration Events · Structured Logging</strong><br/> Ledbim ekosistemi için servisler arası mesajlaşma altyapısı. </p>
Kapsam
Ledbim.Messaging paketi şu bileşenleri sağlar:
| Bileşen | Açıklama |
|---|---|
| IIntegrationEventBus | Transport-agnostic mesaj gönderme arayüzü — MassTransit implementasyonu dahil |
| LoggingConsumerFilter | Her consumer mesajında otomatik structured logging, correlation tracking ve sensitive data masking |
| MassTransit DI Entegrasyonu | AddMassTransitIntegrationEventBus() extension metodu ile tek satır kurulum |
Kurulum
dotnet add package Ledbim.Messaging
Bağımlılıklar
MassTransit.RabbitMQ v8.4.1
Serilog v4.2.0
Ledbim.Core (lokalde project reference)
Domain Events vs Integration Events
Ledbim ekosisteminde iki farklı event mekanizması bulunur:
| Domain Events | Integration Events | |
|---|---|---|
| Taşıyıcı | MediatR INotification |
MassTransit / RabbitMQ |
| Kapsam | Aynı servis içi | Servisler arası |
| İşleme | Senkron, aynı transaction | Asenkron, bağımsız |
| Tetiklenme | Transaction commit sonrası | Handler içinden açıkça |
| Amaç | İç domain mantığı | Dış sistemlere bildirim |
| Interface | INotification |
IIntegrationEventBus |
Örnek: OrderConfirmedEvent bir domain event'tir (aynı servis içi). Buna karşılık OrderConfirmedIntegrationEvent, Notification Service'e veya Billing Service'e gönderilecek bir integration event'tir.
Hızlı Başlangıç
1. appsettings.json
{
"StructuredLoggingOptions": {
"CorrelationHeaderName": "X-Correlation-Id",
"MaskSensitiveData": true,
"MaxRequestBodyBytes": 65536,
"SensitiveFields": ["password", "token", "iban", "creditCard", "email"]
}
}
2. Program.cs
// 1. ÖNCE MassTransit kurulumu
builder.Services.AddMassTransit(x =>
{
// Consumer'ları kaydet
x.AddConsumer<OrderConfirmedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq://localhost", h =>
{
h.Username("guest");
h.Password("guest");
});
// LoggingConsumerFilter global pipeline'a ekle
cfg.UseConsumeFilter(typeof(LoggingConsumerFilter<>), context);
cfg.ConfigureEndpoints(context);
});
});
// 2. SONRA Integration Event Bus (AddMassTransit'ten sonra çağrılmalı)
builder.Services.AddMassTransitIntegrationEventBus();
// 3. Logging ayarları
builder.Services.Configure<StructuredLoggingOptions>(
builder.Configuration.GetSection("StructuredLoggingOptions"));
Kritik:
AddMassTransitIntegrationEventBus(),AddMassTransit()çağrısından sonra gelmelidir. Aksi hâldeIPublishEndpointveISendEndpointProviderhenüz DI'a kayıtlı olmaz.
IIntegrationEventBus
Mesaj gönderme arayüzü — handler'lar bu interface'i inject eder, MassTransit'e doğrudan bağımlılık olmaz.
public interface IIntegrationEventBus
{
// Tüm abone consumer'lara yayınla (pub-sub)
Task PublishAsync<T>(T message, CancellationToken ct = default) where T : class;
// Belirli bir kuyruğa gönder (point-to-point)
Task SendAsync<T>(T message, Uri queueAddress, CancellationToken ct = default) where T : class;
}
PublishAsync — Pub-Sub
Tüm abone consumer'lara fanout ile yayınlar. Hangi consumer'ın mesajı aldığı gönderici tarafından bilinmez.
// Command handler içinde
public class OrderConfirmedCommandHandler(IIntegrationEventBus bus) : IRequestHandler<...>
{
public async Task<Result> Handle(OrderConfirmedCommand request, CancellationToken ct)
{
// ... domain logic ...
await bus.PublishAsync(new OrderConfirmedIntegrationEvent(
OrderId: order.Id,
CustomerId: order.CustomerId,
TotalAmount: order.Total), ct);
return Result.Success(ResultType.Success);
}
}
SendAsync — Point-to-Point
Belirli bir kuyruğa doğrudan gönderir. Yalnızca o kuyruğu dinleyen consumer işler.
var queueAddress = new Uri("rabbitmq://localhost/notification-queue");
await bus.SendAsync(new SendEmailCommand(
To: customer.Email,
Subject: "Siparişiniz onaylandı",
Body: emailBody), queueAddress, ct);
Consumer Tanımlama
Ledbim.Messaging paketi kendi consumer base sınıfı sağlamaz. MassTransit'in IConsumer<T> interface'i doğrudan kullanılır.
public class OrderConfirmedConsumer : IConsumer<OrderConfirmedIntegrationEvent>
{
public async Task Consume(ConsumeContext<OrderConfirmedIntegrationEvent> context)
{
var message = context.Message;
// İşlem mantığı
await notificationService.SendAsync(message.CustomerId, message.OrderId);
}
}
Consumer Kaydı
builder.Services.AddMassTransit(x =>
{
// Tek tek ekle
x.AddConsumer<OrderConfirmedConsumer>();
// veya assembly'den otomatik tarama
x.AddConsumers(typeof(OrderConfirmedConsumer).Assembly);
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
LoggingConsumerFilter
Her MassTransit consumer mesajında otomatik olarak çalışan pipeline filter'ı. Ayrıca her consumer'a eklenmesi gerekmez — global pipeline'a bir kez eklenir.
Ne Yapar
- Gelen mesajdan correlation ID'yi çıkarır (yoksa oluşturur)
- Stopwatch ile işleme süresini ölçer
- Mesaj gövdesini JSON'a serialize eder
SensitiveFieldslistesindeki alanları maskeler- İşlemi
next.Send()ile ilerletir - Başarıda
Information, hatadaErrorseviyesinde Serilog ile structured log yazar
Log Alanları
| Alan | Kaynak |
|---|---|
correlation.id |
Header → CorrelationId → Activity.TraceId → Yeni GUID |
request.name |
typeof(T).Name |
elapsed.ms |
Stopwatch |
request.body |
Serialize edilmiş mesaj JSON'u (maskelenmiş) |
message.source |
"MassTransit" |
Global Ekleme
x.UsingRabbitMq((context, cfg) =>
{
// Tüm consumer'lara otomatik uygulanır
cfg.UseConsumeFilter(typeof(LoggingConsumerFilter<>), context);
cfg.ConfigureEndpoints(context);
});
Tekil Consumer'a Ekleme
x.AddConsumer<OrderConfirmedConsumer>(cfg =>
{
cfg.UseFilter(new LoggingConsumerFilter<OrderConfirmedIntegrationEvent>(options));
});
Integration Event Tanımlama
Integration event'ler sıradan C# record veya class olarak tanımlanır. MassTransit herhangi bir interface gerektirmez.
// Application veya Domain katmanında
public record OrderConfirmedIntegrationEvent(
Guid OrderId,
Guid CustomerId,
decimal TotalAmount,
DateTime ConfirmedAt);
public record UserRegisteredIntegrationEvent(
Guid UserId,
string Email,
string FullName);
Öneri: Integration event'leri Application/IntegrationEvents/ klasöründe, domain event'leri ise Domain/Events/ klasöründe tutun.
Tam Kullanım Örneği
// Integration event
public record OrderShippedIntegrationEvent(Guid OrderId, string TrackingNumber, DateTime ShippedAt);
// Consumer
public class OrderShippedConsumer(INotificationService notifications)
: IConsumer<OrderShippedIntegrationEvent>
{
public async Task Consume(ConsumeContext<OrderShippedIntegrationEvent> context)
{
await notifications.SendShippingNotificationAsync(
context.Message.OrderId,
context.Message.TrackingNumber);
}
}
// Handler'dan yayınlama
public class ShipOrderCommandHandler(
IUnitOfWork uow,
IIntegrationEventBus bus) : IRequestHandler<ShipOrderCommand, Result>
{
public async Task<Result> Handle(ShipOrderCommand request, CancellationToken ct)
{
var order = await uow.OrderRepository.GetAsync(
GenericExpression<Order>.Create(x => x.Id == request.OrderId));
if (order is null)
return Result.Fail(ResultType.NotFound, "Sipariş bulunamadı.");
order.Ship(request.TrackingNumber);
await uow.OrderRepository.UpdateAsync(order);
// Transaction commit'ten sonra integration event gönder
await bus.PublishAsync(new OrderShippedIntegrationEvent(
OrderId: order.Id,
TrackingNumber: request.TrackingNumber,
ShippedAt: DateTime.UtcNow), ct);
return Result.Success(ResultType.Success, "Sipariş kargoya verildi.");
}
}
// Program.cs
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderShippedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(builder.Configuration["RabbitMQ:Host"], h =>
{
h.Username(builder.Configuration["RabbitMQ:Username"]!);
h.Password(builder.Configuration["RabbitMQ:Password"]!);
});
cfg.UseConsumeFilter(typeof(LoggingConsumerFilter<>), context);
cfg.ConfigureEndpoints(context);
});
});
builder.Services.AddMassTransitIntegrationEventBus();
Mimari Notlar
- Transport-agnostic tasarım: Handler'lar
IIntegrationEventBusinterface'ini kullanır, MassTransit'e doğrudan bağımlılık yoktur. Farklı bir transport (Azure Service Bus, Kafka) için yalnızca implementasyon değiştirilir. - Domain event ile koordinasyon: Domain event'ler transaction commit öncesi toplanır ve commit sonrası dispatch edilir. Integration event'ler ise handler içinden açıkça gönderilir — transaction dışındadır. Outbox pattern gerekiyorsa MassTransit'in
UseInboxOutbox()desteği kullanılabilir. - LoggingConsumerFilter bağımlılığı:
StructuredLoggingOptionsLedbim.Core üzerinden gelir. Ayrıca paket eklenmesine gerek yoktur.
Lisans
Bu paket Ledbim Bilişim tarafından geliştirilmektedir. Ticari kullanım için lisans bilgisi için iletişime geçiniz.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Ledbim.Core (>= 1.2.0)
- MassTransit.RabbitMQ (>= 8.4.1)
- Serilog (>= 4.2.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.2.0 | 168 | 3/28/2026 |