QueueTi.Client
2026.5.5
See the version list below for details.
dotnet add package QueueTi.Client --version 2026.5.5
NuGet\Install-Package QueueTi.Client -Version 2026.5.5
<PackageReference Include="QueueTi.Client" Version="2026.5.5" />
<PackageVersion Include="QueueTi.Client" Version="2026.5.5" />
<PackageReference Include="QueueTi.Client" />
paket add QueueTi.Client --version 2026.5.5
#r "nuget: QueueTi.Client, 2026.5.5"
#:package QueueTi.Client@2026.5.5
#addin nuget:?package=QueueTi.Client&version=2026.5.5
#tool nuget:?package=QueueTi.Client&version=2026.5.5
QueueTi C# Client
A proto-first gRPC client library for the QueueTi distributed message queue service. Targets .NET 10.0.
Installation
dotnet add package QueueTi.Client
Or with the Package Manager:
Install-Package QueueTi.Client
Quick Start
Create a client
using QueueTi;
var client = QueueTiClient.Create("https://queue.example.com", new QueueTiClientOptions
{
BearerToken = "your-jwt-token" // optional
});
Publish a message
using System.Text;
var producer = client.NewProducer();
string messageId = await producer.PublishAsync(
topic: "orders",
payload: Encoding.UTF8.GetBytes("Hello, QueueTi!"),
ct: CancellationToken.None
);
Consume messages (streaming)
var consumer = client.NewConsumer("orders", new ConsumerOptions
{
ConsumerGroup = "billing"
});
await consumer.ConsumeAsync(async (msg, ct) =>
{
Console.WriteLine($"Message {msg.Id}: {Encoding.UTF8.GetString(msg.Payload)}");
// Handler automatically acks on success, nacks on exception.
}, ct);
Clean up
await client.DisposeAsync();
Client Creation
Factory method (recommended)
Use QueueTiClient.Create() to construct a client with a managed gRPC channel:
var client = QueueTiClient.Create("https://queue.example.com", new QueueTiClientOptions
{
BearerToken = "jwt-token", // optional; enables Bearer auth
Insecure = false, // set true for plaintext http:// endpoints
ConfigureChannel = opts => { // optional; configure GrpcChannelOptions
opts.MaxReceiveMessageSize = 16 * 1024 * 1024;
}
});
Manual channel (advanced)
If you need full control over the gRPC channel, pass your own QueueService.QueueServiceClient. In this path you are responsible for wiring any interceptors — BearerToken is not automatically applied:
var channel = GrpcChannel.ForAddress("https://queue.example.com");
var grpcClient = new QueueService.QueueServiceClient(channel);
var client = new QueueTiClient(grpcClient, new QueueTiClientOptions());
To add bearer auth on a manual channel, intercept the invoker yourself:
var store = new TokenStore("jwt-token");
var invoker = channel.Intercept(new BearerTokenInterceptor(store));
var grpcClient = new QueueService.QueueServiceClient(invoker);
var client = new QueueTiClient(grpcClient, new QueueTiClientOptions());
Dependency Injection (ASP.NET Core)
Register the client in your service container using AddQueueTiClient():
builder.Services.AddQueueTiClient("https://queue.example.com", opts =>
{
opts.BearerToken = "initial-token";
opts.TokenRefresher = async ct => await GetFreshTokenAsync(ct);
opts.ConfigureHttpClientBuilder = httpBuilder =>
{
// Configure timeouts, delegating handlers, etc.
httpBuilder.ConfigureHttpClient(c => c.Timeout = TimeSpan.FromSeconds(30));
};
});
// Inject QueueTiClient into your controllers or services
app.MapPost("/orders", async (QueueTiClient client) =>
{
var producer = client.NewProducer();
var id = await producer.PublishAsync("orders", orderPayload);
return Results.Created($"/orders/{id}", new { id });
});
When Insecure = true, the DI path configures the gRPC channel to use plaintext HTTP credentials automatically through the AddGrpcClient code path.
.NET Aspire Integration
The QueueTi.Aspire.Hosting and QueueTi.Client.Aspire packages provide seamless integration with .NET Aspire orchestration for both AppHost and service projects.
Installation
Each package targets a different project in your Aspire solution:
AppHost project:
dotnet add package QueueTi.Aspire.Hosting
Service/worker project:
dotnet add package QueueTi.Client.Aspire
AppHost Setup
Use AddQueueTi() to register QueueTi as a distributed application resource in your AppHost:
// Program.cs (Aspire AppHost project)
var builder = DistributedApplication.CreateBuilder(args);
var postgres = builder.AddPostgres("postgres")
.AddDatabase("queueti-db");
var redis = builder.AddRedis("redis"); // optional — enables rate limiting
var queue = builder.AddQueueTi("queue")
.WithReplicas(3) // optional — defaults to 1
.WithNpgsqlDatabase(postgres)
.WithRedis(redis) // optional — enables rate limiting
.WithAuthentication(
username: "admin",
password: builder.AddParameter("queue-password", secret: true),
jwtSecret: builder.AddParameter("queue-jwt-secret", secret: true))
.WithLogLevel("info");
builder.AddProject<Projects.MyWorker>("worker")
.WithReference(queue)
.WithEnvironment("QueueTi__queue__HttpUrl", queue.GetEndpoint("http"));
builder.Build().Run();
Note: When using Postgres, generate passwords with only alphanumeric characters (
GenerateParameterDefault { Special = false }) to avoid URL encoding issues in the QueueTi server's internalpostgres://URL construction.
Key builder methods:
| Method | Purpose |
|---|---|
AddQueueTi(name, grpcPort?, httpPort?, tag?) |
Adds a QueueTi container resource. Pulls ghcr.io/joessst-dev/queue-ti. Endpoints: grpc (target 50051), http (target 8080). |
WithNpgsqlDatabase(database) |
Wires an Npgsql database resource. Automatically detects container-to-container networking and uses the Docker DNS alias + internal target port. Parses the connection string to extract credentials. Sets QUEUETI_DB_* env vars and adds WaitFor dependency. Throws DistributedApplicationException if the connection string cannot be parsed. |
WithRedis(redis) |
Wires an optional Redis resource for rate limiting. Uses container-to-container addressing when possible (Docker DNS alias + internal target port). Sets QUEUETI_REDIS_* env vars and adds WaitFor dependency. |
WithReplicas(n) |
Runs n instances of QueueTi. All replicas share the same database and Redis resources. |
WithAuthentication(username, password, jwtSecret) |
Configures authentication. Sets QUEUETI_AUTH_ENABLED and related env vars from ParameterResource values. |
WithLogLevel(level) |
Sets QUEUETI_LOG_LEVEL. |
Replicas
WithReplicas(n) starts n identical QueueTi container instances. When referenced by a service project via WithReference, Aspire routes connections across the replicas.
All replicas share the same database and Redis resources — wire those once on the resource builder and each instance picks up the same env vars:
var queue = builder.AddQueueTi("queue")
.WithReplicas(3)
.WithNpgsqlDatabase(postgres)
.WithRedis(redis);
Note:
WithReplicassets a fixed count at startup. For dynamic scaling based on load, configure scaling rules at the deployment target (Azure Container Apps, Kubernetes HPA) rather than in the AppHost.
Service Project Setup
In your worker or web project, call AddQueueTiClient() to register the gRPC client:
// Program.cs (Service project)
builder.AddQueueTiClient("queue");
var app = builder.Build();
app.Run();
The client automatically:
- Reads the connection string from
ConnectionStrings:queue(set by Aspire). - Registers
QueueTiClientin DI (and the underlyingQueueService.QueueServiceClient). - Configures health checks (if
HttpUrlis set; see Health Checks section below). - Instruments outbound gRPC calls with OpenTelemetry tracing.
With custom settings:
builder.AddQueueTiClient("queue", settings =>
{
settings.DisableHealthChecks = true; // if health checks are managed separately
settings.BearerToken = "your-jwt"; // if auth is enabled on the server
});
QueueTiClientSettings
| Property | Type | Default | Description |
|---|---|---|---|
ConnectionString |
string? |
null |
Explicit connection string. If not set, read from ConnectionStrings:{connectionName} or QueueTi:{connectionName} config. |
HttpUrl |
string? |
null |
HTTP endpoint of the QueueTi service. Required to enable health checks. Set via QueueTi:{connectionName}:HttpUrl config or QueueTi__{connectionName}__HttpUrl environment variable. |
DisableHealthChecks |
bool |
false |
Disable automatic health check registration. |
DisableTracing |
bool |
false |
Disable OpenTelemetry instrumentation. |
BearerToken |
string? |
null |
Optional bearer token for authentication. |
TokenRefresher |
Func<CancellationToken, Task<string>>? |
null |
Optional callback to refresh the bearer token at runtime. |
Health Checks
Health checks are only registered when DisableHealthChecks is false (default) and HttpUrl is explicitly set. The integration probes GET /healthz on the provided HTTP endpoint. The check is registered under tags live and queueti and requires no authentication.
Set HttpUrl via configuration:
builder.AddQueueTiClient("queue", settings =>
{
settings.HttpUrl = "http://queue:8080"; // or via QueueTi:queue:HttpUrl config
});
Or via environment variable when using Aspire:
builder.AddProject<Projects.MyService>("service")
.WithReference(queue)
.WithEnvironment("QueueTi__queue__HttpUrl", queue.GetEndpoint("http"));
Distributed Tracing
When DisableTracing is false (default), all outbound gRPC calls are instrumented using OpenTelemetry.Instrumentation.GrpcNetClient. Traces are exported via the Aspire telemetry pipeline.
Publishing Messages
Create a producer and call PublishAsync():
var producer = client.NewProducer();
// Minimal publish
string id = await producer.PublishAsync("orders", payload, ct: ct);
// With routing key and metadata
string id = await producer.PublishAsync("orders", payload, new PublishOptions
{
Key = "order-123", // optional routing/ordering key
Metadata = new Dictionary<string, string>
{
["source"] = "api",
["version"] = "v1"
}
}, ct);
PublishOptions
| Property | Type | Description |
|---|---|---|
Key |
string? |
Optional routing or ordering key for the message. |
Metadata |
IReadOnlyDictionary<string, string>? |
Arbitrary string key-value pairs attached to the message. |
Consuming Messages
Consumer Group Registration
QueueTi requires consumer groups to be explicitly registered before any messages can be delivered. Register a consumer group via the QueueTi HTTP API before calling ConsumeAsync() or ConsumeBatchAsync():
// Use a long-lived shared client or IHttpClientFactory in production
var response = await _httpClient.PostAsync(
"http://queue:8080/api/topics/orders/consumer-groups",
JsonContent.Create(new { consumer_group = "billing" }));
The endpoint returns 201 Created on success. See the samples below for an example of robust group registration with retry logic.
Streaming consumer (real-time processing)
Use ConsumeAsync() for continuous subscription with automatic acknowledgment:
var consumer = client.NewConsumer("orders", new ConsumerOptions
{
ConsumerGroup = "billing",
Concurrency = 4, // process up to 4 messages in parallel
VisibilityTimeoutSeconds = 30 // optional; override server default
});
await consumer.ConsumeAsync(async (msg, ct) =>
{
var orderData = JsonSerializer.Deserialize<Order>(msg.Payload);
await BillingService.ProcessAsync(orderData, ct);
// Automatically acked on return; automatically nacked if an exception is thrown.
}, ct);
Behavior:
- The handler is invoked for each message as it arrives from the server.
- The message is automatically acked on successful handler completion.
- The message is automatically nacked with the exception message if the handler throws.
- Handler invocations are limited by
Concurrencyto control parallelism. - The consumer reconnects with exponential backoff (500 ms → 30 s) on gRPC errors.
- Graceful shutdown (cancellation token) exits without nacking in-flight messages.
Batch consumer (polling)
Use ConsumeBatchAsync() for periodic polling:
var consumer = client.NewConsumer("orders");
await consumer.ConsumeBatchAsync(
batchSize: 10,
handler: async (messages, ct) =>
{
foreach (var msg in messages)
{
try
{
await ProcessMessageAsync(msg, ct);
await msg.AckAsync(ct); // must ack manually
}
catch (Exception ex)
{
await msg.NackAsync(ex.Message, ct); // must nack manually
}
}
},
ct: ct
);
Behavior:
- Polls the server for up to
batchSizemessages at a time. - If no messages are available, waits and retries with exponential backoff (500 ms → 30 s).
- You must explicitly call
AckAsync()orNackAsync()for each message. - Reconnects automatically on gRPC errors with exponential backoff.
ConsumerOptions
| Property | Type | Default | Description |
|---|---|---|---|
ConsumerGroup |
string |
"" |
Consumer group identity for coordinating consumers across instances. |
Concurrency |
int |
1 |
Maximum concurrent handler invocations in streaming mode. Ignored in batch mode. |
VisibilityTimeoutSeconds |
uint? |
null |
Visibility timeout (in seconds) for dequeued messages. If null, uses server default. |
Message Handling
QueueTiMessage
Each message received by the consumer provides:
| Property | Type | Description |
|---|---|---|
Id |
string |
Unique message identifier. |
Topic |
string |
Topic name. |
Payload |
byte[] |
Message body as bytes. |
Metadata |
IReadOnlyDictionary<string, string> |
Key-value metadata attached at publish time. |
CreatedAt |
DateTimeOffset |
Server timestamp when the message was created. |
RetryCount |
int |
Number of times this message has been retried. |
Acknowledgment
// Ack: message processed successfully
await msg.AckAsync(ct);
// Nack: processing failed; mark for retry
await msg.NackAsync("Database connection timeout", ct);
In streaming mode, acks and nacks are called automatically by the consumer:
- Ack on successful handler completion.
- Nack with the exception message on handler failure.
In batch mode, you must call AckAsync() or NackAsync() explicitly for each message.
Bearer Token Authentication
Static token
Provide a bearer token at client creation:
var client = QueueTiClient.Create(address, new QueueTiClientOptions
{
BearerToken = "eyJhbGc..."
});
The token is injected as an Authorization: Bearer <token> header on every request.
Dynamic token refresh
For long-lived clients, provide a TokenRefresher to refresh the token before expiry:
var client = QueueTiClient.Create(address, new QueueTiClientOptions
{
BearerToken = "initial-token",
TokenRefresher = async ct =>
{
var response = await httpClient.GetAsync("/auth/refresh", ct);
var json = await response.Content.ReadAsStringAsync(ct);
var token = JsonDocument.Parse(json).RootElement.GetString("access_token");
return token!;
}
});
Behavior:
- The refresh task runs in the background and calls the refresher ~60 seconds before token expiry.
- On refresh success, the new token is stored and used for all subsequent requests.
- On refresh failure, the task retries with exponential backoff (5 s → 60 s) until success.
- Refresh runs until the client is disposed.
Update token at runtime
client.SetToken("new-jwt-token"); // thread-safe; immediate effect
Throws if no BearerToken was configured at creation time.
Configuration Reference
QueueTiClientOptions
| Property | Type | Default | Description |
|---|---|---|---|
BearerToken |
string? |
null |
Initial JWT for Bearer authentication. If set, enables automatic token injection and refresh capability. |
TokenRefresher |
Func<CancellationToken, Task<string>>? |
null |
Optional async callback to refresh the bearer token. Called ~60 s before token expiry. Requires BearerToken to be set. |
Insecure |
bool |
false |
Use ChannelCredentials.Insecure for plaintext http:// connections. Disable TLS/SSL. |
ConfigureChannel |
Action<GrpcChannelOptions>? |
null |
Callback to configure GrpcChannelOptions before channel creation (e.g., message size limits). |
ConfigureHttpClientBuilder |
Action<IHttpClientBuilder>? |
null |
DI-only: callback to configure the IHttpClientBuilder (e.g., add delegating handlers, set timeouts). |
Disposal
Both Dispose() and DisposeAsync() are supported and idempotent:
// Synchronous
client.Dispose();
// Asynchronous (preferred)
await client.DisposeAsync();
Disposal:
- Cancels the background token refresh task (if running).
- Shuts down the managed gRPC channel (if created via
Create()). - Can be called multiple times safely; subsequent calls have no effect.
Error Handling
gRPC errors
gRPC connection errors in streaming and batch consumers trigger automatic reconnection with exponential backoff. Errors are logged at warning or error level via ILogger<Consumer>.
Handler exceptions
In streaming mode, exceptions thrown by the handler are logged and the message is automatically nacked with the exception message. The stream continues processing subsequent messages.
In batch mode, exceptions are your responsibility to handle. Ack or nack each message explicitly.
Token refresh failures
Token refresh failures are logged and retried with exponential backoff. If refresh fails repeatedly, the client continues operating with the last valid token until the connection fails.
Samples
Three complete sample projects are included in the samples/ directory:
QueueTi.Samples.AppHost — Aspire orchestration project that defines the distributed application:
- Adds Postgres and Redis containers with environment-safe passwords (alphanumeric-only for URL safety).
- Registers a QueueTi container resource and wires it to Postgres and Redis.
- Producer and consumer services reference the queue resource and wait for it to be healthy before starting.
- Injects the HTTP URL (
QueueTi__queue__HttpUrl) into both services so health checks and consumer group registration work correctly.
QueueTi.Samples.Producer — ASP.NET Core minimal API service:
POST /publish/{topic}— Accepts{"message": "...", "metadata": {...}}and publishes to the topic, returning{"id": "..."}.GET /health— Aspire health check endpoint.- Uses
AddQueueTiClient("queue")from the Aspire integration.
QueueTi.Samples.Consumer — Worker Service:
- On startup, registers the consumer group via
POST /api/topics/{topic}/consumer-groupswith exponential backoff retry (up to 30 seconds) until success. - Opens a
Subscribestream usingConsumeAsync()and logs each received message. - Topic and consumer group configurable via
Consumer:TopicandConsumer:Groupenvironment variables. - Uses
AddQueueTiClient("queue")from the Aspire integration.
To run the samples:
cd samples/QueueTi.Samples.AppHost
dotnet run
The AppHost orchestrates Postgres, Redis, and QueueTi containers locally, then runs the producer and consumer services.
Thread Safety
QueueTiClientis safe to share across threads.TokenStoreusesReaderWriterLockSliminternally and disposal is guarded by an atomic flag.ProducerandConsumerare stateless wrappers; they are safe to use concurrently from multiple tasks.SetToken()is thread-safe and updates the interceptor state immediately.Dispose()andDisposeAsync()are thread-safe and idempotent.
Logging
The client uses ILogger<QueueTiClient> and ILogger<Consumer> for diagnostics. When using ASP.NET Core DI, configure logging as usual:
builder.Logging.AddConsole();
builder.Logging.SetMinimumLevel(LogLevel.Debug);
Key events logged:
- Bearer token refresh success / failure.
- Consumer stream reconnection and backoff intervals.
- Handler exceptions (error level).
- Nack failures (error level).
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Google.Protobuf (>= 3.29.3)
- Grpc.Net.Client (>= 2.67.0)
- Grpc.Net.ClientFactory (>= 2.67.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on QueueTi.Client:
| Package | Downloads |
|---|---|
|
QueueTi.Client.Aspire
.NET Aspire client integration for the QueueTi distributed message queue service. |
GitHub repositories
This package is not used by any popular GitHub repositories.