基本概念(nian)
flinkCDC功(gong)能(neng)是面(mian)向binlog進行同步、對數(shu)(shu)據的增刪(shan)改(gai)進行同步的工具,能(neng)夠實現(xian)對數(shu)(shu)據的動態監(jian)聽。目(mu)前(qian)其(qi)實現(xian)原理主要為監(jian)聽數(shu)(shu)據源的binlog對數(shu)(shu)據的變化有所感知。
在這里(li),我們只需引入(ru)相關依(yi)賴即可(ke)進行操作,如下所示
<!-- flink connector cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.sql.connector.cdc.version}</version>
</dependency>
需要注意的(de)是(shi),flinkcdc關于flink的(de)版(ban)本(ben)嚴格,在選(xuan)擇相應(ying)的(de)cdc版(ban)本(ben)時,可查看相關官方(fang)的(de)依賴表,在本實例中,選擇(ze)2.2.1版本的mysqlcdc進行演示。
| Flink® CDC Version | Flink® Version | 
|---|---|
| 1.0.0 | 1.11.* | 
| 1.1.0 | 1.11.* | 
| 1.2.0 | 1.12.* | 
| 1.3.0 | 1.12.* | 
| 1.4.0 | 1.13.* | 
| 2.0.* | 1.13.* | 
| 2.1.* | 1.13.* | 
| 2.2.* | 1.13., 1.14. | 
| 2.3.* | 1.13., 1.14., 1.15.*, 1.16.0 | 
Flink SQL CDC 內置了 Debezium 引(yin)擎,利用其抽取日志獲(huo)取變(bian)更的(de)能力(li),將 changelog 轉換為 Flink SQL 認識的(de) RowData 數(shu)(shu)(shu)據(ju)(ju)。RowData 代(dai)表(biao)了一行的(de)數(shu)(shu)(shu)據(ju)(ju),在 RowData 上面會有一個元數(shu)(shu)(shu)據(ju)(ju)的(de)信息 RowKind,RowKind 里面包(bao)括(kuo)了插入(ru)(+I)、更新(xin)(xin)前(-U)、更新(xin)(xin)后(hou)(+U)、刪除(-D),這(zhe)樣和數(shu)(shu)(shu)據(ju)(ju)庫里面的(de) binlog 概念十分類(lei)似。通過 Debezium 采(cai)集的(de)數(shu)(shu)(shu)據(ju)(ju),包(bao)含了舊數(shu)(shu)(shu)據(ju)(ju)(before)和新(xin)(xin)數(shu)(shu)(shu)據(ju)(ju)行(after)以及原數(shu)(shu)(shu)據(ju)(ju)信息(source),op 的(de) u表(biao)示是 update 更新(xin)(xin)操作標(biao)識符(op 字段的(de)值(zhi)c,u,d,r 分別對應 create,update,delete,reade),ts_ms 表(biao)示同步的(de)時間戳。
使用api進行操作(zuo)
使用flink標準DataStreamApi進(jin)行開發(fa),能夠配合CDC功能對數據的(de)動態(tai)輸入進(jin)行獲取(qu)。如下代碼實現(xian)了一個從mysql進(jin)行動態(tai)CDC讀(du)取(qu)的(de)樣例,這里(li)使用了相(xiang)應的(de)mysql-cdc的(de)數據源依賴進(jin)行讀(du)取(qu)。
package cn.ctyun.demo.api.watermark;
import cn.ctyun.demo.api.utils.TransformUtil;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
public class ViewContentStreamWithoutWaterMark {
    public static DataStream<JSONObject> getViewContentDataStream(StreamExecutionEnvironment env){
        // 1.創建Flink-MySQL-CDC的Source
        MySqlSource<String> viewContentSouce = MySqlSource.<String>builder()
                .hostname("49.7.189.190")
                .port(3307)
                .username("root")
                .password("Adm@163.comCdc")
                .databaseList("test_cdc_source")
                .tableList("test_cdc_source.view_content")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .build();
        // 2.使用CDC Source從MySQL讀取數據
        DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
                viewContentSouce,
                WatermarkStrategy.noWatermarks(),
                "ViewContentStreamNoWatermark Source"
        );
        // 3.轉換為指定格式
        return mysqlDataStreamSource.map(TransformUtil::formatResult);
    }
}
使(shi)用flinksql進行操作
flinksql操作,能夠簡化大量操作,具體如下代碼所示(shi)。在這里(li)我們只(zhi)需要提供簡單的sql語句即可(ke)完(wan)成對mysql數據源的動態讀取(qu)。通過指定連接器(qi)類(lei)型(xing)為'connector' = 'mysql-cdc',通(tong)過此配置項(xiang)調用mysql cdc連接器。
package cn.ctyun.demo.flinksql;
import cn.ctyun.demo.flinksql.udf.HashScalarFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
 * @classname: ReadFromCdc
 * @description: 通過cdc獲取數據變化進行輸入
 * @author: Liu Xinyuan
 * @create: 2023-04-12 15:09
 **/
public class FlinkSqlReadFromCdc {
    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        env.disableOperatorChaining();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 1. 創建讀取表,使用mysql-cdc進行,注意此時應標記主鍵
        String source_ddl = "CREATE TABLE UserSource (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT, " +
                " primary key (id) not enforced" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = '*******'," +
                " 'port' = '3307'," +
                " 'username' = '" + parameterTool.get("user") + "', " +
                " 'password' = '" + parameterTool.get("passwd") + "'" +
                " 'database-name' = 'test_cdc_source'," +
                " 'table-name' = 'test_user_table'," +
                " 'debezium.log.mining.continuous.mine'='true',"+
                " 'debezium.log.mining.strategy'='online_catalog', " +
                " 'debezium.database.tablename.case.insensitive'='false',"+
                " 'jdbc.properties.useSSL' = 'false' ," +
                " 'scan.startup.mode' = 'initial')";
        tableEnv.executeSql(source_ddl);
        //  2. 創建寫出表,使用mysql進行
        String sink_ddl = "CREATE TABLE UserSink (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT, " +
                " primary key (id) not enforced" +
                ") WITH (" +
                " 'connector.type' = 'jdbc', " +
                " 'connector.url' = 'jdbc:mysql://******:3306/flink_test_sink?useSSL=false', " +
                " 'connector.table' = 'test_user_table', " +
                " 'connector.username' = '" + parameterTool.get("sinkUser") + "', " +
                " 'connector.password' = '" + parameterTool.get("sinkPasswd") + "'" +
                " 'connector.write.flush.max-rows' = '1'" +
                ")";
        tableEnv.executeSql(sink_ddl);
        // 3.簡單的數據清洗,將電話號碼進行hash掩碼
        tableEnv.createTemporarySystemFunction("MyHASH", HashScalarFunction.class);
        Table maskedTable = tableEnv.sqlQuery("SELECT id, name, MyHASH(phone) as phone, sex FROM UserSource");
        tableEnv.createTemporaryView("MaskedUserInfo", maskedTable);
        // 4.使用insert語句進行數據輸出,在這里進行一定地清洗
        String insertSql = "INSERT INTO UserSink SELECT * FROM MaskedUserInfo";
        TableResult tableResult = tableEnv.executeSql(insertSql);
        tableResult.print();
    }
}剛才的代碼中(zhong)定(ding)義了一(yi)套簡單的數據同步+電話號碼掩碼的操作。這里(li)重點看cdc相關的配(pei)置項,如下所示。這里(li)有一(yi)個重點的配(pei)置項, 'scan.startup.mode' = 'initial'此處是cdc的(de)關(guan)鍵所在(zai),MySQL CDC 消費者可選的(de)啟動模式, 合(he)法的(de)模式為(wei) "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。這(zhe)里使(shi)用的(de)initial模式為在第一次(ci)啟(qi)動時對受監視的(de)數(shu)據庫表執行初始(shi)快照,并繼續讀(du)取最(zui)新的(de) binlog,也就是(shi)先(xian)進(jin)(jin)行一次(ci)全(quan)表掃描后再進(jin)(jin)行后續的(de)增量同步,由于測試數(shu)據較(jiao)小可(ke)以(yi)如此進(jin)(jin)行,cdc的(de)使(shi)用者可(ke)以(yi)根據個人(ren)情(qing)況進(jin)(jin)行選(xuan)擇。
String source_ddl = "CREATE TABLE UserSource (" +
                " id INT, " +
                " name VARCHAR, " +
                " phone VARCHAR, " +
                " sex INT, " +
                " primary key (id) not enforced" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = '******'," +
                " 'port' = '3307'," +
                " 'username' = '" + parameterTool.get("user") + "', " +
                " 'password' = '" + parameterTool.get("passwd") + "'" +
                " 'database-name' = 'test_cdc_source'," +
                " 'table-name' = 'test_user_table'," +
                " 'debezium.log.mining.continuous.mine'='true',"+
                " 'debezium.log.mining.strategy'='online_catalog', " +
                " 'debezium.database.tablename.case.insensitive'='false',"+
                " 'jdbc.properties.useSSL' = 'false' ," +
                " 'scan.startup.mode' = 'initial')";啟用(yong)后(hou),整(zheng)個流程(cheng)為對其中的(de)數據增量(liang)(liang)(liang)同(tong)步(bu),由于我們(men)使(shi)用(yong)的(de)是initial模式,因此我們(men)的(de)數據在任務啟動的(de)時候,首先進行了一(yi)次(ci)全量(liang)(liang)(liang)同(tong)步(bu),全量(liang)(liang)(liang)地將(jiang)信息同(tong)步(bu),并(bing)且進行了掩(yan)碼(ma)操作。


后續如(ru)果添(tian)加新的信(xin)息(xi)也(ye)會進行同(tong)步,刪除亦然。


斷點續傳
斷(duan)點續(xu)傳功(gong)(gong)能(neng)是flink-cdc在(zai)2.0版本后逐漸(jian)推(tui)行(xing)的(de)(de)新功(gong)(gong)能(neng)。此功(gong)(gong)能(neng)能(neng)夠(gou)支(zhi)持使用(yong)savepoint、checkpoint等方式進行(xing)斷(duan)點續(xu)傳功(gong)(gong)能(neng)。意思為如(ru)果我們在(zai)中途保(bao)(bao)留(liu)一個保(bao)(bao)存點,那(nei)么任務如(ru)果重啟的(de)(de)話將會(hui)從保(bao)(bao)存點開(kai)始同步cdc數據(ju),中間不會(hui)遺失數據(ju)(除非手動刪除binlog)。目前flink cdc如(ru)果需要實現斷(duan)點續(xu)傳則(ze)需要開(kai)啟checkpoint功(gong)(gong)能(neng)。關于flink的(de)(de)savepoint、checkpoint功(gong)(gong)能(neng)將會(hui)在(zai)后續(xu)章(zhang)節(jie)展開(kai)進行(xing)講解(jie)。