CSharpDB.Pipelines
2.4.0
Prefix Reserved
See the version list below for details.
dotnet add package CSharpDB.Pipelines --version 2.4.0
NuGet\Install-Package CSharpDB.Pipelines -Version 2.4.0
<PackageReference Include="CSharpDB.Pipelines" Version="2.4.0" />
<PackageVersion Include="CSharpDB.Pipelines" Version="2.4.0" />
<PackageReference Include="CSharpDB.Pipelines" />
paket add CSharpDB.Pipelines --version 2.4.0
#r "nuget: CSharpDB.Pipelines, 2.4.0"
#:package CSharpDB.Pipelines@2.4.0
#addin nuget:?package=CSharpDB.Pipelines&version=2.4.0
#tool nuget:?package=CSharpDB.Pipelines&version=2.4.0
CSharpDB.Pipelines
Package contracts and runtime foundation for CSharpDB ETL pipelines.
Overview
CSharpDB.Pipelines defines portable pipeline packages and a small orchestration
runtime for batch ETL work. A pipeline package describes the source,
transformations, destination, execution options, and optional incremental state.
The built-in runtime can validate packages, serialize them to JSON, execute them
in batches, capture checkpoints, and report rejects and run metrics.
Current boundary:
- Built-in runtime components currently support CSV and JSON file sources/destinations
- Built-in transforms support
Select,Rename,Cast,Filter,Derive, andDeduplicate CSharpDBtable sources/destinations and SQL query sources are modeled in the contracts but are not implemented byDefaultPipelineComponentFactoryyet
Features
- Pipeline package model: strongly typed source, transform, destination, and execution settings
- Validation: schema-level validation before execution
- Serialization: save/load pipeline packages as JSON
- Runtime orchestration:
Validate,DryRun,Run, andResumemodes - Built-in connectors: CSV and JSON file readers/writers
- Built-in transforms: select, rename, cast, filter, derive, deduplicate
- Checkpointing hooks: pluggable checkpoint store and run logger abstractions
- Batch metrics: rows read/written/rejected plus batch counts
Usage
End-to-End Example
using CSharpDB.Pipelines.Models;
using CSharpDB.Pipelines.Runtime;
using CSharpDB.Pipelines.Runtime.BuiltIns;
using CSharpDB.Pipelines.Serialization;
using CSharpDB.Pipelines.Validation;
using CSharpDB.Primitives;
Directory.CreateDirectory("data");
await File.WriteAllLinesAsync("data/customers.csv",
[
"id,name,status",
"1,Alice,active",
"1,Alice,active",
"2,Bob,inactive",
"3,Carol,active",
]);
var package = new PipelinePackageDefinition
{
Name = "customers-csv-to-json",
Version = "1.0.0",
Description = "Import customers from CSV, clean them, and emit JSON.",
Source = new PipelineSourceDefinition
{
Kind = PipelineSourceKind.CsvFile,
Path = "data/customers.csv",
HasHeaderRow = true,
},
Transforms =
[
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Select,
SelectColumns = ["id", "name", "status"],
},
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Rename,
RenameMappings =
[
new PipelineRenameMapping { Source = "name", Target = "full_name" },
],
},
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Cast,
CastMappings =
[
new PipelineCastMapping { Column = "id", TargetType = DbType.Integer },
],
},
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Filter,
FilterExpression = "status == 'active'",
},
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Derive,
DerivedColumns =
[
new PipelineDerivedColumn { Name = "import_source", Expression = "'csv'" },
],
},
new PipelineTransformDefinition
{
Kind = PipelineTransformKind.Deduplicate,
DeduplicateKeys = ["id"],
},
],
Destination = new PipelineDestinationDefinition
{
Kind = PipelineDestinationKind.JsonFile,
Path = "data/customers.cleaned.json",
},
Options = new PipelineExecutionOptions
{
BatchSize = 2,
CheckpointInterval = 1,
ErrorMode = PipelineErrorMode.FailFast,
},
};
PipelineValidationResult validation = PipelinePackageValidator.Validate(package);
if (!validation.IsValid)
{
throw new InvalidOperationException(string.Join(
Environment.NewLine,
validation.Errors.Select(error => $"{error.Path}: {error.Message}")));
}
await PipelinePackageSerializer.SaveToFileAsync(package, "data/customers.pipeline.json");
PipelinePackageDefinition loadedPackage =
await PipelinePackageSerializer.LoadFromFileAsync("data/customers.pipeline.json");
var orchestrator = new PipelineOrchestrator(
new DefaultPipelineComponentFactory(),
new NullPipelineCheckpointStore(),
new NullPipelineRunLogger());
PipelineRunResult result = await orchestrator.ExecuteAsync(new PipelineRunRequest
{
Package = loadedPackage,
Mode = PipelineExecutionMode.Run,
});
Console.WriteLine(
$"{result.Status}: {result.Metrics.RowsRead} read, " +
$"{result.Metrics.RowsWritten} written, " +
$"{result.Metrics.RowsRejected} rejected");
string outputJson = await File.ReadAllTextAsync("data/customers.cleaned.json");
Console.WriteLine(outputJson);
The output file contains the active customers only, with duplicate IDs removed:
[
{
"id": 1,
"status": "active",
"full_name": "Alice",
"import_source": "csv"
},
{
"id": 3,
"status": "active",
"full_name": "Carol",
"import_source": "csv"
}
]
Execution Modes
Validate: validate the package only, without creating componentsDryRun: open the source and run transforms, but skip destination writesRun: execute the full pipeline and persist checkpointsResume: continue a previous run from the checkpoint returned by yourIPipelineCheckpointStore
Notes
DefaultPipelineComponentFactoryis the ready-to-run built-in factory for file pipelines- Use
NullPipelineCheckpointStoreandNullPipelineRunLoggerwhen you want a minimal in-process setup - Relative source file paths are searched from the current directory and app base directory; relative output paths are written relative to the current directory
Deriveexpressions are intentionally simple today: use a source column name or a literal such as'csv',123,true, ornull
Installation
dotnet add package CSharpDB.Pipelines
For the recommended all-in-one package:
dotnet add package CSharpDB
Dependencies
CSharpDB.Primitives- shared type system used by cast mappings and pipeline contracts
Related Packages
| Package | Description |
|---|---|
| CSharpDB | Recommended all-in-one package including pipelines, engine, storage, and client APIs |
| CSharpDB.Engine | Embedded database engine for SQL and collection access |
| CSharpDB.Client | Client SDK for database, pipeline, and maintenance operations |
License
MIT - see LICENSE for details.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- CSharpDB.Primitives (>= 2.4.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on CSharpDB.Pipelines:
| Package | Downloads |
|---|---|
|
CSharpDB.Client
Unified CSharpDB client SDK with pluggable transports (Direct, HTTP, gRPC, TCP, Named Pipes). |
GitHub repositories
This package is not used by any popular GitHub repositories.