亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原(yuan)創(chuang)

使用 Rust 實現的基礎的List 和 Watch 機制

2024-09-24 10:07:29
43
0

使用 Rust 實現的基礎的List 和 Watch 機制

介紹

在(zai)日(ri)常(chang)的(de)開發(fa)過(guo)(guo)程中,有(you)一個(ge)很(hen)重要(yao)的(de)任務是能夠(gou)通過(guo)(guo)Rust語言(yan)實(shi)現K8s中的(de)各種生態組(zu)件,在(zai)這(zhe)個(ge)過(guo)(guo)程中,既需(xu)(xu)要(yao)能過(guo)(guo)夠(gou)了(le)解K8S的(de)工作原理(li)也需(xu)(xu)要(yao)能夠(gou)知道rust的(de)語言(yan)特性。因此(ci),在(zai)這(zhe)個(ge)過(guo)(guo)程中有(you)很(hen)多值得探(tan)討的(de)知識點。

在這里,第一步,我們將探索如何使用 Rust 實現一個類似于 Kubernetes 的 listwatch 機(ji)制。我(wo)們(men)將通過(guo) WebSocket 實現(xian)實時的(de)消(xiao)息推送,并使用一些關鍵的(de) Rust 異步(bu)編程模型來處(chu)理事件和連接(jie)管理。

我們(men)首先默認大(da)家能夠了解rust語言的基本特性。下文中,將針(zhen)對rust的知(zhi)識點展(zhan)開進(jin)行探討。

目標

  • 理解 WebSocket 連接的建立和管理。
  • 學習如何通過 WebSocket 推送消息。
  • 掌握消息緩存和處理的實現方式。
  • 了解如何使用 Rust 實現一個高效的事件分發系統。
  • 理解K8S中的數據一致性保障方法
  • 了解本機制的不足,以及后續如何進行改進

理解問題

什么是 listwatch

  • List:列出當前所有資源的狀態。
  • Watch:實時監控資源的變化,一旦有資源變化,就會立即通知客戶端。

使用場景

  • 自動化運維:實時監控系統資源狀態,觸發自動化運維操作。
  • 應用監控:實時獲取應用狀態,及時處理異常,在很多的系統設計場景中,能夠減少耦合。
  • K8S中的相應設計:K8S中,對相應資源的通知的基礎即為list and watch機制。本人在學習K8S源碼的第一步就是學習這一套設計架構。

分析問題

\當然,通過簡(jian)單的代(dai)碼(ma)僅僅通過http進行(xing)主動連接也(ye)可實現(xian)這個(ge)(ge)功能。但在目前階段,我們希(xi)望能夠設計一個(ge)(ge)高(gao)效的、穩定的、可擴展(zhan)的list and watch體系,因(yin)此我們需要考慮以下幾個(ge)(ge)關鍵問題。

關鍵問題

  1. 如何建立和管理 我們服務器和客戶端的連接?通過什么方式進行?
  2. 如何實現高效的消息推送機制?
  3. 如何處理消息緩存和訂閱管理?

技術選型

  • 語言:Rust
  • Web 框架:warp框架
  • WebSocket實現和框架:tokio-tungstenite、warp
  • 異步編程:tokio、管道機制

設計代碼結構

針對以(yi)上這(zhe)個(ge)需求(qiu),結合目前kunos-system的需求(qiu)我(wo)們闡釋如下

  • 有以下幾個資源,Node、Task(Task是一個shell命令、鏡像運行命令的載體)、Job(Task的上層資源,一個Job包含多個Task,類似于K8s中的replicaset)我們需要對這幾個資源的狀態進行推送。
  • 能夠在服務器建立起來一個watch and list服務器,能夠推送各種事件
  • 能夠

組件設計

  1. Broker:管理 WebSocket 訂閱者和事件分發。
    pub struct Broker<R: Resource + Clone + Serialize + Send + Sync  + 'static> {
        // 下游的訂閱者列表,用于發送websocket信息
        subscribers: Arc<RwLock<HashMap<Topic, HashMap<Uuid, WsSender>>>>,
        // 事件的緩沖流
        event_sender: UnboundedSender<(Topic, WatchEvent<R>)>,
    }
    
  2. Watcher:對不同資源類型進行管理和操作。
    pub struct Watcher {
        // 為不同的事件建立不同的broker
        pub node_broker: Arc<Broker<Node>>,
        pub task_broker: Arc<Broker<Task>>,
        pub job_broker: Arc<Broker<Job>>,
        pub exec_broker: Arc<Broker<TaskExecRequest>>,
    }
    
  3. WebSocket 客戶端:與服務器交互,接收實時事件。

基本原理

websocket路由入口

let node_subscribe = warp::path!("watch" / "node").and(warp::ws()).map(
    move |ws: warp::ws::Ws| {
        let node_broker_clone = Arc::clone(&node_broker_clone);
        ws.on_upgrade(move |socket| async move {
            node_broker_clone.subscribe("node".to_string(), socket).await;
        })
    },
);
1. warp::path!("watch" / "node")

*這部分代碼定義了一個路徑過濾器,用于匹配路徑 /watch/node 的 HTTP 請求。warp::path!是 Warp 框架提供的一個宏,用于簡化路徑定義。這里的"watch" / "node"表示請求路徑必須是/watch/node` 才能匹配這個(ge)過濾器。

2. .and(warp::ws())

這一部分代碼將路徑過濾器與 WebSocket 協議過濾器組合起來。warp::ws() 過濾器會匹配 WebSocket 握手請求并提取一個 warp::ws::Ws 類型,表(biao)示 WebSocket 配置。這(zhe)表(biao)示我們的這(zhe)個路徑將為一個websocket接(jie)口。

  • warp::ws() 過濾器用于匹配并提取 WebSocket 握手請求,確保該請求是 WebSocket 協議請求。
3. .map(move |ws: warp::ws::Ws| { ... })

.map 方法用于將前面的過濾器組合結果映射到一個新的處理邏輯中。這里的 move |ws: warp::ws::Ws| { ... } 是一(yi)個(ge)閉包,用于處理 WebSocket 請求。

  • move 關鍵字確保閉包捕獲其環境中的所有變量的所有權,因為這些變量將在異步操作中使用。
  • ws: warp::ws::Ws 參數是從前面的 warp::ws() 過濾器中提取的 WebSocket 配置。
4. ws.on_upgrade(move |socket| async move { ... })

ws.on_upgrade 方法(fa)用于將 WebSocket 協(xie)議升(sheng)級請求處理(li)為(wei) WebSocket 連(lian)接。它接受(shou)一(yi)(yi)個(ge)(ge)閉(bi)(bi)包作(zuo)為(wei)參數,當 WebSocket 握手成功(gong)后(hou),這(zhe)(zhe)個(ge)(ge)閉(bi)(bi)包會(hui)被調用。在(zai)官方定(ding)義中,這(zhe)(zhe)個(ge)(ge)方法(fa)主要用于自定(ding)義一(yi)(yi)個(ge)(ge)函(han)數對(dui)建立(li)后(hou)的websocket連(lian)接進(jin)行一(yi)(yi)定(ding)的操作(zuo),因此我們在(zai)這(zhe)(zhe)里將建立(li)連(lian)接后(hou)一(yi)(yi)切操作(zuo),比(bi)如保(bao)持連(lian)接,發送信息等。

/// Finish the upgrade, passing a function to handle the `WebSocket`.
///
/// The passed function must return a `Future`.
pub fn on_upgrade<F, U>(self, func: F) -> impl Reply
where
    F: FnOnce(WebSocket) -> U + Send + 'static,
    U: Future<Output = ()> + Send + 'static,
{
    WsReply {
        ws: self,
        on_upgrade: func,
    }
}
  • move |socket| async move { ... } 是一個異步閉包,它將在 WebSocket 連接成功升級后執行。
  • socket 參數表示已經升級的 WebSocket 連接。
5. node_broker_clone.subscribe("node".to_string(), socket).await;

在異步閉包內部,調用 node_broker_clone subscribe` 方法,將(jiang)新的(de) WebSocket 連接(jie)訂閱到節點(node)主題中。后續我們將(jiang)展開(kai)講解

  • "node".to_string() 將節點主題名稱轉換為字符串。
  • socket 參數表示當前的 WebSocket 連接。
  • await 關鍵字等待異步訂閱操作完成。

websocket連接處理

上面說到,我們通過 ws.on_upgrade(move |socket| async move { ... })這個方法在連(lian)接建(jian)立(li)之后(hou)進行處理(li)(li),其中(zhong)可以(yi)知道,我們處理(li)(li)的方法如(ru)下(xia)所示(shi)。

pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {
        let (ws_sender, mut ws_receiver) = socket.split();
        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
        let subscriber_id = Uuid::new_v4();
?
        {
            let mut subs = self.subscribers.write().await;
            subs.entry(topic.clone()).or_default().insert(subscriber_id, tx);
        }
?
        let subscribers = Arc::clone(&self.subscribers);
        tokio::task::spawn(async move {
            while let Some(result) = ws_receiver.next().await {
                match result {
                    Ok(message) => {
                        // 處理有效的消息
                        if message.is_text() {
                            println!(
                                "Received message from client: {}",
                                message.to_str().unwrap()
                            );
                        }
                    }
                    Err(e) => {
                        // 處理錯誤
                        eprintln!("WebSocket error: {:?}", e);
                        break;
                    }
                }
            }
            println!("WebSocket connection closed");
            subscribers.write().await.get_mut(&topic).map(|subscribers| subscribers.remove(&subscriber_id));
        });
?
        tokio::task::spawn(async move {
            let mut sender = ws_sender;
?
            while let Some(msg) = rx.recv().await {
                let _ = sender.send(msg).await;
            }
        });
    }
  • websocket連接處理 let (ws_sender, mut ws_receiver) = socket.split();這里使用原(yuan)生的(de)(de)代碼,將已經(jing)建立起來的(de)(de)socket進(jin)行分割,因(yin)(yin)為websocket是雙向連接,因(yin)(yin)此獲得針對這個socket的(de)(de)發送端(duan)(ws_sender)和接收端(duan)(ws_receiver)。

  • 建立連接并保存

    let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
    let subscriber_id = Uuid::new_v4();
    ?
    {
        let mut subs = self.subscribers.write().await;
        subs.entry(topic.clone()).or_default().insert(subscriber_id, tx);
    }
    

    在這里,我們建立了個一個管道,并將subscriber的信息進行保存,這里的 mpsc::unbounded_channel::<Message>();類似于golang中的(de)channel,他會(hui)生成一(yi)個發(fa)送(song)(song)者、一(yi)個接收(shou)者,當(dang)往發(fa)送(song)(song)者發(fa)送(song)(song)消息的(de)時候,接收(shou)者會(hui)受(shou)到(dao)該消息并進行(xing)一(yi)定(ding)處理。因此我們將subscriber的(de)發(fa)送(song)(song)者(tx)保存(cun)至內存(cun)里。

  • 建立消息發送機制

    tokio::task::spawn(async move {
                let mut sender = ws_sender;
    ?
                while let Some(msg) = rx.recv().await {
                    let _ = sender.send(msg).await;
                }
            });
    

    這個(ge)就是(shi)很簡單了,通(tong)過如(ru)果rx收(shou)到(dao)了消息(xi),則向websocket的subscriber進行發送。該任(ren)務是(shi)以新協程任(ren)務的方(fang)式啟動的,在后臺持續運行

  • 建立websocket連接保活機制

    let subscribers = Arc::clone(&self.subscribers);
        tokio::task::spawn(async move {
            while let Some(result) = ws_receiver.next().await {
                match result {
                    Ok(message) => {
                        // 處理有效的消息
                        if message.is_text() {
                            println!(
                                "Received message from client: {}",
                                message.to_str().unwrap()
                            );
                        }
                    }
                    Err(e) => {
                        // 處理錯誤
                        eprintln!("WebSocket error: {:?}", e);
                        break;
                    }
                }
            }
            println!("WebSocket connection closed");
            subscribers.write().await.get_mut(&topic).map(|subscribers| subscribers.remove(&subscriber_id));
        });
    

    這里我們仍然在后臺啟動一個守護協程,用于保活websocket連接,一旦發生了連接失效,則注銷消息發(fa)送機制,刪除subscribers緩存(cun)中的(de)訂閱者。

消息推送機制

  • 事件推送
    事件推送(song)(song)時候將允(yun)許調用相關事件的推送(song)(song)地址,向推送(song)(song)端發送(song)(song)消(xiao)息

    pub async fn produce_node_event(&self, event: WatchEvent<Node>) {
            self.node_broker.produce("node".to_string(), event).await;
        }
    
        pub async fn produce_task_event(&self, event: WatchEvent<Task>) {
            self.task_broker.produce("task".to_string(), event).await;
        }
    
        pub async fn produce_job_event(&self, event: WatchEvent<Job>) {
            self.job_broker.produce("job".to_string(), event).await;
        }
    

    當收到消(xiao)(xiao)(xiao)息(xi)的時候,不(bu)直接處理消(xiao)(xiao)(xiao)息(xi),而是將(jiang)放入(ru)緩存隊列中(一個消(xiao)(xiao)(xiao)息(xi)無界流)

    pub async fn produce(&self, topic: Topic, event: WatchEvent<R>) {
            if let Err(e) = self.event_sender.send((topic.clone(), event.clone())) {
                eprintln!("Failed to send event: {}", e);
            }
        }
    
  • 事件分發
    同樣的。將(jiang)啟動一(yi)個(ge)協程,用于從(cong)和event_sender對應的event_receiver中(zhong)獲取消息,推送給訂(ding)閱者。

    • 獲取訂閱者的列表并依次發送
    • 如果發現發送失敗,則將這個訂閱者從緩存中刪除
    fn start_event_dispatcher(broker: Arc<Self>, mut event_receiver: UnboundedReceiver<(Topic, WatchEvent<R>)>) {
            tokio::spawn(async move {
                while let Some((topic, event)) = event_receiver.recv().await {
                    let event_json = serde_json::to_string(&event).unwrap();
                    let subscribers_list;
                    {
                        let subscribers = broker.subscribers.read().await;
                        subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();
                    }
    
                    let mut invalid_subscribers = vec![];
                    for (id, ws_sender) in subscribers_list {
                        if ws_sender.send(warp::ws::Message::text(event_json.clone())).is_err() {
                            invalid_subscribers.push(id);
                        }
                    }
    
                    if !invalid_subscribers.is_empty() {
                        let mut subscribers = broker.subscribers.write().await;
                        if let Some(subscribers) = subscribers.get_mut(&topic) {
                            for id in invalid_subscribers {
                                subscribers.remove(&id);
                            }
                        }
                    }
                }
            });
        }
    

客戶端

客戶端的代碼就是建立起來一個訂閱者關注相關事件的動態。在相應的代碼中,可以使用該方法。本方法最終返回的是一個無界流 Stream<Item = WatchEvent<R>>,用于得到服務器推送過來(lai)的(de)事件類型

pub async fn list_and_watch<R>(api_client: &ApiClient, resource_name: &str) -> impl Stream<Item = WatchEvent<R>>
where
    R: Resource + Clone + DeserializeOwned + 'static + Send,
{

    // 先通過 HTTP 獲取資源列表
    let initial_resources = get_resource_list::<R>(api_client).await;

    // 解析要連接WebSocket服務器的URL
    let url = Url::parse(&*format!("{}/{}", api_client.watch_url, resource_name)).expect("Invalid URL");
    // 連接到WebSocket服務器
    println!("watch url is {}", url);
    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    println!("Watch client connected");

    let (mut write, read) = ws_stream.split();
    let (tx, rx) = mpsc::unbounded_channel();

    // 先發送初始資源列表
    match initial_resources {
        Ok(res) => tx.send(WatchEvent::Restarted(res)).unwrap(),
        Err(e) => eprintln!("list resource failed, {}", e),
    };

    // 將 WebSocket 讀流轉換為消息事件流
    tokio::spawn(async move {
        read.for_each(|message| async {
            match message {
                Ok(msg) => {
                    if msg.is_text() {
                        let text = msg.to_text().unwrap();
                        match serde_json::from_str::<WatchEvent<R>>(text) {
                            Ok(event) => {
                                tx.send(event).unwrap();
                            }
                            Err(e) => {
                                eprintln!("Failed to parse message: {:?}", e);
                            }
                        }
                    }
                }
                Err(e) => {
                    eprintln!("Error receiving message: {:?}", e);
                }
            }
        }).await;
    });

    // 保持 WebSocket 連接活躍
    tokio::spawn(async move {
        loop {
            if let Err(e) = write.send(WatchMessage::Text(String::new())).await {
                eprintln!("Error sending ping: {:?}", e);
                break;
            }
            tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
        }
    });

    tokio_stream::wrappers::UnboundedReceiverStream::new(rx)
}

使用驗證

不足分析

經過上面的介(jie)紹,我們可以看到(dao)這個基礎的list and watch機制能夠正確(que)運行。但是(shi),和K8S、ETCD中廣泛(fan)使(shi)用(yong)的list and watch相比仍(reng)然缺少(shao)一個機制來保證(zheng)list和watch的一致性。

請考慮(lv)這(zhe)樣(yang)一種情況我(wo)(wo)們的(de)服務器中(zhong)會源源不斷地(di)產生(sheng)數(shu)據d1,d2,d3,...,dn。當我(wo)(wo)們使(shi)(shi)用list時候,能(neng)夠感知到(dao)d1,d2,d3,此時我(wo)(wo)們完成list,開(kai)始建(jian)(jian)立watch。加入在開(kai)始建(jian)(jian)立watch這(zhe)個階段,即(ji)使(shi)(shi)可能(neng)是幾毫秒(miao)的(de)時間但服務器生(sheng)成了d4,而在watch建(jian)(jian)立起來后,只(zhi)能(neng)接(jie)收到(dao)d5,d6,...。這(zhe)就導致了數(shu)據的(de)遺(yi)失。

在 Kubernetes 中,ListWatch 操作結合使用時,需要使用一個revision機制以確保資源的變更不會被遺漏。理解 ListWatch 操作時 revision(即 resourceVersion)的(de)具體含義和管(guan)理方式對于保證一致性(xing)至關重要。revision的(de)存在(zai)有著如(ru)下的(de)意義:

  1. 數據版本控制revision 是 Etcd 的全局遞增計數器,用于標識數據的當前版本。當進行數據的修改、更新操作時候,revision會+1
  2. 一致性視圖:確保返回的數據是一致的快照視圖,表示在該 revision 之前的所有操作都已完成。

revisionListWatch 的關系

  1. List 操作
    • 返回資源列表和當前的全局 revision,作為 resourceVersion
    • 確保獲取到的資源是該 revision 時刻的一致視圖。
  2. Watch 操作
    • 使用 List 操作返回的resourceVersion` 作為起點。
    • 從該 resourceVersion 開始監聽資源的變化,確保在List Watch` 之間的變更不會丟失。

List 操作的 revision

當進行 List 操作時,Kubernetes API Server 從 Etcd 獲取當前資源的狀態及其resourceVersion 。這個 resourceVersion 是 Etcd 當前的全局revision 。它表示在此 revision 之前的所有操作都已經完成,并確保返回的數據是這個revision` 時刻的(de)一致(zhi)視圖。

Watch 操作的 revision

Watch 操作使用 List 操作返回的 resourceVersion 作為起點,從該版本開始監聽資源的變化。這確保了從 ListWatch 之間的變更不會被遺漏。

示例流程

  1. List 操作
    • API Server 從 Etcd 獲取指定資源的當前狀態。
    • Etcd 返回包含所有資源對象的列表和一個全局 revision ,這個 revision 將作為resourceVersion`。
  2. Watch 操作
    • API Server 使用 List 操作返回的 resourceVersion(revision) 作為起點,開始監聽資源的變化。
    • Etcd 返回從指定 revision` 開始的所有變更事件。

總結

  • revision:標識數據版本,確保數據一致性。
  • ListWatchList 獲取資源和 revisionWatch 從該 revision 開始監聽變化,確保變更的連續性和一致性。
0條評論
0 / 1000
l****n
17文章(zhang)數
0粉絲數
l****n
17 文章 | 0 粉絲
原創

使用 Rust 實現的基礎的List 和 Watch 機制

2024-09-24 10:07:29
43
0

使用 Rust 實現的基礎的List 和 Watch 機制

介紹

在日常的開發過程中(zhong),有一個很重要的任務是能夠通過Rust語言實現K8s中(zhong)的各種生態組件(jian),在這(zhe)個過程中(zhong),既需要能過夠了解K8S的工(gong)作原理(li)也(ye)需要能夠知(zhi)道rust的語言特性。因此,在這(zhe)個過程中(zhong)有很多值(zhi)得(de)探討的知(zhi)識點。

在這里,第一步,我們將探索如何使用 Rust 實現一個類似于 Kubernetes 的 listwatch 機(ji)制。我們將通過 WebSocket 實現實時的消息(xi)推送,并使用一些(xie)關(guan)鍵的 Rust 異步編程模型來處(chu)理(li)事(shi)件和連接管(guan)理(li)。

我(wo)們首先默認大家能夠(gou)了解rust語言的(de)基本(ben)特性。下文中,將針(zhen)對rust的(de)知識點展(zhan)開進行(xing)探討。

目標

  • 理解 WebSocket 連接的建立和管理。
  • 學習如何通過 WebSocket 推送消息。
  • 掌握消息緩存和處理的實現方式。
  • 了解如何使用 Rust 實現一個高效的事件分發系統。
  • 理解K8S中的數據一致性保障方法
  • 了解本機制的不足,以及后續如何進行改進

理解問題

什么是 listwatch

  • List:列出當前所有資源的狀態。
  • Watch:實時監控資源的變化,一旦有資源變化,就會立即通知客戶端。

使用場景

  • 自動化運維:實時監控系統資源狀態,觸發自動化運維操作。
  • 應用監控:實時獲取應用狀態,及時處理異常,在很多的系統設計場景中,能夠減少耦合。
  • K8S中的相應設計:K8S中,對相應資源的通知的基礎即為list and watch機制。本人在學習K8S源碼的第一步就是學習這一套設計架構。

分析問題

\當然,通(tong)過簡(jian)單的(de)代碼僅僅通(tong)過http進行(xing)主動連接也可實現這個功(gong)能(neng)。但在目(mu)前階段(duan),我們希望能(neng)夠設計(ji)一個高效的(de)、穩定的(de)、可擴展的(de)list and watch體系(xi),因此我們需(xu)要考慮(lv)以下幾(ji)個關鍵問題。

關鍵問題

  1. 如何建立和管理 我們服務器和客戶端的連接?通過什么方式進行?
  2. 如何實現高效的消息推送機制?
  3. 如何處理消息緩存和訂閱管理?

技術選型

  • 語言:Rust
  • Web 框架:warp框架
  • WebSocket實現和框架:tokio-tungstenite、warp
  • 異步編程:tokio、管道機制

設計代碼結構

針對以上這個需(xu)求(qiu),結合目前kunos-system的(de)需(xu)求(qiu)我們闡釋如下

  • 有以下幾個資源,Node、Task(Task是一個shell命令、鏡像運行命令的載體)、Job(Task的上層資源,一個Job包含多個Task,類似于K8s中的replicaset)我們需要對這幾個資源的狀態進行推送。
  • 能夠在服務器建立起來一個watch and list服務器,能夠推送各種事件
  • 能夠

組件設計

  1. Broker:管理 WebSocket 訂閱者和事件分發。
    pub struct Broker<R: Resource + Clone + Serialize + Send + Sync  + 'static> {
        // 下游的訂閱者列表,用于發送websocket信息
        subscribers: Arc<RwLock<HashMap<Topic, HashMap<Uuid, WsSender>>>>,
        // 事件的緩沖流
        event_sender: UnboundedSender<(Topic, WatchEvent<R>)>,
    }
    
  2. Watcher:對不同資源類型進行管理和操作。
    pub struct Watcher {
        // 為不同的事件建立不同的broker
        pub node_broker: Arc<Broker<Node>>,
        pub task_broker: Arc<Broker<Task>>,
        pub job_broker: Arc<Broker<Job>>,
        pub exec_broker: Arc<Broker<TaskExecRequest>>,
    }
    
  3. WebSocket 客戶端:與服務器交互,接收實時事件。

基本原理

websocket路由入口

let node_subscribe = warp::path!("watch" / "node").and(warp::ws()).map(
    move |ws: warp::ws::Ws| {
        let node_broker_clone = Arc::clone(&node_broker_clone);
        ws.on_upgrade(move |socket| async move {
            node_broker_clone.subscribe("node".to_string(), socket).await;
        })
    },
);
1. warp::path!("watch" / "node")

*這部分代碼定義了一個路徑過濾器,用于匹配路徑 /watch/node 的 HTTP 請求。warp::path!是 Warp 框架提供的一個宏,用于簡化路徑定義。這里的"watch" / "node"表示請求路徑必須是/watch/node` 才(cai)能匹配這個過濾器。

2. .and(warp::ws())

這一部分代碼將路徑過濾器與 WebSocket 協議過濾器組合起來。warp::ws() 過濾器會匹配 WebSocket 握手請求并提取一個 warp::ws::Ws 類型,表示(shi) WebSocket 配置(zhi)。這表示(shi)我們的這個(ge)路(lu)徑將為一(yi)個(ge)websocket接口。

  • warp::ws() 過濾器用于匹配并提取 WebSocket 握手請求,確保該請求是 WebSocket 協議請求。
3. .map(move |ws: warp::ws::Ws| { ... })

.map 方法用于將前面的過濾器組合結果映射到一個新的處理邏輯中。這里的 move |ws: warp::ws::Ws| { ... } 是一個閉(bi)包,用于處理 WebSocket 請求。

  • move 關鍵字確保閉包捕獲其環境中的所有變量的所有權,因為這些變量將在異步操作中使用。
  • ws: warp::ws::Ws 參數是從前面的 warp::ws() 過濾器中提取的 WebSocket 配置。
4. ws.on_upgrade(move |socket| async move { ... })

ws.on_upgrade 方法用(yong)于將 WebSocket 協議升級(ji)請求(qiu)處理為 WebSocket 連接(jie)(jie)。它接(jie)(jie)受一個(ge)(ge)(ge)閉包作(zuo)(zuo)為參(can)數,當(dang) WebSocket 握手(shou)成功后,這個(ge)(ge)(ge)閉包會被(bei)調用(yong)。在官方定(ding)(ding)義(yi)中,這個(ge)(ge)(ge)方法主要用(yong)于自定(ding)(ding)義(yi)一個(ge)(ge)(ge)函數對建(jian)立后的websocket連接(jie)(jie)進行一定(ding)(ding)的操作(zuo)(zuo),因此(ci)我們在這里將建(jian)立連接(jie)(jie)后一切操作(zuo)(zuo),比(bi)如保持連接(jie)(jie),發送信息(xi)等。

/// Finish the upgrade, passing a function to handle the `WebSocket`.
///
/// The passed function must return a `Future`.
pub fn on_upgrade<F, U>(self, func: F) -> impl Reply
where
    F: FnOnce(WebSocket) -> U + Send + 'static,
    U: Future<Output = ()> + Send + 'static,
{
    WsReply {
        ws: self,
        on_upgrade: func,
    }
}
  • move |socket| async move { ... } 是一個異步閉包,它將在 WebSocket 連接成功升級后執行。
  • socket 參數表示已經升級的 WebSocket 連接。
5. node_broker_clone.subscribe("node".to_string(), socket).await;

在異步閉包內部,調用 node_broker_clone subscribe` 方法,將(jiang)新的 WebSocket 連接訂閱到節點(node)主題中。后續我們將(jiang)展開講解

  • "node".to_string() 將節點主題名稱轉換為字符串。
  • socket 參數表示當前的 WebSocket 連接。
  • await 關鍵字等待異步訂閱操作完成。

websocket連接處理

上面說到,我們通過 ws.on_upgrade(move |socket| async move { ... })這個方法在連接(jie)建立之后進行處理(li),其中可以知道,我(wo)們(men)處理(li)的方法如下所示。

pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {
        let (ws_sender, mut ws_receiver) = socket.split();
        let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
        let subscriber_id = Uuid::new_v4();
?
        {
            let mut subs = self.subscribers.write().await;
            subs.entry(topic.clone()).or_default().insert(subscriber_id, tx);
        }
?
        let subscribers = Arc::clone(&self.subscribers);
        tokio::task::spawn(async move {
            while let Some(result) = ws_receiver.next().await {
                match result {
                    Ok(message) => {
                        // 處理有效的消息
                        if message.is_text() {
                            println!(
                                "Received message from client: {}",
                                message.to_str().unwrap()
                            );
                        }
                    }
                    Err(e) => {
                        // 處理錯誤
                        eprintln!("WebSocket error: {:?}", e);
                        break;
                    }
                }
            }
            println!("WebSocket connection closed");
            subscribers.write().await.get_mut(&topic).map(|subscribers| subscribers.remove(&subscriber_id));
        });
?
        tokio::task::spawn(async move {
            let mut sender = ws_sender;
?
            while let Some(msg) = rx.recv().await {
                let _ = sender.send(msg).await;
            }
        });
    }
  • websocket連接處理 let (ws_sender, mut ws_receiver) = socket.split();這(zhe)里使用原生(sheng)的(de)(de)代(dai)碼(ma),將已經建立(li)起來的(de)(de)socket進(jin)行分割,因為websocket是(shi)雙向連接,因此(ci)獲得針(zhen)對這(zhe)個socket的(de)(de)發(fa)送端(duan)(ws_sender)和接收端(duan)(ws_receiver)。

  • 建立連接并保存

    let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
    let subscriber_id = Uuid::new_v4();
    ?
    {
        let mut subs = self.subscribers.write().await;
        subs.entry(topic.clone()).or_default().insert(subscriber_id, tx);
    }
    

    在這里,我們建立了個一個管道,并將subscriber的信息進行保存,這里的 mpsc::unbounded_channel::<Message>();類似于golang中的(de)channel,他會(hui)生成一個發送者(zhe)、一個接(jie)(jie)收者(zhe),當往發送者(zhe)發送消(xiao)(xiao)息的(de)時候,接(jie)(jie)收者(zhe)會(hui)受到該(gai)消(xiao)(xiao)息并進(jin)行一定處理。因(yin)此我們將subscriber的(de)發送者(zhe)(tx)保存至內存里(li)。

  • 建立消息發送機制

    tokio::task::spawn(async move {
                let mut sender = ws_sender;
    ?
                while let Some(msg) = rx.recv().await {
                    let _ = sender.send(msg).await;
                }
            });
    

    這個(ge)就是很簡單(dan)了(le),通過如果rx收(shou)到(dao)了(le)消(xiao)息,則(ze)向websocket的subscriber進行發送。該任務(wu)是以新協程任務(wu)的方式(shi)啟(qi)動的,在后臺持續運(yun)行

  • 建立websocket連接保活機制

    let subscribers = Arc::clone(&self.subscribers);
        tokio::task::spawn(async move {
            while let Some(result) = ws_receiver.next().await {
                match result {
                    Ok(message) => {
                        // 處理有效的消息
                        if message.is_text() {
                            println!(
                                "Received message from client: {}",
                                message.to_str().unwrap()
                            );
                        }
                    }
                    Err(e) => {
                        // 處理錯誤
                        eprintln!("WebSocket error: {:?}", e);
                        break;
                    }
                }
            }
            println!("WebSocket connection closed");
            subscribers.write().await.get_mut(&topic).map(|subscribers| subscribers.remove(&subscriber_id));
        });
    

    這里我們仍然在后臺啟動一個守護協程,用于保活websocket連接,一旦發生了連接失效,則注銷消息發送機制,刪除(chu)subscribers緩存中的(de)訂閱者(zhe)。

消息推送機制

  • 事件推送
    事件推送時候將允許調(diao)用相關事件的推送地址(zhi),向推送端(duan)發送消(xiao)息(xi)

    pub async fn produce_node_event(&self, event: WatchEvent<Node>) {
            self.node_broker.produce("node".to_string(), event).await;
        }
    
        pub async fn produce_task_event(&self, event: WatchEvent<Task>) {
            self.task_broker.produce("task".to_string(), event).await;
        }
    
        pub async fn produce_job_event(&self, event: WatchEvent<Job>) {
            self.job_broker.produce("job".to_string(), event).await;
        }
    

    當(dang)收到消(xiao)息的時候(hou),不直接處(chu)理消(xiao)息,而是將(jiang)放入緩存(cun)隊列中(一個消(xiao)息無界(jie)流)

    pub async fn produce(&self, topic: Topic, event: WatchEvent<R>) {
            if let Err(e) = self.event_sender.send((topic.clone(), event.clone())) {
                eprintln!("Failed to send event: {}", e);
            }
        }
    
  • 事件分發
    同樣(yang)的。將(jiang)啟(qi)動一個協程(cheng),用于(yu)從和(he)event_sender對(dui)應的event_receiver中(zhong)獲(huo)取(qu)消息,推送給訂閱(yue)者。

    • 獲取訂閱者的列表并依次發送
    • 如果發現發送失敗,則將這個訂閱者從緩存中刪除
    fn start_event_dispatcher(broker: Arc<Self>, mut event_receiver: UnboundedReceiver<(Topic, WatchEvent<R>)>) {
            tokio::spawn(async move {
                while let Some((topic, event)) = event_receiver.recv().await {
                    let event_json = serde_json::to_string(&event).unwrap();
                    let subscribers_list;
                    {
                        let subscribers = broker.subscribers.read().await;
                        subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();
                    }
    
                    let mut invalid_subscribers = vec![];
                    for (id, ws_sender) in subscribers_list {
                        if ws_sender.send(warp::ws::Message::text(event_json.clone())).is_err() {
                            invalid_subscribers.push(id);
                        }
                    }
    
                    if !invalid_subscribers.is_empty() {
                        let mut subscribers = broker.subscribers.write().await;
                        if let Some(subscribers) = subscribers.get_mut(&topic) {
                            for id in invalid_subscribers {
                                subscribers.remove(&id);
                            }
                        }
                    }
                }
            });
        }
    

客戶端

客戶端的代碼就是建立起來一個訂閱者關注相關事件的動態。在相應的代碼中,可以使用該方法。本方法最終返回的是一個無界流 Stream<Item = WatchEvent<R>>,用(yong)于得到服務器推送過(guo)來的(de)事件(jian)類型

pub async fn list_and_watch<R>(api_client: &ApiClient, resource_name: &str) -> impl Stream<Item = WatchEvent<R>>
where
    R: Resource + Clone + DeserializeOwned + 'static + Send,
{

    // 先通過 HTTP 獲取資源列表
    let initial_resources = get_resource_list::<R>(api_client).await;

    // 解析要連接WebSocket服務器的URL
    let url = Url::parse(&*format!("{}/{}", api_client.watch_url, resource_name)).expect("Invalid URL");
    // 連接到WebSocket服務器
    println!("watch url is {}", url);
    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

    println!("Watch client connected");

    let (mut write, read) = ws_stream.split();
    let (tx, rx) = mpsc::unbounded_channel();

    // 先發送初始資源列表
    match initial_resources {
        Ok(res) => tx.send(WatchEvent::Restarted(res)).unwrap(),
        Err(e) => eprintln!("list resource failed, {}", e),
    };

    // 將 WebSocket 讀流轉換為消息事件流
    tokio::spawn(async move {
        read.for_each(|message| async {
            match message {
                Ok(msg) => {
                    if msg.is_text() {
                        let text = msg.to_text().unwrap();
                        match serde_json::from_str::<WatchEvent<R>>(text) {
                            Ok(event) => {
                                tx.send(event).unwrap();
                            }
                            Err(e) => {
                                eprintln!("Failed to parse message: {:?}", e);
                            }
                        }
                    }
                }
                Err(e) => {
                    eprintln!("Error receiving message: {:?}", e);
                }
            }
        }).await;
    });

    // 保持 WebSocket 連接活躍
    tokio::spawn(async move {
        loop {
            if let Err(e) = write.send(WatchMessage::Text(String::new())).await {
                eprintln!("Error sending ping: {:?}", e);
                break;
            }
            tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
        }
    });

    tokio_stream::wrappers::UnboundedReceiverStream::new(rx)
}

使用驗證

不足分析

經(jing)過上面的(de)介(jie)紹(shao),我(wo)們可(ke)以(yi)看到這(zhe)個(ge)基礎(chu)的(de)list and watch機(ji)制能夠正(zheng)確(que)運(yun)行(xing)。但是,和K8S、ETCD中廣泛(fan)使(shi)用的(de)list and watch相比仍然(ran)缺少一(yi)個(ge)機(ji)制來保證list和watch的(de)一(yi)致性。

請考慮這樣一種情(qing)況我(wo)們(men)的(de)服(fu)務器中(zhong)會源源不斷地產(chan)生(sheng)數據d1,d2,d3,...,dn。當(dang)我(wo)們(men)使(shi)用(yong)list時候,能(neng)夠感知到d1,d2,d3,此時我(wo)們(men)完(wan)成(cheng)(cheng)list,開(kai)始建(jian)(jian)立watch。加入在開(kai)始建(jian)(jian)立watch這個(ge)階段,即使(shi)可能(neng)是幾毫秒(miao)的(de)時間但服(fu)務器生(sheng)成(cheng)(cheng)了(le)(le)d4,而在watch建(jian)(jian)立起來(lai)后,只能(neng)接收到d5,d6,...。這就導致了(le)(le)數據的(de)遺失。

在 Kubernetes 中,ListWatch 操作結合使用時,需要使用一個revision機制以確保資源的變更不會被遺漏。理解 ListWatch 操作時 revision(即 resourceVersion)的(de)(de)具體含(han)義(yi)和管理方(fang)式對(dui)于保證一致性至關重要(yao)。revision的(de)(de)存在有著如下的(de)(de)意義(yi):

  1. 數據版本控制revision 是 Etcd 的全局遞增計數器,用于標識數據的當前版本。當進行數據的修改、更新操作時候,revision會+1
  2. 一致性視圖:確保返回的數據是一致的快照視圖,表示在該 revision 之前的所有操作都已完成。

revisionListWatch 的關系

  1. List 操作
    • 返回資源列表和當前的全局 revision,作為 resourceVersion
    • 確保獲取到的資源是該 revision 時刻的一致視圖。
  2. Watch 操作
    • 使用 List 操作返回的resourceVersion` 作為起點。
    • 從該 resourceVersion 開始監聽資源的變化,確保在List Watch` 之間的變更不會丟失。

List 操作的 revision

當進行 List 操作時,Kubernetes API Server 從 Etcd 獲取當前資源的狀態及其resourceVersion 。這個 resourceVersion 是 Etcd 當前的全局revision 。它表示在此 revision 之前的所有操作都已經完成,并確保返回的數據是這個revision` 時刻(ke)的一致(zhi)視圖(tu)。

Watch 操作的 revision

Watch 操作使用 List 操作返回的 resourceVersion 作為起點,從該版本開始監聽資源的變化。這確保了從 ListWatch 之間的變(bian)更不會被遺(yi)漏。

示例流程

  1. List 操作
    • API Server 從 Etcd 獲取指定資源的當前狀態。
    • Etcd 返回包含所有資源對象的列表和一個全局 revision ,這個 revision 將作為resourceVersion`。
  2. Watch 操作
    • API Server 使用 List 操作返回的 resourceVersion(revision) 作為起點,開始監聽資源的變化。
    • Etcd 返回從指定 revision` 開始的所有變更事件。

總結

  • revision:標識數據版本,確保數據一致性。
  • ListWatchList 獲取資源和 revisionWatch 從該 revision 開始監聽變化,確保變更的連續性和一致性。
文章來自個人專欄
文章(zhang) | 訂閱(yue)
0條評論
0 / 1000
請輸入你的評論
0
0