ClickHouse.Ado.Dapper 2.0.3

dotnet add package ClickHouse.Ado.Dapper --version 2.0.3                
NuGet\Install-Package ClickHouse.Ado.Dapper -Version 2.0.3                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="ClickHouse.Ado.Dapper" Version="2.0.3" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add ClickHouse.Ado.Dapper --version 2.0.3                
#r "nuget: ClickHouse.Ado.Dapper, 2.0.3"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install ClickHouse.Ado.Dapper as a Cake Addin
#addin nuget:?package=ClickHouse.Ado.Dapper&version=2.0.3

// Install ClickHouse.Ado.Dapper as a Cake Tool
#tool nuget:?package=ClickHouse.Ado.Dapper&version=2.0.3                

https://gitee.com/znyet/clickhouse.ado

        static void Main(string[] args)
        {
            TimeZone.Hour = 8;

            using var conn = new ClickHouseConnection("Host=192.168.238.129;Port=9000;User=default;Database=test;Compress=True;");
            conn.Open();

            //conn.Execute("create database if not exists test");
            var dbList = conn.Query<string>("show databases"); //show databases like 'name'
            Console.WriteLine(JsonSerializer.Serialize(dbList));

            //conn.Execute("drop table if exists people");
            //conn.Execute("create table if not exists people(id Int32,name String,time Date)ENGINE=MergeTree(time,(id),8192)");
            conn.Execute("create table if not exists people(id Int32,name String)ENGINE=Log");

            var tableList = conn.Query<string>("show tables"); //show tables like 'name'
            Console.WriteLine(JsonSerializer.Serialize(tableList));

            //var sql = "insert into people values(@id,@name)";
            //var sql = "insert into people select @id,@name";
            //conn.Execute(sql, new { id = 1, name = "李四" });

            //Stopwatch sw = new Stopwatch();
            //sw.Start();
            //var list2 = new List<people>();
            //for (int i = 0; i < 10000; i++)
            //{
            //    list2.Add(new people { id = i, name = "李四" + i });
            //}
            //var sql2 = "insert into people(id,name) values @list";
            //conn.Execute(sql2, new { list = list2 });
            //sw.Stop();
            //Console.WriteLine(sw.ElapsedMilliseconds);

            var count = conn.QueryFirstOrDefault("select count(1),sum(id),avg(id) from people");
            Console.WriteLine(count);

            var data = conn.Query("select * from people where id=100 limit 1");
            Console.WriteLine(JsonSerializer.Serialize(data));

            Console.WriteLine("Hello World!");
            Console.ReadKey();
        }

        class people
        {
            public int id { get; set; }

            public string name { get; set; }

        }
    }

一、Log Engine Family
1、
Stripelog
This engine belongs to the family of log engines. See the common properties of log engines and their differences in the Log Engine Family article.

Use this engine in scenarios when you need to write many tables with a small amount of data (less than 1 million rows).

Creating a Table
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    column1_name [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    column2_name [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = StripeLog

2、Log

3、TinyLog

二、Memory

三、MergeTree Engine Family

1、MergeTree
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
    INDEX index_name1 expr1 TYPE type1(...) [GRANULARITY value1],
    INDEX index_name2 expr2 TYPE type2(...) [GRANULARITY value2],
    ...
    PROJECTION projection_name_1 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY]),
    PROJECTION projection_name_2 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY])
) ENGINE = MergeTree()
ORDER BY expr
[PARTITION BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[TTL expr
    [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx' [, ...] ]
    [WHERE conditions]
    [GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ] ]
[SETTINGS name=value, ...]

-----------------------------------

Example

CREATE TABLE Tes(
	...
)
ENGINE = MergeTree
ORDER BY (Id)
[PARTITION BY toYYYYMMDD(AddTime)]
[PRIMARY KEY Id]

2、ReplacingMergeTree
Creating a Table
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = ReplacingMergeTree([ver [, is_deleted]])
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, clean_deleted_rows=value, ...]

3、SummingMergeTree
Creating a Table
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = SummingMergeTree([columns])
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]


4、AggregatingMergeTree
Creating a Table
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = AggregatingMergeTree()
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[TTL expr]
[SETTINGS name=value, ...]

5、CollapsingMergeTree
Creating a Table
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = CollapsingMergeTree(sign)
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]

ClickHouse 表引擎
ClickHouse 的表引擎是 ClickHouse 服务的核心,它们决定了 ClickHouse 的以下行为:

1.数据的存储方式和位置。
2.支持哪些查询操作以及如何支持。
3.数据的并发访问。
4.数据索引的使用。
5.是否可以支持多线程请求。
6.是否可以支持数据复制。

ClickHouse 包含以下几种常用的引擎类型:

MergeTree 引擎:该系列引擎是执行高负载任务的最通用和最强大的表引擎,它们的特点是可以快速插入数据以及进行后续的数据处理。该系列引擎还同时支持数据复制(使用Replicated的引擎版本),分区 (partition) 以及一些其它引擎不支持的额外功能。
Log 引擎:该系列引擎是具有最小功能的轻量级引擎。当你需要快速写入许多小表(最多约有 100 万行)并在后续任务中整体读取它们时使用该系列引擎是最有效的。
集成引擎:该系列引擎是与其它数据存储以及处理系统集成的引擎,如 Kafka,MySQL 以及 HDFS 等,使用该系列引擎可以直接与其它系统进行交互,但也会有一定的限制,如确有需要,可以尝试一下。
特殊引擎:该系列引擎主要用于一些特定的功能,如 Distributed 用于分布式查询,MaterializedView 用来聚合数据,以及 Dictionary 用来查询字典数据等
MergeTree系列引擎
MergeTree表引擎
MergeTree在写入一批数据时,数据总会以数据片段的形式写入磁盘,且数据片段不可修改。为了避免片段过多,ClickHouse会通过后台线程,定期合并这些数据片段,属于相同分区的数据片段会被合成一个新的片段。这种数据片段往复合并的特点,也正是合并树名称的由来

MergeTree作为家族系列最基础的表引擎,主要有以下特点:

存储的数据按照主键排序:允许创建稀疏索引,从而加快数据查询速度
支持分区,可以通过PRIMARY KEY语句指定分区字段。
支持数据副本
支持数据采样
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
    INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
    INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
) ENGINE = MergeTree()
ORDER BY expr
[PARTITION BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[TTL expr [DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'], ...]
[SETTINGS name=value, ...]
ENGINE:ENGINE = MergeTree(),MergeTree引擎没有参数
ORDER BY:排序字段。比如ORDER BY (Col1, Col2),值得注意的是,如果没有指定主键,默认情况下 sorting key(排序字段)即为主键。如果不需要排序,则可以使用ORDER BY tuple()语法,这样的话,创建的表也就不包含主键。这种情况下,ClickHouse会按照插入的顺序存储数据。必选。
PARTITION BY:分区字段,可选。
PRIMARY KEY:指定主键,如果排序字段与主键不一致,可以单独指定主键字段。否则默认主键是排序字段。可选。
SAMPLE BY:采样字段,如果指定了该字段,那么主键中也必须包含该字段。比如SAMPLE BY intHash32(UserID) ORDER BY (CounterID, EventDate, intHash32(UserID))。可选。
TTL:数据的存活时间。在MergeTree中,可以为某个列字段或整张表设置TTL。当时间到达时,如果是列字段级别的TTL,则会删除这一列的数据;如果是表级别的TTL,则会删除整张表的数据。可选。
SETTINGS:额外的参数配置。可选。
CREATE TABLE emp_mergetree (
  emp_id UInt16 COMMENT '员工id',
  name String COMMENT '员工姓名',
  work_place String COMMENT '工作地点',
  age UInt8 COMMENT '员工年龄',
  depart String COMMENT '部门',
  salary Decimal32(2) COMMENT '工资'
  )ENGINE=MergeTree()
  ORDER BY emp_id
  PARTITION BY work_place
  ;
 -- 插入数据 
INSERT INTO emp_mergetree 
VALUES (1,'tom','上海',25,'技术部',20000),(2,'jack','上海',26,'人事部',10000);
INSERT INTO emp_mergetree 
VALUES (3,'bob','北京',33,'财务部',50000),(4,'tony','杭州',28,'销售事部',50000); 
 
-- 查询数据
-- 按work_place进行分区
select * from emp_mergetree;
 
-- 新插入两条数据
INSERT INTO emp_mergetree
VALUES (5,'robin','北京',35,'财务部',50000),(6,'lilei','北京',38,'销售事部',50000);
 
可以看出,新插入的数据新生成了一个数据块,并没有与原来的分区数据在一起,我们可以执行optimize命令,执行合并操作
 
-- 执行合并操作
 OPTIMIZE TABLE emp_mergetree PARTITION '北京';
-- 插入一条相同主键的数据
 INSERT INTO emp_mergetree
VALUES (1,'sam','杭州',35,'财务部',50000);
-- 会发现该条数据可以插入,由此可知,并不会对主键进行去重
注意

多次插入数据,会生成多个分区文件
在MergeTree中主键并不用于去重,而是用于索引,加快查询速度
ReplacingMergeTree表引擎
上文提到MergeTree表引擎无法对相同主键的数据进行去重,ClickHouse提供了ReplacingMergeTree引擎,可以针对相同主键的数据进行去重,它能够在合并分区时删除重复的数据。值得注意的是,ReplacingMergeTree只是在一定程度上解决了数据重复问题,但是并不能完全保障数据不重复。

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = ReplacingMergeTree([ver])
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
[ver]:可选参数,列的版本,可以是UInt、Date或者DateTime类型的字段作为版本号。该参数决定了数据去重的方式。
当没有指定[ver]参数时,保留最新的数据;如果指定了具体的值,保留最大的版本数据
CREATE TABLE emp_replacingmergetree (
  emp_id UInt16 COMMENT '员工id',
  name String COMMENT '员工姓名',
  work_place String COMMENT '工作地点',
  age UInt8 COMMENT '员工年龄',
  depart String COMMENT '部门',
  salary Decimal32(2) COMMENT '工资'
  )ENGINE=ReplacingMergeTree()
  ORDER BY emp_id
  PRIMARY KEY emp_id
  PARTITION BY work_place
  ;
 -- 插入数据 
INSERT INTO emp_replacingmergetree
VALUES (1,'tom','上海',25,'技术部',20000),(2,'jack','上海',26,'人事部',10000);
INSERT INTO emp_replacingmergetree
VALUES (3,'bob','北京',33,'财务部',50000),(4,'tony','杭州',28,'销售事部',50000);
 
-- 再insert 一条相同主键的 emp_id
INSERT INTO emp_replacingmergetree
VALUES (1,'tom','上海',25,'技术部',50000);
-- 执行合并操作
optimize table emp_replacingmergetree final;
-- 再次查询,相同主键的数据,保留最近插入的数据,旧的数据被清除
select * from emp_replacingmergetree ;
总结

如何判断数据重复
ReplacingMergeTree在去除重复数据时,是以ORDERBY排序键为基准的,而不是PRIMARY KEY。

何时删除重复数据
在执行分区合并时,会触发删除重复数据。optimize的合并操作是在后台执行的,无法预测具体执行时间点,除非是手动执行。

不同分区的重复数据不会被去重
ReplacingMergeTree是以分区为单位删除重复数据的。只有在相同的数据分区内重复的数据才可以被删除,而不同数据分区之间的重复数据依然不能被剔除。

数据去重的策略是什么
如果没有设置[ver]版本号,则保留同一组重复数据中的最新插入的数据;如果设置了[ver]版本号,则保留同一组重复数据中ver字段取值最大的那一行。

optimize命令使用
一般在数据量比较大的情况,尽量不要使用该命令。因为在海量数据场景下,执行optimize要消耗大量时间

SummingMergeTree表引擎
该引擎继承了MergeTree引擎,当合并 SummingMergeTree 表的数据片段时,ClickHouse 会把所有具有相同主键的行合并为一行,该行包含了被合并的行中具有数值数据类型的列的汇总值,即如果存在重复的数据,会对对这些重复的数据进行合并成一条数据,类似于group by的效果。

推荐将该引擎和 MergeTree 一起使用。例如,将完整的数据存储在 MergeTree 表中,并且使用 SummingMergeTree 来存储聚合数据。这种方法可以避免因为使用不正确的主键组合方式而丢失数据。

如果用户只需要查询数据的汇总结果,不关心明细数据,并且数据的汇总条件是预先明确的,即GROUP BY的分组字段是确定的,可以使用该表引擎。

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = SummingMergeTree([columns]) -- 指定合并汇总字段
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
CREATE TABLE emp_summingmergetree (
  emp_id UInt16 COMMENT '员工id',
  name String COMMENT '员工姓名',
  work_place String COMMENT '工作地点',
  age UInt8 COMMENT '员工年龄',
  depart String COMMENT '部门',
  salary Decimal32(2) COMMENT '工资'
  )ENGINE=SummingMergeTree(salary)
  ORDER BY (emp_id,name) -- 注意排序key是两个字段
  PRIMARY KEY emp_id     -- 主键是一个字段
  PARTITION BY work_place
  ;
 -- 插入数据
INSERT INTO emp_summingmergetree
VALUES (1,'tom','上海',25,'技术部',20000),(2,'jack','上海',26,'人事部',10000);
INSERT INTO emp_summingmergetree
VALUES (3,'bob','北京',33,'财务部',50000),(4,'tony','杭州',28,'销售事部',50000);
 
-- 插入相同数据
INSERT INTO emp_summingmergetree
VALUES (1,'tom','上海',25,'信息部',10000),(1,'tom','北京',26,'人事部',10000);
 
optimize table emp_summingmergetree final;
select * from emp_summingmergetree ;
总结

SummingMergeTree是根据什么对两条数据进行合并的
用ORBER BY排序键作为聚合数据的条件Key。即如果排序key是相同的,则会合并成一条数据,并对指定的合并字段进行聚合。
仅对分区内的相同排序key的数据行进行合并
以数据分区为单位来聚合数据。当分区合并时,同一数据分区内聚合Key相同的数据会被合并汇总,而不同分区之间的数据则不会被汇总。
如果没有指定聚合字段,会怎么聚合
如果没有指定聚合字段,则会按照非主键的数值类型字段进行聚合
对于非汇总字段的数据,该保留哪一条
如果两行数据除了排序字段相同,其他的非聚合字段不相同,那么在聚合发生时,会保留最初的那条数据,新插入的数据对应的那个字段值会被舍弃
-- 新插入的数据:        1 │ tom  │ 上海       │  25 │ 信息部 │ 10000.00 
-- 最初的数据 :1 │ tom  │ 上海       │  25 │ 技术部 │ 20000.00

-- 聚合合并的结果:1 │ tom  │ 上海       │  25 │ 技术部 │ 30000.00
Aggregatingmergetree表引擎
该表引擎继承自MergeTree,可以使用 AggregatingMergeTree 表来做增量数据统计聚合。如果要按一组规则来合并减少行数,则使用 AggregatingMergeTree 是合适的。

AggregatingMergeTree是通过预先定义的聚合函数计算数据并通过二进制的格式存入表内。与SummingMergeTree的区别在于:SummingMergeTree对非主键列进行sum聚合,而AggregatingMergeTree则可以指定各种聚合函数。

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = AggregatingMergeTree()
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
CollapsingMergeTree表引擎
CollapsingMergeTree就是一种通过以增代删的思路,支持行级数据修改和删除的表引擎。它通过定义一个sign标记位字段,记录数据行的状态。如果sign标记为1,则表示这是一行有效的数据;如果sign标记为-1,则表示这行数据需要被删除。当CollapsingMergeTree分区合并时,同一数据分区内,sign标记为1和-1的一组数据会被抵消删除。

每次需要新增数据时,写入一行sign标记为1的数据;需要删除数据时,则写入一行sign标记为-1的数据。

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = CollapsingMergeTree(sign)
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
总结

分区合并
分区数据折叠不是实时的,需要后台进行Compaction操作,用户也可以使用手动合并命令,但是效率会很低,一般不推荐在生产环境中使用。
当进行汇总数据操作时,可以通过改变查询方式,来过滤掉被删除的数据
只有相同分区内的数据才有可能被折叠。其实,当我们修改或删除数据时,这些被修改的数据通常是在一个分区内的,所以不会产生影响。

数据写入顺序
值得注意的是:CollapsingMergeTree对于写入数据的顺序有着严格要求,否则导致无法正常折叠。
如果数据的写入程序是单线程执行的,则能够较好地控制写入顺序;如果需要处理的数据量很大,数据的写入程序通常是多线程执行的,那么此时就不能保障数据的写入顺序了。在这种情况下,CollapsingMergeTree的工作机制就会出现问题。但是可以通过VersionedCollapsingMergeTree的表引擎得到解决。

VersionedCollapsingMergeTree表引擎
上面提到CollapsingMergeTree表引擎对于数据写入乱序的情况下,不能够实现数据折叠的效果。VersionedCollapsingMergeTree表引擎的作用与CollapsingMergeTree完全相同,它们的不同之处在于,VersionedCollapsingMergeTree对数据的写入顺序没有要求,在同一个分区内,任意顺序的数据都能够完成折叠操作

VersionedCollapsingMergeTree使用version列来实现乱序情况下的数据折叠
 
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = VersionedCollapsingMergeTree(sign, version)
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
可以看出:该引擎除了需要指定一个sign标识之外,还需要指定一个UInt8类型的version版本号。

GraphiteMergeTree表引擎
该引擎用来对 Graphite数据进行(图数据)'瘦身'及汇总。对于想使用CH来存储Graphite数据的开发者来说可能有用。

如果不需要对Graphite数据做汇总,那么可以使用任意的CH表引擎;但若需要,那就采用 GraphiteMergeTree 引擎。它能减少存储空间,同时能提高Graphite数据的查询效率。

Log系列表引擎
应用场景

Log系列表引擎功能相对简单,主要用于快速写入小表(1百万行左右的表),然后全部读出的场景。即一次写入多次查询。

Log系列表引擎的特点

共性特点

数据存储在磁盘上
当写数据时,将数据追加到文件的末尾
不支持并发读写,当向表中写入数据时,针对这张表的查询会被阻塞,直至写入动作结束
不支持索引
不支持原子写:如果某些操作(异常的服务器关闭)中断了写操作,则可能会获得带有损坏数据的表
不支持ALTER操作(这些操作会修改表设置或数据,比如delete、update等等)
区别

TinyLog是Log系列引擎中功能简单、性能较低的引擎。它的存储结构由数据文件和元数据两部分组成。其中,数据文件是按列独立存储的,也就是说每一个列字段都对应一个文件。除此之外,TinyLog不支持并发数据读取。
StripLog支持并发读取数据文件,当读取数据时,ClickHouse会使用多线程进行读取,每个线程处理一个单独的数据块。另外,StripLog将所有列数据存储在同一个文件中,减少了文件的使用数量。
Log支持并发读取数据文件,当读取数据时,ClickHouse会使用多线程进行读取,每个线程处理一个单独的数据块。Log引擎会将每个列数据单独存储在一个独立文件中。
外部集成表引擎
ClickHouse提供了许多与外部系统集成的方法,包括一些表引擎。这些表引擎与其他类型的表引擎类似,可以用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。

例如直接读取HDFS的文件或者MySQL数据库的表。这些表引擎只负责元数据管理和数据查询,而它们自身通常并不负责数据的写入,数据文件直接由外部系统提供。目前ClickHouse提供了下面的外部集成表引擎:

ODBC:通过指定odbc连接读取数据源
JDBC:通过指定jdbc连接读取数据源;
MySQL:将MySQL作为数据存储,直接查询其数据
HDFS:直接读取HDFS上的特定格式的数据文件;
Kafka:将Kafka数据导入ClickHouse
RabbitMQ:与Kafka类似
Kafka:使用方式
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]
kafka_broker_list :逗号分隔的brokers地址 (localhost:9092).
kafka_topic_list :Kafka 主题列表,多个主题用逗号分隔.
kafka_group_name :消费者组.
kafka_format – Message format. 比如JSONEachRow、JSON、CSV等等
注意点

当我们一旦查询完毕之后,ClickHouse会删除表内的数据,其实Kafka表引擎只是一个数据管道,我们可以通过物化视图的方式访问Kafka中的数据。

首先创建一张Kafka表引擎的表,用于从Kafka中读取数据
然后再创建一张普通表引擎的表,比如MergeTree,面向终端用户使用
最后创建物化视图,用于将Kafka引擎表实时同步到终端用户所使用的表中
示例

--  创建Kafka引擎表
 
CREATE TABLE kafka_table_consumer
(
    `id` UInt16,
    `name` String,
    `password` String,
    `age` UInt16
)
    ENGINE = Kafka() SETTINGS 
    kafka_broker_list = '120.26.126.158:9092'
    ,kafka_topic_list = 'student',
    kafka_group_name = 'group2',
    kafka_format = 'JSONEachRow';
 
-- 创建一张终端用户使用的表
CREATE TABLE kafka_table_mergetree 
(id UInt64 ,name String)ENGINE=MergeTree() ORDER BY id;
  
-- 创建物化视图,同步数据
CREATE MATERIALIZED VIEW consumer TO 
kafka_table_mergetree AS SELECT id,name FROM kafka_table_consumer;
-- 查询,多次查询,已经被查询的数据依然会被输出
select * from kafka_table_mergetree;
其他特殊的表引擎
Memory表引擎

Memory表引擎直接将数据保存在内存中,数据既不会被压缩也不会被格式转换。当ClickHouse服务重启的时候,Memory表内的数据会全部丢失。一般在测试时使用。

CREATE TABLE table_memory (
    id UInt64,
    name String
  ) ENGINE = Memory();
Distributed表引擎

使用方式

Distributed表引擎是分布式表的代名词,它自身不存储任何数据,数据都分散存储在某一个分片上,能够自动路由数据至集群中的各个节点,所以Distributed表引擎需要和其他数据表引擎一起协同工作。

所以,一张分布式表底层会对应多个本地分片数据表,由具体的分片表存储数据,分布式表与分片表是一对多的关系

Distributed(cluster_name, database_name, table_name[, sharding_key])

各个参数的含义分别如下:

cluster_name:集群名称,与集群配置中的自定义名称相对应。
database_name:数据库名称
table_name:表名称
sharding_key:可选的,用于分片的key值,在数据写入的过程中,分布式表会依据分片key的规则,将数据分布到各个节点的本地表。
创建分布式表是读时检查的机制,也就是说对创建分布式表和本地表的顺序并没有强制要求。

同样值得注意的是,在上面的语句中使用了ON CLUSTER分布式DDL,这意味着在集群的每个分片节点上,都会创建一张Distributed表,这样便可以从其中任意一端发起对所有分片的读、写请求
Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 is compatible.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.1 is compatible. 
.NET Framework net462 is compatible.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
2.0.3 327 3/1/2024
1.5.5 117 3/1/2024
1.0.10 10,559 4/5/2022