Icecake.Ingest.Streaming 1.1.0

dotnet add package Icecake.Ingest.Streaming --version 1.1.0
                    
NuGet\Install-Package Icecake.Ingest.Streaming -Version 1.1.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Icecake.Ingest.Streaming" Version="1.1.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Icecake.Ingest.Streaming" Version="1.1.0" />
                    
Directory.Packages.props
<PackageReference Include="Icecake.Ingest.Streaming" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Icecake.Ingest.Streaming --version 1.1.0
                    
#r "nuget: Icecake.Ingest.Streaming, 1.1.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Icecake.Ingest.Streaming@1.1.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Icecake.Ingest.Streaming&version=1.1.0
                    
Install as a Cake Addin
#tool nuget:?package=Icecake.Ingest.Streaming&version=1.1.0
                    
Install as a Cake Tool

<p align="center"> <img src="icon.png" alt="Icecake Streaming Icon" width="150"/> </p>

Icecake.Ingest.Streaming

Unofficial C# client for Snowflake Snowpipe Streaming (V2)

Icecake.Ingest.Streaming is a lightweight, high-performance C# library for sending data to Snowflake Snowpipe Streaming V2.

This project is not affiliated with Snowflake Inc., is not an official SDK, and is not endorsed or supported by Snowflake.

The library is based on the ingestion behaviour exposed through Snowflake’s official client ecosystem (notably the Java/Python SDKs). Although the raw HTTP endpoints are not formally documented (such as with an OpenAPI spec), their structure and semantics are visible through Snowflake’s publicly distributed client libraries. Icecake library intentionally mirrors the conceptual structure of Snowflake’s Java/Python Streaming SDKs (Client + Channel), while offering an idiomatic C# implementation.

The main abstractions are:

  • SnowpipeIngestClient
  • SnowpipeIngestChannel

These map closely to Snowflake’s own ingestion clients in their official language SDKs.

⚠️ Disclaimer This library is community-maintained. API compatibility with Snowflake may require updates as Snowflake evolves their services.


✨ Features

  • ✔ Fully supports Snowpipe Streaming V2 ingestion endpoint
  • ✔ Async batching and ingestion
  • ✔ Automatic:
    • Chunk creation
    • GZIP compression
    • MD5 checksumming
  • ✔ RSA key-pair authentication
  • ✔ Offset token support for resumable ingestion
  • ✔ Multi-target builds:
    • .NET 8.0 — full capability
    • netstandard2.0 — maximum compatibility with polyfills
  • ✔ Dependency-light, efficient.
  • ✔ Provides DI extensions.

📦 Packages

Icecake.Ingest.Streaming

Core ingestion client and abstractions:

dotnet add package Icecake.Ingest.Streaming

Icecake.Ingest.Streaming.Services

Optional helpers for DI, configuration, and hosted execution:

dotnet add package Icecake.Ingest.Streaming.Services

🚀 Example Usage with Dependency Injection

⚙️ Example appsettings.jsonc

{
  "Snowflake": {
    "Account": {
      "OrganizationName": "A1234567890123",
      "AccountName": "HELLO_ACCOUNT"
    },
    "Credentials": {
      "User": "ingestion",
      "PrivateKeyPath": "ingestion_key.p8"
    },
    "Connect": {
      "Database": "HELLO_DB",
      "Schema": "PUBLIC",
      "Role": "HELLO_ROLE",
      "Warehouse": "HELLO_WH"
    },

    // Default client options (change as needed)
    "ClientOptions": {

      // User-Agent header included in every request
      "UserAgent": "snowflake-ingest-dotnet/0.1",

      // Timeout for HTTP operations
      "Timeout": "00:01:00",

      // Base delay for retry backoff
      "RetryBackoffBase": "00:00:00.200",

      // Maximum retry attempts
      "MaxRetries": 5,

      // HTTP proxy configuration (set null if unused)
      "Proxy": null,

      // Validate SSL/TLS certificates
      "ValidateCertificates": true,

      // Enable GZIP compression for ingestion payloads
      "EnableGzipOnAppend": true,

      // Minimum number of bytes before GZIP is applied
      "GzipMinBytes": 4096,

      // GZIP compression level (Fastest | Optimal | etc.)
      "GzipLevel": "Fastest"
    }
  }
}

🏗️ Build Phase (registering the client)

builder.Services.ConfigureRegisterSnowpipeIngestClient(builder.Configuration);

// OR — if using a custom config layout:
builder.Services.AddSnowpipeIngestClient();

🔥 Runtime Example (end-to-end flow)

using var host = builder.Build();
await host.StartAsync();

using var scope = host.Services.CreateScope();

var client  = scope.ServiceProvider.GetRequiredService<SnowpipeIngestClient>();
var helper  = scope.ServiceProvider.GetRequiredService<SnowflakeHelper>();
var logger  = scope.ServiceProvider.GetRequiredService<ILogger<Program>>();
var chLogger = scope.ServiceProvider.GetRequiredService<ILogger<SnowpipeIngestChannel>>();

var table = new SchemaObjectCoords
{
    Database = "HELLO_DB",
    Schema   = "PUBLIC",
    Name     = "TELEMETRIES",
};

var schema = new TableSchema
{
    SchemaObject = table,
    ColumnsByName = new()
    {
        ["ID"]    = new() { Name = "ID",    Type = SnowflakeType.NUMBER },
        ["VALUE"] = new() { Name = "VALUE", Type = SnowflakeType.VARCHAR }
    }
};

// Channel name should remain stable for the same app instance
var channelName = $"IngestionApp_{Environment.MachineName}_{table.Name}_ch";
var pipeName    = $"{table.Name}_PIPE";

// Ensure Snowflake-side pipe exists, that's optional!
await helper.EnsureStreamingPipeForTableAsync(table, pipeName);

// Create ingestion channel
var channel = new SnowpipeIngestChannel(
    channelName,
    pipeName,
    schema,
    client,
    new FlushPolicy(),
    chLogger
);

await channel.OpenAsync();

// Build rows
var rows = new List<Dictionary<string, object?>>();
int id = 1000;

for (int i = 0; i < 10; i++)
{
    rows.Add(new()
    {
        ["ID"]    = id++,
        ["VALUE"] = Guid.NewGuid().ToString()[..10]
    });
}

// Adding an offset token to the next flush.
// The offset token is not strictly required, but it is highly useful:
//   - It tracks how Snowflake processes the data your application ingests
//   - It enables recovery / resume logic for ingestion workflows
//
// Note:
// Icecake behaves differently from Snowflake’s official SDKs here.
// In the Java/Python clients, offset tokens are passed with the Rows ingested.
// In Icecake, the offset token is *explicitly* attached to the next flush,
// that's what happens in practice.
channel.SetOffsetTokenForNextFlush($"{id}");

// Queue rows for ingestion
channel.InsertRows(rows);

// Flush to Snowflake
await channel.FlushAsync(offsetToken: $"{id}");

// Wait for Snowflake to confirm
var committed = await channel.FetchLatestCommittedOffsetAsync(
    timeOutSeconds: 20,
    pollMilliseconds: 250
);
// 20 seconds should be enough!
logger.LogInformation("Committed offset: {Offset}", committed ?? "<pending>");

// Clean shutdown
await channel.DisposeAsync();

// The channel can be reopened at any time.
await channel.OpenAsync();

// Retrieve the latest committed offset again.
// This should match the previously confirmed offset.
var committedOffsetBackAgain = await channel.FetchLatestCommittedOffsetAsync();

// Add more rows
var moreRows = new List<Dictionary<string, object?>>();

for (var i = 0; i < 10; i++)
{
    moreRows.Add(new Dictionary<string, object?>
    {
        ["ID"] = id++,
        ["VALUE"] = Guid.NewGuid().ToString()[..10]
    });
}

// Attach an offset token to the next flush.
// As before, Icecake requires explicitly assigning the offset token
// to the next flush, rather than relying on implicit behavior.
channel.SetOffsetTokenForNextFlush($"{id}");

// Queue and flush the rows
channel.InsertRows(moreRows);
await channel.FlushAsync();

// Dispose the channel locally
await channel.DisposeAsync();

// The channel continues to exist on the Snowflake side unless explicitly dropped.
// For long-running ingestion apps, channels should typically be kept and reused,
// since they allow resuming ingestion and offset tracking across restarts.
//
// Here we drop it just for demonstration.
await channel.DropAsync();

🔐 Authentication

This library supports:

  • RSA key-pair authentication
  • PKCS#1 and PKCS#8 (encrypted or plaintext)
  • Automatic computation of Snowflake’s fingerprint format: SHA256:<Base64Digest>
  • Uses BouncyCastle for compatibility on netstandard2.0

⚠️ Not an official Snowflake SDK

To restate:

  • This package is not created, supported, or endorsed by Snowflake
  • Only public REST APIs are used
  • Behavior is inspired by Snowflake’s Java/Python SDKs but adapted for C#
  • Snowflake updates may require library changes

🤝 Contributing

Issues and PRs are welcome. If Snowflake updates their streaming API, please open an issue so we can track compatibility.


📄 License

MIT License.

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on Icecake.Ingest.Streaming:

Package Downloads
Icecake.Ingest.Streaming.Services

DI extensions to add core components of Icecake ingest streaming package

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.1.0 500 12/9/2025
1.0.0 299 11/30/2025