k8S的工作隊列代碼解析和rust的實現(基礎工作隊列)
基礎知識
在 Kubernetes(K8S)中,workqueue 是一個用于處理異步任務的工具,它提供了一種安全、高效的方式來管理并處理工作項。下面我們詳細解讀這段代碼的工作原理。
主要結構和接口
Interface
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShutDownWithDrain()
ShuttingDown() bool
}
這個接口定義了一個工作隊列應具有的基本功能:
Add(item interface{}):添加一個新的工作項。Len() int:返回隊列中的當前工作項數。Get() (item interface{}, shutdown bool):獲取一個要處理的工作項。Done(item interface{}):標記一個工作項為已完成。ShutDown():關閉隊列,不再接受新工作項。ShutDownWithDrain():在所有工作項處理完畢后關閉隊列。ShuttingDown() bool:檢查隊列是否正在關閉。
FIFO隊列存儲過程
正常添加流程
通過 Add 方法往 FIFO 隊列中分別插入 1、2、3 三個元素,此時隊列中的 queue 和 dirty 分別存有 1、2、3 元素,processing為空;然后通過 Get 方法獲取最先進入的1元素,此時1 元素被放入 processing中,queue 和 dirty 剩余有 2、3 元素,表示1元素正在被處理;最后,當我們處理完 1 元素時通過 Done 方法標記該元素已經被處理完成,此時將1元素從 processing中移除。
高并發下如何保證一個元素哪怕其被添加了多次,但也只會被處理一次?
元素在queue和dirty中還未放入processing:
在并發場景下,假設 goroutine A 通過Add方法將1元素到queue和dirty中;同一時間,goroutine B 通過 Add 方法插入另一個 1 元素,此時在dirty中已經存在相同的元素,會直接返回。
元素在processing中正被處理:
在并發場景下,假設 goroutine A 通過 Get 方法獲取 1 元素,1 元素被添加到 processing 中并從queue和dirty中移除;同一時間,goroutine B 通過 Add 方法插入另一個 1 元素,此時在 processing 中已經存在相同的元素,所以后面的 1 元素并不會被直接添加到 queue 字段中,而是僅添加到dirty中;在 goroutine A 通過 Done 方法標記1元素被處理完成并從processing刪除后,檢測到dirty 字段中存有 1 元素,則將 1 元素追加到 queue 字段中的尾部。
get時候隊列空了:
當goroutine進行get時候,隊列空了,則會讓get的goroutine進行等待、阻塞。當隊列有值的時候才會繼續get。
隊列關閉了:
當隊列關閉了之后,get、add方法將不允許添加、獲取數據。但正在處理的數據除外,可以設置一種策略等待正在處理的數據處理完成。
多線程管理
在 Go 語言中,sync.Cond 是一個用于協調并發操作的條件變量。它允許一個或多個 goroutine 等待某個條件發生變化,同時確保這些等待和通知操作是安全的。sync.Cond 通常與互斥鎖 (sync.Mutex) 一起使用。
sync.Cond 的主要方法
Wait()Signal()Broadcast()
sync.Cond 的用法
Wait()
Wait方法會使調用它的 goroutine 進入等待狀態,直到收到Signal或Broadcast通知。調用Wait前,必須持有與條件變量關聯的互斥鎖。調用Wait后,會自動釋放該鎖,當Wait` 返回時,goroutine 會重新獲得該鎖。
Signal()
Signal 方法會喚醒一個正在等待的 goroutine。如果沒有 goroutine 在等待,Signal` 不做任何操作。
Broadcast()
Broadcast` 方法會喚醒所有正在等待的 goroutine。
sync.Cond 的應用場景
sync.Cond` 適用于需要等待某個條件的并發場景,例如生產者-消費者模型、資源池等。
Rust實現
基礎技術
堆、棧與Box
在 Rust 中,Box 是一個智能指針類型,用于在堆上分配內存并擁有該內存的所有權。Box 將值從棧上移動到堆上,并且當 Box 被丟棄時,堆上的值也會被自動清理。Box 主要用于以下情況:
- ?在堆上分配大型數據結構?:有時候我們不希望在棧上分配過大的數據結構,這時可以使用
Box將其分配到堆上。 - ?遞歸數據結構?:由于 Rust 編譯器需要在編譯時知道類型的大小,遞歸數據結構(例如樹或鏈表)無法在棧上存儲,這時可以使用
Box。 - ?動態分發(Dynamic Dispatch)?:當需要使用運行時多態(例如
trait對象)時,可以使用Box<dyn Trait>。
在rust中的工作隊列的實現中,Box用于存儲不同類型的工作項。這是通過將工作項封裝在Box<dyn Any + Send + Sync>中實現的,Any是一個允許存儲任意類型的特征,Send和Sync則確保這些類型在線程間傳遞和共享時是安全的。
高并發控制設計
Go 語言中的高并發控制
?在 Go 中,?sync.Mutex 和 sync.Cond 是高并發控制的基礎工具:
sync.Mutex:用于保護臨界區,確保同一時間只有一個 goroutine 訪問共享資源。sync.Cond:條件變量,用于 goroutines 等待或通知某個條件發生變化。
**Go 語言中 **sync.Cond 的主要方法:
Wait()Signal()Broadcast()
Rust語言中的高并發控制
?在 Rust 中,?std::sync::Mutex 和 std::sync::Condvar 提供了和golang中類似的功能:
std::sync::Mutex:用于保護臨界區,確保同一時間只有一個線程訪問共享資源。std::sync::Condvar:條件變量,用于線程等待或通知某個條件發生變化。
**Rust 中 **Condvar 的主要方法:
wait:和golang中的wait相同notify_onenotify_all
代碼設計
結構體
struct Queue {
queue: Mutex<VecDeque<WorkItem>>,
dirty: Mutex<HashSet<usize>>,
processing: Mutex<HashSet<usize>>,
cond: Condvar,
shutting_down: Mutex<bool>,
}
- queue:標準的工作隊列,是workqpueue的載體
- dirty:記錄添加進去隊列中,但未被處理的元素?
- p
- cond:控制線程同步、高并發的rust組件
- shutting_down:用于記錄是否這個隊列被關閉了
Add方法設計
代碼對比
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if _, exists := q.dirty[item]; exists {
return
}
q.dirty[item] = struct{}{}
if _, exists := q.processing[item]; !exists {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
** **這個方法的實現如下
fn add(&self, item: WorkItem) {
let key = Queue::get_item_key(&item);
?
// 如果隊列已經關閉,則退出
{
let shutting_down = self.shutting_down.lock().unwrap();
if *shutting_down {
println!("Queue shut down");
return;
}
}
?
let mut dirty = self.dirty.lock().unwrap();
if !dirty.contains(&key) {
dirty.insert(key);
drop(dirty);
?
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);
?
self.cond.notify_one();
}
}
方法解析
獲取工作項的唯一鍵
let key = Queue::get_item_key(&item);
- **使用 **
Queue::get_item_key(&item)方法獲取工作項的唯一鍵。這個鍵用于標識工作項并在dirty集合中跟蹤它。
檢查隊列是否已經關閉
{
let shutting_down = self.shutting_down.lock().unwrap();
if *shutting_down {
println!("Queue shut down");
return;
}
}
- **通過獲取 **
shutting_down鎖來檢查隊列是否已經關閉。 - **如果隊列已經關閉,打印 **
Queue shut down并返回。 - **使用一個代碼塊來限制鎖的持有范圍,確保在檢查完 **
shutting_down狀態后立即釋放鎖。
更新 dirty 集合
let mut dirty = self.dirty.lock().unwrap();
if !dirty.contains(&key) {
dirty.insert(key);
drop(dirty);
- **獲取 **
dirty集合的鎖,檢查工作項的鍵是否已經存在于dirty集合中。如果鍵不存在于dirty集合中,將鍵插入集合。 - **使用 **
drop(dirty)手動釋放鎖,確保在插入鍵后立即釋放鎖。
更新 queue 隊列并通知消費者
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);
self.cond.notify_one();
- 獲取
queue隊列的鎖,將工作項添加到隊列的末尾。 - 使用 drop(queue)手動釋放鎖,確保在添加工作項后立即釋放鎖。
- 調用
self.cond.notify_one()通知等待的消費者有新的工作項可用。
get方法設計
代碼對比
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item = q.queue[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
q.queue[0] = nil
q.queue = q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
rust中的實現代碼,這里實現了主體功能,省略了metrics的獲取。
fn get(&self) -> Option<WorkItem> {
let mut item = None;
let mut queue = self.queue.lock().unwrap();
while queue.is_empty() {
// 使用一個塊來釋放 shutting_down 的鎖
{
let shutting_down = self.shutting_down.lock().unwrap();
if *shutting_down {
return None;
}
}
queue = self.cond.wait(queue).unwrap();
}
if !queue.is_empty() {
item = queue.pop_front();
}
if let Some(ref itm) = item {
let key = Queue::get_item_key(itm);
self.processing.lock().unwrap().insert(key);
self.dirty.lock().unwrap().remove(&key);
}
item
}
方法解析
定義返回值和獲取隊列鎖
let mut item = None;
let mut queue = self.queue.lock().unwrap();
- ?
item:定義一個變量作為最終的返回值,初始化為None。 - ?獲取隊列鎖?:獲取
queue的互斥鎖,如果其他線程正在訪問queue,當前線程將會阻塞,直到鎖可用。此時queue被鎖定,確保對queue的操作是線程安全的。
等待非空隊列或檢查關閉狀態
while queue.is_empty() {
// 使用一個塊來釋放 shutting_down 的鎖
{
let shutting_down = self.shutting_down.lock().unwrap();
if *shutting_down {
return None;
}
}
queue = self.cond.wait(queue).unwrap();
}
- ?檢查隊列是否為空?:如果
queue為空,則進入while循環。 - ?檢查關閉狀態?:使用一個代碼塊來獲取
shutting_down的互斥鎖并檢查其狀態。如果shutting_down為true,說明隊列正在關閉,返回None退出方法。 - ?等待條件變量?:調用
self.cond.wait(queue).unwrap(),使當前線程進入等待狀態,直到條件變量收到通知。此時會釋放queue的鎖,允許其他線程操作queue。當被喚醒后,線程會重新獲取queue的鎖。這里他會一直等待隊列不為空,即有生產者入隊消息。
從隊列中獲取工作項
if !queue.is_empty() {
item = queue.pop_front();
}
- ?獲取工作項?:如果
queue不為空,從隊列前端彈出一個工作項并賦值給item。
更新 processing 和 dirty 集合
if let Some(ref itm) = item {
let key = Queue::get_item_key(itm);
self.processing.lock().unwrap().insert(key);
self.dirty.lock().unwrap().remove(&key);
}
- ?檢查
item是否存在?:如果item存在,獲取其引用itm。 - ?獲取工作項的鍵?:使用
Queue::get_item_key(itm)獲取工作項的唯一鍵。 - ?更新
processing集合?:獲取processing的互斥鎖,并將鍵插入processing集合中,表示該工作項正在處理。 - ?更新
dirty集合?:獲取dirty的互斥鎖,并將鍵從dirty集合中移除,表示該工作項不再需要重新處理,因為他已經get到了。
返回工作項
item
- ?返回值?:返回
item,這是一個可選類型,表示要處理的工作項。如果在整個過程中沒有獲取到工作項,則返回None。
done方法
代碼對比
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
} else if q.processing.len() == 0 {
q.cond.Signal()
}
}
rust中的實現如下:
fn done(&self, item: WorkItem) {
let key = Queue::get_item_key(&item);
self.processing.lock().unwrap().remove(&key);
let mut dirty = self.dirty.lock().unwrap();
if dirty.contains(&key) {
drop(dirty);
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);
self.cond.notify_one();
} else if self.processing.lock().unwrap().is_empty() {
self.cond.notify_all();
}
}
方法解析
獲取工作項的唯一鍵并從 processing 集合中移除
let key = Queue::get_item_key(&item);
self.processing.lock().unwrap().remove(&key);
- ?獲取工作項的唯一鍵?:調用
Queue::get_item_key(&item)方法獲取工作項的唯一鍵。這個鍵用于標識工作項。 - ?從
processing集合中移除?:獲取processing的互斥鎖,并從processing集合中移除該鍵,表示該工作項的處理已經完成。
檢查并更新 dirty 集合和 queue 隊列
let mut dirty = self.dirty.lock().unwrap();
if dirty.contains(&key) {
drop(dirty);
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);
self.cond.notify_one();
}
- ?獲取
dirty集合的鎖?:獲取dirty的互斥鎖,確保對dirty集合的操作是線程安全的。 - ?檢查
dirty集合?:如果dirty集合包含該鍵,表示該工作項在處理過程中再次被標記為需要處理。 - 更新
queue隊列:將工作項重新添加到queue隊列中,以便后續處理。具體步驟如下:- ?釋放
dirty鎖?:使用drop(dirty)釋放dirty的鎖。 - ?獲取
queue鎖?:獲取queue的互斥鎖,并將工作項添加到隊列末尾。 - ?釋放
queue鎖?:使用drop(queue)釋放queue的鎖。 - ?通知等待的線程?:調用
self.cond.notify_one(),通知等待在條件變量上的消費者線程有新的工作項可用。
- ?釋放
檢查 processing 集合并通知所有線程
else if self.processing.lock().unwrap().is_empty() {
self.cond.notify_all();
}
- ?獲取
processing鎖?:獲取processing的互斥鎖。 - ?檢查
processing集合?:如果processing集合為空,表示當前沒有正在處理的工作項。 - ?通知所有線程?:調用
self.cond.notify_all(),通知所有等待在條件變量上的線程。這通常用于在隊列即將關閉時確保所有工作項都已處理完畢。