亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享

使用DataX實現數據從mysql到mysql的復制

2023-09-25 09:16:08
89
0

DataX 簡介

DataX 是(shi) DataWorks 數(shu)據(ju)集成(cheng) 的(de)(de)開源版本,主要就(jiu)是(shi)用于實(shi)現數(shu)據(ju)間(jian)的(de)(de)離線同步。 DataX 致力于實(shi)現包括關系型數(shu)據(ju)庫(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等 各種異構數(shu)據(ju)源(即不同的(de)(de)數(shu)據(ju)庫) 間(jian)穩定高效的(de)(de)數(shu)據(ju)同步功(gong)能。

  • 為了 解決異構數據源同步問題,DataX 將復雜的網狀同步鏈路變成了星型數據鏈路 ,DataX 作為中間傳輸載體負責連接各種數據源;
  • 當需要接入一個新的數據源時,只需要將此數據源對接到 DataX,便能跟已有的數據源作為無縫數據同步。

DataX3.0 框架設計

DataX 采用 Framework + Plugin 架構,將數據源讀取和寫入(ru)抽象稱為(wei) Reader/Writer 插件,納入(ru)到整個(ge)同步(bu)框架中。

DataX 完成單個(ge)數(shu)據同步的(de)作業,我(wo)們稱為 Job,DataX 接收(shou)到一個(ge) Job 后(hou),將啟動一個(ge)進程來完成整個(ge)作業同步過程。DataX Job 模塊是單個(ge)作業的(de)中樞(shu)管理(li)節點,承擔了數(shu)據清(qing)理(li)、子任務(wu)切(qie)分、TaskGroup 管理(li)等功(gong)能。

  • DataX Job 啟動后,會根據不同源端的切分策略,將 Job 切分成多個小的 Task (子任務),以便于并發執行。
  • 接著 DataX Job 會調用 Scheduler 模塊,根據配置的并發數量,將拆分成的 Task 重新組合,組裝成 TaskGroup(任務組)
  • 每一個 Task 都由 TaskGroup 負責啟動,Task 啟動后,會固定啟動 Reader --> Channel --> Writer 線程來完成任務同步工作。
  • DataX 作業運行啟動后,Job 會對 TaskGroup 進行監控操作,等待所有 TaskGroup 完成后,Job 便會成功退出(異常退出時 值非 0 )

DataX 調度過程:

  1. 首先 DataX Job 模塊會根據分庫分表切分成若干個 Task,然后根據用戶配置并發數,來計算需要分配多少個 TaskGroup;
  2. 計算過程:Task / Channel = TaskGroup,最后由 TaskGroup 根據分配好的并發數來運行 Task(任務)

使用 DataX 實現數據同步

DataX 基本使用

查看 streamreader \--> streamwriter 的模板:

python /usr/local/datax/bin/datax.py -r streamreader -w streamwriter

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根據模板編寫 json 文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [        # 同步的列名 (* 表示所有)
       {
           "type":"string",
    "value":"Hello."
       },
       {
           "type":"string",
    "value":"河北彭于晏"
       },
   ], 
                        "sliceRecordCount": "3"     # 打印數量
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "utf-8",     # 編碼
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"         # 并發 (即 sliceRecordCount * channel = 結果)
            }
        }
    }
}

通過 DataX 實 MySQL 數據同步

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",       # 讀取端
                    "parameter": {
                        "column": [],         # 需要同步的列 (* 表示所有的列)
                        "connection": [
                            {
                                "jdbcUrl": [],       # 連接信息
                                "table": []       # 連接表
                            }
                        ], 
                        "password": "",        # 連接用戶
                        "username": "",        # 連接密碼
                        "where": ""         # 描述篩選條件
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter",       # 寫入端
                    "parameter": {
                        "column": [],         # 需要同步的列
                        "connection": [
                            {
                                "jdbcUrl": "",       # 連接信息
                                "table": []       # 連接表
                            }
                        ], 
                        "password": "",        # 連接密碼
                        "preSql": [],         # 同步前. 要做的事
                        "session": [], 
                        "username": "",        # 連接用戶 
                        "writeMode": ""        # 操作類型
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""          # 指定并發數
            }
        }
    }
}

編寫 json 文件:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}
  • 上面的方式相當于是完全同步,但是當數據量較大時,同步的時候被中斷,是件很痛苦的事情;
  • 所以在有些情況下,增量同步還是蠻重要的

使用 DataX 進行增量同步

使用 DataX 進行全量同步和增量同步的唯一區別就是:增量同步需要使用 where 進行條件篩選。

編寫 json 文件:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "where": "ID <= 1888",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}
  • 需要注意的部分就是:where(條件篩選) 和 preSql(同步前,要做的事) 參數。

驗證:

python /usr/local/data/bin/data.py where.json

輸出

2022-12-16 17:34:38.534 [job-0] INFO  JobContainer - PerfTrace not enable!
2022-12-16 17:34:38.534 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.002s |  All Task WaitReaderTime 100.570s | Percentage 100.00%
2022-12-16 17:34:38.537 [job-0] INFO  JobContainer - 
任務啟動時刻                    : 2021-12-16 17:34:06
任務結束時刻                    : 2021-12-16 17:34:38
任務總計耗時                    :                 32s
任務平均流量                    :            1.61KB/s
記錄寫入速度                    :             62rec/s
讀出記錄總數                    :                1888

目標數據庫上查看:

基于上(shang)面數據,再次進行增(zeng)量同(tong)步:

主要是 where 配置:"where": "ID > 1888 AND ID <= 2888"      # 通過條件篩選來進行增量同步
同時需要將我上面的 preSql 刪除(因為我上面做的操作時 truncate 表)
0條評論
作者已關閉評論
劉****猛
7文章數
0粉(fen)絲(si)數
劉****猛
7 文章 | 0 粉(fen)絲

使用DataX實現數據從mysql到mysql的復制

2023-09-25 09:16:08
89
0

DataX 簡介

DataX 是 DataWorks 數據(ju)集成 的(de)開(kai)源版(ban)本,主(zhu)要就是用于實(shi)現數據(ju)間的(de)離線同(tong)步。 DataX 致力于實(shi)現包括關系型數據(ju)庫(MySQL、Oracle 等(deng))、HDFS、Hive、ODPS、HBase、FTP 等(deng) 各種異構數據(ju)源(即不同(tong)的(de)數據(ju)庫) 間穩定高效的(de)數據(ju)同(tong)步功能。

  • 為了 解決異構數據源同步問題,DataX 將復雜的網狀同步鏈路變成了星型數據鏈路 ,DataX 作為中間傳輸載體負責連接各種數據源;
  • 當需要接入一個新的數據源時,只需要將此數據源對接到 DataX,便能跟已有的數據源作為無縫數據同步。

DataX3.0 框架設計

DataX 采用 Framework + Plugin 架(jia)構,將數據源讀取和寫(xie)入抽象稱為 Reader/Writer 插(cha)件,納入到整個同步(bu)框架(jia)中(zhong)。

DataX 完成(cheng)單個(ge)數據同(tong)步的作(zuo)業,我們稱為(wei) Job,DataX 接收到一個(ge) Job 后(hou),將啟動一個(ge)進程(cheng)來完成(cheng)整個(ge)作(zuo)業同(tong)步過程(cheng)。DataX Job 模塊是單個(ge)作(zuo)業的中樞(shu)管理(li)節點,承擔(dan)了數據清(qing)理(li)、子任務切分、TaskGroup 管理(li)等功(gong)能(neng)。

  • DataX Job 啟動后,會根據不同源端的切分策略,將 Job 切分成多個小的 Task (子任務),以便于并發執行。
  • 接著 DataX Job 會調用 Scheduler 模塊,根據配置的并發數量,將拆分成的 Task 重新組合,組裝成 TaskGroup(任務組)
  • 每一個 Task 都由 TaskGroup 負責啟動,Task 啟動后,會固定啟動 Reader --> Channel --> Writer 線程來完成任務同步工作。
  • DataX 作業運行啟動后,Job 會對 TaskGroup 進行監控操作,等待所有 TaskGroup 完成后,Job 便會成功退出(異常退出時 值非 0 )

DataX 調度過程:

  1. 首先 DataX Job 模塊會根據分庫分表切分成若干個 Task,然后根據用戶配置并發數,來計算需要分配多少個 TaskGroup;
  2. 計算過程:Task / Channel = TaskGroup,最后由 TaskGroup 根據分配好的并發數來運行 Task(任務)

使用 DataX 實現數據同步

DataX 基本使用

查看 streamreader \--> streamwriter 的模(mo)板(ban):

python /usr/local/datax/bin/datax.py -r streamreader&nbsp;-w streamwriter

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根據模板編寫 json 文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [        # 同步的列名 (* 表示所有)
       {
           "type":"string",
    "value":"Hello."
       },
       {
           "type":"string",
    "value":"河北彭于晏"
       },
   ], 
                        "sliceRecordCount": "3"     # 打印數量
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "utf-8",     # 編碼
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"         # 并發 (即 sliceRecordCount * channel = 結果)
            }
        }
    }
}

通過 DataX 實 MySQL 數據同步

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",       # 讀取端
                    "parameter": {
                        "column": [],         # 需要同步的列 (* 表示所有的列)
                        "connection": [
                            {
                                "jdbcUrl": [],       # 連接信息
                                "table": []       # 連接表
                            }
                        ], 
                        "password": "",        # 連接用戶
                        "username": "",        # 連接密碼
                        "where": ""         # 描述篩選條件
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter",       # 寫入端
                    "parameter": {
                        "column": [],         # 需要同步的列
                        "connection": [
                            {
                                "jdbcUrl": "",       # 連接信息
                                "table": []       # 連接表
                            }
                        ], 
                        "password": "",        # 連接密碼
                        "preSql": [],         # 同步前. 要做的事
                        "session": [], 
                        "username": "",        # 連接用戶 
                        "writeMode": ""        # 操作類型
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""          # 指定并發數
            }
        }
    }
}

編寫 json 文件:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}
  • 上面的方式相當于是完全同步,但是當數據量較大時,同步的時候被中斷,是件很痛苦的事情;
  • 所以在有些情況下,增量同步還是蠻重要的

使用 DataX 進行增量同步

使用 DataX 進行全量同步和增量同步的唯一區別就是:增量同步需要使用 where 進行條件篩選。

編寫 json 文件:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "username": "root",
                        "password": "123123",
                        "column": ["*"],
                        "splitPk": "ID",
                        "where": "ID <= 1888",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8"
                                ], 
                                "table": ["t_member"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": ["*"], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8",
                                "table": ["t_member"]
                            }
                        ], 
                        "password": "123123",
                        "preSql": [
                            "truncate t_member"
                        ], 
                        "session": [
                            "set session sql_mode='ANSI'"
                        ], 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}
  • 需要注意的部分就是:where(條件篩選) 和 preSql(同步前,要做的事) 參數。

驗證:

python /usr/local/data/bin/data.py where.json

輸出

2022-12-16 17:34:38.534 [job-0] INFO  JobContainer - PerfTrace not enable!
2022-12-16 17:34:38.534 [job-0] INFO  StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.002s |  All Task WaitReaderTime 100.570s | Percentage 100.00%
2022-12-16 17:34:38.537 [job-0] INFO  JobContainer - 
任務啟動時刻                    : 2021-12-16 17:34:06
任務結束時刻                    : 2021-12-16 17:34:38
任務總計耗時                    :                 32s
任務平均流量                    :            1.61KB/s
記錄寫入速度                    :             62rec/s
讀出記錄總數                    :                1888

目標數據庫上查看:

基于(yu)上面數(shu)據,再(zai)次進行增(zeng)量同步:

主要是 where 配置:"where": "ID > 1888 AND ID <= 2888"      # 通過條件篩選來進行增量同步
同時需要將我上面的 preSql 刪除(因為我上面做的操作時 truncate 表)
文章來自個人專欄
文(wen)章 | 訂閱
0條評論
作者已關閉評論
作者已關閉評論
0
0