Convex.Shared.Messaging
1.0.0
dotnet add package Convex.Shared.Messaging --version 1.0.0
NuGet\Install-Package Convex.Shared.Messaging -Version 1.0.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Convex.Shared.Messaging" Version="1.0.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Convex.Shared.Messaging" Version="1.0.0" />
<PackageReference Include="Convex.Shared.Messaging" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Convex.Shared.Messaging --version 1.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Convex.Shared.Messaging, 1.0.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Convex.Shared.Messaging@1.0.0
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Convex.Shared.Messaging&version=1.0.0
#tool nuget:?package=Convex.Shared.Messaging&version=1.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Convex.Shared.Messaging
Messaging utilities for Convex microservices using Apache Kafka.
Features
- Kafka Integration: Full Apache Kafka support
- Topic Publishing: Publish messages to Kafka topics
- Consumer Groups: Kafka consumer group support
- Automatic Recovery: Built-in connection recovery
- JSON Serialization: Automatic message serialization
- Error Handling: Robust error handling and retry logic
Installation
<PackageReference Include="Convex.Shared.Messaging" Version="1.0.0" />
Quick Start
1. Register Services
// In Program.cs
services.AddConvexMessaging(options =>
{
options.BootstrapServers = "localhost:9092";
options.SecurityProtocol = "PLAINTEXT";
options.ConsumerGroup = "convex-group";
});
2. Use in Your Service
public class UserService
{
private readonly IConvexMessageBus _messageBus;
public UserService(IConvexMessageBus messageBus)
{
_messageBus = messageBus;
}
public async Task<User> CreateUserAsync(User user)
{
// Create user in database
var createdUser = await _userRepository.CreateAsync(user);
// Publish user created event
await _messageBus.PublishAsync("user.created", new UserCreatedEvent
{
UserId = createdUser.Id,
Email = createdUser.Email,
CreatedAt = DateTime.UtcNow
});
return createdUser;
}
}
Topic Publishing
Publish Messages
// Publish to topic
await _messageBus.PublishAsync("user.created", new UserCreatedEvent
{
UserId = 123,
Email = "user@example.com",
CreatedAt = DateTime.UtcNow
});
// Publish to multiple topics
await _messageBus.PublishAsync("bet.placed", new BetPlacedEvent
{
BetId = 456,
UserId = 123,
Amount = 100.00,
Odds = 2.5
});
Subscribe to Topics
// Subscribe to topic
var subscriptionId = await _messageBus.SubscribeAsync<UserCreatedEvent>(
"user.created",
async (message) =>
{
Console.WriteLine($"User {message.UserId} created with email {message.Email}");
// Handle user created event
});
// Unsubscribe
await _messageBus.UnsubscribeAsync(subscriptionId);
Queue Management
Send Messages
// Send to queue
await _messageBus.SendAsync("user.notifications", new NotificationMessage
{
UserId = 123,
Type = "Welcome",
Content = "Welcome to Convex!"
});
Receive Messages
// Receive from queue
await _messageBus.ReceiveAsync<NotificationMessage>(
"user.notifications",
async (message) =>
{
await _notificationService.SendAsync(message);
});
Message Types
Event Messages
public class UserCreatedEvent
{
public int UserId { get; set; }
public string Email { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
}
public class BetPlacedEvent
{
public int BetId { get; set; }
public int UserId { get; set; }
public decimal Amount { get; set; }
public decimal Odds { get; set; }
public DateTime PlacedAt { get; set; }
}
Command Messages
public class SendEmailCommand
{
public string To { get; set; } = string.Empty;
public string Subject { get; set; } = string.Empty;
public string Body { get; set; } = string.Empty;
}
public class ProcessPaymentCommand
{
public int UserId { get; set; }
public decimal Amount { get; set; }
public string Currency { get; set; } = "USD";
}
Configuration
appsettings.json
{
"ConvexMessaging": {
"BootstrapServers": "localhost:9092",
"SecurityProtocol": "PLAINTEXT",
"SaslMechanism": "PLAIN",
"SaslUsername": "",
"SaslPassword": "",
"SslCaLocation": "",
"TopicPrefix": "convex",
"ConsumerGroup": "convex-group",
"AutoOffsetReset": "earliest",
"EnableAutoCommit": true,
"AutoCommitIntervalMs": 5000,
"SessionTimeoutMs": 30000,
"RequestTimeoutMs": 30000,
"MaxRetryAttempts": 3,
"RetryDelaySeconds": 5,
"MessageRetentionMs": 604800000
}
}
Environment Variables
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export KAFKA_SECURITY_PROTOCOL=PLAINTEXT
export KAFKA_CONSUMER_GROUP=convex-group
export KAFKA_TOPIC_PREFIX=convex
Error Handling
Retry Logic
public async Task<bool> PublishWithRetryAsync<T>(string topic, T message)
{
for (int attempt = 1; attempt <= 3; attempt++)
{
try
{
return await _messageBus.PublishAsync(topic, message);
}
catch (Exception ex)
{
if (attempt == 3)
throw;
await Task.Delay(TimeSpan.FromSeconds(attempt * 2));
}
}
return false;
}
Dead Letter Topic
// Configure dead letter topic for failed messages
var deadLetterTopic = "convex.dlq";
// Publish failed messages to dead letter topic
await _messageBus.PublishAsync(deadLetterTopic, new DeadLetterMessage
{
OriginalTopic = originalTopic,
OriginalMessage = originalMessage,
ErrorMessage = ex.Message,
FailedAt = DateTime.UtcNow
});
Best Practices
- Use Topics for Events: Use Kafka topics for event-driven communication
- Consumer Groups: Use consumer groups for load balancing
- Handle Errors: Implement proper error handling and retry logic
- Monitor Messages: Monitor message flow and performance
- Use Dead Letter Topics: Handle failed messages appropriately
- Partitioning: Consider message partitioning for scalability
License
This project is licensed under the MIT License.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net9.0
- Confluent.Kafka (>= 2.3.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 9.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 9.0.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 9.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.0)
- Microsoft.Extensions.Options (>= 9.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.0 | 174 | 10/17/2025 |
Initial release of Convex.Shared.Messaging