ScalePad.Databricks.Zerobus 0.0.4

dotnet add package ScalePad.Databricks.Zerobus --version 0.0.4
                    
NuGet\Install-Package ScalePad.Databricks.Zerobus -Version 0.0.4
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="ScalePad.Databricks.Zerobus" Version="0.0.4" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="ScalePad.Databricks.Zerobus" Version="0.0.4" />
                    
Directory.Packages.props
<PackageReference Include="ScalePad.Databricks.Zerobus" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add ScalePad.Databricks.Zerobus --version 0.0.4
                    
#r "nuget: ScalePad.Databricks.Zerobus, 0.0.4"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package ScalePad.Databricks.Zerobus@0.0.4
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=ScalePad.Databricks.Zerobus&version=0.0.4
                    
Install as a Cake Addin
#tool nuget:?package=ScalePad.Databricks.Zerobus&version=0.0.4
                    
Install as a Cake Tool

ScalePad.Databricks.Zerobus — .NET SDK

High-performance .NET SDK for streaming data ingestion into Databricks Delta tables using the Zerobus service. Built on the same Rust core as the Go SDK, exposed via P/Invoke (C FFI bindings).

Requirements

  • .NET 8 or .NET 10
  • Rust toolchain (for building the native zerobus_ffi library from source)

Quick Start

using ScalePad.Databricks.Zerobus;

// 1. Create SDK instance.
using var sdk = new ZerobusSdk(
    "https://your-shard.zerobus.databricks.com",
    "https://your-workspace.databricks.com");

// 2. Configure stream options.
var options = StreamConfigurationOptions.Default with
{
    RecordType = RecordType.Json,
};

// 3. Create stream.
using var stream = sdk.CreateStream(
    new TableProperties("catalog.schema.table"),
    clientId,
    clientSecret,
    options);

// 4. Ingest records.
long offset = stream.IngestRecord("""{"id": 1, "message": "Hello"}""");

// 5. Wait for acknowledgment.
stream.WaitForOffset(offset);

Installation

NuGet (when published)

dotnet add package ScalePad.Databricks.Zerobus

From Source

cd dotnet
dotnet build

The build automatically invokes build_native.sh to compile the Rust FFI shared library and place it in the correct runtimes/<RID>/native/ directory. You need cargo on your PATH (or in ~/.cargo/bin/).

To skip the automatic native build (e.g. when the library is pre-built):

dotnet build -p:SkipNativeBuild=true

API Reference

ZerobusSdk

The main entry point. Manages the connection to Zerobus and Unity Catalog.

using var sdk = new ZerobusSdk(zerobusEndpoint, unityCatalogUrl);
CreateStream

Creates a stream with OAuth 2.0 client credentials authentication.

using var stream = sdk.CreateStream(
    new TableProperties("catalog.schema.table"),
    clientId,
    clientSecret,
    options);  // optional, defaults if null
CreateStreamWithHeadersProvider

Creates a stream with custom authentication headers.

using var stream = sdk.CreateStreamWithHeadersProvider(
    new TableProperties("catalog.schema.table"),
    new MyHeadersProvider(),
    options);  // optional

ZerobusStream

An active bidirectional gRPC stream for record ingestion. Thread-safe.

IngestRecord (sync)

Ingests a single record and returns its offset immediately (acknowledgment happens in background).

// JSON
long offset = stream.IngestRecord("""{"field": "value"}""");

// Protobuf
byte[] protoBytes = myMessage.ToByteArray();
long offset = stream.IngestRecord(protoBytes);
IngestRecordAsync (async)

Asynchronously ingests a single record and waits for server acknowledgment before returning.

// JSON
long offset = await stream.IngestRecordAsync("""{"field": "value"}""");

// Protobuf with cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
long offset = await stream.IngestRecordAsync(protoBytes, cts.Token);

// ReadOnlyMemory<byte> for zero-copy scenarios
ReadOnlyMemory<byte> data = GetData();
long offset = await stream.IngestRecordAsync(data);
IngestRecords (sync)

Ingests a batch of records and returns one offset for the whole batch.

string[] records = [
    """{"device": "sensor-001", "temp": 20}""",
    """{"device": "sensor-002", "temp": 21}""",
];
long batchOffset = stream.IngestRecords(records);
IngestRecordsAsync (async)

Asynchronously ingests a batch of records and waits for acknowledgment.

string[] records = [
    """{"device": "sensor-001", "temp": 20}""",
    """{"device": "sensor-002", "temp": 21}""",
];
long batchOffset = await stream.IngestRecordsAsync(records);

// With cancellation
using var cts = new CancellationTokenSource();
byte[][] protoRecords = GetProtoRecords();
long offset = await stream.IngestRecordsAsync(protoRecords, cts.Token);
WaitForOffset (sync)

Blocks until a specific offset is acknowledged by the server.

stream.WaitForOffset(offset);
WaitForOffsetAsync (async)

Asynchronously waits for a specific offset to be acknowledged.

await stream.WaitForOffsetAsync(offset);

// With cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await stream.WaitForOffsetAsync(offset, cts.Token);
Flush (sync)

Blocks until all pending records are acknowledged.

stream.Flush();
FlushAsync (async)

Asynchronously waits until all pending records are acknowledged.

await stream.FlushAsync();

// With cancellation
using var cts = new CancellationTokenSource();
await stream.FlushAsync(cts.Token);
GetUnackedRecords

Retrieves unacknowledged records after stream failure (call after close/failure only).

object[] unacked = stream.GetUnackedRecords();
Close / Dispose

Gracefully closes the stream (flushes first). Called automatically by using.

stream.Close();
// or simply let `using` handle it

IHeadersProvider

Interface for custom authentication.

public class CustomHeadersProvider : IHeadersProvider
{
    public IDictionary<string, string> GetHeaders()
    {
        return new Dictionary<string, string>
        {
            ["authorization"] = "Bearer " + GetToken(),
            ["x-databricks-zerobus-table-name"] = "catalog.schema.table",
        };
    }
}

StreamConfigurationOptions

Use C# record with expressions to customise:

var options = StreamConfigurationOptions.Default with
{
    MaxInflightRequests = 50_000,
    RecordType = RecordType.Json,
    RecoveryRetries = 10,
};
Property Default Description
MaxInflightRequests 1,000,000 Backpressure control
Recovery true Auto-recovery on failures
RecoveryTimeoutMs 15,000 Timeout per recovery attempt
RecoveryBackoffMs 2,000 Delay between retries
RecoveryRetries 4 Max recovery attempts
ServerLackOfAckTimeoutMs 60,000 Server ack timeout
FlushTimeoutMs 300,000 Flush timeout (5 min)
RecordType Proto Proto / Json / Unspecified
StreamPausedMaxWaitTimeMs null Graceful close wait time

Error Handling

Errors throw ZerobusException with an IsRetryable property:

try
{
    long offset = stream.IngestRecord(data);
}
catch (ZerobusException ex) when (ex.IsRetryable)
{
    // Transient error — SDK auto-recovers when Recovery is enabled.
    Console.WriteLine($"Retryable error: {ex.RawMessage}");
}
catch (ZerobusException ex)
{
    // Fatal error — manual intervention needed.
    Console.WriteLine($"Fatal error: {ex.RawMessage}");
}

Concurrent Ingestion

The stream is thread-safe. Both synchronous and asynchronous methods can be called concurrently.

Use the async methods with Task.WhenAll or Parallel.ForEachAsync for concurrent ingestion with automatic acknowledgment:

// Concurrent async ingestion
var tasks = records.Select(record => stream.IngestRecordAsync(record));
long[] offsets = await Task.WhenAll(tasks);

// With Parallel.ForEachAsync
await Parallel.ForEachAsync(records, async (record, ct) =>
{
    long offset = await stream.IngestRecordAsync(record, ct);
    Console.WriteLine($"Record ingested at offset {offset}");
});

Using Sync Methods

For scenarios where you need more control over the ingestion and acknowledgment phases:

await Parallel.ForEachAsync(records, async (record, ct) =>
{
    long offset = stream.IngestRecord(record);
    await stream.WaitForOffsetAsync(offset, ct);
});

Native Library Setup

The native zerobus_ffi shared library is built automatically when you run dotnet build. The MSBuild target invokes build_native.sh, which:

  1. Detects your OS and architecture
  2. Runs cargo build --release in the zerobus-ffi crate
  3. Copies the shared library (.dylib / .so / .dll) to src/Zerobus/runtimes/<RID>/native/
  4. Skips the rebuild if the library is already up to date

Manual Build

You can also run the script directly:

cd dotnet
./build_native.sh           # Build for current platform
./build_native.sh --force   # Force rebuild

Runtime Directories

The native library is placed in the standard .NET runtime identifier layout:

Platform Path
Linux x64 runtimes/linux-x64/native/libzerobus_ffi.so
Linux arm64 runtimes/linux-arm64/native/libzerobus_ffi.so
macOS x64 runtimes/osx-x64/native/libzerobus_ffi.dylib
macOS arm64 runtimes/osx-arm64/native/libzerobus_ffi.dylib
Windows x64 runtimes/win-x64/native/zerobus_ffi.dll

Testing

Unit Tests

Unit tests are isolated and do not require the native library:

dotnet test tests/Zerobus.Tests

Integration Tests

Integration tests spin up a mock gRPC server (per test) and exercise the full SDK through the native FFI layer. They require the Rust toolchain to build the native library:

dotnet test tests/Zerobus.IntegrationTests

The integration tests cover:

Test Scenario
SuccessfulStreamCreation Stream creation succeeds
TimeoutedStreamCreation Timeout when server responds slowly
NonRetriableErrorDuringStreamCreation Non-retriable error (e.g. Unauthenticated)
RetriableErrorWithoutRecoveryDuringStreamCreation Retriable error with recovery disabled
GracefulClose Ingest record then close gracefully
IdempotentClose Multiple Close() calls succeed
IngestAfterClose Ingest after close throws
IngestSingleRecord Single record ingest and ack
IngestMultipleRecords Multiple sequential records with ack
IngestBatchRecords Batch ingest of 5 records
IngestRecordsAfterClose Batch ingest after close throws

Each test gets its own mock gRPC server on a unique port, so all tests run in parallel.

Running All Tests

dotnet test

Project Structure

dotnet/
├── Zerobus.slnx                              # Solution file
├── Directory.Build.props                      # Shared build settings
├── build_native.sh                            # Rust FFI build script
├── README.md
├── src/
│   └── Zerobus/                               # Main SDK library
│       ├── Zerobus.csproj
│       ├── ZerobusSdk.cs                      # SDK entry point (IDisposable)
│       ├── ZerobusStream.cs                   # Stream for record ingestion (IDisposable)
│       ├── ZerobusException.cs                # Error type with IsRetryable
│       ├── IHeadersProvider.cs                # Custom auth interface
│       ├── RecordType.cs                      # Proto / Json / Unspecified enum
│       ├── StreamConfigurationOptions.cs      # Config record with defaults
│       ├── TableProperties.cs                 # Table name + optional descriptor
│       ├── Properties/
│       │   └── AssemblyInfo.cs
│       └── Native/                            # P/Invoke layer (internal)
│           ├── NativeBindings.cs              # Raw DllImport declarations
│           ├── NativeInterop.cs               # Safe wrappers + marshalling
│           └── HeadersProviderBridge.cs       # Managed→native callback bridge
├── tests/
│   ├── Zerobus.Tests/                         # Unit tests (NUnit)
│   └── Zerobus.IntegrationTests/              # Integration tests (NUnit + gRPC mock)
│       ├── Zerobus.IntegrationTests.csproj
│       ├── IntegrationTests.cs                # 11 integration tests
│       ├── MockZerobusServer.cs               # Mock gRPC server
│       ├── TestHelpers.cs                     # Fixtures, response builders, interceptor
│       └── Protos/
│           └── zerobus_service.proto          # Proto definition for gRPC stubs
└── examples/
    ├── JsonSingle/                            # Single JSON record ingestion
    ├── JsonBatch/                             # Batch JSON record ingestion
    ├── ProtoSingle/                           # Single protobuf record ingestion
    └── AsyncIngestion/                        # Async ingestion patterns

Architecture

.NET SDK (ScalePad.Databricks.Zerobus)
    ↓ P/Invoke
Rust FFI (zerobus-ffi / libzerobus_ffi)
    ↓
Rust Core (databricks-zerobus-ingest-sdk)
    ↓ gRPC
Zerobus Service

License

Apache-2.0. See LICENSE.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
  • net10.0

    • No dependencies.
  • net8.0

    • No dependencies.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.0.4 37 3/6/2026
0.0.3 549 2/12/2026