QueueTi.Client
2026.5.2
See the version list below for details.
dotnet add package QueueTi.Client --version 2026.5.2
NuGet\Install-Package QueueTi.Client -Version 2026.5.2
<PackageReference Include="QueueTi.Client" Version="2026.5.2" />
<PackageVersion Include="QueueTi.Client" Version="2026.5.2" />
<PackageReference Include="QueueTi.Client" />
paket add QueueTi.Client --version 2026.5.2
#r "nuget: QueueTi.Client, 2026.5.2"
#:package QueueTi.Client@2026.5.2
#addin nuget:?package=QueueTi.Client&version=2026.5.2
#tool nuget:?package=QueueTi.Client&version=2026.5.2
QueueTi C# Client
A proto-first gRPC client library for the QueueTi distributed message queue service. Targets .NET 10.0.
Installation
dotnet add package QueueTi.Client
Or with the Package Manager:
Install-Package QueueTi.Client
Quick Start
Create a client
using QueueTi;
var client = QueueTiClient.Create("https://queue.example.com", new QueueTiClientOptions
{
BearerToken = "your-jwt-token" // optional
});
Publish a message
using System.Text;
var producer = client.NewProducer();
string messageId = await producer.PublishAsync(
topic: "orders",
payload: Encoding.UTF8.GetBytes("Hello, QueueTi!"),
ct: CancellationToken.None
);
Consume messages (streaming)
var consumer = client.NewConsumer("orders", new ConsumerOptions
{
ConsumerGroup = "billing"
});
await consumer.ConsumeAsync(async (msg, ct) =>
{
Console.WriteLine($"Message {msg.Id}: {Encoding.UTF8.GetString(msg.Payload)}");
// Handler automatically acks on success, nacks on exception.
}, ct);
Clean up
await client.DisposeAsync();
Client Creation
Factory method (recommended)
Use QueueTiClient.Create() to construct a client with a managed gRPC channel:
var client = QueueTiClient.Create("https://queue.example.com", new QueueTiClientOptions
{
BearerToken = "jwt-token", // optional; enables Bearer auth
Insecure = false, // set true for plaintext http:// endpoints
ConfigureChannel = opts => { // optional; configure GrpcChannelOptions
opts.MaxReceiveMessageSize = 16 * 1024 * 1024;
}
});
Manual channel (advanced)
If you need full control over the gRPC channel, pass your own QueueService.QueueServiceClient. In this path you are responsible for wiring any interceptors — BearerToken is not automatically applied:
var channel = GrpcChannel.ForAddress("https://queue.example.com");
var grpcClient = new QueueService.QueueServiceClient(channel);
var client = new QueueTiClient(grpcClient, new QueueTiClientOptions());
To add bearer auth on a manual channel, intercept the invoker yourself:
var store = new TokenStore("jwt-token");
var invoker = channel.Intercept(new BearerTokenInterceptor(store));
var grpcClient = new QueueService.QueueServiceClient(invoker);
var client = new QueueTiClient(grpcClient, new QueueTiClientOptions());
Dependency Injection (ASP.NET Core)
Register the client in your service container using AddQueueTiClient():
builder.Services.AddQueueTiClient("https://queue.example.com", opts =>
{
opts.BearerToken = "initial-token";
opts.TokenRefresher = async ct => await GetFreshTokenAsync(ct);
opts.ConfigureHttpClientBuilder = httpBuilder =>
{
// Configure timeouts, delegating handlers, etc.
httpBuilder.ConfigureHttpClient(c => c.Timeout = TimeSpan.FromSeconds(30));
};
});
// Inject QueueTiClient into your controllers or services
app.MapPost("/orders", async (QueueTiClient client) =>
{
var producer = client.NewProducer();
var id = await producer.PublishAsync("orders", orderPayload);
return Results.Created($"/orders/{id}", new { id });
});
.NET Aspire Integration
The QueueTi.Aspire.Hosting and QueueTi.Client.Aspire packages provide seamless integration with .NET Aspire orchestration for both AppHost and service projects.
Installation
Each package targets a different project in your Aspire solution:
AppHost project:
dotnet add package QueueTi.Aspire.Hosting
Service/worker project:
dotnet add package QueueTi.Client.Aspire
AppHost Setup
Use AddQueueTi() to register QueueTi as a distributed application resource in your AppHost:
// Program.cs (Aspire AppHost project)
var builder = DistributedApplication.CreateBuilder(args);
var postgres = builder.AddPostgres("postgres")
.AddDatabase("queueti-db");
var queue = builder.AddQueueTi("queue")
.WithNpgsqlDatabase(postgres)
.WithAuthentication(
username: "admin",
password: builder.AddParameter("queue-password", secret: true),
jwtSecret: builder.AddParameter("queue-jwt-secret", secret: true))
.WithLogLevel("info");
builder.AddProject<Projects.MyWorker>("worker")
.WithReference(queue);
builder.Build().Run();
Key builder methods:
| Method | Purpose |
|---|---|
AddQueueTi(name, grpcPort?, httpPort?, tag?) |
Adds a QueueTi container resource. Pulls ghcr.io/joessst-dev/queue-ti. Endpoints: grpc (target 50051), http (target 8080). |
WithNpgsqlDatabase(database) |
Wires an Npgsql database resource. Sets QUEUETI_DB_* env vars and adds WaitFor dependency. |
WithAuthentication(username, password, jwtSecret) |
Configures authentication. Sets QUEUETI_AUTH_ENABLED and related env vars from ParameterResource values. |
WithLogLevel(level) |
Sets QUEUETI_LOG_LEVEL. |
Service Project Setup
In your worker or web project, call AddQueueTiClient() to register the gRPC client:
// Program.cs (Service project)
builder.AddQueueTiClient("queue");
var app = builder.Build();
app.Run();
The client automatically:
- Reads the connection string from
ConnectionStrings:queue(set by Aspire). - Registers
QueueTiClientin DI (and the underlyingQueueService.QueueServiceClient). - Configures health checks (HTTP GET to
/healthzon port 8080). - Instruments outbound gRPC calls with OpenTelemetry tracing.
With custom settings:
builder.AddQueueTiClient("queue", settings =>
{
settings.DisableHealthChecks = true; // if health checks are managed separately
settings.BearerToken = "your-jwt"; // if auth is enabled on the server
});
QueueTiClientSettings
| Property | Type | Default | Description |
|---|---|---|---|
ConnectionString |
string? |
null |
Explicit connection string. If not set, read from ConnectionStrings:{connectionName} or QueueTi:{connectionName} config. |
DisableHealthChecks |
bool |
false |
Disable automatic health check registration. |
DisableTracing |
bool |
false |
Disable OpenTelemetry instrumentation. |
BearerToken |
string? |
null |
Optional bearer token for authentication. |
TokenRefresher |
Func<CancellationToken, Task<string>>? |
null |
Optional callback to refresh the bearer token at runtime. |
Health Checks
When DisableHealthChecks is false (default), the integration registers an HTTP health check that probes GET /healthz on the QueueTi service's HTTP port (default 8080). The check is registered under tags live and queueti and requires no authentication.
Distributed Tracing
When DisableTracing is false (default), all outbound gRPC calls are instrumented using OpenTelemetry.Instrumentation.GrpcNetClient. Traces are exported via the Aspire telemetry pipeline.
Publishing Messages
Create a producer and call PublishAsync():
var producer = client.NewProducer();
// Minimal publish
string id = await producer.PublishAsync("orders", payload, ct: ct);
// With routing key and metadata
string id = await producer.PublishAsync("orders", payload, new PublishOptions
{
Key = "order-123", // optional routing/ordering key
Metadata = new Dictionary<string, string>
{
["source"] = "api",
["version"] = "v1"
}
}, ct);
PublishOptions
| Property | Type | Description |
|---|---|---|
Key |
string? |
Optional routing or ordering key for the message. |
Metadata |
IReadOnlyDictionary<string, string>? |
Arbitrary string key-value pairs attached to the message. |
Consuming Messages
Streaming consumer (real-time processing)
Use ConsumeAsync() for continuous subscription with automatic acknowledgment:
var consumer = client.NewConsumer("orders", new ConsumerOptions
{
ConsumerGroup = "billing",
Concurrency = 4, // process up to 4 messages in parallel
VisibilityTimeoutSeconds = 30 // optional; override server default
});
await consumer.ConsumeAsync(async (msg, ct) =>
{
var orderData = JsonSerializer.Deserialize<Order>(msg.Payload);
await BillingService.ProcessAsync(orderData, ct);
// Automatically acked on return; automatically nacked if an exception is thrown.
}, ct);
Behavior:
- The handler is invoked for each message as it arrives from the server.
- The message is automatically acked on successful handler completion.
- The message is automatically nacked with the exception message if the handler throws.
- Handler invocations are limited by
Concurrencyto control parallelism. - The consumer reconnects with exponential backoff (500 ms → 30 s) on gRPC errors.
- Graceful shutdown (cancellation token) exits without nacking in-flight messages.
Batch consumer (polling)
Use ConsumeBatchAsync() for periodic polling:
var consumer = client.NewConsumer("orders");
await consumer.ConsumeBatchAsync(
batchSize: 10,
handler: async (messages, ct) =>
{
foreach (var msg in messages)
{
try
{
await ProcessMessageAsync(msg, ct);
await msg.AckAsync(ct); // must ack manually
}
catch (Exception ex)
{
await msg.NackAsync(ex.Message, ct); // must nack manually
}
}
},
ct: ct
);
Behavior:
- Polls the server for up to
batchSizemessages at a time. - If no messages are available, waits and retries with exponential backoff (500 ms → 30 s).
- You must explicitly call
AckAsync()orNackAsync()for each message. - Reconnects automatically on gRPC errors with exponential backoff.
ConsumerOptions
| Property | Type | Default | Description |
|---|---|---|---|
ConsumerGroup |
string |
"" |
Consumer group identity for coordinating consumers across instances. |
Concurrency |
int |
1 |
Maximum concurrent handler invocations in streaming mode. Ignored in batch mode. |
VisibilityTimeoutSeconds |
uint? |
null |
Visibility timeout (in seconds) for dequeued messages. If null, uses server default. |
Message Handling
QueueTiMessage
Each message received by the consumer provides:
| Property | Type | Description |
|---|---|---|
Id |
string |
Unique message identifier. |
Topic |
string |
Topic name. |
Payload |
byte[] |
Message body as bytes. |
Metadata |
IReadOnlyDictionary<string, string> |
Key-value metadata attached at publish time. |
CreatedAt |
DateTimeOffset |
Server timestamp when the message was created. |
RetryCount |
int |
Number of times this message has been retried. |
Acknowledgment
// Ack: message processed successfully
await msg.AckAsync(ct);
// Nack: processing failed; mark for retry
await msg.NackAsync("Database connection timeout", ct);
In streaming mode, acks and nacks are called automatically by the consumer:
- Ack on successful handler completion.
- Nack with the exception message on handler failure.
In batch mode, you must call AckAsync() or NackAsync() explicitly for each message.
Bearer Token Authentication
Static token
Provide a bearer token at client creation:
var client = QueueTiClient.Create(address, new QueueTiClientOptions
{
BearerToken = "eyJhbGc..."
});
The token is injected as an Authorization: Bearer <token> header on every request.
Dynamic token refresh
For long-lived clients, provide a TokenRefresher to refresh the token before expiry:
var client = QueueTiClient.Create(address, new QueueTiClientOptions
{
BearerToken = "initial-token",
TokenRefresher = async ct =>
{
var response = await httpClient.GetAsync("/auth/refresh", ct);
var json = await response.Content.ReadAsStringAsync(ct);
var token = JsonDocument.Parse(json).RootElement.GetString("access_token");
return token!;
}
});
Behavior:
- The refresh task runs in the background and calls the refresher ~60 seconds before token expiry.
- On refresh success, the new token is stored and used for all subsequent requests.
- On refresh failure, the task retries with exponential backoff (5 s → 60 s) until success.
- Refresh runs until the client is disposed.
Update token at runtime
client.SetToken("new-jwt-token"); // thread-safe; immediate effect
Throws if no BearerToken was configured at creation time.
Configuration Reference
QueueTiClientOptions
| Property | Type | Default | Description |
|---|---|---|---|
BearerToken |
string? |
null |
Initial JWT for Bearer authentication. If set, enables automatic token injection and refresh capability. |
TokenRefresher |
Func<CancellationToken, Task<string>>? |
null |
Optional async callback to refresh the bearer token. Called ~60 s before token expiry. Requires BearerToken to be set. |
Insecure |
bool |
false |
Use ChannelCredentials.Insecure for plaintext http:// connections. Disable TLS/SSL. |
ConfigureChannel |
Action<GrpcChannelOptions>? |
null |
Callback to configure GrpcChannelOptions before channel creation (e.g., message size limits). |
ConfigureHttpClientBuilder |
Action<IHttpClientBuilder>? |
null |
DI-only: callback to configure the IHttpClientBuilder (e.g., add delegating handlers, set timeouts). |
Disposal
Both Dispose() and DisposeAsync() are supported and idempotent:
// Synchronous
client.Dispose();
// Asynchronous (preferred)
await client.DisposeAsync();
Disposal:
- Cancels the background token refresh task (if running).
- Shuts down the managed gRPC channel (if created via
Create()). - Can be called multiple times safely; subsequent calls have no effect.
Error Handling
gRPC errors
gRPC connection errors in streaming and batch consumers trigger automatic reconnection with exponential backoff. Errors are logged at warning or error level via ILogger<Consumer>.
Handler exceptions
In streaming mode, exceptions thrown by the handler are logged and the message is automatically nacked with the exception message. The stream continues processing subsequent messages.
In batch mode, exceptions are your responsibility to handle. Ack or nack each message explicitly.
Token refresh failures
Token refresh failures are logged and retried with exponential backoff. If refresh fails repeatedly, the client continues operating with the last valid token until the connection fails.
Thread Safety
QueueTiClientis safe to share across threads.TokenStoreusesReaderWriterLockSliminternally and disposal is guarded by an atomic flag.ProducerandConsumerare stateless wrappers; they are safe to use concurrently from multiple tasks.SetToken()is thread-safe and updates the interceptor state immediately.Dispose()andDisposeAsync()are thread-safe and idempotent.
Logging
The client uses ILogger<QueueTiClient> and ILogger<Consumer> for diagnostics. When using ASP.NET Core DI, configure logging as usual:
builder.Logging.AddConsole();
builder.Logging.SetMinimumLevel(LogLevel.Debug);
Key events logged:
- Bearer token refresh success / failure.
- Consumer stream reconnection and backoff intervals.
- Handler exceptions (error level).
- Nack failures (error level).
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Google.Protobuf (>= 3.29.3)
- Grpc.Net.Client (>= 2.67.0)
- Grpc.Net.ClientFactory (>= 2.67.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on QueueTi.Client:
| Package | Downloads |
|---|---|
|
QueueTi.Client.Aspire
.NET Aspire client integration for the QueueTi distributed message queue service. |
GitHub repositories
This package is not used by any popular GitHub repositories.