DataX是阿里巴巴开发的用于离线数据同步的工具,它支持在MySQL、Oracle、SqlServer、HDFS、HBase等多个数据库之间进行数据的离线同步。
安装DataX
我们可以直接下载已经打包好的文件
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
当然,我们也可以选择从源码编译安装DataX。由于上面的包已经比较旧了,推荐从源码进行安装。
git clone https://github.com/alibaba/DataX.git
因为我们只需要针对一些指定的数据库,所以可以删除pom.xml
文件中我们不需要使用的数据库子模块。我保留的子模块如下
1 |
|
同时,我在使用中遇到了一个MySQL连接的问题,经确认应该是mysql驱动的问题。所以还需要修改pom.xml
中的MySQL驱动的版本
。经过上面两步修改之后就可以执行编译命令了
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
执行结果如下
...[INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------[INFO] Total time: 01:42 min[INFO] Finished at: 2021-06-09T14:32:08+08:00[INFO] ------------------------------------------------------------------------
编译生成的包文件位于target/datax.tar.gz
。解压工具包
tar -zxvf datax.tar.gzcd datax
至此DataX就安装好了,它的目录如下
➜ datax tree -L 1.├── bin├── conf├── job├── lib├── plugin├── script└── tmp7 directories, 0 files
读取MySQL数据并打印
接下来我们创建一个从MySQL读取数据并且把数据打印出来的任务,创建文件job/mysql_print.json
1 | { |
创建好任务文件之后就可以执行同步命令了
➜ datax python bin/datax.py job/mysql_print.jsonDataX (DATAX-OPENSOURCE-3.0), From Alibaba !Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved....14 dev 2021-10-09 14:54:46 f9d428b45e882506ce45fb77dbb8e9661155412915 master 2021-10-09 14:55:41 f9d428b45e882506ce45fb77dbb8e9661155412920 revert-a8a85b0e 2021-08-03 09:40:09 04b81ff4fbf976328d59d105681b8c05fa4f04b522 默认 2021-06-09 10:48:33 04b81ff4fbf976328d59d105681b8c05fa4f04b5...2021-06-09 14:39:36.845 [job-0] INFO JobContainer -任务启动时刻 : 2021-06-09 14:39:25任务结束时刻 : 2021-06-09 14:39:36任务总计耗时 : 10s任务平均流量 : 22B/s记录写入速度 : 0rec/s读出记录总数 : 4读写失败总数 : 0
如上所示,我们已经成功的把MySQL中的4条数据查询并打印出来了。
从MySQL读取数据并写入Elasticsearch
我们再创建一个类似于上面的同步任务,只是这一次我们不打印数据而是把数据同步到Elasticsearch中去。创建job/mysql_2_es.json
文件
1 | {"job": { |
执行结果如下
➜ datax python bin/datax.py job/mysql_2_es.jsonDataX (DATAX-OPENSOURCE-3.0), From Alibaba !Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.2021-06-09 14:51:45.622 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl2021-06-09 14:51:45.632 [main] INFO Engine - the machine info => osInfo: Oracle Corporation 1.8 25.321-b07 jvmInfo: Mac OS X x86_64 10.15.7 cpu num: 4 totalPhysicalMemory: -0.00G freePhysicalMemory: -0.00G maxFileDescriptorCount: -1 currentOpenFileDescriptorCount: -1 GC Names [PS MarkSweep, PS Scavenge] MEMORY_NAME | allocation_size | init_size PS Eden Space | 256.00MB | 256.00MB Code Cache | 240.00MB | 2.44MB Compressed Class Space | 1,024.00MB | 0.00MB PS Survivor Space | 42.50MB | 42.50MB PS Old Gen | 683.00MB | 683.00MB Metaspace | -0.00MB | 0.00MB2021-06-09 14:51:45.658 [main] INFO Engine -{ "content":[ { "reader":{ "name":"mysqlreader", "parameter":{ "column":[ "id", "name", "update_time", "message" ], "connection":[ { "jdbcUrl":[ "jdbc:mysql://172.19.34.19:3306/bugatti" ], "table":[ "script_version" ] } ], "password":"****", "username":"root" } }, "writer":{ "name":"elasticsearchwriter", "parameter":{ "accessId":"1", "accessKey":"*", "batchSize":2, "column":[ { "name":"id", "type":"id" }, { "name":"name", "type":"keyword" }, { "format":"yyyy-MM-dd HH:mm:ss", "name":"update_time", "type":"date" }, { "name":"message", "type":"keyword" } ], "endpoint":"http://172.19.40.171:9200", "index":"script-version", "settings":{ "index":{ "number_of_replicas":1, "number_of_shards":3 } }, "type":"_doc" } } } ], "setting":{ "errorLimit":{ "percentage":0.02, "record":0 }, "speed":{ "channel":3 } }}2021-06-09 14:51:45.688 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null2021-06-09 14:51:45.690 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=02021-06-09 14:51:45.690 [main] INFO JobContainer - DataX jobContainer starts job.2021-06-09 14:51:45.693 [main] INFO JobContainer - Set jobId = 02021-06-09 14:51:46.257 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.2021-06-09 14:51:46.334 [job-0] INFO OriginalConfPretreatmentUtil - table:[script_version] has columns:[id,name,update_time,message].2021-06-09 14:51:46.345 [job-0] INFO JobContainer - jobContainer starts to do prepare ...2021-06-09 14:51:46.347 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .2021-06-09 14:51:46.347 [job-0] INFO JobContainer - DataX Writer.Job [elasticsearchwriter] do prepare work .2021-06-09 14:51:47.117 [job-0] INFO AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]2021-06-09 14:51:47.119 [job-0] INFO JestClientFactory - Using single thread/connection supporting basic connection manager2021-06-09 14:51:47.220 [job-0] INFO JestClientFactory - Using default GSON instance2021-06-09 14:51:47.220 [job-0] INFO JestClientFactory - Node Discovery disabled...2021-06-09 14:51:47.220 [job-0] INFO JestClientFactory - Idle connection reaping disabled...2021-06-09 14:51:47.220 [job-0] INFO JestClientFactory - Authentication cache set for preemptive authentication2021-06-09 14:51:47.245 [job-0] INFO ESWriter$Job - [{"name":"id","type":"id"},{"name":"name","type":"keyword"},{"format":"yyyy-MM-dd HH:mm:ss","name":"update_time","type":"date"},{"name":"message","type":"keyword"}]2021-06-09 14:51:47.255 [job-0] INFO ESWriter$Job - index:[script-version], type:[_doc], mappings:[{"_doc":{"properties":{"message":{"type":"keyword"},"name":{"type":"keyword"},"update_time":{"type":"date"}}}}]2021-06-09 14:51:47.458 [job-0] INFO ESClient - create index script-version2021-06-09 14:51:47.488 [job-0] INFO ESClient - index [script-version] already exists2021-06-09 14:51:47.489 [job-0] INFO JobContainer - jobContainer starts to do split ...2021-06-09 14:51:47.490 [job-0] INFO JobContainer - Job set Channel-Number to 3 channels.2021-06-09 14:51:47.503 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.2021-06-09 14:51:47.504 [job-0] INFO JobContainer - DataX Writer.Job [elasticsearchwriter] splits to [1] tasks.2021-06-09 14:51:47.527 [job-0] INFO JobContainer - jobContainer starts to do schedule ...2021-06-09 14:51:47.536 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.2021-06-09 14:51:47.538 [job-0] INFO JobContainer - Running by standalone Mode.2021-06-09 14:51:47.549 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.2021-06-09 14:51:47.555 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.2021-06-09 14:51:47.555 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.2021-06-09 14:51:47.674 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started2021-06-09 14:51:47.687 [0-0-0-writer] INFO AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]2021-06-09 14:51:47.687 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select id,name,update_time,message from script_version] jdbcUrl:[jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].2021-06-09 14:51:47.687 [0-0-0-writer] INFO JestClientFactory - Using multi thread/connection supporting pooling connection manager2021-06-09 14:51:47.709 [0-0-0-writer] INFO JestClientFactory - Using default GSON instance2021-06-09 14:51:47.710 [0-0-0-writer] INFO JestClientFactory - Node Discovery disabled...2021-06-09 14:51:47.710 [0-0-0-writer] INFO JestClientFactory - Idle connection reaping disabled...2021-06-09 14:51:47.711 [0-0-0-writer] INFO JestClientFactory - Authentication cache set for preemptive authentication2021-06-09 14:51:47.775 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select id,name,update_time,message from script_version] jdbcUrl:[jdbc:mysql://172.19.34.19:3306/bugatti?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].2021-06-09 14:51:48.369 [0-0-0-writer] INFO ESWriter$Job - task end, write size :42021-06-09 14:51:48.388 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[714]ms2021-06-09 14:51:48.388 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.2021-06-09 14:51:57.722 [job-0] INFO StandAloneJobContainerCommunicator - Total 4 records, 226 bytes | Speed 22B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.074s | Percentage 100.00%2021-06-09 14:51:57.723 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.2021-06-09 14:51:57.724 [job-0] INFO JobContainer - DataX Writer.Job [elasticsearchwriter] do post work.2021-06-09 14:51:57.725 [job-0] INFO AbstractJestClient - Setting server pool to a list of 1 servers: [http://172.19.40.171:9200]2021-06-09 14:51:57.725 [job-0] INFO JestClientFactory - Using single thread/connection supporting basic connection manager2021-06-09 14:51:57.727 [job-0] INFO JestClientFactory - Using default GSON instance2021-06-09 14:51:57.727 [job-0] INFO JestClientFactory - Node Discovery disabled...2021-06-09 14:51:57.727 [job-0] INFO JestClientFactory - Idle connection reaping disabled...2021-06-09 14:51:57.727 [job-0] INFO JestClientFactory - Authentication cache set for preemptive authentication2021-06-09 14:51:57.728 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.2021-06-09 14:51:57.728 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.2021-06-09 14:51:57.729 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /Users/hourui/Downloads/111111/datax/hook2021-06-09 14:51:57.735 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00% [total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime PS MarkSweep | 1 | 1 | 1 | 0.060s | 0.060s | 0.060s PS Scavenge | 1 | 1 | 1 | 0.031s | 0.031s | 0.031s2021-06-09 14:51:57.735 [job-0] INFO JobContainer - PerfTrace not enable!2021-06-09 14:51:57.736 [job-0] INFO StandAloneJobContainerCommunicator - Total 4 records, 226 bytes | Speed 22B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.074s | Percentage 100.00%2021-06-09 14:51:57.737 [job-0] INFO JobContainer -任务启动时刻 : 2021-06-09 14:51:45任务结束时刻 : 2021-06-09 14:51:57任务总计耗时 : 12s任务平均流量 : 22B/s记录写入速度 : 0rec/s读出记录总数 : 4读写失败总数 : 0
上面的脚本执行完毕之后我们就可以在Elasticsearch中查询到相关的数据了
curl http://172.19.40.171:9200/script-version/_search
查询结果如下
1 | {"took":3,"timed_out":false,"_shards":{"total":2,"successful":2,"skipped":0,"failed":0},"hits":{"total":{"value":4,"relation":"eq"},"max_score":1.0,"hits":[ |