一、CDC
CDC(Change Data Capture)是一種用于跟蹤數據庫庫變更事件(插入、更新、刪除)中的行級更改,并將事件以發生的順序通知到其他系統處理。在容災場景下,CDC主要實現的是主備間的數據同步,即從主數據庫到備數據庫的數據實時同步。
source ----------> CDC ----------> sink
二、Seatunne CDC
Seatunnel CDC的數據同步分為兩種:
- 快照讀:讀取表的歷史數據
- 增量跟蹤:讀取表的增量日志更改數據
2.1 無鎖快照同步
無鎖快照同步階段,為什么強調無鎖,是因為現有的CDC平臺在進行歷史數據的同步時可能會進行鎖表操作,例如Debezium。快照讀階段就是對數據庫的歷史數據庫進行同步的過程,其基本概述流程如下:
storage------------->splitEnumerator----------split---------->reader
^ |
| |
\-----------------report------------/
- split劃分:splitEnumerator(split分發器)按照指定的字段(例如表id或唯一鍵)和步長將表數據劃分為多個分片split。
- 并行處理:每個split通過路由算法分配給不同的reader進行并行讀取,一個reader會占用一個連接。
- 事件反饋:每個reader完成split讀取后會向splitEnumerator報告進度。
splitEnumerator會發送給reader一個分片,分片的元數據信息如下:
String splitId 路由id
TableId tableId 表id
SeatunnelRowType splitKeyType 分片基于的字段的類型
Object splitStart 分片讀取起點
Object splitEnd 分片讀取終點
reader收到split信息后會生成相關的sql語句,在此之前會記錄當前split對應到數據庫日志log的開始位置,等處理完當前split后上報report給splitEnumerator,report內容如下:
String splitId 分片id
Offset highWatermark 分片對應log的位置,用于后續的校對
2.2 增量同步
增量同步階段是基于上述快照讀取階段后,在源數據庫發生變化時,實時將變更的數據同步到備數據庫,不同的是,此階段監聽的是數據庫的log日志,例如mysql的bin log。增量跟蹤通常是單線程處理,這樣可以避免重復拉取bin log,減輕對數據庫的壓力,因此該階段只有一個reader工作,只占用一個連接。
data log------------->splitEnumerator----------split---------->reader
^ |
| |
\-----------------report------------/
增量同步會合成快照階段所有split、table,因此只會存在一個split,增量同步階段的split信息如下:
String splitId
Offset startingOffset 所有split中最小的log start
Offset endingOffset log的結束位置,若無則代表是持續的,例如增量階段
List<TableId> tableIds
Map<TableId, Offset> tanleWatermarks 所有split的watermark
List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos 快照階段讀取的split細節信息
其中CompletedSnapshotSplitInfo的具體字段如下:
String splitId
TableId tableId
SeatunnelRowType splitKeyType
Object splitStart
Object splitEnd
Offset watermark 對應了report中的highWatermark
增量階段的split包含了快照階段所有split的watermark,會去從其中選出一個合適的位置進行增量同步,這個合適位置就是最小的watermark。
三、Exactly-once
無論是快照讀還是增量讀,同步的過程中數據庫可能也在經歷變化,如何保證exactly-once?
3.1 快照讀階段
在快照讀階段,例如某個split在同步的過程中,這段split中的數據發生了變換,例如下圖操作,插入一條k3,更新k2,刪除k1,如果在讀的過程中不做任務標識,那么這部分的更新信息就會丟失,seatunnel的做法是:
- 在split讀取之前首先去數據庫查一下bin log位置:low watermark
- 讀取split{start, end}數據
- 再記錄一下高水位high watermark
- 如果high = low 說明在讀取該split期間,該split的數據沒有發生變化;如果(high - low) > 0,說明在處理的過程中發生了數據變化,會進行如下操作:①將讀到的split數據在內存中建立內存表緩存;②將low watermark~high watermark的變更;③按順序、主鍵重放操作到內存表
- 報告report high watermark
insert k3 update k2 delete k1
| | |
v v v
bin log --|---------------------------------------------------|-- log offset
low watermark high watermark
CDC讀到的數據: k1 k3 k4
| 重放
v
真實的數據: k2 k3' k4
3.2 增量階段
在增量階段開始之前首先會對上一個步驟的所有split做校驗,因為在split和split之間的間隙也有可能出現數據更新,例如在split1和split2之間插入了若干條記錄,在快照階段就會遺漏掉,對于這種split之間的數據回撈,seatunnel的做法是:
- 從所有的split的report中找到最小的watermark,作為start watermark,開始讀取log。
- 每讀一條log都去completedSnapshotSplitInfos中找該條數據是否在某個split被處理過了,如果沒有被處理過,說明是split間隙數據,應該被重新修正。
- 當表過濾完后,可以從completedSnapshotSplitInfos中刪除,繼續處理剩余的表。
- 直到所有的split都校驗結束,就進入到了完全的增量階段。
|------------filter split2-----------------|
|----filter split1------|
data log -|-----------------------|------------------|----------------------------------|- log offset
min watermark split1 watermark split2 watermark max watermark
四、斷點續傳
如果做到暫停恢復?分布式快照算法(Chandy-Lamport):
p1 p2
X1:0 X2:4
Y1:0 Y2:2
Z1:0 Z2:3
p1 p2
X1:0 -------marker-------> X2:4
Y1:0 <---------M---------- Y2:2
Z1:0 Z2:3
p1 M p2
X1:0 X2:4
Y1:0 Y2:2
Z1:0 Z2:3
在Seatunnel CDC的過程中,marker同發送給所有的reader、splitEnumerator、writer等節點都會保存自己的內存狀態。