DataX 簡介
DataX 是(shi) DataWorks 數(shu)據(ju)集成(cheng) 的(de)(de)開源版本,主要就(jiu)是(shi)用于實(shi)現數(shu)據(ju)間(jian)的(de)(de)離線同步。 DataX 致力于實(shi)現包括關系型數(shu)據(ju)庫(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等 各種異構數(shu)據(ju)源(即不同的(de)(de)數(shu)據(ju)庫) 間(jian)穩定高效的(de)(de)數(shu)據(ju)同步功(gong)能。

-
為了 解決異構數據源同步問題,DataX 將復雜的網狀同步鏈路變成了星型數據鏈路 ,DataX 作為中間傳輸載體負責連接各種數據源; -
當需要接入一個新的數據源時,只需要將此數據源對接到 DataX,便能跟已有的數據源作為無縫數據同步。
DataX3.0 框架設計
DataX 采用 Framework + Plugin 架構,將數據源讀取和寫入(ru)抽象稱為(wei) Reader/Writer 插件,納入(ru)到整個(ge)同步(bu)框架中。

DataX 完成單個(ge)數(shu)據同步的(de)作業,我(wo)們稱為 Job,DataX 接收(shou)到一個(ge) Job 后(hou),將啟動一個(ge)進程來完成整個(ge)作業同步過程。DataX Job 模塊是單個(ge)作業的(de)中樞(shu)管理(li)節點,承擔了數(shu)據清(qing)理(li)、子任務(wu)切(qie)分、TaskGroup 管理(li)等功(gong)能。

-
DataX Job 啟動后,會根據不同源端的切分策略,將 Job 切分成多個小的 Task (子任務),以便于并發執行。 -
接著 DataX Job 會調用 Scheduler 模塊,根據配置的并發數量,將拆分成的 Task 重新組合,組裝成 TaskGroup(任務組) -
每一個 Task 都由 TaskGroup 負責啟動,Task 啟動后,會固定啟動 Reader -->Channel-->Writer 線程來完成任務同步工作。 -
DataX 作業運行啟動后,Job 會對 TaskGroup 進行監控操作,等待所有 TaskGroup 完成后,Job 便會成功退出(異常退出時 值非 0 )
DataX 調度過程:
-
首先 DataX Job 模塊會根據分庫分表切分成若干個 Task,然后根據用戶配置并發數,來計算需要分配多少個 TaskGroup; -
計算過程: Task / Channel = TaskGroup,最后由 TaskGroup 根據分配好的并發數來運行 Task(任務)
使用 DataX 實現數據同步
DataX 基本使用
查看 streamreader \--> streamwriter 的模板:
python /usr/local/datax/bin/datax.py -r streamreader -w streamwriter
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [],
"sliceRecordCount": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
根據模板編寫 json 文件
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [ # 同步的列名 (* 表示所有)
{
"type":"string",
"value":"Hello."
},
{
"type":"string",
"value":"河北彭于晏"
},
],
"sliceRecordCount": "3" # 打印數量
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "utf-8", # 編碼
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": "2" # 并發 (即 sliceRecordCount * channel = 結果)
}
}
}
}
通過 DataX 實 MySQL 數據同步
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader", # 讀取端
"parameter": {
"column": [], # 需要同步的列 (* 表示所有的列)
"connection": [
{
"jdbcUrl": [], # 連接信息
"table": [] # 連接表
}
],
"password": "", # 連接用戶
"username": "", # 連接密碼
"where": "" # 描述篩選條件
}
},
"writer": {
"name": "mysqlwriter", # 寫入端
"parameter": {
"column": [], # 需要同步的列
"connection": [
{
"jdbcUrl": "", # 連接信息
"table": [] # 連接表
}
],
"password": "", # 連接密碼
"preSql": [], # 同步前. 要做的事
"session": [],
"username": "", # 連接用戶
"writeMode": "" # 操作類型
}
}
}
],
"setting": {
"speed": {
"channel": "" # 指定并發數
}
}
}
}
編寫 json 文件:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123123",
"column": ["*"],
"splitPk": "ID",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
],
"table": ["t_member"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
"table": ["t_member"]
}
],
"password": "123123",
"preSql": [
"truncate t_member"
],
"session": [
"set session sql_mode='ANSI'"
],
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "5"
}
}
}
}
-
上面的方式相當于是完全同步,但是當數據量較大時,同步的時候被中斷,是件很痛苦的事情; -
所以在有些情況下,增量同步還是蠻重要的
使用 DataX 進行增量同步
使用 DataX 進行全量同步和增量同步的唯一區別就是:增量同步需要使用 where 進行條件篩選。
編寫 json 文件:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123123",
"column": ["*"],
"splitPk": "ID",
"where": "ID <= 1888",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
],
"table": ["t_member"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
"table": ["t_member"]
}
],
"password": "123123",
"preSql": [
"truncate t_member"
],
"session": [
"set session sql_mode='ANSI'"
],
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "5"
}
}
}
}
-
需要注意的部分就是: where(條件篩選) 和preSql(同步前,要做的事) 參數。
驗證:
python /usr/local/data/bin/data.py where.json
輸出
2022-12-16 17:34:38.534 [job-0] INFO JobContainer - PerfTrace not enable!
2022-12-16 17:34:38.534 [job-0] INFO StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.002s | All Task WaitReaderTime 100.570s | Percentage 100.00%
2022-12-16 17:34:38.537 [job-0] INFO JobContainer -
任務啟動時刻 : 2021-12-16 17:34:06
任務結束時刻 : 2021-12-16 17:34:38
任務總計耗時 : 32s
任務平均流量 : 1.61KB/s
記錄寫入速度 : 62rec/s
讀出記錄總數 : 1888
目標數據庫上查看:

基于上(shang)面數據,再次進行增(zeng)量同(tong)步:
主要是 where 配置:"where": "ID > 1888 AND ID <= 2888" # 通過條件篩選來進行增量同步
同時需要將我上面的 preSql 刪除(因為我上面做的操作時 truncate 表)