QueueTi.Client
2026.5.8
See the version list below for details.
dotnet add package QueueTi.Client --version 2026.5.8
NuGet\Install-Package QueueTi.Client -Version 2026.5.8
<PackageReference Include="QueueTi.Client" Version="2026.5.8" />
<PackageVersion Include="QueueTi.Client" Version="2026.5.8" />
<PackageReference Include="QueueTi.Client" />
paket add QueueTi.Client --version 2026.5.8
#r "nuget: QueueTi.Client, 2026.5.8"
#:package QueueTi.Client@2026.5.8
#addin nuget:?package=QueueTi.Client&version=2026.5.8
#tool nuget:?package=QueueTi.Client&version=2026.5.8
QueueTi C# Client
A proto-first gRPC client library for the QueueTi distributed message queue service. Targets .NET 8.0.
Installation
dotnet add package QueueTi.Client
Or with the Package Manager:
Install-Package QueueTi.Client
Quick Start
Create a client
using QueueTi;
var client = QueueTiClient.Create("https://queue.example.com", new QueueTiClientOptions
{
BearerToken = "your-jwt-token" // optional
});
Publish a message
using System.Text;
var producer = client.NewProducer();
string messageId = await producer.PublishAsync(
topic: "orders",
payload: Encoding.UTF8.GetBytes("Hello, QueueTi!"),
ct: CancellationToken.None
);
Consume messages (streaming)
var consumer = client.NewConsumer("orders", new ConsumerOptions
{
ConsumerGroup = "billing"
});
await consumer.ConsumeAsync(async (msg, ct) =>
{
Console.WriteLine($"Message {msg.Id}: {Encoding.UTF8.GetString(msg.Payload)}");
// Handler automatically acks on success, nacks on exception.
}, ct);
Clean up
await client.DisposeAsync();
Client Creation
Factory method (recommended)
Use QueueTiClient.Create() to construct a client with a managed gRPC channel:
var client = QueueTiClient.Create("https://queue.example.com", new QueueTiClientOptions
{
BearerToken = "jwt-token", // optional; enables Bearer auth
Insecure = false, // set true for plaintext http:// endpoints
ConfigureChannel = opts => { // optional; configure GrpcChannelOptions
opts.MaxReceiveMessageSize = 16 * 1024 * 1024;
}
});
Manual channel (advanced)
If you need full control over the gRPC channel, pass your own QueueService.QueueServiceClient. In this path you are responsible for wiring any interceptors — BearerToken is not automatically applied:
var channel = GrpcChannel.ForAddress("https://queue.example.com");
var grpcClient = new QueueService.QueueServiceClient(channel);
var client = new QueueTiClient(grpcClient, new QueueTiClientOptions());
To add bearer auth on a manual channel, intercept the invoker yourself:
var store = new TokenStore("jwt-token");
var invoker = channel.Intercept(new BearerTokenInterceptor(store));
var grpcClient = new QueueService.QueueServiceClient(invoker);
var client = new QueueTiClient(grpcClient, new QueueTiClientOptions());
Dependency Injection (ASP.NET Core)
Registering QueueTiClient
Register the gRPC client in your service container using AddQueueTiClient():
builder.Services.AddQueueTiClient("https://queue.example.com", opts =>
{
opts.BearerToken = "initial-token";
opts.TokenRefresher = async ct => await GetFreshTokenAsync(ct);
opts.ConfigureHttpClientBuilder = httpBuilder =>
{
// Configure timeouts, delegating handlers, etc.
httpBuilder.ConfigureHttpClient(c => c.Timeout = TimeSpan.FromSeconds(30));
};
});
// Inject QueueTiClient into your controllers or services
app.MapPost("/orders", async (QueueTiClient client) =>
{
var producer = client.NewProducer();
var id = await producer.PublishAsync("orders", orderPayload);
return Results.Created($"/orders/{id}", new { id });
});
When Insecure = true, the DI path configures the gRPC channel to use plaintext HTTP credentials automatically through the AddGrpcClient code path.
Registering AdminClient
Register the admin REST client using AddQueueTiAdminClient():
builder.Services.AddQueueTiAdminClient("http://queue.example.com", opts =>
{
opts.BearerToken = "admin-token";
opts.TokenRefresher = async ct => await GetFreshAdminTokenAsync(ct);
});
// Inject AdminClient into your services
app.MapPost("/admin/topics/{topic}", async (AdminClient admin, string topic) =>
{
var config = new TopicConfig(Topic: topic, Replayable: true, MaxRetries: 3);
await admin.UpsertTopicConfigAsync(topic, config);
return Results.Created($"/admin/topics/{topic}", config);
});
AddQueueTiAdminClient() uses IHttpClientFactory internally. The BearerToken, TokenRefresher, and Insecure options all apply the same way as for QueueTiClient.
.NET Aspire Integration
The QueueTi.Aspire.Hosting and QueueTi.Client.Aspire packages provide seamless integration with .NET Aspire orchestration for both AppHost and service projects.
Installation
Each package targets a different project in your Aspire solution:
AppHost project:
dotnet add package QueueTi.Aspire.Hosting
Service/worker project:
dotnet add package QueueTi.Client.Aspire
AppHost Setup
Use AddQueueTi() to register QueueTi as a distributed application resource in your AppHost:
// Program.cs (Aspire AppHost project)
var builder = DistributedApplication.CreateBuilder(args);
var postgres = builder.AddPostgres("postgres")
.AddDatabase("queueti-db");
var redis = builder.AddRedis("redis"); // optional — enables rate limiting
var queue = builder.AddQueueTi("queue")
.WithReplicas(3) // optional — defaults to 1
.WithNpgsqlDatabase(postgres)
.WithRedis(redis) // optional — enables rate limiting
.WithAuthentication(
username: "admin",
password: builder.AddParameter("queue-password", secret: true),
jwtSecret: builder.AddParameter("queue-jwt-secret", secret: true))
.WithLogLevel("info");
builder.AddProject<Projects.MyWorker>("worker")
.WithReference(queue)
.WithEnvironment("QueueTi__queue__HttpUrl", queue.GetEndpoint("http"));
builder.Build().Run();
Note: When using Postgres, generate passwords with only alphanumeric characters (
GenerateParameterDefault { Special = false }) to avoid URL encoding issues in the QueueTi server's internalpostgres://URL construction.
Key builder methods:
| Method | Purpose |
|---|---|
AddQueueTi(name, grpcPort?, httpPort?, tag?) |
Adds a QueueTi container resource. Pulls ghcr.io/joessst-dev/queue-ti. Endpoints: grpc (target 50051), http (target 8080). |
WithNpgsqlDatabase(database) |
Wires an Npgsql database resource. Automatically detects container-to-container networking and uses the Docker DNS alias + internal target port. Parses the connection string to extract credentials. Sets QUEUETI_DB_* env vars and adds WaitFor dependency. Throws DistributedApplicationException if the connection string cannot be parsed. |
WithRedis(redis) |
Wires an optional Redis resource for rate limiting. Uses container-to-container addressing when possible (Docker DNS alias + internal target port). Sets QUEUETI_REDIS_* env vars and adds WaitFor dependency. |
WithReplicas(n) |
Runs n instances of QueueTi. All replicas share the same database and Redis resources. |
WithAuthentication(username, password, jwtSecret) |
Configures authentication. Sets QUEUETI_AUTH_ENABLED and related env vars from ParameterResource values. |
WithLogLevel(level) |
Sets QUEUETI_LOG_LEVEL. |
Replicas
WithReplicas(n) starts n identical QueueTi container instances. When referenced by a service project via WithReference, Aspire routes connections across the replicas.
All replicas share the same database and Redis resources — wire those once on the resource builder and each instance picks up the same env vars:
var queue = builder.AddQueueTi("queue")
.WithReplicas(3)
.WithNpgsqlDatabase(postgres)
.WithRedis(redis);
Note:
WithReplicassets a fixed count at startup. For dynamic scaling based on load, configure scaling rules at the deployment target (Azure Container Apps, Kubernetes HPA) rather than in the AppHost.
Service Project Setup
In your worker or web project, call AddQueueTiClient() to register the gRPC client:
// Program.cs (Service project)
builder.AddQueueTiClient("queue");
var app = builder.Build();
app.Run();
The client automatically:
- Reads the connection string from
ConnectionStrings:queue(set by Aspire). - Registers
QueueTiClientin DI (and the underlyingQueueService.QueueServiceClient). - If
QueueTiClientSettings.HttpUrlis set, also registersAdminClientin DI and configures health checks (see Health Checks section below). - Instruments outbound gRPC calls with OpenTelemetry tracing.
With custom settings:
builder.AddQueueTiClient("queue", settings =>
{
settings.DisableHealthChecks = true; // if health checks are managed separately
settings.BearerToken = "your-jwt"; // if auth is enabled on the server
});
QueueTiClientSettings
| Property | Type | Default | Description |
|---|---|---|---|
ConnectionString |
string? |
null |
Explicit connection string. If not set, read from ConnectionStrings:{connectionName} or QueueTi:{connectionName} config. |
HttpUrl |
string? |
null |
HTTP endpoint of the QueueTi service. Required to enable health checks. Set via QueueTi:{connectionName}:HttpUrl config or QueueTi__{connectionName}__HttpUrl environment variable. |
DisableHealthChecks |
bool |
false |
Disable automatic health check registration. |
DisableTracing |
bool |
false |
Disable OpenTelemetry instrumentation. |
BearerToken |
string? |
null |
Optional bearer token for authentication. |
TokenRefresher |
Func<CancellationToken, Task<string>>? |
null |
Optional callback to refresh the bearer token at runtime. |
Health Checks
Health checks are only registered when DisableHealthChecks is false (default) and HttpUrl is explicitly set. The integration probes GET /healthz on the provided HTTP endpoint. The check is registered under tags live and queueti and requires no authentication.
Set HttpUrl via configuration:
builder.AddQueueTiClient("queue", settings =>
{
settings.HttpUrl = "http://queue:8080"; // or via QueueTi:queue:HttpUrl config
});
Or via environment variable when using Aspire:
builder.AddProject<Projects.MyService>("service")
.WithReference(queue)
.WithEnvironment("QueueTi__queue__HttpUrl", queue.GetEndpoint("http"));
Distributed Tracing
When DisableTracing is false (default), all outbound gRPC calls are instrumented using OpenTelemetry.Instrumentation.GrpcNetClient. Traces are exported via the Aspire telemetry pipeline.
Publishing Messages
Create a producer and call PublishAsync():
var producer = client.NewProducer();
// Minimal publish
string id = await producer.PublishAsync("orders", payload, ct: ct);
// With routing key and metadata
string id = await producer.PublishAsync("orders", payload, new PublishOptions
{
Key = "order-123", // optional routing/ordering key
Metadata = new Dictionary<string, string>
{
["source"] = "api",
["version"] = "v1"
}
}, ct);
PublishOptions
| Property | Type | Description |
|---|---|---|
Key |
string? |
Optional routing or ordering key for the message. |
Metadata |
IReadOnlyDictionary<string, string>? |
Arbitrary string key-value pairs attached to the message. |
Consuming Messages
Consumer Group Registration
QueueTi requires consumer groups to be explicitly registered before any messages can be delivered. Use AdminClient.RegisterConsumerGroupAsync() before calling ConsumeAsync() or ConsumeBatchAsync():
await admin.RegisterConsumerGroupAsync("orders", "billing");
QueueTiConflictException is thrown if the group is already registered — treat this as success. See the samples below for an example of robust group registration with exponential backoff retry.
Streaming consumer (real-time processing)
Use ConsumeAsync() for continuous subscription with automatic acknowledgment:
var consumer = client.NewConsumer("orders", new ConsumerOptions
{
ConsumerGroup = "billing",
Concurrency = 4, // process up to 4 messages in parallel
VisibilityTimeoutSeconds = 30 // optional; override server default
});
await consumer.ConsumeAsync(async (msg, ct) =>
{
var orderData = JsonSerializer.Deserialize<Order>(msg.Payload);
await BillingService.ProcessAsync(orderData, ct);
// Automatically acked on return; automatically nacked if an exception is thrown.
}, ct);
Behavior:
- The handler is invoked for each message as it arrives from the server.
- The message is automatically acked on successful handler completion.
- The message is automatically nacked with the exception message if the handler throws.
- Handler invocations are limited by
Concurrencyto control parallelism. - The consumer reconnects with exponential backoff (500 ms → 30 s) on gRPC errors.
- Graceful shutdown (cancellation token) exits without nacking in-flight messages.
Batch consumer (polling)
Use ConsumeBatchAsync() for periodic polling:
var consumer = client.NewConsumer("orders");
await consumer.ConsumeBatchAsync(
batchSize: 10,
handler: async (messages, ct) =>
{
foreach (var msg in messages)
{
try
{
await ProcessMessageAsync(msg, ct);
await msg.AckAsync(ct); // must ack manually
}
catch (Exception ex)
{
await msg.NackAsync(ex.Message, ct); // must nack manually
}
}
},
ct: ct
);
Behavior:
- Polls the server for up to
batchSizemessages at a time. - If no messages are available, waits and retries with exponential backoff (500 ms → 30 s).
- You must explicitly call
AckAsync()orNackAsync()for each message. - Reconnects automatically on gRPC errors with exponential backoff.
ConsumerOptions
| Property | Type | Default | Description |
|---|---|---|---|
ConsumerGroup |
string |
"" |
Consumer group identity for coordinating consumers across instances. |
Concurrency |
int |
1 |
Maximum concurrent handler invocations in streaming mode. Ignored in batch mode. |
VisibilityTimeoutSeconds |
uint? |
null |
Visibility timeout (in seconds) for dequeued messages. If null, uses server default. |
Message Handling
QueueTiMessage
Each message received by the consumer provides:
| Property | Type | Description |
|---|---|---|
Id |
string |
Unique message identifier. |
Topic |
string |
Topic name. |
Payload |
byte[] |
Message body as bytes. |
Metadata |
IReadOnlyDictionary<string, string> |
Key-value metadata attached at publish time. |
CreatedAt |
DateTimeOffset |
Server timestamp when the message was created. |
RetryCount |
int |
Number of times this message has been retried. |
Acknowledgment
// Ack: message processed successfully
await msg.AckAsync(ct);
// Nack: processing failed; mark for retry
await msg.NackAsync("Database connection timeout", ct);
In streaming mode, acks and nacks are called automatically by the consumer:
- Ack on successful handler completion.
- Nack with the exception message on handler failure.
In batch mode, you must call AckAsync() or NackAsync() explicitly for each message.
Admin Client
The AdminClient wraps the QueueTi admin REST API to manage topic configurations, schemas, and consumer group registration. It is available in the same QueueTi.Client NuGet package as QueueTiClient.
Creating an admin client
Use the factory method for a managed HTTP client:
using QueueTi;
var admin = AdminClient.Create(
baseUrl: "http://queue.example.com",
options: new QueueTiClientOptions
{
BearerToken = "your-jwt-token" // optional
}
);
Or inject via dependency injection (see "Dependency Injection" section above).
For manual HTTP client management (e.g., with IHttpClientFactory), use the constructor directly:
var admin = new AdminClient(httpClient);
When using this constructor, you are responsible for wiring authentication headers yourself (for example via a DelegatingHandler). SetToken() is not available with this constructor — it requires a TokenStore configured through AdminClient.Create or AddQueueTiAdminClient.
Topic Configuration
Manage topic settings:
// List all topic configurations
var configs = await admin.ListTopicConfigsAsync();
foreach (var config in configs)
{
Console.WriteLine($"Topic: {config.Topic}, Replayable: {config.Replayable}");
}
// Get or create a topic configuration
var created = await admin.UpsertTopicConfigAsync("orders", new TopicConfig(
Topic: "orders",
Replayable: true,
MaxRetries: 3,
MessageTtlSeconds: 86400,
MaxDepth: 10000
));
// Delete a topic configuration
await admin.DeleteTopicConfigAsync("orders");
TopicConfig properties:
| Property | Type | Description |
|---|---|---|
Topic |
string |
Topic name (immutable). |
Replayable |
bool |
Whether the topic supports message replay. |
MaxRetries |
int? |
Maximum retry attempts for a message before it becomes dead-lettered. |
MessageTtlSeconds |
int? |
Time-to-live for messages (in seconds) before automatic expiry. |
MaxDepth |
int? |
Maximum number of unacknowledged messages to queue. |
ReplayWindowSeconds |
int? |
Window during which messages can be replayed. |
ThroughputLimit |
int? |
Rate limit (messages per second) for this topic. |
Topic Schema
Manage topic schemas for schema validation:
// List all topic schemas
var schemas = await admin.ListTopicSchemasAsync();
// Get the current schema for a topic
var schema = await admin.GetTopicSchemaAsync("orders");
Console.WriteLine($"Version: {schema.Version}, Updated: {schema.UpdatedAt}");
// Upsert a schema (JSON schema as a string)
var schemaJson = @"{
""type"": ""object"",
""properties"": {
""order_id"": { ""type"": ""string"" },
""amount"": { ""type"": ""number"" }
},
""required"": [""order_id""]
}";
var updated = await admin.UpsertTopicSchemaAsync("orders", schemaJson);
// Delete a topic schema
await admin.DeleteTopicSchemaAsync("orders");
TopicSchema properties:
| Property | Type | Description |
|---|---|---|
Topic |
string |
Topic name. |
SchemaJson |
string |
The schema definition as a JSON string. |
Version |
int |
Schema version (incremented on each update). |
UpdatedAt |
string |
ISO 8601 timestamp of the last update. |
Consumer Group Management
Register and unregister consumer groups:
// List consumer groups for a topic
var groups = await admin.ListConsumerGroupsAsync("orders");
foreach (var group in groups)
{
Console.WriteLine($"Group: {group}");
}
// Register a consumer group
await admin.RegisterConsumerGroupAsync("orders", "billing");
// Unregister a consumer group
await admin.UnregisterConsumerGroupAsync("orders", "billing");
Statistics
Retrieve aggregate queue statistics:
var stats = await admin.StatsAsync();
foreach (var topicStat in stats)
{
Console.WriteLine($"Topic: {topicStat.Topic}, Status: {topicStat.Status}, Count: {topicStat.Count}");
}
TopicStat properties:
| Property | Type | Description |
|---|---|---|
Topic |
string |
Topic name. |
Status |
string |
Status of the topic as reported by the server. |
Count |
int |
Number of unacknowledged messages on this topic. |
Error Handling
The admin client throws specific exceptions for HTTP error responses:
try
{
await admin.DeleteTopicConfigAsync("nonexistent");
}
catch (QueueTiNotFoundException ex)
{
Console.WriteLine($"Topic not found: {ex.Message}");
}
catch (QueueTiConflictException ex)
{
Console.WriteLine($"Conflict (likely already registered): {ex.Message}");
}
Exception types:
QueueTiNotFoundException— Thrown on HTTP 404 (resource not found).QueueTiConflictException— Thrown on HTTP 409 (conflict, e.g., topic already exists).- Other non-2xx responses throw
HttpRequestException.
Cleanup
AdminClient implements IDisposable and IAsyncDisposable:
await admin.DisposeAsync(); // preferred
// or
admin.Dispose();
When created via AdminClient.Create, the client owns the underlying HttpClient and disposes it on cleanup. When constructed with new AdminClient(httpClient) or resolved from DI via AddQueueTiAdminClient, the caller or the IHttpClientFactory owns the HttpClient — the client will not dispose it.
Bearer Token Authentication
Obtaining a token
Use QueueTiAuth to check whether the server requires authentication and to log in:
using QueueTi;
const string HttpAddress = "http://queue.example.com:8080"; // REST API
const string GrpcAddress = "http://queue.example.com:50051"; // gRPC
string? bearerToken = null;
if (await QueueTiAuth.GetAuthRequiredAsync(HttpAddress))
{
bearerToken = await QueueTiAuth.LoginAsync(HttpAddress, username: "admin", password: "secret");
}
var client = QueueTiClient.Create(GrpcAddress, new QueueTiClientOptions
{
BearerToken = bearerToken // null when auth is disabled
});
Both methods accept an insecure flag (set true for plain http:// endpoints) and an optional CancellationToken.
LoginAsync throws HttpRequestException on a non-2xx response (e.g., 401 Unauthorized) and InvalidOperationException if the server response is missing the token field.
Static token
Provide a bearer token at client creation:
var client = QueueTiClient.Create(address, new QueueTiClientOptions
{
BearerToken = "eyJhbGc..."
});
The token is injected as an Authorization: Bearer <token> header on every request.
Dynamic token refresh
For long-lived clients, provide a TokenRefresher to refresh the token before expiry:
var client = QueueTiClient.Create(address, new QueueTiClientOptions
{
BearerToken = "initial-token",
TokenRefresher = async ct =>
{
var response = await httpClient.GetAsync("/auth/refresh", ct);
var json = await response.Content.ReadAsStringAsync(ct);
var token = JsonDocument.Parse(json).RootElement.GetString("access_token");
return token!;
}
});
Behavior:
- The refresh task runs in the background and calls the refresher ~60 seconds before token expiry.
- On refresh success, the new token is stored and used for all subsequent requests.
- On refresh failure, the task retries with exponential backoff (5 s → 60 s) until success.
- Refresh runs until the client is disposed.
Update token at runtime
client.SetToken("new-jwt-token"); // thread-safe; immediate effect
Throws if no BearerToken was configured at creation time.
Configuration Reference
QueueTiClientOptions
| Property | Type | Default | Description |
|---|---|---|---|
BearerToken |
string? |
null |
Initial JWT for Bearer authentication. If set, enables automatic token injection and refresh capability. |
TokenRefresher |
Func<CancellationToken, Task<string>>? |
null |
Optional async callback to refresh the bearer token. Called ~60 s before token expiry. Requires BearerToken to be set. |
Insecure |
bool |
false |
Use ChannelCredentials.Insecure for plaintext http:// connections. Disable TLS/SSL. |
ConfigureChannel |
Action<GrpcChannelOptions>? |
null |
Callback to configure GrpcChannelOptions before channel creation (e.g., message size limits). |
ConfigureHttpClientBuilder |
Action<IHttpClientBuilder>? |
null |
DI-only: callback to configure the IHttpClientBuilder (e.g., add delegating handlers, set timeouts). |
Disposal
Both Dispose() and DisposeAsync() are supported and idempotent:
// Synchronous
client.Dispose();
// Asynchronous (preferred)
await client.DisposeAsync();
Disposal:
- Cancels the background token refresh task (if running).
- Shuts down the managed gRPC channel (if created via
Create()). - Can be called multiple times safely; subsequent calls have no effect.
Error Handling
gRPC errors
gRPC connection errors in streaming and batch consumers trigger automatic reconnection with exponential backoff. Errors are logged at warning or error level via ILogger<Consumer>.
Handler exceptions
In streaming mode, exceptions thrown by the handler are logged and the message is automatically nacked with the exception message. The stream continues processing subsequent messages.
In batch mode, exceptions are your responsibility to handle. Ack or nack each message explicitly.
Token refresh failures
Token refresh failures are logged and retried with exponential backoff. If refresh fails repeatedly, the client continues operating with the last valid token until the connection fails.
Samples
Three complete sample projects are included in the samples/ directory:
QueueTi.Samples.AppHost — Aspire orchestration project that defines the distributed application:
- Adds Postgres and Redis containers with environment-safe passwords (alphanumeric-only for URL safety).
- Registers a QueueTi container resource and wires it to Postgres and Redis.
- Producer and consumer services reference the queue resource and wait for it to be healthy before starting.
- Injects the HTTP URL (
QueueTi__queue__HttpUrl) into both services so health checks and consumer group registration work correctly.
QueueTi.Samples.Producer — ASP.NET Core minimal API service:
POST /publish/{topic}— Accepts{"message": "...", "metadata": {...}}and publishes to the topic, returning{"id": "..."}.GET /health— Aspire health check endpoint.- Uses
AddQueueTiClient("queue")from the Aspire integration.
QueueTi.Samples.Consumer — Worker Service:
- On startup, registers the consumer group via
AdminClient.RegisterConsumerGroupAsync()with exponential backoff retry (up to 30 seconds) until success.QueueTiConflictException(already registered) is treated as success. - Opens a
Subscribestream usingConsumeAsync()and logs each received message. - Topic and consumer group configurable via
Consumer:TopicandConsumer:Groupenvironment variables. - Uses
AddQueueTiClient("queue")from the Aspire integration, which also registersAdminClientwhen the HTTP URL is configured.
To run the samples:
cd samples/QueueTi.Samples.AppHost
dotnet run
The AppHost orchestrates Postgres, Redis, and QueueTi containers locally, then runs the producer and consumer services.
Standalone examples
The examples/ directory contains self-contained console apps that run against a local QueueTi instance with no Aspire dependency.
examples/OrderPipeline/ — end-to-end producer → consumer → DLQ lifecycle:
- Checks for auth and logs in via
QueueTiAuthif required. - Upserts topic config (
Replayable: false) and registers the consumer group. - Publishes five orders;
ord-003carriestype=poisonmetadata. - Runs the streaming consumer (
Concurrency = 3,VisibilityTimeoutSeconds = 5) and a DLQ monitor concurrently viaTask.WhenAll. - The DLQ monitor polls for registration until the
orders.dlqtopic exists (created server-side on first dead-letter), then drains it in 5-second batch windows. - Both loops stop cleanly on Ctrl+C.
To run it against a local QueueTi instance (see the QueueTi server repo for setup):
dotnet run --project examples/OrderPipeline/
Credentials default to admin / secret (the docker-compose defaults). Override via environment variables:
QUEUETI_USERNAME=myuser QUEUETI_PASSWORD=mypass dotnet run --project examples/OrderPipeline/
Thread Safety
QueueTiClientis safe to share across threads.TokenStoreusesReaderWriterLockSliminternally and disposal is guarded by an atomic flag.AdminClientis safe to share across threads. UnderlyingHttpClientis thread-safe.ProducerandConsumerare stateless wrappers; they are safe to use concurrently from multiple tasks.SetToken()is thread-safe and updates the interceptor state immediately for both clients.Dispose()andDisposeAsync()are thread-safe and idempotent for both clients.
Logging
The clients use ILogger<QueueTiClient>, ILogger<Consumer>, and ILogger<AdminClient> for diagnostics. When using ASP.NET Core DI, configure logging as usual:
builder.Logging.AddConsole();
builder.Logging.SetMinimumLevel(LogLevel.Debug);
Key events logged:
- Bearer token refresh success / failure (for both clients).
- Consumer stream reconnection and backoff intervals.
- Handler exceptions (error level).
- Nack failures (error level).
- Admin API errors (error level).
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Google.Protobuf (>= 3.29.3)
- Grpc.Net.Client (>= 2.67.0)
- Grpc.Net.ClientFactory (>= 2.67.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on QueueTi.Client:
| Package | Downloads |
|---|---|
|
QueueTi.Client.Aspire
.NET Aspire client integration for the QueueTi distributed message queue service. |
GitHub repositories
This package is not used by any popular GitHub repositories.