基本概念
觀(guan)察(cha)者模式(shi)是一種行為型(xing)設計(ji)模式(shi),它定義了一種一對(dui)多的(de)依(yi)賴關系(xi),當一個(ge)對(dui)象的(de)狀態(tai)發生改變(bian)時,其所有依(yi)賴者都會收到(dao)通知并自動更新。
當對(dui)象間存在(zai)一對(dui)多(duo)關(guan)系時(shi)(shi),則(ze)(ze)使用(yong)觀(guan)察(cha)者模式。比如,當一個對(dui)象被修改時(shi)(shi),則(ze)(ze)會(hui)自動(dong)通知依賴它的對(dui)象。觀(guan)察(cha)者模式屬于行為型模式。
在 MongoDB 中,Observer 模式使用在對請求(qiu)的監聽上。當有一條變更操作(zuo)發生時(shi),除了對存儲引擎進行數(shu)據寫入外,還可能執行以下額(e)外的操作(zuo):
- 如果運行在副本集模式下,還要寫 oplog。
- 如果修改了 admin 庫下的 system.users、system.roles 系統表,還需要通知 authManager 進行權限更新。
- 如果修改了 cache.chunks.xxx 路由緩存表,還需要進行本地路由刷新。
- ...
以(yi)(yi)上這些(xie)操(cao)作(zuo)(zuo)都可以(yi)(yi)按(an)功能(neng)模塊劃分出具(ju)體的(de) observer,當(dang)有變更(geng)操(cao)作(zuo)(zuo)發(fa)生時(shi),會(hui)通知這些(xie) observer 執(zhi)行對(dui)應的(de)操(cao)作(zuo)(zuo)。
實現方式
類型定義

OpObServer 是(shi)一個抽象類,其(qi)主要(yao)的實(shi)現包括(kuo):
- OpObserverImpl: 實現了常見的 DDL、DML 的處理方式,包括如何調用接口寫 oplog。
- OpObserverShardingImpl: 繼承自 OpObserverImpl,并實現了 shardSever 之間遷移數據時,如何處理插入、更新、刪除的數據處于正在遷移的 chunk 中的情況。
- AuthOpObserver: 實現了對 system.users、system.roles 等系統表進行變更時的處理流程。
- ConfigServerOpObserver: 實現了在 configServer 節點上對 config 庫執行變更時的處理流程。
- ShardServerOpObserver: 實現了 shardServer 節點上對 cache.chunks.xxx 路由緩存表執行變更時的處理流程。
- OpObserverRegistry: OpObserver 注冊器,內部使用數組存儲,當有監聽的動作發生時,會循環調用數組中每個 OpObserver 的方法。
調用流程
在 mongod 節(jie)點(副(fu)本集、單節(jie)點、shardServer、configServer)啟動時(shi),會根據運行配置(zhi)注冊(ce)多個 OpObserver 到 OpObserverRegistry 中,而 OpObserverRegistry 存放(fang)在全局唯一的 ServerContext 結構中。參考(kao) _initAndListen 的實現(xian):
auto serviceContext = getGlobalServiceContext();
serviceContext->setFastClockSource(FastClockSourceFactory::create(Milliseconds(10)));
// 分配 opObserverRegistry
auto opObserverRegistry = stdx::make_unique<OpObserverRegistry>();
// 注冊 OpObserver
opObserverRegistry->addObserver(stdx::make_unique<OpObserverShardingImpl>());
opObserverRegistry->addObserver(stdx::make_unique<AuthOpObserver>());
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
// shardServer 節點上,注冊 ShardServerOpObserver
opObserverRegistry->addObserver(stdx::make_unique<ShardServerOpObserver>());
} else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
// configServer 節點上,注冊 ConfigServerOpObserver
opObserverRegistry->addObserver(stdx::make_unique<ConfigServerOpObserver>());
}
setupFreeMonitoringOpObserver(opObserverRegistry.get());
serviceContext->setOpObserver(std::move(opObserverRegistry));
當有監(jian)聽的動作發生時,會調用 ServerContext 結構中的 OpObserverRegistry 對應的方法。以(yi) CollectionImpl::insertDocuments 插入動作為例:
getGlobalServiceContext()->getOpObserver()->onInserts(
opCtx, ns(), uuid(), begin, end, fromMigrate);
Observer 操作的一致性
以(yi) insert 操(cao)作為例,對應的 observer 的動作需(xu)要與其保持(chi)一致。即同時提(ti)交或者回(hui)滾。
有些 observer 動作(zuo)是可以(yi)利用存儲引擎的事務原子性(xing)來保證(zheng)的,比如 oplog 的提(ti)交。
有些 observer 動作(zuo)不(bu)會修改(gai)(gai)存(cun)儲引擎數據,只(zhi)是會修改(gai)(gai)內存(cun)狀態,比(bi)如 AuthOpObserver 的(de)權限更新動作(zuo)。對于(yu)這(zhe)些可以利用 WiredTigerRecoveryUnit 的(de) Change 機制,在(zai) WiredTigerRecoveryUnit 進(jin)行事務(wu)提交(jiao)或者回滾之后,調(diao)用 Change 中傳入(ru)的(de) rollback 和 commit 回調(diao)函數完成相應(ying)的(de)操作(zuo)。
不過 Change 機制不和對應的事務一起保證原子性。可能在事務提交之后馬上斷電,沒有執行對應的 Change 回調函數。以 AuthOpObserver 為例,可能修改了 admin.system.users 表并記錄了 oplog,但是沒有來得及更新內存結構就宕機了。
不過這種情況問題不大,因為節點重啟之后,會根據(ju)磁盤上最新(xin)的一致性數據(ju)重新(xin)構建內存數據(ju)。