應用場景
在(zai)以(yi)下場景,使用(yong)MirrorMaker進行不(bu)同(tong)集(ji)群間的(de)(de)數據同(tong)步,可(ke)以(yi)確保Kafka集(ji)群的(de)(de)可(ke)用(yong)性和可(ke)靠性。
- 備份和容災:企業存在多個數據中心,為了防止其中一個數據中心出現問題,導致業務不可用,會將集群數據同步備份在多個不同的數據中心。
- 集群遷移:當今很多企業將業務遷移上云,遷移過程中需要確保線下集群和云上集群的數據同步,保證業務的連續性。
方案架構
使(shi)用MirrorMaker可以實現將(jiang)源(yuan)集群(qun)中的(de)數據鏡像復制(zhi)到(dao)目標(biao)(biao)集群(qun)中。其原(yuan)理如圖1所(suo)示,MirrorMaker本質上也是生產(chan)消(xiao)費消(xiao)息,首先從源(yuan)集群(qun)中消(xiao)費數據,然后將(jiang)消(xiao)費的(de)數據生產(chan)到(dao)目標(biao)(biao)集群(qun)。如果您需要了解更多關于MirrorMaker的(de)信息,請參見。
圖(tu) MirrorMaker 原理圖(tu)


約束與限制
- 源集群中節點的IP地址和端口號不能和目標集群中節點的IP地址和端口號相同,否則會導致數據在Topic內無限循環復制。
- 使用MirrorMaker同步數據,至少需要有兩個或以上集群,不可在單個集群內部使用MirrorMaker,否則會導致數據在Topic內無限循環復制。
實施步驟
1、 購買一臺彈性云主機,確保彈性云主機與源集群、目標集群網絡互通。具體購買操作,請參考購買彈性云主機。
2、 登錄(lu)彈性(xing)云主機,安(an)裝Java JDK,并配置(zhi)JAVA_HOME與PATH環境(jing)變量,使用執行用戶(hu)在用戶(hu)家目錄(lu)下修改“.bash_profile”,添(tian)加如下行。其(qi)中“/opt/java/jdk1.8.0_151”為JDK的(de)安(an)裝路徑,請根據實際情況修改。
export JAVA_HOME=/opt/java/jdk1.8.0_151
export PATH=**JAVA_HOME/bin:**PATH
執行source .bash_profile命令使修改生效。
說明彈性云主(zhu)機默認自帶的JDK可(ke)能(neng)不符合(he)要求,例(li)如OpenJDK,需要配置為Oracle的JDK,可(ke)至Oracle官方下載頁面。
3、 下載Kafka 2.4.3及以上版本(ben)的(de)二(er)進制軟(ruan)件包。
wget //downloads.apache.org/kafka/ Kafka版本 /二進制軟件包
例如:下載Kafka 2.7.0版本的二進制軟(ruan)件包(bao)。
wget //downloads.apache.org/kafka/2.7.0/kafka_2.12-2.7.0.tgz
4、 解壓二進制(zhi)軟(ruan)件包。
tar -zxvf 二進制軟件包
例如:解壓“kafka_2.12-2.7.0.tgz”。
tar -zxvf kafka_2.12-2.7.0.tgz
5、 進入二進制軟件(jian)包(bao)目錄,修(xiu)改“config”目錄下的(de)“connect-mirror-maker.properties”的(de)配置(zhi)文(wen)件(jian),在配置(zhi)文(wen)件(jian)中指定源集群和(he)目標(biao)集群的(de)IP地址和(he)端口以及其(qi)他配置(zhi)。
# 指定兩個集群
clusters = A, B
A.bootstrap.servers = A_host1:A_port, A_host2:A_port, A_host3:B_port
B.bootstrap.servers = B_host1:B_port, B_host2:B_port, B_host3:B_port
# 指定數據同步方向,可以單向同步也可互相同步
A->B.enabled = true
# 指定同步的Topic,支持正則匹配,默認復制全部Topic,如:"foo-.*"
A->B.topics = .*
# 打開以下兩個配置則表示A、B兩個集群互相復制同步
#B->A.enabled = true
#B->A.topics = .*
# 設置副本個數,如果是要同步多個Topic且副本數各不相同,建議先創建同名同本數的Topic再啟動MirrorMaker
replication.factor=3
############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# 測試環境可以為1,生產環境建議以下配置大于1,比如設為3
checkpoints.topic.replication.factor=3
heartbeats.topic.replication.factor=3
offset-syncs.topic.replication.factor=3
# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# 測試環境可以為1,生產環境建議以下配置大于1,比如設為3
offset.storage.replication.factor=3
status.storage.replication.factor=3
config.storage.replication.factor=3
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
6、 在二進制(zhi)軟(ruan)件包目錄下(xia),啟動(dong)MirrorMaker,進行數據同步。
./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
7、 (可選)MirrorMaker開啟后,如果在源集群上新建了Topic,如需對此Topic進行數據同步,則需重啟MirrorMaker,重啟步驟參考6。也可配(pei)置(zhi)自(zi)動同(tong)步(bu)新(xin)增Topic,按需(xu)增加(jia)如表1所示配(pei)置(zhi)后(hou),無(wu)需(xu)重啟(qi)MirrorMaker,即可周期性(xing)同(tong)步(bu)新(xin)增Topic。其(qi)中,“refresh.topics.interval.seconds”為必(bi)選(xuan),其(qi)他(ta)參(can)數根(gen)據實(shi)際(ji)情(qing)況選(xuan)擇。
表 MirrorMaker配置(zhi)參數
| 參數名 | 默認值 | 說明 |
|---|---|---|
| sync.topic.configs.enabled | true | 是否監控源集群的配置更改 |
| sync.topic.acls.enabled | true | 是否監控源集群ACL的更改 |
| emit.heartbeats.enabled | true | 連接器應定期發出心跳 |
| emit.heartbeats.interval.seconds | 5秒 | 心跳頻率 |
| emit.checkpoints.enabled | true | 連接器應定期發出消費端偏移量信息 |
| emit.checkpoints.interval.seconds | 5秒 | 檢查點的頻率 |
| refresh.topics.enabled | true | 連接器應定期檢查新主題 |
| refresh.topics.interval.seconds | 5秒 | 檢查源群集中是否有新主題的頻率 |
| refresh.groups.enabled | true | 連接器應定期檢查新的消費組 |
| refresh.groups.interval.seconds | 5秒 | 檢查源集群新的消費組頻率 |
| replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | 使用LegacyReplicationPolicy模仿舊版MirrorMaker |
| heartbeats.topic.retention.ms | 1天 | 首次創建心跳主題時使用 |
| checkpoints.topic.retention.ms | 1天 | 首次創建檢查點主題時使用 |
| offset.syncs.topic.retention.ms | max long | 首次創建偏移同步主題時使用 |
驗證數據是否同步
1、 在目(mu)標集(ji)群(qun)中(zhong)查(cha)看Topic列(lie)表(biao),確認是否有源集(ji)群(qun)Topic。
說明目標集群中的Topic名稱和源集群相比,多了前綴(如A.),這屬于正常情況,是MirrorMaker 2為了防止Topic循環備份進行的設置。
2、 在源(yuan)集(ji)群(qun)生產并消費消息(xi),在目標集(ji)群(qun)查看消費進(jin)度,確(que)認數據(ju)是否已從源(yuan)集(ji)群(qun)同步到了(le)目標集(ji)群(qun)。
如果目標(biao)集群為Kafka實例的話,在(zai)分布式消息服務(wu)Kafka控制臺的“消費組(zu)管理 > 消費進度”中(zhong),查看(kan)消費進度。