ScalePad.Databricks.Zerobus
0.0.4
dotnet add package ScalePad.Databricks.Zerobus --version 0.0.4
NuGet\Install-Package ScalePad.Databricks.Zerobus -Version 0.0.4
<PackageReference Include="ScalePad.Databricks.Zerobus" Version="0.0.4" />
<PackageVersion Include="ScalePad.Databricks.Zerobus" Version="0.0.4" />
<PackageReference Include="ScalePad.Databricks.Zerobus" />
paket add ScalePad.Databricks.Zerobus --version 0.0.4
#r "nuget: ScalePad.Databricks.Zerobus, 0.0.4"
#:package ScalePad.Databricks.Zerobus@0.0.4
#addin nuget:?package=ScalePad.Databricks.Zerobus&version=0.0.4
#tool nuget:?package=ScalePad.Databricks.Zerobus&version=0.0.4
ScalePad.Databricks.Zerobus — .NET SDK
High-performance .NET SDK for streaming data ingestion into Databricks Delta tables using the Zerobus service. Built on the same Rust core as the Go SDK, exposed via P/Invoke (C FFI bindings).
Requirements
- .NET 8 or .NET 10
- Rust toolchain (for building the native
zerobus_ffilibrary from source)
Quick Start
using ScalePad.Databricks.Zerobus;
// 1. Create SDK instance.
using var sdk = new ZerobusSdk(
"https://your-shard.zerobus.databricks.com",
"https://your-workspace.databricks.com");
// 2. Configure stream options.
var options = StreamConfigurationOptions.Default with
{
RecordType = RecordType.Json,
};
// 3. Create stream.
using var stream = sdk.CreateStream(
new TableProperties("catalog.schema.table"),
clientId,
clientSecret,
options);
// 4. Ingest records.
long offset = stream.IngestRecord("""{"id": 1, "message": "Hello"}""");
// 5. Wait for acknowledgment.
stream.WaitForOffset(offset);
Installation
NuGet (when published)
dotnet add package ScalePad.Databricks.Zerobus
From Source
cd dotnet
dotnet build
The build automatically invokes build_native.sh to compile the Rust FFI shared library and place it in the correct runtimes/<RID>/native/ directory. You need cargo on your PATH (or in ~/.cargo/bin/).
To skip the automatic native build (e.g. when the library is pre-built):
dotnet build -p:SkipNativeBuild=true
API Reference
ZerobusSdk
The main entry point. Manages the connection to Zerobus and Unity Catalog.
using var sdk = new ZerobusSdk(zerobusEndpoint, unityCatalogUrl);
CreateStream
Creates a stream with OAuth 2.0 client credentials authentication.
using var stream = sdk.CreateStream(
new TableProperties("catalog.schema.table"),
clientId,
clientSecret,
options); // optional, defaults if null
CreateStreamWithHeadersProvider
Creates a stream with custom authentication headers.
using var stream = sdk.CreateStreamWithHeadersProvider(
new TableProperties("catalog.schema.table"),
new MyHeadersProvider(),
options); // optional
ZerobusStream
An active bidirectional gRPC stream for record ingestion. Thread-safe.
IngestRecord (sync)
Ingests a single record and returns its offset immediately (acknowledgment happens in background).
// JSON
long offset = stream.IngestRecord("""{"field": "value"}""");
// Protobuf
byte[] protoBytes = myMessage.ToByteArray();
long offset = stream.IngestRecord(protoBytes);
IngestRecordAsync (async)
Asynchronously ingests a single record and waits for server acknowledgment before returning.
// JSON
long offset = await stream.IngestRecordAsync("""{"field": "value"}""");
// Protobuf with cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
long offset = await stream.IngestRecordAsync(protoBytes, cts.Token);
// ReadOnlyMemory<byte> for zero-copy scenarios
ReadOnlyMemory<byte> data = GetData();
long offset = await stream.IngestRecordAsync(data);
IngestRecords (sync)
Ingests a batch of records and returns one offset for the whole batch.
string[] records = [
"""{"device": "sensor-001", "temp": 20}""",
"""{"device": "sensor-002", "temp": 21}""",
];
long batchOffset = stream.IngestRecords(records);
IngestRecordsAsync (async)
Asynchronously ingests a batch of records and waits for acknowledgment.
string[] records = [
"""{"device": "sensor-001", "temp": 20}""",
"""{"device": "sensor-002", "temp": 21}""",
];
long batchOffset = await stream.IngestRecordsAsync(records);
// With cancellation
using var cts = new CancellationTokenSource();
byte[][] protoRecords = GetProtoRecords();
long offset = await stream.IngestRecordsAsync(protoRecords, cts.Token);
WaitForOffset (sync)
Blocks until a specific offset is acknowledged by the server.
stream.WaitForOffset(offset);
WaitForOffsetAsync (async)
Asynchronously waits for a specific offset to be acknowledged.
await stream.WaitForOffsetAsync(offset);
// With cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await stream.WaitForOffsetAsync(offset, cts.Token);
Flush (sync)
Blocks until all pending records are acknowledged.
stream.Flush();
FlushAsync (async)
Asynchronously waits until all pending records are acknowledged.
await stream.FlushAsync();
// With cancellation
using var cts = new CancellationTokenSource();
await stream.FlushAsync(cts.Token);
GetUnackedRecords
Retrieves unacknowledged records after stream failure (call after close/failure only).
object[] unacked = stream.GetUnackedRecords();
Close / Dispose
Gracefully closes the stream (flushes first). Called automatically by using.
stream.Close();
// or simply let `using` handle it
IHeadersProvider
Interface for custom authentication.
public class CustomHeadersProvider : IHeadersProvider
{
public IDictionary<string, string> GetHeaders()
{
return new Dictionary<string, string>
{
["authorization"] = "Bearer " + GetToken(),
["x-databricks-zerobus-table-name"] = "catalog.schema.table",
};
}
}
StreamConfigurationOptions
Use C# record with expressions to customise:
var options = StreamConfigurationOptions.Default with
{
MaxInflightRequests = 50_000,
RecordType = RecordType.Json,
RecoveryRetries = 10,
};
| Property | Default | Description |
|---|---|---|
MaxInflightRequests |
1,000,000 | Backpressure control |
Recovery |
true |
Auto-recovery on failures |
RecoveryTimeoutMs |
15,000 | Timeout per recovery attempt |
RecoveryBackoffMs |
2,000 | Delay between retries |
RecoveryRetries |
4 | Max recovery attempts |
ServerLackOfAckTimeoutMs |
60,000 | Server ack timeout |
FlushTimeoutMs |
300,000 | Flush timeout (5 min) |
RecordType |
Proto |
Proto / Json / Unspecified |
StreamPausedMaxWaitTimeMs |
null |
Graceful close wait time |
Error Handling
Errors throw ZerobusException with an IsRetryable property:
try
{
long offset = stream.IngestRecord(data);
}
catch (ZerobusException ex) when (ex.IsRetryable)
{
// Transient error — SDK auto-recovers when Recovery is enabled.
Console.WriteLine($"Retryable error: {ex.RawMessage}");
}
catch (ZerobusException ex)
{
// Fatal error — manual intervention needed.
Console.WriteLine($"Fatal error: {ex.RawMessage}");
}
Concurrent Ingestion
The stream is thread-safe. Both synchronous and asynchronous methods can be called concurrently.
Using Async Methods (Recommended)
Use the async methods with Task.WhenAll or Parallel.ForEachAsync for concurrent ingestion with automatic acknowledgment:
// Concurrent async ingestion
var tasks = records.Select(record => stream.IngestRecordAsync(record));
long[] offsets = await Task.WhenAll(tasks);
// With Parallel.ForEachAsync
await Parallel.ForEachAsync(records, async (record, ct) =>
{
long offset = await stream.IngestRecordAsync(record, ct);
Console.WriteLine($"Record ingested at offset {offset}");
});
Using Sync Methods
For scenarios where you need more control over the ingestion and acknowledgment phases:
await Parallel.ForEachAsync(records, async (record, ct) =>
{
long offset = stream.IngestRecord(record);
await stream.WaitForOffsetAsync(offset, ct);
});
Native Library Setup
The native zerobus_ffi shared library is built automatically when you run dotnet build. The MSBuild target invokes build_native.sh, which:
- Detects your OS and architecture
- Runs
cargo build --releasein thezerobus-fficrate - Copies the shared library (
.dylib/.so/.dll) tosrc/Zerobus/runtimes/<RID>/native/ - Skips the rebuild if the library is already up to date
Manual Build
You can also run the script directly:
cd dotnet
./build_native.sh # Build for current platform
./build_native.sh --force # Force rebuild
Runtime Directories
The native library is placed in the standard .NET runtime identifier layout:
| Platform | Path |
|---|---|
| Linux x64 | runtimes/linux-x64/native/libzerobus_ffi.so |
| Linux arm64 | runtimes/linux-arm64/native/libzerobus_ffi.so |
| macOS x64 | runtimes/osx-x64/native/libzerobus_ffi.dylib |
| macOS arm64 | runtimes/osx-arm64/native/libzerobus_ffi.dylib |
| Windows x64 | runtimes/win-x64/native/zerobus_ffi.dll |
Testing
Unit Tests
Unit tests are isolated and do not require the native library:
dotnet test tests/Zerobus.Tests
Integration Tests
Integration tests spin up a mock gRPC server (per test) and exercise the full SDK through the native FFI layer. They require the Rust toolchain to build the native library:
dotnet test tests/Zerobus.IntegrationTests
The integration tests cover:
| Test | Scenario |
|---|---|
SuccessfulStreamCreation |
Stream creation succeeds |
TimeoutedStreamCreation |
Timeout when server responds slowly |
NonRetriableErrorDuringStreamCreation |
Non-retriable error (e.g. Unauthenticated) |
RetriableErrorWithoutRecoveryDuringStreamCreation |
Retriable error with recovery disabled |
GracefulClose |
Ingest record then close gracefully |
IdempotentClose |
Multiple Close() calls succeed |
IngestAfterClose |
Ingest after close throws |
IngestSingleRecord |
Single record ingest and ack |
IngestMultipleRecords |
Multiple sequential records with ack |
IngestBatchRecords |
Batch ingest of 5 records |
IngestRecordsAfterClose |
Batch ingest after close throws |
Each test gets its own mock gRPC server on a unique port, so all tests run in parallel.
Running All Tests
dotnet test
Project Structure
dotnet/
├── Zerobus.slnx # Solution file
├── Directory.Build.props # Shared build settings
├── build_native.sh # Rust FFI build script
├── README.md
├── src/
│ └── Zerobus/ # Main SDK library
│ ├── Zerobus.csproj
│ ├── ZerobusSdk.cs # SDK entry point (IDisposable)
│ ├── ZerobusStream.cs # Stream for record ingestion (IDisposable)
│ ├── ZerobusException.cs # Error type with IsRetryable
│ ├── IHeadersProvider.cs # Custom auth interface
│ ├── RecordType.cs # Proto / Json / Unspecified enum
│ ├── StreamConfigurationOptions.cs # Config record with defaults
│ ├── TableProperties.cs # Table name + optional descriptor
│ ├── Properties/
│ │ └── AssemblyInfo.cs
│ └── Native/ # P/Invoke layer (internal)
│ ├── NativeBindings.cs # Raw DllImport declarations
│ ├── NativeInterop.cs # Safe wrappers + marshalling
│ └── HeadersProviderBridge.cs # Managed→native callback bridge
├── tests/
│ ├── Zerobus.Tests/ # Unit tests (NUnit)
│ └── Zerobus.IntegrationTests/ # Integration tests (NUnit + gRPC mock)
│ ├── Zerobus.IntegrationTests.csproj
│ ├── IntegrationTests.cs # 11 integration tests
│ ├── MockZerobusServer.cs # Mock gRPC server
│ ├── TestHelpers.cs # Fixtures, response builders, interceptor
│ └── Protos/
│ └── zerobus_service.proto # Proto definition for gRPC stubs
└── examples/
├── JsonSingle/ # Single JSON record ingestion
├── JsonBatch/ # Batch JSON record ingestion
├── ProtoSingle/ # Single protobuf record ingestion
└── AsyncIngestion/ # Async ingestion patterns
Architecture
.NET SDK (ScalePad.Databricks.Zerobus)
↓ P/Invoke
Rust FFI (zerobus-ffi / libzerobus_ffi)
↓
Rust Core (databricks-zerobus-ingest-sdk)
↓ gRPC
Zerobus Service
License
Apache-2.0. See LICENSE.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- No dependencies.
-
net8.0
- No dependencies.
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.