数据同步工具有很多中,下面我们看一下阿里云的开源工具DataX,DataX已经在阿里云的Dataworks中已经在使用了,已经支持很多中主流的存储服务之间的相互转换,下面的实验主要是介绍mongodb数据同步到elaticsearch,和mongodb同步到mysql的两种同步方式,有一些在实验过程中遇到的问题值得参考,因为在网上找的时候没有找到任何一个比较全面的记录,还有遇到问题的比较清晰一点的回答,所以遇到了很多的坑,不过这些都不是重点,重点是能够理解并使用即可,下面就详细一点的介绍一下我的实验记录。
支持的服务
可以参考官网:https://github.com/alibaba/DataX
安装
环境要求:
1、JDK 1.6 + 2、maven 3.X + 3、python 2.6 +
mongodb是4.0的分片集群,elasticsearch版本是6.7,mysql版本是5.6
下载源码包
git clone https://github.com/alibaba/DataX.git
进入源码包进行进行编译打包
cd DataX mvn -U clean package assembly:assembly -Dmaven.test.skip=true
注:jdk建议1.8版本,1.7版本在编译的时候出现问题,使用1.8就没有错误。
这里打包的是所有插件,也可以根据自己的需求,修改pom.xml文件的modules模块,只保留自己需要的插件即可,其他的可以删除
[root@wulaoer DataX]# pwd
/root/DataX
[root@wulaoer DataX]# vim pom.xml
......................
        <!-- reader -->
        <module>mongodbreader</module>
        <!-- writer -->
        <module>elasticsearchwriter</module>
.....................
这里我们只编译mongodb的读插件和elasticsearch的写插件,所以只保留这两个。
DataX编译
下面进行编译
[root@wulaoer DataX]# mvn -U clean package assembly:assembly -Dmaven.test.skip=true
编译成功出现如下信息:
oyment. [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary for datax-all 0.0.1-SNAPSHOT: [INFO] [INFO] datax-all .......................................... SUCCESS [ 11.335 s] [INFO] datax-common ....................................... SUCCESS [04:04 min] [INFO] datax-transformer .................................. SUCCESS [01:02 min] [INFO] datax-core ......................................... SUCCESS [ 14.754 s] [INFO] plugin-unstructured-storage-util ................... SUCCESS [ 46.987 s] [INFO] mongodbreader ...................................... SUCCESS [ 4.547 s] [INFO] elasticsearchwriter ................................ SUCCESS [ 8.606 s] [INFO] plugin-rdbms-util .................................. SUCCESS [ 3.649 s] [INFO] hbase20xsqlreader .................................. SUCCESS [ 9.678 s] [INFO] hbase20xsqlwriter .................................. SUCCESS [ 1.053 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 07:47 min [INFO] Finished at: 2019-10-22T14:55:47+08:00 [INFO] ------------------------------------------------------------------------
编译后默认elasticsearch写插件没有生成json文件,所以需要我们收到创建格式如下:
{
    "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://xxx:9999",
            "accessId": "xxxx",
            "accessKey": "xxxx",
            "index": "test-1",
            "indextype": "default",
            "cleanup": true,
            "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
            "discovery": false,
            "batchSize": 1000,
            "splitter": ",",
            "column": []
    },
            "speed": {
                "concurrent": 4,
                "throttle": true,
                "mbps": "2"
            }
}
把上面的内容写到这个文件中,然后在配置信息
[root@wulaoer DataX]# vim target/datax/datax/plugin/writer/elasticsearchwriter/plugin_job_template.json
配置信息已经配置完成,需要生成一下配置文件。
[root@wulaoer bin]# pwd
/root/DataX/target/datax/datax/bin
[root@ks-allinone bin]# python datax.py -r mongodbreader -w elasticsearchwriter
#同步顺序python datax.py -r 源数据 -w 目标数据
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the mongodbreader document:
     https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md 
Please refer to the elasticsearchwriter document:
     https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": [], 
                        "collectionName": "", 
                        "column": [], 
                        "dbName": "", 
                        "userName": "", 
                        "userPassword": ""
                    }
                }, 
                "writer": {
                    "name": "elasticsearchwriter", 
                    "parameter": {
                        "accessId": "xxxx", 
                        "accessKey": "xxxx", 
                        "batchSize": 1000, 
                        "cleanup": true, 
                        "column": [], 
                        "discovery": false, 
                        "endpoint": "http://xxx:9999", 
                        "index": "test-1", 
                        "indextype": "default", 
                        "settings": {
                            "index": {
                                "number_of_replicas": 0, 
                                "number_of_shards": 1
                            }
                        }, 
                        "splitter": ","
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
上面的意思就是参考mongodbreader和elasticsearchwriter的文件,根据参数添加自己环境的值,添加的时候需要即可格式我这里使用的mongodb同步到mysql和es两个试验。
mongodb同步mysql
先说一下mongodb同步到mysql,这是我的配置文件:
{
    "job": {
        "content": [{
            "reader": {
                "name": "mongodbreader",
                "parameter": {
                    "address": ["127.0.0.1:3717"],
                    "collectionName": "集合名称",
                    "column": [{
                            "name": "_class",     
                            "type": "string"
                        },
                        {
                            "name": "_id",
                            "type": "string"
                        },
                        {
                            "name": "clientType",
                            "type": "string"
                        },
                        {
                            "name": "commentNeold",
                            "type": "string"
                        },
                        {
                            "name": "count",
                            "type": "long"
                        },
                        {
                            "name": "createTime",
                            "type": "long"
                        },
                        {
                            "name": "episodeNo",
                            "type": "string"
                        },
                        {
                            "name": "extra_status",
                            "type": "long"
                        },
                        {
                            "name": "modifyTime",
                            "type": "long"
                        },
                        {
                            "name": "neolds",
                            "type": "string"
                        },
                        {
                            "name": "status",
                            "type": "long"
                        },
                        {
                            "name": "titleNo",
                            "type": "string"
                        },
                        {
                            "name": "type",
                            "type": "long"
                        }
                    ],
                    "dbName": "mongodb库名",
                    "userName": "用户名",
                    "userPassword": "密码"
                }
            },
            "writer": {
                "name": "mysqlwriter",
                "parameter": {
                    "column": [
                        "_class", "_id", "clientType", "commentNeold", "count", "createTime", "episodeNo", "extra_status", "modifyTime", "neolds", "status", "titleNo", "type"
                    ],
                    "connection": [{
                        "jdbcUrl": "jdbc:mysql://10.211.55.36:3306/库名",
                        "table": ["表名"]
                    }],
                    "password": "密码",
                    "preSql": ["delete from 表名"],
                    "session": ["set session sql_mode='ANSI'"],
                    "username": "用户名",
                    "writeMode": "insert"
                }
            }
        }],
        "setting": {
            "speed": {
                "channel": "10"
            }
        }
    }
}
这里注意在同步之前一定要按照上面的格式在mysql中创建字段,然后同步,同步命令:
[root@wulaoer bin]# python datax.py mongodb_mysql.json ............... 2019-10-23 11:58:12.829 [job-0] INFO JobContainer - PerfTrace not enable! 2019-10-23 11:58:12.830 [job-0] INFO StandAloneJobContainerCommunicator - Total 1 records, 148 bytes | Speed 14B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 3.306s | Percentage 100.00% 2019-10-23 11:58:12.832 [job-0] INFO JobContainer - 任务启动时刻 : 2019-10-23 11:57:56 任务结束时刻 : 2019-10-23 11:58:12 任务总计耗时 : 16s 任务平均流量 : 14B/s 记录写入速度 : 0rec/s 读出记录总数 : 1 读写失败总数 : 0
mongodb_mysql.json就是我的mongodb同步到mysql的配置文件,在执行的时候会打印mongodb_mysql.json的内容,密码会进行加密。
mongodb同步到elasticsearch
同步到elasticsearch也是一样的执行方法,可以看一下我配置的mongodb_elasticsearch.json文件内容
{
    "job": {
        "content": [{
            "reader": {
                "name": "mongodbreader",
                "parameter": {
                    "address": ["127.0.0.1:3717"],
                    "collectionName": "集合名称",
                    "column": [{
                            "name": "_id",
                            "type": "string"
                        },
                        {
                            "name": "_id",
                            "type": "string"
                        },
                        {
                            "name": "secret",
                            "type": "boolean"
                        }
                    ],
                    "dbName": "库名",
                    "userName": "用户名",
                    "userPassword": "密码"
                }
            },
            "writer": {
                "job": {
                    "content": [{
                        "writer": {
                            "name": "elasticsearchwriter",
                            "parameter": {
                                "accessId": "用户名",
                                "accessKey": "密码",
                                "batchSize": 1000,
                                "cleanup": true,
                                "writeMode": "insert",
                                "column": [{
                                        "name": "pk",
                                        "type": "id"
                                    },
                                    {
                                        "name": "id",
                                        "type": "keywork"
                                    },
                                    {
                                        "name": "secret",
                                        "type": "boolean"
                                    }
                                ],
                                "discovery": false,
                                "endpoint": "http://127.0.0.1:9200",
                                "index": "索引",
                                "settings": {
                                    "index": {
                                        "number_of_replicas": 0,
                                        "number_of_shards": 1
                                    }
                                },
                                "splitter": ",",
                                "type": "类型"
                            }
                        }
                    }],
                    "setting": {
                        "speed": {
                            "channel": 1
                        }
                    }
                }
            }
        }],
        "setting": {
            "speed": {
                "throttle": false,
                "concurrent": 4
            }
        }
    }
}
这里我把mongodb中的_id数据同步了两次,一次同步到elasticsearch的_id,一次同步到id。elasticsearch中的_id和id中的数据是一样的。string类型需要改成keywork。还有一个就是写入的方式,就是writeMode的字段,写入方式分为两种:一种是insert into,一种是overwrite into,前一种是追加插入,后一种是删除原始数据重新插入。测试了一下insert into模式,同步过去的数据会删除原来的数据,在插入数据。这里注意需要修改一下cleanup,修改成false,即可。每次同步的快慢需要参考并发数和网络以及进程数。
打包过程中出现报错
[ERROR] Failed to execute goal on project otsstreamreader: Could not resolve dependencies for project com.alibaba.datax:otsstreamreader:jar:0.0.1-SNAPSHOT: Could not find artifact com.aliyun.openservices:tablestore-streamclient:jar:1.0.0-SNAPSHOT -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn <goals> -rf :otsstreamreader
解决方法:
[root@wulaoer DataX]# vim otsstreamreader/pom.xml 
    <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>tablestore-streamclient</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </dependency>
    替换成
    <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>tablestore-streamclient</artifactId>
        <version>1.0.0</version>
    </dependency>
从新打包一下:
[INFO] Reactor Summary: [INFO] [INFO] datax-all 0.0.1-SNAPSHOT ........................... SUCCESS [02:07 min] [INFO] datax-common ....................................... SUCCESS [ 2.273 s] [INFO] datax-transformer .................................. SUCCESS [ 1.429 s] [INFO] datax-core ......................................... SUCCESS [ 3.002 s] [INFO] plugin-rdbms-util .................................. SUCCESS [ 1.135 s] [INFO] mysqlreader ........................................ SUCCESS [ 1.040 s] [INFO] drdsreader ......................................... SUCCESS [ 1.034 s] [INFO] sqlserverreader .................................... SUCCESS [ 1.002 s] [INFO] postgresqlreader ................................... SUCCESS [ 1.217 s] [INFO] oraclereader ....................................... SUCCESS [ 1.099 s] [INFO] odpsreader ......................................... SUCCESS [ 1.920 s] [INFO] otsreader .......................................... SUCCESS [ 2.478 s] [INFO] otsstreamreader .................................... SUCCESS [ 2.150 s] [INFO] plugin-unstructured-storage-util ................... SUCCESS [ 1.103 s] [INFO] txtfilereader ...................................... SUCCESS [ 3.586 s] [INFO] hdfsreader ......................................... SUCCESS [ 10.375 s] [INFO] streamreader ....................................... SUCCESS [ 0.991 s] [INFO] ossreader .......................................... SUCCESS [ 3.418 s] [INFO] ftpreader .......................................... SUCCESS [ 2.967 s] [INFO] mongodbreader ...................................... SUCCESS [ 3.235 s] [INFO] rdbmsreader ........................................ SUCCESS [ 1.007 s] [INFO] hbase11xreader ..................................... SUCCESS [ 4.525 s] [INFO] hbase094xreader .................................... SUCCESS [ 3.232 s] [INFO] opentsdbreader ..................................... SUCCESS [ 2.277 s] [INFO] mysqlwriter ........................................ SUCCESS [ 0.776 s] [INFO] drdswriter ......................................... SUCCESS [ 0.774 s] [INFO] odpswriter ......................................... SUCCESS [ 1.597 s] [INFO] txtfilewriter ...................................... SUCCESS [ 2.691 s] [INFO] ftpwriter .......................................... SUCCESS [ 2.783 s] [INFO] hdfswriter ......................................... SUCCESS [ 8.503 s] [INFO] streamwriter ....................................... SUCCESS [ 0.925 s] [INFO] otswriter .......................................... SUCCESS [ 1.739 s] [INFO] oraclewriter ....................................... SUCCESS [ 0.996 s] [INFO] sqlserverwriter .................................... SUCCESS [ 0.969 s] [INFO] postgresqlwriter ................................... SUCCESS [ 2.233 s] [INFO] osswriter .......................................... SUCCESS [ 8.041 s] [INFO] mongodbwriter ...................................... SUCCESS [ 10.075 s] [INFO] adswriter .......................................... SUCCESS [ 7.073 s] [INFO] ocswriter .......................................... SUCCESS [ 5.983 s] [INFO] rdbmswriter ........................................ SUCCESS [ 2.679 s] [INFO] hbase11xwriter ..................................... SUCCESS [ 7.409 s] [INFO] hbase094xwriter .................................... SUCCESS [ 3.524 s] [INFO] hbase11xsqlwriter .................................. SUCCESS [ 17.167 s] [INFO] hbase11xsqlreader .................................. SUCCESS [ 28.462 s] [INFO] elasticsearchwriter ................................ SUCCESS [ 3.873 s] [INFO] tsdbwriter ......................................... SUCCESS [ 2.598 s] [INFO] adbpgwriter ........................................ SUCCESS [ 4.083 s] [INFO] gdbwriter .......................................... SUCCESS [ 5.756 s] [INFO] hbase20xsqlreader .................................. SUCCESS [ 1.134 s] [INFO] hbase20xsqlwriter 0.0.1-SNAPSHOT ................... SUCCESS [ 1.056 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 05:17 min [INFO] Finished at: 2019-09-18T01:09:39+08:00 [INFO] ------------------------------------------------------------------------
这里打包是DataX中所有服务的插件都打包,我们用不了那么多,所以可以根据自己的需要删除一下不需要的应用,这样防止错误发生和减少一些不必要的麻烦。
我只用mongodb的读插件,同步到elasticsearch,需要需要elasticsearch的写插件,下面开始编译。
出现问题:
同步到时候出现问题
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2019-10-22 18:40:25.828 [main] WARN  ConfigParser - 插件[mongodbreader,null]加载失败,1s后重试... Exception:Code:[Framework-12], Description:[DataX插件初始化错误, 该问题通常是由于DataX安装错误引起,请联系您的运维解决 .].  - 插件加载失败,未完成指定插件加载:[null, mongodbreader] 
2019-10-22 18:40:26.835 [main] ERROR Engine - 
经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Framework-12], Description:[DataX插件初始化错误, 该问题通常是由于DataX安装错误引起,请联系您的运维解决 .].  - 插件加载失败,未完成指定插件加载:[null, mongodbreader]
        at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
        at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:142)
        at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
        at com.alibaba.datax.core.Engine.entry(Engine.java:137)
        at com.alibaba.datax.core.Engine.main(Engine.java:204)
解决方法:
配置文件的问题,使用了
https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md 中的job.json内容作为elasticsearchwriter的plugin_job_template.json的内容,上面虽然提示的是mongodbreader,但是我使用mongodb同步到mysql中就没有这个错误,所以我怀疑是因为我的elasticsearchwriter中的plugin_job_template.json配置错误。所以修改成:
{
    "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://xxx:9999",
            "accessId": "xxxx",
            "accessKey": "xxxx",
            "index": "test-1",        
            "indextype": "default",
            "cleanup": true,
            "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
            "discovery": false,
            "batchSize": 1000,
            "splitter": ",",
            "column": []
    }
}
在重新生成同步执行文件,就没有上面的错误了。问题解决了,如果也遇到这种问题建议先同步到自己比较熟悉的应用上,然后在同步其他的应用。 出现问题:
2019-10-24 14:20:30.561 [job-53135295] ERROR ESClient - {"root_cause":[{"type":"mapper_parsing_exception","reason":"No handler for type [string] declared on field [categoryImage]"}],"type":"mapper_parsing_exception","reason":"No handler for type [string] declared on field [categoryImage]"}
2019-10-24 14:20:30.566 [job-53135295] ERROR JobContainer - Exception when job run
com.alibaba.datax.common.exception.DataXException: Code:[ESWriter-03], Description:[mappings错误.].  - java.io.IOException: create index or mapping failed
        at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:34) ~[datax-common-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.plugin.writer.elasticsearchwriter.ESWriter$Job.prepare(ESWriter.java:94) ~[elasticsearchwriter-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.job.JobContainer.prepareJobWriter(JobContainer.java:1068) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.job.JobContainer.prepare(JobContainer.java:457) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:213) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.Engine.start(Engine.java:96) [datax-core-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.Engine.entry(Engine.java:246) [datax-core-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.Engine.main(Engine.java:279) [datax-core-0.0.1-SNAPSHOT.jar:na]
2019-10-24 14:20:30.677 [job-53135295] INFO  MetricReportUtil - reportJobMetric is turn off
2019-10-24 14:20:30.678 [job-53135295] INFO  LocalJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 0.00%

您可以选择一种方式赞助本站
支付宝扫一扫赞助
微信钱包扫描赞助
赏