DataX使用总结_datax where-程序员宅基地

技术标签: DataX  MySQL  

简介

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、HDFS、Hive、OceanBase、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX采用了框架 + 插件 的模式,目前已开源,代码托管在github,地址:https://github.com/alibaba/DataX。

 

DataX安装部署


1.下载压缩包:

下载页面地址:https://github.com/alibaba/DataX 在页面中【Quick Start】--->【Download DataX下载地址】进行下载。下载后的包名:datax.tar.gz。解压后{datax}目录下有{bin conf job lib log log_perf plugin script tmp}几个目录。
2.安装

将下载后的压缩包直接解压后可用,前提是对应的java及python环境满足要求。
    JDK(1.6以上,推荐1.6)
    Python(推荐Python2.6.X)一定要为python2,因为后面执行datax.py的时候,里面的python的print会执行不了,导致运行不成功,会提示你print语法要加括号,python2中加不加都行 python3中必须要加,否则报语法错

另外要注意, 不要解压到C:\Program Files目录下或其他名字带空格的目录下,因为在cmd执行时会因为路径有空格导致找不到程序主文件。

DataX支持绝大部分种类数据库的数据转移,其数据转移的主要流程有三步:Reader -->transform-->writer

reader 从数据库读取需要转移的数据,transform在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,这些工作在这里进行,writer将读取并处理后的数据写入目标数据库。

DataX配置文件

DataX是通过读取配置文件进行数据转移,配置文件为json格式,这里以MySQL做示范,MySQL 2 MySQL

{
    "job": {
        
        "content": [
                    //reader过程配置信息
            {
                "reader": {   
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",//数据库用户名
                        "password": "root",
                        "column": [
                            "id",        //要读取的列元素 list
                            "name"
                        ],
                         "where":"",//可以添加筛选条件
                        "splitPk": "db_id",//数据分片
                        "connection": [
                            {
                                "table": [
                                    "table"//要读取的表名list,支持多个表的读取
                                ],
                                "querySql":[     //自定义筛选SQL
                                    "select reflect.id as id from user ,reflect where user.id = reflect.user_id",
                                ]
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database" //要读取的数据库URL 可以加数据库配置信息后缀
                                ]
                            }
                        ]
                    }
                },

                "writer": {  //writer过程配置信息
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                        "id",    //要写入的列list 注意要与读取的列一致
                        "name"
                        ],
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://120.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "root", 
                        "username": "root"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
            }
        ]
    }
}
  • 数据分片:如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。官方文档推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点,要注意目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,MysqlReader将报错!

  • Where:添加where配置可以对要转移的数据进行筛选,比如可以选择只转移今天的数据,"where":"gmt_create > $bizdate "

MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据读取

querySql:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

类型转换

DataX 内部类型 Mysql 数据类型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext, year
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

贴上我的一个测试数据转移配置

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                      
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/pshare?characterEncoding=utf-8"], 
                              
                                "querySql":[
                                    "select reflect.id as id,reflect.user_id as user_id,user.mail as comment from user ,reflect where user.id = reflect.user_id",
                                ]
                            }
                        ], 
                        "password": "xxxx", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "writeMode":"insert ",//必选 控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句 insert/replace/update  默认insert
                        "column": [
                        "id",
                        "user_id",
                        "comment"
                        ],
                        "preSql":"",   //可选 插入数据前执行的SQL
                        "postSql":"",  //可选 插入数据成功后执行的SQL
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://xxx.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

用到了连表查询信息然后作为一个字段值转移到目标数据库,这里要注意查询结果的列名和顺序要和writer里column里一样。

writeMode

  • 描述:控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句

  • 必选:是

  • 所有选项:insert/replace/update

  • 默认值:insert

启动命令

CMD下 进到你解压的dataX/bin目录下

python  datax.py  ..\job\mysql2mysql.json 

Transformer

下面说一下transform的使用,transform用于对读取的数据进行特殊定制化的需求场景,包括裁剪列、转换列等工作,主要使用的是五个对数据进行处理的方法,分别是dx_substr(),dx_pad(),dx_replace(),dx_filter(),dx_groovy(),官方文档对这几个方法的解释如下: 

dx_substr

参数:3个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:目标字段长度。

返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)

举例:

dx_substr(1,"2","5")  column 1的value为“dataxTest”=>"taxTe"

dx_substr(1,"5","10")  column 1的value为“dataxTest”=>"Test"

dx_pad

参数:4个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:"l","r", 指示是在头进行pad,还是尾进行pad。
    • 第三个参数:目标字段长度。
    • 第四个参数:需要pad的字符。

返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符

举例:

         dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz

         dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz

dx_replace

参数:4个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:字段值的开始位置。
    • 第三个参数:需要替换的字段长度。
    • 第四个参数:需要替换的字符串。

返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)

举例:

dx_replace(1,"2","4","****")  column 1的value为“dataxTest”=>"da****est"

dx_replace(1,"5","10","****")  column 1的value为“dataxTest”=>"data****"

dx_filter (关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。)

参数:

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <=
    • 第三个参数:正则表达式(java正则表达式)、值。

返回:

    • 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果.
    • like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。
    • , =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值,其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。
    • 如果目标colunn为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。

举例:

dx_filter(1,"like","dataTest") 

dx_filter(1,">=","10") 

dx_groovy

参数。

    • 第一个参数: groovy code
    • 第二个参数(列表或者为空):extraPackage

备注:

    • dx_groovy只能调用一次。不能多次调用。
    • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
    • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
    • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表 (按需补充):

 transform job示例:

"transformer": [
                    {
                        "name": "dx_substr",
                        "parameter": 
                            {
                            "columnIndex":5,
                            "paras":["1","3"]
                            }  
                    },
                    {
                        "name": "dx_replace",
                        "parameter": 
                            {
                            "columnIndex":4,
                            "paras":["3","4","****"]
                            }  
                    },
                    {
                        "name": "dx_groovy",
                          "parameter": 
                            {
                               "code": "//groovy code//",  
                               "extraPackage":[
                               "import somePackage1;", 
                               "import somePackage2;"
                               ]                      
                            }  
                    }
                ]

要注意  Reader  Writer 和Transform 的配置都要写在content下面,Reader和writer的配置是用大括号,transform是写在[]里,因为transform可以包括多个数据处理,下面贴上一个包括reader writer transform Job ,

{
    "job": {

        "setting": {
            "speed": {
                "channel": "1"
            },
            "errorLimit": {
                "record": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                      "column": [
                        "id",
                        "user_id",
                        "comment",
                        "tel"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/pshare?characterEncoding=utf-8"], 
                              
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "writeMode":"insert ",
                        "column": [
                        "id",
                        "user_id",
                        "comment",
                        "tel"
						], 
                        "connection": [
                            {                                
                                "jdbcUrl": "jdbc:mysql://xxx.78.223.211:3306/pshare?characterEncoding=utf-8", 
                                "table": ["reflect"]
                            }
                        ], 
                        "password": "xxx", 
                        "username": "root"
                    }
                },
                "transformer":[
                    {
                        "name":"dx_substr",
                        "parameter": 
                            {
                            "columnIndex":3,
                            "paras":["1","3"]
                            }  
                    },
                     {
                        "name": "dx_replace",
                        "parameter": 
                            {
                            "columnIndex":2,
                            "paras":["1","4","****"]
                            }  
                    }
                ]
            }
        ]

    }
}

打开CMD 执行命令

结果:

可以看到读取到两条符合筛选条件的数据并全部写入,transform两条数据全部成功,然后看一下数据库:

被读取数据库数据:

写入目标数据库结果:

可以看到第三条数据经过了dx_replace()替换,第四条经过了dx_substr()裁剪,数据转移成功。

PS:CMD中文乱码问题  使用HCP 65001即可

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_42050545/article/details/93883887

智能推荐

51单片机的中断系统_51单片机中断篇-程序员宅基地

文章浏览阅读3.3k次,点赞7次,收藏39次。CPU 执行现行程序的过程中,出现某些急需处理的异常情况或特殊请求,CPU暂时中止现行程序,而转去对异常情况或特殊请求进行处理,处理完毕后再返回现行程序断点处,继续执行原程序。void 函数名(void) interrupt n using m {中断函数内容 //尽量精简 }编译器会把该函数转化为中断函数,表示中断源编号为n,中断源对应一个中断入口地址,而中断入口地址的内容为跳转指令,转入本函数。using m用于指定本函数内部使用的工作寄存器组,m取值为0~3。该修饰符可省略,由编译器自动分配。_51单片机中断篇

oracle项目经验求职,网络工程师简历中的项目经验怎么写-程序员宅基地

文章浏览阅读396次。项目经验(案例一)项目时间:2009-10 - 2009-12项目名称:中驰别克信息化管理整改完善项目描述:项目介绍一,建立中驰别克硬件档案(PC,服务器,网络设备,办公设备等)二,建立中驰别克软件档案(每台PC安装的软件,财务,HR,OA,专用系统等)三,能过建立的档案对中驰别克信息化办公环境优化(合理使用ADSL宽带资源,对域进行调整,对文件服务器进行优化,对共享打印机进行调整)四,优化完成后..._网络工程师项目经历

LVS四层负载均衡集群-程序员宅基地

文章浏览阅读1k次,点赞31次,收藏30次。LVS:Linux Virtual Server,负载调度器,内核集成, 阿里的四层SLB(Server Load Balance)是基于LVS+keepalived实现。NATTUNDR优点端口转换WAN性能最好缺点性能瓶颈服务器支持隧道模式不支持跨网段真实服务器要求anyTunneling支持网络private(私网)LAN/WAN(私网/公网)LAN(私网)真实服务器数量High (100)High (100)真实服务器网关lvs内网地址。

「技术综述」一文道尽传统图像降噪方法_噪声很大的图片可以降噪吗-程序员宅基地

文章浏览阅读899次。https://www.toutiao.com/a6713171323893318151/作者 | 黄小邪/言有三编辑 | 黄小邪/言有三图像预处理算法的好坏直接关系到后续图像处理的效果,如图像分割、目标识别、边缘提取等,为了获取高质量的数字图像,很多时候都需要对图像进行降噪处理,尽可能的保持原始信息完整性(即主要特征)的同时,又能够去除信号中无用的信息。并且,降噪还引出了一..._噪声很大的图片可以降噪吗

Effective Java 【对于所有对象都通用的方法】第13条 谨慎地覆盖clone_为继承设计类有两种选择,但无论选择其中的-程序员宅基地

文章浏览阅读152次。目录谨慎地覆盖cloneCloneable接口并没有包含任何方法,那么它到底有什么作用呢?Object类中的clone()方法如何重写好一个clone()方法1.对于数组类型我可以采用clone()方法的递归2.如果对象是非数组,建议提供拷贝构造器(copy constructor)或者拷贝工厂(copy factory)3.如果为线程安全的类重写clone()方法4.如果为需要被继承的类重写clone()方法总结谨慎地覆盖cloneCloneable接口地目的是作为对象的一个mixin接口(详见第20_为继承设计类有两种选择,但无论选择其中的

毕业设计 基于协同过滤的电影推荐系统-程序员宅基地

文章浏览阅读958次,点赞21次,收藏24次。今天学长向大家分享一个毕业设计项目基于协同过滤的电影推荐系统项目运行效果:项目获取:https://gitee.com/assistant-a/project-sharing21世纪是信息化时代,随着信息技术和网络技术的发展,信息化已经渗透到人们日常生活的各个方面,人们可以随时随地浏览到海量信息,但是这些大量信息千差万别,需要费事费力的筛选、甄别自己喜欢或者感兴趣的数据。对网络电影服务来说,需要用到优秀的协同过滤推荐功能去辅助整个系统。系统基于Python技术,使用UML建模,采用Django框架组合进行设

随便推点

你想要的10G SFP+光模块大全都在这里-程序员宅基地

文章浏览阅读614次。10G SFP+光模块被广泛应用于10G以太网中,在下一代移动网络、固定接入网、城域网、以及数据中心等领域非常常见。下面易天光通信(ETU-LINK)就为大家一一盘点下10G SFP+光模块都有哪些吧。一、10G SFP+双纤光模块10G SFP+双纤光模块是一种常规的光模块,有两个LC光纤接口,传输距离最远可达100公里,常用的10G SFP+双纤光模块有10G SFP+ SR、10G SFP+ LR,其中10G SFP+ SR的传输距离为300米,10G SFP+ LR的传输距离为10公里。_10g sfp+

计算机毕业设计Node.js+Vue基于Web美食网站设计(程序+源码+LW+部署)_基于vue美食网站源码-程序员宅基地

文章浏览阅读239次。该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流项目运行环境配置:项目技术:Express框架 + Node.js+ Vue 等等组成,B/S模式 +Vscode管理+前后端分离等等。环境需要1.运行环境:最好是Nodejs最新版,我们在这个版本上开发的。其他版本理论上也可以。2.开发环境:Vscode或HbuilderX都可以。推荐HbuilderX;3.mysql环境:建议是用5.7版本均可4.硬件环境:windows 7/8/10 1G内存以上;_基于vue美食网站源码

oldwain随便写@hexun-程序员宅基地

文章浏览阅读62次。oldwain随便写@hexun链接:http://oldwain.blog.hexun.com/ ...

渗透测试-SQL注入-SQLMap工具_sqlmap拖库-程序员宅基地

文章浏览阅读843次,点赞16次,收藏22次。用这个工具扫描其它网站时,要注意法律问题,同时也比较慢,所以我们以之前写的登录页面为例子扫描。_sqlmap拖库

origin三图合一_神教程:Origin也能玩转图片拼接组合排版-程序员宅基地

文章浏览阅读1.5w次,点赞5次,收藏38次。Origin也能玩转图片的拼接组合排版谭编(华南师范大学学报编辑部,广州 510631)通常,我们利用Origin软件能非常快捷地绘制出一张单独的绘图。但是,我们在论文的撰写过程中,经常需要将多种科学实验图片(电镜图、示意图、曲线图等)组合在一张图片中。大多数人都是采用PPT、Adobe Illustrator、CorelDraw等软件对多种不同类型的图进行拼接的。那么,利用Origin软件能否实..._origin怎么把三个图做到一张图上

51单片机智能电风扇控制系统proteus仿真设计( 仿真+程序+原理图+报告+讲解视频)_电风扇模拟控制系统设计-程序员宅基地

文章浏览阅读4.2k次,点赞4次,收藏51次。51单片机智能电风扇控制系统仿真设计( proteus仿真+程序+原理图+报告+讲解视频)仿真图proteus7.8及以上 程序编译器:keil 4/keil 5 编程语言:C语言 设计编号:S0042。_电风扇模拟控制系统设计