DataX工具同步Mongodb到elaticsearch

avatar 2019年10月26日17:34:03 评论 4,119 次浏览

数据同步工具有很多中,下面我们看一下阿里云的开源工具DataXDataX已经在阿里云的Dataworks中已经在使用了,已经支持很多中主流的存储服务之间的相互转换,下面的实验主要是介绍mongodb数据同步到elaticsearch,和mongodb同步到mysql的两种同步方式,有一些在实验过程中遇到的问题值得参考,因为在网上找的时候没有找到任何一个比较全面的记录,还有遇到问题的比较清晰一点的回答,所以遇到了很多的坑,不过这些都不是重点,重点是能够理解并使用即可,下面就详细一点的介绍一下我的实验记录。

支持的服务

可以参考官网:https://github.com/alibaba/DataX

安装

环境要求:

1、JDK 1.6 + 2、maven 3.X + 3、python 2.6 +

mongodb是4.0的分片集群,elasticsearch版本是6.7,mysql版本是5.6

下载源码包

git clone https://github.com/alibaba/DataX.git

进入源码包进行进行编译打包

cd DataX
mvn -U clean package assembly:assembly -Dmaven.test.skip=true

注:jdk建议1.8版本,1.7版本在编译的时候出现问题,使用1.8就没有错误。

这里打包的是所有插件,也可以根据自己的需求,修改pom.xml文件的modules模块,只保留自己需要的插件即可,其他的可以删除

[root@wulaoer DataX]# pwd
/root/DataX
[root@wulaoer DataX]# vim pom.xml
......................
        <!-- reader -->
        <module>mongodbreader</module>

        <!-- writer -->
        <module>elasticsearchwriter</module>
.....................

这里我们只编译mongodb的读插件和elasticsearch的写插件,所以只保留这两个。

DataX编译

下面进行编译

[root@wulaoer DataX]# mvn -U clean package assembly:assembly -Dmaven.test.skip=true

编译成功出现如下信息:

oyment.
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for datax-all 0.0.1-SNAPSHOT:
[INFO] 
[INFO] datax-all .......................................... SUCCESS [ 11.335 s]
[INFO] datax-common ....................................... SUCCESS [04:04 min]
[INFO] datax-transformer .................................. SUCCESS [01:02 min]
[INFO] datax-core ......................................... SUCCESS [ 14.754 s]
[INFO] plugin-unstructured-storage-util ................... SUCCESS [ 46.987 s]
[INFO] mongodbreader ...................................... SUCCESS [  4.547 s]
[INFO] elasticsearchwriter ................................ SUCCESS [  8.606 s]
[INFO] plugin-rdbms-util .................................. SUCCESS [  3.649 s]
[INFO] hbase20xsqlreader .................................. SUCCESS [  9.678 s]
[INFO] hbase20xsqlwriter .................................. SUCCESS [  1.053 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  07:47 min
[INFO] Finished at: 2019-10-22T14:55:47+08:00
[INFO] ------------------------------------------------------------------------

编译后默认elasticsearch写插件没有生成json文件,所以需要我们收到创建格式如下:

{
    "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://xxx:9999",
            "accessId": "xxxx",
            "accessKey": "xxxx",
            "index": "test-1",
            "indextype": "default",
            "cleanup": true,
            "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
            "discovery": false,
            "batchSize": 1000,
            "splitter": ",",
            "column": []
    },
            "speed": {
                "concurrent": 4,
                "throttle": true,
                "mbps": "2"
            }
}

把上面的内容写到这个文件中,然后在配置信息

[root@wulaoer DataX]# vim target/datax/datax/plugin/writer/elasticsearchwriter/plugin_job_template.json

配置信息已经配置完成,需要生成一下配置文件。

[root@wulaoer bin]# pwd
/root/DataX/target/datax/datax/bin
[root@ks-allinone bin]# python datax.py -r mongodbreader -w elasticsearchwriter
#同步顺序python datax.py -r 源数据 -w 目标数据
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mongodbreader document:
     https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md 

Please refer to the elasticsearchwriter document:
     https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md 

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": [], 
                        "collectionName": "", 
                        "column": [], 
                        "dbName": "", 
                        "userName": "", 
                        "userPassword": ""
                    }
                }, 
                "writer": {
                    "name": "elasticsearchwriter", 
                    "parameter": {
                        "accessId": "xxxx", 
                        "accessKey": "xxxx", 
                        "batchSize": 1000, 
                        "cleanup": true, 
                        "column": [], 
                        "discovery": false, 
                        "endpoint": "http://xxx:9999", 
                        "index": "test-1", 
                        "indextype": "default", 
                        "settings": {
                            "index": {
                                "number_of_replicas": 0, 
                                "number_of_shards": 1
                            }
                        }, 
                        "splitter": ","
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

上面的意思就是参考mongodbreader和elasticsearchwriter的文件,根据参数添加自己环境的值,添加的时候需要即可格式我这里使用的mongodb同步到mysql和es两个试验。

mongodb同步mysql

先说一下mongodb同步到mysql,这是我的配置文件:

{
    "job": {
        "content": [{
            "reader": {
                "name": "mongodbreader",
                "parameter": {
                    "address": ["127.0.0.1:3717"],
                    "collectionName": "集合名称",
                    "column": [{
                            "name": "_class",     
                            "type": "string"
                        },
                        {
                            "name": "_id",
                            "type": "string"
                        },
                        {
                            "name": "clientType",
                            "type": "string"
                        },
                        {
                            "name": "commentNeold",
                            "type": "string"
                        },
                        {
                            "name": "count",
                            "type": "long"
                        },
                        {
                            "name": "createTime",
                            "type": "long"
                        },
                        {
                            "name": "episodeNo",
                            "type": "string"
                        },
                        {
                            "name": "extra_status",
                            "type": "long"
                        },
                        {
                            "name": "modifyTime",
                            "type": "long"
                        },
                        {
                            "name": "neolds",
                            "type": "string"
                        },
                        {
                            "name": "status",
                            "type": "long"
                        },
                        {
                            "name": "titleNo",
                            "type": "string"
                        },
                        {
                            "name": "type",
                            "type": "long"
                        }
                    ],
                    "dbName": "mongodb库名",
                    "userName": "用户名",
                    "userPassword": "密码"
                }
            },
            "writer": {
                "name": "mysqlwriter",
                "parameter": {
                    "column": [
                        "_class", "_id", "clientType", "commentNeold", "count", "createTime", "episodeNo", "extra_status", "modifyTime", "neolds", "status", "titleNo", "type"
                    ],
                    "connection": [{
                        "jdbcUrl": "jdbc:mysql://10.211.55.36:3306/库名",
                        "table": ["表名"]
                    }],
                    "password": "密码",
                    "preSql": ["delete from 表名"],
                    "session": ["set session sql_mode='ANSI'"],
                    "username": "用户名",
                    "writeMode": "insert"

                }
            }
        }],
        "setting": {
            "speed": {
                "channel": "10"
            }
        }
    }
}

这里注意在同步之前一定要按照上面的格式在mysql中创建字段,然后同步,同步命令:

[root@wulaoer bin]# python datax.py mongodb_mysql.json 
...............
2019-10-23 11:58:12.829 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-10-23 11:58:12.830 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1 records, 148 bytes | Speed 14B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 3.306s | Percentage 100.00%
2019-10-23 11:58:12.832 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2019-10-23 11:57:56
任务结束时刻                    : 2019-10-23 11:58:12
任务总计耗时                    :                 16s
任务平均流量                    :               14B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   1
读写失败总数                    :                   0

mongodb_mysql.json就是我的mongodb同步到mysql的配置文件,在执行的时候会打印mongodb_mysql.json的内容,密码会进行加密。

mongodb同步到elasticsearch

同步到elasticsearch也是一样的执行方法,可以看一下我配置的mongodb_elasticsearch.json文件内容

{
    "job": {
        "content": [{
            "reader": {
                "name": "mongodbreader",
                "parameter": {
                    "address": ["127.0.0.1:3717"],
                    "collectionName": "集合名称",
                    "column": [{
                            "name": "_id",
                            "type": "string"
                        },
                        {
                            "name": "_id",
                            "type": "string"
                        },
                        {
                            "name": "secret",
                            "type": "boolean"
                        }
                    ],
                    "dbName": "库名",
                    "userName": "用户名",
                    "userPassword": "密码"
                }
            },
            "writer": {
                "job": {
                    "content": [{
                        "writer": {
                            "name": "elasticsearchwriter",
                            "parameter": {
                                "accessId": "用户名",
                                "accessKey": "密码",
                                "batchSize": 1000,
                                "cleanup": true,
                                "writeMode": "insert",
                                "column": [{
                                        "name": "pk",
                                        "type": "id"
                                    },
                                    {
                                        "name": "id",
                                        "type": "keywork"
                                    },
                                    {
                                        "name": "secret",
                                        "type": "boolean"
                                    }
                                ],
                                "discovery": false,
                                "endpoint": "http://127.0.0.1:9200",
                                "index": "索引",
                                "settings": {
                                    "index": {
                                        "number_of_replicas": 0,
                                        "number_of_shards": 1
                                    }
                                },
                                "splitter": ",",
                                "type": "类型"
                            }
                        }
                    }],
                    "setting": {
                        "speed": {
                            "channel": 1
                        }
                    }
                }
            }
        }],
        "setting": {
            "speed": {
                "throttle": false,
                "concurrent": 4
            }
        }
    }
}

这里我把mongodb中的_id数据同步了两次,一次同步到elasticsearch的_id,一次同步到id。elasticsearch中的_id和id中的数据是一样的。string类型需要改成keywork。还有一个就是写入的方式,就是writeMode的字段,写入方式分为两种:一种是insert into,一种是overwrite into,前一种是追加插入,后一种是删除原始数据重新插入。测试了一下insert into模式,同步过去的数据会删除原来的数据,在插入数据。这里注意需要修改一下cleanup,修改成false,即可。每次同步的快慢需要参考并发数和网络以及进程数

打包过程中出现报错

[ERROR] Failed to execute goal on project otsstreamreader: Could not resolve dependencies for project com.alibaba.datax:otsstreamreader:jar:0.0.1-SNAPSHOT: Could not find artifact com.aliyun.openservices:tablestore-streamclient:jar:1.0.0-SNAPSHOT -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR] 
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <goals> -rf :otsstreamreader

解决方法:

[root@wulaoer DataX]# vim otsstreamreader/pom.xml 
    <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>tablestore-streamclient</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </dependency>
    替换成
    <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>tablestore-streamclient</artifactId>
        <version>1.0.0</version>
    </dependency>

从新打包一下:

[INFO] Reactor Summary:
[INFO] 
[INFO] datax-all 0.0.1-SNAPSHOT ........................... SUCCESS [02:07 min]
[INFO] datax-common ....................................... SUCCESS [  2.273 s]
[INFO] datax-transformer .................................. SUCCESS [  1.429 s]
[INFO] datax-core ......................................... SUCCESS [  3.002 s]
[INFO] plugin-rdbms-util .................................. SUCCESS [  1.135 s]
[INFO] mysqlreader ........................................ SUCCESS [  1.040 s]
[INFO] drdsreader ......................................... SUCCESS [  1.034 s]
[INFO] sqlserverreader .................................... SUCCESS [  1.002 s]
[INFO] postgresqlreader ................................... SUCCESS [  1.217 s]
[INFO] oraclereader ....................................... SUCCESS [  1.099 s]
[INFO] odpsreader ......................................... SUCCESS [  1.920 s]
[INFO] otsreader .......................................... SUCCESS [  2.478 s]
[INFO] otsstreamreader .................................... SUCCESS [  2.150 s]
[INFO] plugin-unstructured-storage-util ................... SUCCESS [  1.103 s]
[INFO] txtfilereader ...................................... SUCCESS [  3.586 s]
[INFO] hdfsreader ......................................... SUCCESS [ 10.375 s]
[INFO] streamreader ....................................... SUCCESS [  0.991 s]
[INFO] ossreader .......................................... SUCCESS [  3.418 s]
[INFO] ftpreader .......................................... SUCCESS [  2.967 s]
[INFO] mongodbreader ...................................... SUCCESS [  3.235 s]
[INFO] rdbmsreader ........................................ SUCCESS [  1.007 s]
[INFO] hbase11xreader ..................................... SUCCESS [  4.525 s]
[INFO] hbase094xreader .................................... SUCCESS [  3.232 s]
[INFO] opentsdbreader ..................................... SUCCESS [  2.277 s]
[INFO] mysqlwriter ........................................ SUCCESS [  0.776 s]
[INFO] drdswriter ......................................... SUCCESS [  0.774 s]
[INFO] odpswriter ......................................... SUCCESS [  1.597 s]
[INFO] txtfilewriter ...................................... SUCCESS [  2.691 s]
[INFO] ftpwriter .......................................... SUCCESS [  2.783 s]
[INFO] hdfswriter ......................................... SUCCESS [  8.503 s]
[INFO] streamwriter ....................................... SUCCESS [  0.925 s]
[INFO] otswriter .......................................... SUCCESS [  1.739 s]
[INFO] oraclewriter ....................................... SUCCESS [  0.996 s]
[INFO] sqlserverwriter .................................... SUCCESS [  0.969 s]
[INFO] postgresqlwriter ................................... SUCCESS [  2.233 s]
[INFO] osswriter .......................................... SUCCESS [  8.041 s]
[INFO] mongodbwriter ...................................... SUCCESS [ 10.075 s]
[INFO] adswriter .......................................... SUCCESS [  7.073 s]
[INFO] ocswriter .......................................... SUCCESS [  5.983 s]
[INFO] rdbmswriter ........................................ SUCCESS [  2.679 s]
[INFO] hbase11xwriter ..................................... SUCCESS [  7.409 s]
[INFO] hbase094xwriter .................................... SUCCESS [  3.524 s]
[INFO] hbase11xsqlwriter .................................. SUCCESS [ 17.167 s]
[INFO] hbase11xsqlreader .................................. SUCCESS [ 28.462 s]
[INFO] elasticsearchwriter ................................ SUCCESS [  3.873 s]
[INFO] tsdbwriter ......................................... SUCCESS [  2.598 s]
[INFO] adbpgwriter ........................................ SUCCESS [  4.083 s]
[INFO] gdbwriter .......................................... SUCCESS [  5.756 s]
[INFO] hbase20xsqlreader .................................. SUCCESS [  1.134 s]
[INFO] hbase20xsqlwriter 0.0.1-SNAPSHOT ................... SUCCESS [  1.056 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 05:17 min
[INFO] Finished at: 2019-09-18T01:09:39+08:00
[INFO] ------------------------------------------------------------------------

这里打包是DataX中所有服务的插件都打包,我们用不了那么多,所以可以根据自己的需要删除一下不需要的应用,这样防止错误发生和减少一些不必要的麻烦。

我只用mongodb的读插件,同步到elasticsearch,需要需要elasticsearch的写插件,下面开始编译。

出现问题:

同步到时候出现问题

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2019-10-22 18:40:25.828 [main] WARN  ConfigParser - 插件[mongodbreader,null]加载失败,1s后重试... Exception:Code:[Framework-12], Description:[DataX插件初始化错误, 该问题通常是由于DataX安装错误引起,请联系您的运维解决 .].  - 插件加载失败,未完成指定插件加载:[null, mongodbreader] 
2019-10-22 18:40:26.835 [main] ERROR Engine - 

经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Framework-12], Description:[DataX插件初始化错误, 该问题通常是由于DataX安装错误引起,请联系您的运维解决 .].  - 插件加载失败,未完成指定插件加载:[null, mongodbreader]
        at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
        at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:142)
        at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
        at com.alibaba.datax.core.Engine.entry(Engine.java:137)
        at com.alibaba.datax.core.Engine.main(Engine.java:204)

解决方法:

配置文件的问题,使用了

https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md 中的job.json内容作为elasticsearchwriter的plugin_job_template.json的内容,上面虽然提示的是mongodbreader,但是我使用mongodb同步到mysql中就没有这个错误,所以我怀疑是因为我的elasticsearchwriter中的plugin_job_template.json配置错误。所以修改成:

{
    "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://xxx:9999",
            "accessId": "xxxx",
            "accessKey": "xxxx",
            "index": "test-1",        
            "indextype": "default",
            "cleanup": true,
            "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
            "discovery": false,
            "batchSize": 1000,
            "splitter": ",",
            "column": []
    }
}

在重新生成同步执行文件,就没有上面的错误了。问题解决了,如果也遇到这种问题建议先同步到自己比较熟悉的应用上,然后在同步其他的应用。 出现问题:

2019-10-24 14:20:30.561 [job-53135295] ERROR ESClient - {"root_cause":[{"type":"mapper_parsing_exception","reason":"No handler for type [string] declared on field [categoryImage]"}],"type":"mapper_parsing_exception","reason":"No handler for type [string] declared on field [categoryImage]"}
2019-10-24 14:20:30.566 [job-53135295] ERROR JobContainer - Exception when job run
com.alibaba.datax.common.exception.DataXException: Code:[ESWriter-03], Description:[mappings错误.].  - java.io.IOException: create index or mapping failed
        at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:34) ~[datax-common-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.plugin.writer.elasticsearchwriter.ESWriter$Job.prepare(ESWriter.java:94) ~[elasticsearchwriter-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.job.JobContainer.prepareJobWriter(JobContainer.java:1068) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.job.JobContainer.prepare(JobContainer.java:457) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:213) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.Engine.start(Engine.java:96) [datax-core-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.Engine.entry(Engine.java:246) [datax-core-0.0.1-SNAPSHOT.jar:na]
        at com.alibaba.datax.core.Engine.main(Engine.java:279) [datax-core-0.0.1-SNAPSHOT.jar:na]
2019-10-24 14:20:30.677 [job-53135295] INFO  MetricReportUtil - reportJobMetric is turn off
2019-10-24 14:20:30.678 [job-53135295] INFO  LocalJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 0.00%

 

avatar

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: