QueueTi.Client 2026.5.9

dotnet add package QueueTi.Client --version 2026.5.9
                    
NuGet\Install-Package QueueTi.Client -Version 2026.5.9
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="QueueTi.Client" Version="2026.5.9" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="QueueTi.Client" Version="2026.5.9" />
                    
Directory.Packages.props
<PackageReference Include="QueueTi.Client" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add QueueTi.Client --version 2026.5.9
                    
#r "nuget: QueueTi.Client, 2026.5.9"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package QueueTi.Client@2026.5.9
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=QueueTi.Client&version=2026.5.9
                    
Install as a Cake Addin
#tool nuget:?package=QueueTi.Client&version=2026.5.9
                    
Install as a Cake Tool

QueueTi C# Client

NuGet License: MIT

A proto-first gRPC client library for the QueueTi distributed message queue service. Targets .NET 8.0.

Installation

dotnet add package QueueTi.Client

Or with the Package Manager:

Install-Package QueueTi.Client

Quick Start

Create a client

using QueueTi;

var client = QueueTiClient.Create("https://queue.example.com", new QueueTiClientOptions
{
    BearerToken = "your-jwt-token"  // optional
});

Publish a message

using System.Text;

var producer = client.NewProducer();

string messageId = await producer.PublishAsync(
    topic: "orders",
    payload: Encoding.UTF8.GetBytes("Hello, QueueTi!"),
    ct: CancellationToken.None
);

Consume messages (streaming)

var consumer = client.NewConsumer("orders", new ConsumerOptions
{
    ConsumerGroup = "billing"
});

await consumer.ConsumeAsync(async (msg, ct) =>
{
    Console.WriteLine($"Message {msg.Id}: {Encoding.UTF8.GetString(msg.Payload)}");
    // Handler automatically acks on success, nacks on exception.
}, ct);

Clean up

await client.DisposeAsync();

Client Creation

Use QueueTiClient.Create() to construct a client with a managed gRPC channel:

var client = QueueTiClient.Create("https://queue.example.com", new QueueTiClientOptions
{
    BearerToken = "jwt-token",        // optional; enables Bearer auth
    Insecure = false,                 // set true for plaintext http:// endpoints
    ConfigureChannel = opts => {      // optional; configure GrpcChannelOptions
        opts.MaxReceiveMessageSize = 16 * 1024 * 1024;
    }
});

Manual channel (advanced)

If you need full control over the gRPC channel, pass your own QueueService.QueueServiceClient. In this path you are responsible for wiring any interceptors — BearerToken is not automatically applied:

var channel = GrpcChannel.ForAddress("https://queue.example.com");
var grpcClient = new QueueService.QueueServiceClient(channel);
var client = new QueueTiClient(grpcClient, new QueueTiClientOptions());

To add bearer auth on a manual channel, intercept the invoker yourself:

var store = new TokenStore("jwt-token");
var invoker = channel.Intercept(new BearerTokenInterceptor(store));
var grpcClient = new QueueService.QueueServiceClient(invoker);
var client = new QueueTiClient(grpcClient, new QueueTiClientOptions());

Dependency Injection (ASP.NET Core)

Registering QueueTiClient

Register the gRPC client in your service container using AddQueueTiClient():

builder.Services.AddQueueTiClient("https://queue.example.com", opts =>
{
    opts.BearerToken = "initial-token";
    opts.TokenRefresher = async ct => await GetFreshTokenAsync(ct);
    opts.ConfigureHttpClientBuilder = httpBuilder =>
    {
        // Configure timeouts, delegating handlers, etc.
        httpBuilder.ConfigureHttpClient(c => c.Timeout = TimeSpan.FromSeconds(30));
    };
});

// Inject QueueTiClient into your controllers or services
app.MapPost("/orders", async (QueueTiClient client) =>
{
    var producer = client.NewProducer();
    var id = await producer.PublishAsync("orders", orderPayload);
    return Results.Created($"/orders/{id}", new { id });
});

When Insecure = true, the DI path configures the gRPC channel to use plaintext HTTP credentials automatically through the AddGrpcClient code path.

Registering AdminClient

Register the admin REST client using AddQueueTiAdminClient():

builder.Services.AddQueueTiAdminClient("http://queue.example.com", opts =>
{
    opts.BearerToken = "admin-token";
    opts.TokenRefresher = async ct => await GetFreshAdminTokenAsync(ct);
});

// Inject AdminClient into your services
app.MapPost("/admin/topics/{topic}", async (AdminClient admin, string topic) =>
{
    var config = new TopicConfig(Topic: topic, Replayable: true, MaxRetries: 3);
    await admin.UpsertTopicConfigAsync(topic, config);
    return Results.Created($"/admin/topics/{topic}", config);
});

AddQueueTiAdminClient() uses IHttpClientFactory internally. The BearerToken, TokenRefresher, and Insecure options all apply the same way as for QueueTiClient.

.NET Aspire Integration

The QueueTi.Aspire.Hosting and QueueTi.Client.Aspire packages provide seamless integration with .NET Aspire orchestration for both AppHost and service projects.

Installation

Each package targets a different project in your Aspire solution:

AppHost project:

dotnet add package QueueTi.Aspire.Hosting

Service/worker project:

dotnet add package QueueTi.Client.Aspire

AppHost Setup

Use AddQueueTi() to register QueueTi as a distributed application resource in your AppHost:

// Program.cs (Aspire AppHost project)
var builder = DistributedApplication.CreateBuilder(args);

var postgres = builder.AddPostgres("postgres")
    .AddDatabase("queueti-db");

var redis = builder.AddRedis("redis"); // optional — enables rate limiting

var queue = builder.AddQueueTi("queue")
    .WithReplicas(3)                   // optional — defaults to 1
    .WithNpgsqlDatabase(postgres)
    .WithRedis(redis)                  // optional — enables rate limiting
    .WithAuthentication(
        username: "admin",
        password: builder.AddParameter("queue-password", secret: true),
        jwtSecret: builder.AddParameter("queue-jwt-secret", secret: true))
    .WithLogLevel("info");

builder.AddProject<Projects.MyWorker>("worker")
    .WithReference(queue)
    .WithEnvironment("QueueTi__queue__HttpUrl", queue.GetEndpoint("http"));

builder.Build().Run();

Note: When using Postgres, generate passwords with only alphanumeric characters (GenerateParameterDefault { Special = false }) to avoid URL encoding issues in the QueueTi server's internal postgres:// URL construction.

Key builder methods:

Method Purpose
AddQueueTi(name, grpcPort?, httpPort?, tag?) Adds a QueueTi container resource. Pulls ghcr.io/joessst-dev/queue-ti. Endpoints: grpc (target 50051), http (target 8080).
WithNpgsqlDatabase(database) Wires an Npgsql database resource. Automatically detects container-to-container networking and uses the Docker DNS alias + internal target port. Parses the connection string to extract credentials. Sets QUEUETI_DB_* env vars and adds WaitFor dependency. Throws DistributedApplicationException if the connection string cannot be parsed.
WithRedis(redis) Wires an optional Redis resource for rate limiting. Uses container-to-container addressing when possible (Docker DNS alias + internal target port). Sets QUEUETI_REDIS_* env vars and adds WaitFor dependency.
WithReplicas(n) Runs n instances of QueueTi. All replicas share the same database and Redis resources.
WithAuthentication(username, password, jwtSecret) Configures authentication. Sets QUEUETI_AUTH_ENABLED and related env vars from ParameterResource values.
WithLogLevel(level) Sets QUEUETI_LOG_LEVEL.

Replicas

WithReplicas(n) starts n identical QueueTi container instances. When referenced by a service project via WithReference, Aspire routes connections across the replicas.

All replicas share the same database and Redis resources — wire those once on the resource builder and each instance picks up the same env vars:

var queue = builder.AddQueueTi("queue")
    .WithReplicas(3)
    .WithNpgsqlDatabase(postgres)
    .WithRedis(redis);

Note: WithReplicas sets a fixed count at startup. For dynamic scaling based on load, configure scaling rules at the deployment target (Azure Container Apps, Kubernetes HPA) rather than in the AppHost.

Service Project Setup

In your worker or web project, call AddQueueTiClient() to register the gRPC client:

// Program.cs (Service project)
builder.AddQueueTiClient("queue");

var app = builder.Build();
app.Run();

The client automatically:

  • Reads the connection string from ConnectionStrings:queue (set by Aspire).
  • Registers QueueTiClient in DI (and the underlying QueueService.QueueServiceClient).
  • If QueueTiClientSettings.HttpUrl is set, also registers AdminClient in DI and configures health checks (see Health Checks section below).
  • Instruments outbound gRPC calls with OpenTelemetry tracing.

With custom settings:

builder.AddQueueTiClient("queue", settings =>
{
    settings.DisableHealthChecks = true; // if health checks are managed separately
    settings.BearerToken = "your-jwt";   // if auth is enabled on the server
});

QueueTiClientSettings

Property Type Default Description
ConnectionString string? null Explicit connection string. If not set, read from ConnectionStrings:{connectionName} or QueueTi:{connectionName} config.
HttpUrl string? null HTTP endpoint of the QueueTi service. Required to enable health checks. Set via QueueTi:{connectionName}:HttpUrl config or QueueTi__{connectionName}__HttpUrl environment variable.
DisableHealthChecks bool false Disable automatic health check registration.
DisableTracing bool false Disable OpenTelemetry instrumentation.
BearerToken string? null Optional bearer token for authentication.
TokenRefresher Func<CancellationToken, Task<string>>? null Optional callback to refresh the bearer token at runtime.

Health Checks

Health checks are only registered when DisableHealthChecks is false (default) and HttpUrl is explicitly set. The integration probes GET /healthz on the provided HTTP endpoint. The check is registered under tags live and queueti and requires no authentication.

Set HttpUrl via configuration:

builder.AddQueueTiClient("queue", settings =>
{
    settings.HttpUrl = "http://queue:8080";  // or via QueueTi:queue:HttpUrl config
});

Or via environment variable when using Aspire:

builder.AddProject<Projects.MyService>("service")
    .WithReference(queue)
    .WithEnvironment("QueueTi__queue__HttpUrl", queue.GetEndpoint("http"));

Distributed Tracing

When DisableTracing is false (default), all outbound gRPC calls are instrumented using OpenTelemetry.Instrumentation.GrpcNetClient. Traces are exported via the Aspire telemetry pipeline.

Publishing Messages

Create a producer and call PublishAsync():

var producer = client.NewProducer();

// Minimal publish
string id = await producer.PublishAsync("orders", payload, ct: ct);

// With routing key and metadata
string id = await producer.PublishAsync("orders", payload, new PublishOptions
{
    Key = "order-123",  // optional routing/ordering key
    Metadata = new Dictionary<string, string>
    {
        ["source"] = "api",
        ["version"] = "v1"
    }
}, ct);

PublishOptions

Property Type Description
Key string? Optional routing or ordering key for the message.
Metadata IReadOnlyDictionary<string, string>? Arbitrary string key-value pairs attached to the message.

Consuming Messages

Consumer Group Registration

QueueTi requires consumer groups to be explicitly registered before any messages can be delivered. Use AdminClient.RegisterConsumerGroupAsync() before calling ConsumeAsync() or ConsumeBatchAsync():

await admin.RegisterConsumerGroupAsync("orders", "billing");

QueueTiConflictException is thrown if the group is already registered — treat this as success. See the samples below for an example of robust group registration with exponential backoff retry.

Streaming consumer (real-time processing)

Use ConsumeAsync() for continuous subscription with automatic acknowledgment:

var consumer = client.NewConsumer("orders", new ConsumerOptions
{
    ConsumerGroup = "billing",
    Concurrency = 4,                 // process up to 4 messages in parallel
    VisibilityTimeoutSeconds = 30    // optional; override server default
});

await consumer.ConsumeAsync(async (msg, ct) =>
{
    var orderData = JsonSerializer.Deserialize<Order>(msg.Payload);
    await BillingService.ProcessAsync(orderData, ct);
    // Automatically acked on return; automatically nacked if an exception is thrown.
}, ct);

Behavior:

  • The handler is invoked for each message as it arrives from the server.
  • The message is automatically acked on successful handler completion.
  • The message is automatically nacked with the exception message if the handler throws.
  • Handler invocations are limited by Concurrency to control parallelism.
  • The consumer reconnects with exponential backoff (500 ms → 30 s) on gRPC errors.
  • Graceful shutdown (cancellation token) exits without nacking in-flight messages.

Batch consumer (polling)

Use ConsumeBatchAsync() for periodic polling:

var consumer = client.NewConsumer("orders");

await consumer.ConsumeBatchAsync(
    batchSize: 10,
    handler: async (messages, ct) =>
    {
        foreach (var msg in messages)
        {
            try
            {
                await ProcessMessageAsync(msg, ct);
                await msg.AckAsync(ct);  // must ack manually
            }
            catch (Exception ex)
            {
                await msg.NackAsync(ex.Message, ct);  // must nack manually
            }
        }
    },
    ct: ct
);

Behavior:

  • Polls the server for up to batchSize messages at a time.
  • If no messages are available, waits and retries with exponential backoff (500 ms → 30 s).
  • You must explicitly call AckAsync() or NackAsync() for each message.
  • Reconnects automatically on gRPC errors with exponential backoff.

ConsumerOptions

Property Type Default Description
ConsumerGroup string "" Consumer group identity for coordinating consumers across instances.
Concurrency int 1 Maximum concurrent handler invocations in streaming mode. Ignored in batch mode.
VisibilityTimeoutSeconds uint? null Visibility timeout (in seconds) for dequeued messages. If null, uses server default.

Message Handling

QueueTiMessage

Each message received by the consumer provides:

Property Type Description
Id string Unique message identifier.
Topic string Topic name.
Payload byte[] Message body as bytes.
Metadata IReadOnlyDictionary<string, string> Key-value metadata attached at publish time.
CreatedAt DateTimeOffset Server timestamp when the message was created.
RetryCount int Number of times this message has been retried.

Acknowledgment

// Ack: message processed successfully
await msg.AckAsync(ct);

// Nack: processing failed; mark for retry
await msg.NackAsync("Database connection timeout", ct);

In streaming mode, acks and nacks are called automatically by the consumer:

  • Ack on successful handler completion.
  • Nack with the exception message on handler failure.

In batch mode, you must call AckAsync() or NackAsync() explicitly for each message.

Admin Client

The AdminClient wraps the QueueTi admin REST API to manage topic configurations, schemas, and consumer group registration. It is available in the same QueueTi.Client NuGet package as QueueTiClient.

Creating an admin client

Use the factory method for a managed HTTP client:

using QueueTi;

var admin = AdminClient.Create(
    baseUrl: "http://queue.example.com",
    options: new QueueTiClientOptions
    {
        BearerToken = "your-jwt-token"  // optional
    }
);

Or inject via dependency injection (see "Dependency Injection" section above).

For manual HTTP client management (e.g., with IHttpClientFactory), use the constructor directly:

var admin = new AdminClient(httpClient);

When using this constructor, you are responsible for wiring authentication headers yourself (for example via a DelegatingHandler). SetToken() is not available with this constructor — it requires a TokenStore configured through AdminClient.Create or AddQueueTiAdminClient.

Topic Configuration

Manage topic settings:

// List all topic configurations
var configs = await admin.ListTopicConfigsAsync();
foreach (var config in configs)
{
    Console.WriteLine($"Topic: {config.Topic}, Replayable: {config.Replayable}");
}

// Get or create a topic configuration
var created = await admin.UpsertTopicConfigAsync("orders", new TopicConfig(
    Topic: "orders",
    Replayable: true,
    MaxRetries: 3,
    MessageTtlSeconds: 86400,
    MaxDepth: 10000
));

// Delete a topic configuration
await admin.DeleteTopicConfigAsync("orders");

TopicConfig properties:

Property Type Description
Topic string Topic name (immutable).
Replayable bool Whether the topic supports message replay.
MaxRetries int? Maximum retry attempts for a message before it becomes dead-lettered.
MessageTtlSeconds int? Time-to-live for messages (in seconds) before automatic expiry.
MaxDepth int? Maximum number of unacknowledged messages to queue.
ReplayWindowSeconds int? Window during which messages can be replayed.
ThroughputLimit int? Rate limit (messages per second) for this topic.

Topic Schema

Manage topic schemas for schema validation:

// List all topic schemas
var schemas = await admin.ListTopicSchemasAsync();

// Get the current schema for a topic
var schema = await admin.GetTopicSchemaAsync("orders");
Console.WriteLine($"Version: {schema.Version}, Updated: {schema.UpdatedAt}");

// Upsert a schema (JSON schema as a string)
var schemaJson = @"{
    ""type"": ""object"",
    ""properties"": {
        ""order_id"": { ""type"": ""string"" },
        ""amount"": { ""type"": ""number"" }
    },
    ""required"": [""order_id""]
}";

var updated = await admin.UpsertTopicSchemaAsync("orders", schemaJson);

// Delete a topic schema
await admin.DeleteTopicSchemaAsync("orders");

TopicSchema properties:

Property Type Description
Topic string Topic name.
SchemaJson string The schema definition as a JSON string.
Version int Schema version (incremented on each update).
UpdatedAt string ISO 8601 timestamp of the last update.

Consumer Group Management

Register and unregister consumer groups:

// List consumer groups for a topic
var groups = await admin.ListConsumerGroupsAsync("orders");
foreach (var group in groups)
{
    Console.WriteLine($"Group: {group}");
}

// Register a consumer group
await admin.RegisterConsumerGroupAsync("orders", "billing");

// Unregister a consumer group
await admin.UnregisterConsumerGroupAsync("orders", "billing");

Statistics

Retrieve aggregate queue statistics:

var stats = await admin.StatsAsync();
foreach (var topicStat in stats)
{
    Console.WriteLine($"Topic: {topicStat.Topic}, Status: {topicStat.Status}, Count: {topicStat.Count}");
}

TopicStat properties:

Property Type Description
Topic string Topic name.
Status string Status of the topic as reported by the server.
Count int Number of unacknowledged messages on this topic.

Error Handling

The admin client throws specific exceptions for HTTP error responses:

try
{
    await admin.DeleteTopicConfigAsync("nonexistent");
}
catch (QueueTiNotFoundException ex)
{
    Console.WriteLine($"Topic not found: {ex.Message}");
}
catch (QueueTiConflictException ex)
{
    Console.WriteLine($"Conflict (likely already registered): {ex.Message}");
}

Exception types:

  • QueueTiNotFoundException — Thrown on HTTP 404 (resource not found).
  • QueueTiConflictException — Thrown on HTTP 409 (conflict, e.g., topic already exists).
  • Other non-2xx responses throw HttpRequestException.

Cleanup

AdminClient implements IDisposable and IAsyncDisposable:

await admin.DisposeAsync();  // preferred
// or
admin.Dispose();

When created via AdminClient.Create, the client owns the underlying HttpClient and disposes it on cleanup. When constructed with new AdminClient(httpClient) or resolved from DI via AddQueueTiAdminClient, the caller or the IHttpClientFactory owns the HttpClient — the client will not dispose it.

TLS Configuration

TlsOptions controls server certificate validation, mutual TLS, and SNI hostname override for both QueueTiClient and AdminClient. It is mutually exclusive with Insecure.

When Tls is not set and Insecure is false (the default), the system trust store is used with no further configuration required.

Custom CA certificate

Use a custom CA for self-signed or internal PKI servers:

var client = QueueTiClient.Create("https://queue.example.com:50051", new QueueTiClientOptions
{
    Tls = new TlsOptions
    {
        RootCertificates = await File.ReadAllBytesAsync("/path/to/ca.pem"),
    },
});

Mutual TLS (mTLS)

Provide both a client certificate chain and private key for mTLS authentication:

var client = QueueTiClient.Create("https://queue.example.com:50051", new QueueTiClientOptions
{
    Tls = new TlsOptions
    {
        RootCertificates = await File.ReadAllBytesAsync("/path/to/ca.pem"),
        CertificateChain = await File.ReadAllBytesAsync("/path/to/client-cert.pem"),
        PrivateKey = await File.ReadAllBytesAsync("/path/to/client-key.pem"),
    },
});

CertificateChain and PrivateKey must be supplied together — providing only one throws ArgumentException.

Server name override

Use a different SNI hostname than the connection address (e.g., for self-signed certs with a fixed CN):

var client = QueueTiClient.Create("https://192.168.1.100:50051", new QueueTiClientOptions
{
    Tls = new TlsOptions
    {
        RootCertificates = await File.ReadAllBytesAsync("/path/to/ca.pem"),
        ServerNameOverride = "queue.internal",
    },
});

TlsOptions applies identically to AdminClient.Create.

TlsOptions reference

Property Type Description
RootCertificates byte[]? PEM-encoded CA certificate(s). null uses the system trust store.
CertificateChain byte[]? PEM-encoded client certificate chain for mTLS. Requires PrivateKey.
PrivateKey byte[]? PEM-encoded client private key for mTLS. Requires CertificateChain.
ServerNameOverride string? Hostname sent as TLS SNI and used for certificate verification.

Bearer Token Authentication

Obtaining a token

Use QueueTiAuth to check whether the server requires authentication and to log in:

using QueueTi;

const string HttpAddress = "http://queue.example.com:8080";  // REST API
const string GrpcAddress = "http://queue.example.com:50051"; // gRPC

string? bearerToken = null;

if (await QueueTiAuth.GetAuthRequiredAsync(HttpAddress))
{
    bearerToken = await QueueTiAuth.LoginAsync(HttpAddress, username: "admin", password: "secret");
}

var client = QueueTiClient.Create(GrpcAddress, new QueueTiClientOptions
{
    BearerToken = bearerToken  // null when auth is disabled
});

Both methods accept an insecure flag (set true for plain http:// endpoints) and an optional CancellationToken.

LoginAsync throws HttpRequestException on a non-2xx response (e.g., 401 Unauthorized) and InvalidOperationException if the server response is missing the token field.

Static token

Provide a bearer token at client creation:

var client = QueueTiClient.Create(address, new QueueTiClientOptions
{
    BearerToken = "eyJhbGc..."
});

The token is injected as an Authorization: Bearer <token> header on every request.

Dynamic token refresh

For long-lived clients, provide a TokenRefresher to refresh the token before expiry:

var client = QueueTiClient.Create(address, new QueueTiClientOptions
{
    BearerToken = "initial-token",
    TokenRefresher = async ct =>
    {
        var response = await httpClient.GetAsync("/auth/refresh", ct);
        var json = await response.Content.ReadAsStringAsync(ct);
        var token = JsonDocument.Parse(json).RootElement.GetString("access_token");
        return token!;
    }
});

Behavior:

  • The refresh task runs in the background and calls the refresher ~60 seconds before token expiry.
  • On refresh success, the new token is stored and used for all subsequent requests.
  • On refresh failure, the task retries with exponential backoff (5 s → 60 s) until success.
  • Refresh runs until the client is disposed.

Update token at runtime

client.SetToken("new-jwt-token");  // thread-safe; immediate effect

Throws if no BearerToken was configured at creation time.

Configuration Reference

QueueTiClientOptions

Property Type Default Description
BearerToken string? null Initial JWT for Bearer authentication. If set, enables automatic token injection and refresh capability.
TokenRefresher Func<CancellationToken, Task<string>>? null Optional async callback to refresh the bearer token. Called ~60 s before token expiry. Requires BearerToken to be set.
Insecure bool false Use ChannelCredentials.Insecure for plaintext http:// connections. Disable TLS/SSL.
ConfigureChannel Action<GrpcChannelOptions>? null Callback to configure GrpcChannelOptions before channel creation (e.g., message size limits).
ConfigureHttpClientBuilder Action<IHttpClientBuilder>? null DI-only: callback to configure the IHttpClientBuilder (e.g., add delegating handlers, set timeouts).

Disposal

Both Dispose() and DisposeAsync() are supported and idempotent:

// Synchronous
client.Dispose();

// Asynchronous (preferred)
await client.DisposeAsync();

Disposal:

  1. Cancels the background token refresh task (if running).
  2. Shuts down the managed gRPC channel (if created via Create()).
  3. Can be called multiple times safely; subsequent calls have no effect.

Error Handling

gRPC errors

gRPC connection errors in streaming and batch consumers trigger automatic reconnection with exponential backoff. Errors are logged at warning or error level via ILogger<Consumer>.

Handler exceptions

In streaming mode, exceptions thrown by the handler are logged and the message is automatically nacked with the exception message. The stream continues processing subsequent messages.

In batch mode, exceptions are your responsibility to handle. Ack or nack each message explicitly.

Token refresh failures

Token refresh failures are logged and retried with exponential backoff. If refresh fails repeatedly, the client continues operating with the last valid token until the connection fails.

Samples

Three complete sample projects are included in the samples/ directory:

QueueTi.Samples.AppHost — Aspire orchestration project that defines the distributed application:

  • Adds Postgres and Redis containers with environment-safe passwords (alphanumeric-only for URL safety).
  • Registers a QueueTi container resource and wires it to Postgres and Redis.
  • Producer and consumer services reference the queue resource and wait for it to be healthy before starting.
  • Injects the HTTP URL (QueueTi__queue__HttpUrl) into both services so health checks and consumer group registration work correctly.

QueueTi.Samples.Producer — ASP.NET Core minimal API service:

  • POST /publish/{topic} — Accepts {"message": "...", "metadata": {...}} and publishes to the topic, returning {"id": "..."}.
  • GET /health — Aspire health check endpoint.
  • Uses AddQueueTiClient("queue") from the Aspire integration.

QueueTi.Samples.Consumer — Worker Service:

  • On startup, registers the consumer group via AdminClient.RegisterConsumerGroupAsync() with exponential backoff retry (up to 30 seconds) until success. QueueTiConflictException (already registered) is treated as success.
  • Opens a Subscribe stream using ConsumeAsync() and logs each received message.
  • Topic and consumer group configurable via Consumer:Topic and Consumer:Group environment variables.
  • Uses AddQueueTiClient("queue") from the Aspire integration, which also registers AdminClient when the HTTP URL is configured.

To run the samples:

cd samples/QueueTi.Samples.AppHost
dotnet run

The AppHost orchestrates Postgres, Redis, and QueueTi containers locally, then runs the producer and consumer services.

Standalone examples

The examples/ directory contains self-contained console apps that run against a local QueueTi instance with no Aspire dependency.

examples/OrderPipeline/ — end-to-end producer → consumer → DLQ lifecycle:

  • Checks for auth and logs in via QueueTiAuth if required.
  • Upserts topic config (Replayable: false) and registers the consumer group.
  • Publishes five orders; ord-003 carries type=poison metadata.
  • Runs the streaming consumer (Concurrency = 3, VisibilityTimeoutSeconds = 5) and a DLQ monitor concurrently via Task.WhenAll.
  • The DLQ monitor polls for registration until the orders.dlq topic exists (created server-side on first dead-letter), then drains it in 5-second batch windows.
  • Both loops stop cleanly on Ctrl+C.

To run it against a local QueueTi instance (see the QueueTi server repo for setup):

dotnet run --project examples/OrderPipeline/

Credentials default to admin / secret (the docker-compose defaults). Override via environment variables:

QUEUETI_USERNAME=myuser QUEUETI_PASSWORD=mypass dotnet run --project examples/OrderPipeline/

Thread Safety

  • QueueTiClient is safe to share across threads. TokenStore uses ReaderWriterLockSlim internally and disposal is guarded by an atomic flag.
  • AdminClient is safe to share across threads. Underlying HttpClient is thread-safe.
  • Producer and Consumer are stateless wrappers; they are safe to use concurrently from multiple tasks.
  • SetToken() is thread-safe and updates the interceptor state immediately for both clients.
  • Dispose() and DisposeAsync() are thread-safe and idempotent for both clients.

Logging

The clients use ILogger<QueueTiClient>, ILogger<Consumer>, and ILogger<AdminClient> for diagnostics. When using ASP.NET Core DI, configure logging as usual:

builder.Logging.AddConsole();
builder.Logging.SetMinimumLevel(LogLevel.Debug);

Key events logged:

  • Bearer token refresh success / failure (for both clients).
  • Consumer stream reconnection and backoff intervals.
  • Handler exceptions (error level).
  • Nack failures (error level).
  • Admin API errors (error level).
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on QueueTi.Client:

Package Downloads
QueueTi.Client.Aspire

.NET Aspire client integration for the QueueTi distributed message queue service.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
2026.5.9 32 5/7/2026
2026.5.8 43 5/5/2026
2026.5.7 43 5/5/2026
2026.5.6 34 5/5/2026
2026.5.5 39 5/5/2026
2026.5.4 37 5/5/2026
2026.5.3 46 5/5/2026
2026.5.2 43 5/5/2026
2026.5.1 32 5/5/2026
2026.5.0 31 5/5/2026