ChunJun(纯钧),耍起来怎么样?
发表于 ・ 资讯中心
上篇文章实测了 Flink CDC 3.0.1,虽然相较去年实测的 3.0.0 有进步,但从总体表现来看,依然是让人失望的。
目前市面上,这种基于实时流的 CDC 工具,除了 Flink CDC 外,其实还有很多,比如上次在直播间,就有小伙伴提到了另一款,也隶属于 Flink 生态的实时数据同步工具。
第一次听到这个名字,给我一种怪怪的感觉,翻看了一下它的历史,发现原来它还有另外一个名字—— FlinkX,我个人觉得这个名字要更好听一点,既好记,又能体现出它跟 Flink 之间的渊源,也不知道为什么要改。
那么这篇文章,就把这款号称已经在一千多家公司和机构使用过的产品,给跑起来看看。
0. 阅读官网
从它的文档内容来看,凭我的直觉,这几乎一定确定以及肯定是个中国程序员写的,极致的简洁,惜字如金的文字描述风格。
而且,有点让人哭笑不得的是,它的文档还假模假式的给你个支持英文的按钮,但实际上呢,
英文版的文档说明
应该是想逗国际友人玩一下吧。
另外,很遗憾的一点,既然它也需要依赖基础的 Flink 环境,但它的官方文档没有像 Flink CDC 那样告诉你(github上倒是有),不同版本的 chunjun,跟不同版本的 Flink 之间的兼容关系。
截止到目前为止,你能看到的,关于这部分的内容,翻遍整个文档,
就这么轻飘飘一句带过,实属有点草率(当然,这个跟社区的繁荣度可能有关)。
Flink 都出 1.19 release 了,还在推荐用 1.12,说明多少有点跟不上时代,我准备先不接受它的推荐,从我当前已经准备就绪的 1.15进行尝试,看看能不能行。
1. 软件准备
1.1 Flink 环境准备
这个不多赘述,之前写跟 Flink 相关的文章已经多次提到,本质就是找一台客户端服务器,将 Flink 的安装包下载下来,解压,再设置个环境变量,完事。
解压后的Flink压缩包
chunjun用户下,设置Flink环境变量
1.2 haodop 环境准备
根据我的习惯,如果想让这个基于 chunjun 的同步任务跑在 yarn 上面,hadoop 环境是必须的,跟上一篇写的 Flink CDC 要求完全一致。
这部分也不赘述,我的集群早已部署了 hadoop,只不过,是 3.1 的版本,而 chunjun 的官网,却推荐 2.7.5.
同样,这次也不接受它的推荐,爱咋咋地。
1.3 chunjun 环境准备
根据官网提供的地址,下载个最新的版本
从提供的版本信息来看,chunjun 最新的 release 版,已经发布了一年多。
将安装包下载到部署有 Flink 环境的机器下,解压之后,是这个样子:
观察了一下它的目录结构,本身已经内置了很多常用 DB 的connector,长得特别像之前用过的 DataX,有没有。
2. 跑起来
先启动 yarn session:
因为我准备用 yarn session 的方式把这个 chunjun 任务给跑起来,所以这里需要先启动 yarn session。
虽然官方文档推荐 Flink 1.12,但我集群环境是 flink 1.15,所以,直接用 Flink 1.15来启动 yarn session。
这里需要注意的是,在启动这个进程的时候,务必需要添加这个 -t 参数,来指定你当前 chunjun 的 home 目录。
否则,后面你提交的任务是成功不了的,我一开始就是不信邪,没有加这个 -t 参数,结果,
撞到南墙了。
PS:如果你用的是 Flink 用户启动这个进程,需要事先设置 hadoop 的环境变量。
再启动 chunjun 任务:
小试牛刀:
根据官网的案例,用命令行提交一个简单的测试任务,
从这个提交命令内容行来看,它明显要比上次测试的 Flink CDC 3.0.1 的提交命令聪明多了,可以把要提交的目标 yarn session 相关信息,直接用命令行来指定(也可以写在 flink-conf.yaml 主配置文件里)。
而 Flink CDC 3.0.1 官网中,同样的信息,你只能写死在 flink-conf.yaml 主配置文件里,就显得很蠢。
提交完成之后,就能看到 Flink UI 界面上的任务信息了,
So easy,能顺利跑起来,说明到目前为止,chunjun 跟当前 Flink,以及 yarn 的环境是兼容的(虽然没有根据官网推荐的来)。
来个正经的案例(导 MySQL 数据到 Doris 失败):
就用上次测试 Flink CDC 3.0.1 的,把 MySQL 的数据,给导入到 Doris。
从官网提供的示例,以及安装包中自带的数据传输案例来看,chunjun 的任务貌似不支持「整库导入」。
那就先找个 MySQL 的单表,导入到 Doris 的另一张表里,对应的 json 配置如下(是不是感觉跟 dataX 的玩法一毛一样):
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "xxx",
"password": "xxxx",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.221.173:3306/test?useUnicode=true&characterEncoding=utf8"
],
"table": [
"test01"
]
}
],
"column": ["*"],
"customSql": "",
"where": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"parameter": {
"batchSize": 1024,
"maxRetries": 3,
"feNodes": ["192.168.221.174:8030"],
"column": ["*"],
"username": "xxx",
"password": "xxxx",
"database": "test",
"table": "test01",
"fieldDelimiter": "\t"
},
"name": "dorisbatchwriter"
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": true
},
"speed": {
"channel": 1
}
}
}
}
这个 json 是我根据安装包里面的案例改的,语法啥的按理说,不应该有问题(而且对应的 connector 当前安装包里已经自带了)。
但是,提交到集群之后没多久,就抛异常了:
这个异常一般来说,是因为根据 hostname 找不到对应的 IP 导致的,但是,我整个配置中都压根就没有出现 hostname,配的都是 IP,所以这个异常抛的我是一脸懵逼,丝毫没有头绪。
为了避免是集群环境造成的,后面我又用「本地模式」试了一次,一样的问题,就很让人很迷。
导 MySQL 数据到 MySQL:
既然 MySQL 到 Doris 不行,那咱换一个,Mysql 到 MySQL,总该可以吧。
于是,再测一次:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"splitPk": "id",
"increColumn": "id",
"startLocation": "1",
"polling": true,
"pollingInterval": 3000,
"queryTimeOut": 1000,
"username": "xxx",
"password": "xxxx",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.221.173:3306/test?useUnicode=true&characterEncoding=utf8"
],
"table": [
"test01"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "xxx",
"password": "xxxx",
"connection": [
{
"jdbcUrl":"jdbc:mysql://192.168.221.173:3306/test123?useUnicode=true&characterEncoding=utf8",
"table": [
"test12301"
]
}
],
"writeMode": "insert",
"flushIntervalMills":"3000",
"column": ["*"]
}
}
}
],
"setting": {
"restore": {
"restoreColumnName": "id"
},
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}
这个也是根据安装包中的案例来改的,是一个流式的 CDC 传输方式,只不过,这里的 CDC 原理,用的不是传统的 binlog 方式,而是根据这个自增的 id 字段来判定。
简洁起见,我这里直接说结论:
1. 不报错了,能把源表的所有历史纪录导入成功;
2. 从官方的要求来看,因为这里没有利用 MySQL 的 binlog,而通过自增 id 字段的话,就需要将这部分信息发送给 prometheus 来保存,因此,需要部署 prometheus,个人觉得这个玩法比较扯,所以没有去尝试。因此,也就同步不了后续的增量数据。
先测到这里,说实话,已经没有激情了。
最后
经过3天对 chunjun 的使用体验,不敢说对这玩意有多少深入的理解,但也把它的玩法,大致都了解了个七八成。
给我的感觉就是:它的野心很大,想兼容 DataX,跟 Flink CDC 的所有功能,或者说优点,但是,以当前的能力,还撑不起这个野心。
经过一番体验后发现,它的生态兼容性咱暂且先不说,最让人蛋疼的,是当前支持的一些组件(数据库)的版本,是不是有点太老了。
比如,这次测试的最新版本的 chunjun,它要求的 Doris 版本是 0.14,而我当前测试的 Doris,是2.x版本,莫非是因为版本太高,才出现上面的那个问题?我不知道。
总之,这次给我的使用体验,并不好。
测试了这么多款数据传输工具,给我的感觉就是,目前还没有一种既简单(不用编码),又让人觉得特别好用的开源产品。
尤其是面对数据源场景比较复杂的情况时,想要靠谱好用,根据我的经验,还得需要根据实际情况来编码,比如用 Spark,或者 Flink 的 API。
你觉得呢?

发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。