DataX是阿里巴巴开发的用于离线数据同步的工具,它支持在MySQL、Oracle、SqlServer、HDFS、HBase等多个数据库之间进行数据的离线同步。

安装DataX

我们可以直接下载已经打包好的文件

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

当然,我们也可以选择从源码编译安装DataX。由于上面的包已经比较旧了,推荐从源码进行安装。

git clone https://github.com/alibaba/DataX.git

因为我们只需要针对一些指定的数据库,所以可以删除pom.xml文件中我们不需要使用的数据库子模块。我保留的子模块如下

1
2
3
4
5
6
7
8
9
10
11
12
13

<module>mysqlreadermodule>
<module>oraclereadermodule>
<module>txtfilereadermodule>
<module>streamreadermodule>
<module>rdbmsreadermodule>

<module>mysqlwritermodule>
<module>tdenginewritermodule>
<module>txtfilewritermodule>
<module>streamwritermodule>
<module>rdbmswritermodule>
<module>elasticsearchwritermodule>

同时,我在使用中遇到了一个MySQL连接的问题,经确认应该是mysql驱动的问题。所以还需要修改pom.xml中的MySQL驱动的版本5.1.34。经过上面两步修改之后就可以执行编译命令了

mvn -U clean package assembly:assembly -Dmaven.test.skip=true

执行结果如下

...[INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------[INFO] Total time:  01:42 min[INFO] Finished at: 2021-06-09T14:32:08+08:00[INFO] ------------------------------------------------------------------------

编译生成的包文件位于target/datax.tar.gz。解压工具包

tar -zxvf datax.tar.gzcd datax

至此DataX就安装好了,它的目录如下

➜  datax tree -L 1.├── bin├── conf├── job├── lib├── plugin├── script└── tmp7 directories, 0 files

读取MySQL数据并打印

接下来我们创建一个从MySQL读取数据并且把数据打印出来的任务,创建文件job/mysql_print.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"job": {
"setting": {
"speed": {"channel": 3},
"errorLimit": {"record": 0, "percentage": 0.02}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1234",
"column": ["id", "name", "update_time", "message"],
"connection": [{
"table": ["script_version"],
"jdbcUrl": ["jdbc:mysql://172.19.34.19:3306/bugatti"]
}]}
},
"writer": {
"name": "streamwriter",
"parameter": {"print": true}
}
}]
}}

创建好任务文件之后就可以执行同步命令了

➜  datax python bin/datax.py job/mysql_print.jsonDataX (DATAX-OPENSOURCE-3.0), From Alibaba !Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved....14    dev    2021-10-09 14:54:46    f9d428b45e882506ce45fb77dbb8e9661155412915    master    2021-10-09 14:55:41    f9d428b45e882506ce45fb77dbb8e9661155412920    revert-a8a85b0e    2021-08-03 09:40:09    04b81ff4fbf976328d59d105681b8c05fa4f04b522    默认    2021-06-09 10:48:33    04b81ff4fbf976328d59d105681b8c05fa4f04b5...2021-06-09 14:39:36.845 [job-0] INFO  JobContainer -任务启动时刻                    : 2021-06-09 14:39:25任务结束时刻                    : 2021-06-09 14:39:36任务总计耗时                    :                 10s任务平均流量                    :               22B/s记录写入速度                    :              0rec/s读出记录总数                    :                   4读写失败总数                    :                   0

如上所示,我们已经成功的把MySQL中的4条数据查询并打印出来了。

从MySQL读取数据并写入Elasticsearch

我们再创建一个类似于上面的同步任务,只是这一次我们不打印数据而是把数据同步到Elasticsearch中去。创建job/mysql_2_es.json文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{"job": {
"setting": {"speed": {"channel": 3}, "errorLimit": {"record": 0,"percentage": 0.02}},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root", "password": "1234", "column": ["id", "name", "update_time", "message"],
"connection": [{
"table": ["script_version"],
"jdbcUrl": ["jdbc:mysql://172.19.34.19:3306/bugatti"]}]}},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://172.19.40.171:9200",
"accessId": "1",
"accessKey": "1",
"index": "script-version",
"type": "_doc",
"settings": {"index": {"number_of_shards": 3, "number_of_replicas": 1}}, "batchSize": 2,
"column": [
{"name": "id", "type": "id"},
{"name": "name", "type": "keyword"},
{"name": "update_time", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
{"name": "message", "type": "keyword"}]}}}]}}

执行结果如下

➜  datax python bin/datax.py job/mysql_2_es.jsonDataX (DATAX-OPENSOURCE-3.0), From Alibaba !Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.2021-06-09 14:51:45.622 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl2021-06-09 14:51:45.632 [main] INFO  Engine - the machine info  =>    osInfo:    Oracle Corporation 1.8 25.321-b07    jvmInfo:    Mac OS X x86_64 10.15.7    cpu num:    4    totalPhysicalMemory:    -0.00G    freePhysicalMemory:    -0.00G    maxFileDescriptorCount:    -1    currentOpenFileDescriptorCount:    -1    GC Names    [PS MarkSweep, PS Scavenge]    MEMORY_NAME                    | allocation_size                | init_size    PS Eden Space                  | 256.00MB                       | 256.00MB    Code Cache                     | 240.00MB                       | 2.44MB    Compressed Class Space         | 1,024.00MB                     | 0.00MB    PS Survivor Space              | 42.50MB                        | 42.50MB    PS Old Gen                     | 683.00MB                       | 683.00MB    Metaspace                      | -0.00MB                        | 0.00MB2021-06-09 14:51:45.658 [main] INFO  Engine -{    "content":[        {            "reader":{                "name":"mysqlreader",                "parameter":{                    "column":[                        "id",                        "name",                        "update_time",                        "message"                    ],                    "connection":[                        {                            "jdbcUrl":[                                "jdbc:mysql://172.19.34.19:3306/bugatti"                            ],                            "table":[                                "script_version"                            ]                        }                    ],                    "password":"****",                    "username":"root"                }            },            "writer":{                "name":"elasticsearchwriter",                "parameter":{                    "accessId":"1",                    "accessKey":"*",                    "batchSize":2,                    "column":[                        {                            "name":"id",                            "type":"id"                        },                        {                            "name":"name",                            "type":"keyword"                        },                        {                            "format":"yyyy-MM-dd HH:mm:ss",                            "name":"update_time",                            "type":"date"                        },                        {                            "name":"message",                            "type":"keyword"                        }                    ],                    "endpoint":"http://172.19.40.171:9200",                    "index":"script-version",                    "settings":{                        "index":{                            "number_of_replicas":1,                            "number_of_shards":3                        }                    },                    "type":"_doc"                }            }        }    ],    "setting":{        "errorLimit":{            "percentage":0.02,            "record":0        },        "speed":{            "channel":3        }    }}2021-06-09 14:51:45.688 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null2021-06-09 14:51:45.690 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=02021-06-09 14:51:45.690 [main] INFO  JobContainer - DataX jobContainer starts job.2021-06-09 14:51:45.693 [main] INFO  JobContainer - Set jobId = 02021-06-09 14:51:46.257 [job-0] INFO  OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.2021-06-09 14:51:46.334 [job-0] INFO  OriginalConfPretreatmentUtil - table:[script_version] has columns:[id,name,update_time,message].2021-06-09 14:51:46.345 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...2021-06-09 14:51:46.347 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do prepare work .2021-06-09 14:51:46.347 [job-0] INFO  JobContainer - DataX Writer.Job [elasticsearchwriter] do prepare work .2021-06-09 14:51:47.117 [job-0] INFO  AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]2021-06-09 14:51:47.119 [job-0] INFO  JestClientFactory - Using single thread/connection supporting basic connection manager2021-06-09 14:51:47.220 [job-0] INFO  JestClientFactory - Using default GSON instance2021-06-09 14:51:47.220 [job-0] INFO  JestClientFactory - Node Discovery disabled...2021-06-09 14:51:47.220 [job-0] INFO  JestClientFactory - Idle connection reaping disabled...2021-06-09 14:51:47.220 [job-0] INFO  JestClientFactory - Authentication cache set for preemptive authentication2021-06-09 14:51:47.245 [job-0] INFO  ESWriter$Job - [{"name":"id","type":"id"},{"name":"name","type":"keyword"},{"format":"yyyy-MM-dd HH:mm:ss","name":"update_time","type":"date"},{"name":"message","type":"keyword"}]2021-06-09 14:51:47.255 [job-0] INFO  ESWriter$Job - index:[script-version], type:[_doc], mappings:[{"_doc":{"properties":{"message":{"type":"keyword"},"name":{"type":"keyword"},"update_time":{"type":"date"}}}}]2021-06-09 14:51:47.458 [job-0] INFO  ESClient - create index script-version2021-06-09 14:51:47.488 [job-0] INFO  ESClient - index [script-version] already exists2021-06-09 14:51:47.489 [job-0] INFO  JobContainer - jobContainer starts to do split ...2021-06-09 14:51:47.490 [job-0] INFO  JobContainer - Job set Channel-Number to 3 channels.2021-06-09 14:51:47.503 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.2021-06-09 14:51:47.504 [job-0] INFO  JobContainer - DataX Writer.Job [elasticsearchwriter] splits to [1] tasks.2021-06-09 14:51:47.527 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...2021-06-09 14:51:47.536 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.2021-06-09 14:51:47.538 [job-0] INFO  JobContainer - Running by standalone Mode.2021-06-09 14:51:47.549 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.2021-06-09 14:51:47.555 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.2021-06-09 14:51:47.555 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.2021-06-09 14:51:47.674 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started2021-06-09 14:51:47.687 [0-0-0-writer] INFO  AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]2021-06-09 14:51:47.687 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Begin to read record by Sql: [select id,name,update_time,message from script_version] jdbcUrl:[jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].2021-06-09 14:51:47.687 [0-0-0-writer] INFO  JestClientFactory - Using multi thread/connection supporting pooling connection manager2021-06-09 14:51:47.709 [0-0-0-writer] INFO  JestClientFactory - Using default GSON instance2021-06-09 14:51:47.710 [0-0-0-writer] INFO  JestClientFactory - Node Discovery disabled...2021-06-09 14:51:47.710 [0-0-0-writer] INFO  JestClientFactory - Idle connection reaping disabled...2021-06-09 14:51:47.711 [0-0-0-writer] INFO  JestClientFactory - Authentication cache set for preemptive authentication2021-06-09 14:51:47.775 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select id,name,update_time,message from script_version] jdbcUrl:[jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].2021-06-09 14:51:48.369 [0-0-0-writer] INFO  ESWriter$Job - task end, write size :42021-06-09 14:51:48.388 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[714]ms2021-06-09 14:51:48.388 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.2021-06-09 14:51:57.722 [job-0] INFO  StandAloneJobContainerCommunicator - Total 4 records, 226 bytes | Speed 22B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.074s | Percentage 100.00%2021-06-09 14:51:57.723 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.2021-06-09 14:51:57.724 [job-0] INFO  JobContainer - DataX Writer.Job [elasticsearchwriter] do post work.2021-06-09 14:51:57.725 [job-0] INFO  AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]2021-06-09 14:51:57.725 [job-0] INFO  JestClientFactory - Using single thread/connection supporting basic connection manager2021-06-09 14:51:57.727 [job-0] INFO  JestClientFactory - Using default GSON instance2021-06-09 14:51:57.727 [job-0] INFO  JestClientFactory - Node Discovery disabled...2021-06-09 14:51:57.727 [job-0] INFO  JestClientFactory - Idle connection reaping disabled...2021-06-09 14:51:57.727 [job-0] INFO  JestClientFactory - Authentication cache set for preemptive authentication2021-06-09 14:51:57.728 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do post work.2021-06-09 14:51:57.728 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.2021-06-09 14:51:57.729 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /Users/hourui/Downloads/111111/datax/hook2021-06-09 14:51:57.735 [job-0] INFO  JobContainer -    [total cpu info] =>        averageCpu                     | maxDeltaCpu                    | minDeltaCpu        -1.00%                         | -1.00%                         | -1.00%    [total gc info] =>        NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime        PS MarkSweep         | 1                  | 1                  | 1                  | 0.060s             | 0.060s             | 0.060s        PS Scavenge          | 1                  | 1                  | 1                  | 0.031s             | 0.031s             | 0.031s2021-06-09 14:51:57.735 [job-0] INFO  JobContainer - PerfTrace not enable!2021-06-09 14:51:57.736 [job-0] INFO  StandAloneJobContainerCommunicator - Total 4 records, 226 bytes | Speed 22B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.074s | Percentage 100.00%2021-06-09 14:51:57.737 [job-0] INFO  JobContainer -任务启动时刻                    : 2021-06-09 14:51:45任务结束时刻                    : 2021-06-09 14:51:57任务总计耗时                    :                 12s任务平均流量                    :               22B/s记录写入速度                    :              0rec/s读出记录总数                    :                   4读写失败总数                    :                   0

上面的脚本执行完毕之后我们就可以在Elasticsearch中查询到相关的数据了

curl http://172.19.40.171:9200/script-version/_search

查询结果如下

1
2
3
4
5
{"took":3,"timed_out":false,"_shards":{"total":2,"successful":2,"skipped":0,"failed":0},"hits":{"total":{"value":4,"relation":"eq"},"max_score":1.0,"hits":[
{"_index":"script-version","_type":"_doc","_id":"22","_score":1.0,"_source":{"update_time":"2021-06-09T10:48:33.000+08:00","name":"默认","message":"04b81ff4fbf976328d59d105681b8c05fa4f04b5"}},
{"_index":"script-version","_type":"_doc","_id":"14","_score":1.0,"_source":{"update_time":"2021-10-09T14:54:46.000+08:00","name":"dev","message":"f9d428b45e882506ce45fb77dbb8e96611554129"}},
{"_index":"script-version","_type":"_doc","_id":"15","_score":1.0,"_source":{"update_time":"2021-10-09T14:55:41.000+08:00","name":"master","message":"f9d428b45e882506ce45fb77dbb8e96611554129"}},
{"_index":"script-version","_type":"_doc","_id":"20","_score":1.0,"_source":{"update_time":"2021-08-03T09:40:09.000+08:00","name":"revert-a8a85b0e","message":"04b81ff4fbf976328d59d105681b8c05fa4f04b5"}}]}}