CSharpDB.Pipelines 2.6.0

Prefix Reserved
There is a newer version of this package available.
See the version list below for details.
dotnet add package CSharpDB.Pipelines --version 2.6.0
                    
NuGet\Install-Package CSharpDB.Pipelines -Version 2.6.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="CSharpDB.Pipelines" Version="2.6.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="CSharpDB.Pipelines" Version="2.6.0" />
                    
Directory.Packages.props
<PackageReference Include="CSharpDB.Pipelines" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add CSharpDB.Pipelines --version 2.6.0
                    
#r "nuget: CSharpDB.Pipelines, 2.6.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package CSharpDB.Pipelines@2.6.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=CSharpDB.Pipelines&version=2.6.0
                    
Install as a Cake Addin
#tool nuget:?package=CSharpDB.Pipelines&version=2.6.0
                    
Install as a Cake Tool

CSharpDB.Pipelines

Package contracts and runtime foundation for CSharpDB ETL pipelines.

NuGet .NET 10 Release License: MIT

Overview

CSharpDB.Pipelines defines portable pipeline packages and a small orchestration runtime for batch ETL work. A pipeline package describes the source, transformations, destination, execution options, and optional incremental state. The built-in runtime can validate packages, serialize them to JSON, execute them in batches, capture checkpoints, and report rejects and run metrics.

Current boundary:

  • Built-in runtime components currently support CSV and JSON file sources/destinations
  • Built-in transforms support Select, Rename, Cast, Filter, Derive, and Deduplicate
  • CSharpDB table sources/destinations and SQL query sources are modeled in the contracts but are not implemented by DefaultPipelineComponentFactory yet

Features

  • Pipeline package model: strongly typed source, transform, destination, and execution settings
  • Validation: schema-level validation before execution
  • Serialization: save/load pipeline packages as JSON
  • Runtime orchestration: Validate, DryRun, Run, and Resume modes
  • Built-in connectors: CSV and JSON file readers/writers
  • Built-in transforms: select, rename, cast, filter, derive, deduplicate
  • Checkpointing hooks: pluggable checkpoint store and run logger abstractions
  • Batch metrics: rows read/written/rejected plus batch counts

Usage

End-to-End Example

using CSharpDB.Pipelines.Models;
using CSharpDB.Pipelines.Runtime;
using CSharpDB.Pipelines.Runtime.BuiltIns;
using CSharpDB.Pipelines.Serialization;
using CSharpDB.Pipelines.Validation;
using CSharpDB.Primitives;

Directory.CreateDirectory("data");

await File.WriteAllLinesAsync("data/customers.csv",
[
    "id,name,status",
    "1,Alice,active",
    "1,Alice,active",
    "2,Bob,inactive",
    "3,Carol,active",
]);

var package = new PipelinePackageDefinition
{
    Name = "customers-csv-to-json",
    Version = "1.0.0",
    Description = "Import customers from CSV, clean them, and emit JSON.",
    Source = new PipelineSourceDefinition
    {
        Kind = PipelineSourceKind.CsvFile,
        Path = "data/customers.csv",
        HasHeaderRow = true,
    },
    Transforms =
    [
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Select,
            SelectColumns = ["id", "name", "status"],
        },
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Rename,
            RenameMappings =
            [
                new PipelineRenameMapping { Source = "name", Target = "full_name" },
            ],
        },
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Cast,
            CastMappings =
            [
                new PipelineCastMapping { Column = "id", TargetType = DbType.Integer },
            ],
        },
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Filter,
            FilterExpression = "status == 'active'",
        },
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Derive,
            DerivedColumns =
            [
                new PipelineDerivedColumn { Name = "import_source", Expression = "'csv'" },
            ],
        },
        new PipelineTransformDefinition
        {
            Kind = PipelineTransformKind.Deduplicate,
            DeduplicateKeys = ["id"],
        },
    ],
    Destination = new PipelineDestinationDefinition
    {
        Kind = PipelineDestinationKind.JsonFile,
        Path = "data/customers.cleaned.json",
    },
    Options = new PipelineExecutionOptions
    {
        BatchSize = 2,
        CheckpointInterval = 1,
        ErrorMode = PipelineErrorMode.FailFast,
    },
};

PipelineValidationResult validation = PipelinePackageValidator.Validate(package);
if (!validation.IsValid)
{
    throw new InvalidOperationException(string.Join(
        Environment.NewLine,
        validation.Errors.Select(error => $"{error.Path}: {error.Message}")));
}

await PipelinePackageSerializer.SaveToFileAsync(package, "data/customers.pipeline.json");
PipelinePackageDefinition loadedPackage =
    await PipelinePackageSerializer.LoadFromFileAsync("data/customers.pipeline.json");

var orchestrator = new PipelineOrchestrator(
    new DefaultPipelineComponentFactory(),
    new NullPipelineCheckpointStore(),
    new NullPipelineRunLogger());

PipelineRunResult result = await orchestrator.ExecuteAsync(new PipelineRunRequest
{
    Package = loadedPackage,
    Mode = PipelineExecutionMode.Run,
});

Console.WriteLine(
    $"{result.Status}: {result.Metrics.RowsRead} read, " +
    $"{result.Metrics.RowsWritten} written, " +
    $"{result.Metrics.RowsRejected} rejected");

string outputJson = await File.ReadAllTextAsync("data/customers.cleaned.json");
Console.WriteLine(outputJson);

The output file contains the active customers only, with duplicate IDs removed:

[
  {
    "id": 1,
    "status": "active",
    "full_name": "Alice",
    "import_source": "csv"
  },
  {
    "id": 3,
    "status": "active",
    "full_name": "Carol",
    "import_source": "csv"
  }
]

Execution Modes

  • Validate: validate the package only, without creating components
  • DryRun: open the source and run transforms, but skip destination writes
  • Run: execute the full pipeline and persist checkpoints
  • Resume: continue a previous run from the checkpoint returned by your IPipelineCheckpointStore

Notes

  • DefaultPipelineComponentFactory is the ready-to-run built-in factory for file pipelines
  • Use NullPipelineCheckpointStore and NullPipelineRunLogger when you want a minimal in-process setup
  • Relative source file paths are searched from the current directory and app base directory; relative output paths are written relative to the current directory
  • Derive expressions are intentionally simple today: use a source column name or a literal such as 'csv', 123, true, or null

Installation

dotnet add package CSharpDB.Pipelines

For the recommended all-in-one package:

dotnet add package CSharpDB

Dependencies

  • CSharpDB.Primitives - shared type system used by cast mappings and pipeline contracts
Package Description
CSharpDB Recommended all-in-one package including pipelines, engine, storage, and client APIs
CSharpDB.Engine Embedded database engine for SQL and collection access
CSharpDB.Client Client SDK for database, pipeline, and maintenance operations

License

MIT - see LICENSE for details.

Product Compatible and additional computed target framework versions.
.NET net10.0 is compatible.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on CSharpDB.Pipelines:

Package Downloads
CSharpDB.Client

Unified CSharpDB client SDK with pluggable transports (Direct, HTTP, gRPC, TCP, Named Pipes).

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
3.0.0 47 4/8/2026
2.9.1 51 4/7/2026
2.8.1 94 4/6/2026
2.8.0 95 4/4/2026
2.7.0 89 3/31/2026
2.6.0 95 3/29/2026
2.5.0 188 3/28/2026
2.4.0 92 3/24/2026