亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

k8S的工作隊列代碼解析和rust的實現(基礎工作隊列)

2024-09-04 09:42:29
13
0

k8S的工作隊列代碼解析和rust的實現(基礎工作隊列)

基礎知識

在 Kubernetes(K8S)中,workqueue 是一個用于處理異步任務的工具,它提供了一種安全、高效的方式來管理并處理工作項。下面我們詳細解讀這段代碼的工作原理。

主要結構和接口

Interface

type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShutDownWithDrain()
    ShuttingDown() bool
}

這個接口定義了一個工作隊列應具有的基本功能:

  • Add(item interface{}):添加一個新的工作項。
  • Len() int:返回隊列中的當前工作項數。
  • Get() (item interface{}, shutdown bool):獲取一個要處理的工作項。
  • Done(item interface{}):標記一個工作項為已完成。
  • ShutDown():關閉隊列,不再接受新工作項。
  • ShutDownWithDrain():在所有工作項處理完畢后關閉隊列。
  • ShuttingDown() bool:檢查隊列是否正在關閉。

FIFO隊列存儲過程

正常添加流程

通過 Add 方法往 FIFO 隊列中分別插入 1、2、3 三個元素,此時隊列中的 queue 和 dirty 分別存有 1、2、3 元素,processing為空;然后通過 Get 方法獲取最先進入的1元素,此時1 元素被放入 processing中,queue 和 dirty 剩余有 2、3 元素,表示1元素正在被處理;最后,當我們處理完 1 元素時通過 Done 方法標記該元素已經被處理完成,此時將1元素從 processing中移除。

高并發下如何保證一個元素哪怕其被添加了多次,但也只會被處理一次?

元素在queue和dirty中還未放入processing:

在并發場景下,假設 goroutine A 通過Add方法將1元素到queue和dirty中;同一時間,goroutine B 通過 Add 方法插入另一個 1 元素,此時在dirty中已經存在相同的元素,會直接返回。

元素在processing中正被處理:

在并發場景下,假設 goroutine A 通過 Get 方法獲取 1 元素,1 元素被添加到 processing 中并從queue和dirty中移除;同一時間,goroutine B 通過 Add 方法插入另一個 1 元素,此時在 processing 中已經存在相同的元素,所以后面的 1 元素并不會被直接添加到 queue 字段中,而是僅添加到dirty中;在 goroutine A 通過 Done 方法標記1元素被處理完成并從processing刪除后,檢測到dirty 字段中存有 1 元素,則將 1 元素追加到 queue 字段中的尾部。

get時候隊列空了:

當goroutine進行get時候,隊列空了,則會讓get的goroutine進行等待、阻塞。當隊列有值的時候才會繼續get。

隊列關閉了:

當隊列關閉了之后,get、add方法將不允許添加、獲取數據。但正在處理的數據除外,可以設置一種策略等待正在處理的數據處理完成。

多線程管理

在 Go 語言中,sync.Cond 是一個用于協調并發操作的條件變量。它允許一個或多個 goroutine 等待某個條件發生變化,同時確保這些等待和通知操作是安全的。sync.Cond 通常與互斥鎖 (sync.Mutex) 一起使用。

sync.Cond 的主要方法

  • Wait()
  • Signal()
  • Broadcast()

sync.Cond 的用法

Wait()

Wait方法會使調用它的 goroutine 進入等待狀態,直到收到SignalBroadcast通知。調用Wait前,必須持有與條件變量關聯的互斥鎖。調用Wait后,會自動釋放該鎖,當Wait` 返回時,goroutine 會重新獲得該鎖。

Signal()

Signal 方法會喚醒一個正在等待的 goroutine。如果沒有 goroutine 在等待,Signal` 不做任何操作。

Broadcast()

Broadcast` 方法會喚醒所有正在等待的 goroutine。

sync.Cond 的應用場景

sync.Cond` 適用于需要等待某個條件的并發場景,例如生產者-消費者模型、資源池等。

Rust實現

基礎技術

堆、棧與Box

在 Rust 中,Box 是一個智能指針類型,用于在堆上分配內存并擁有該內存的所有權。Box 將值從棧上移動到堆上,并且當 Box 被丟棄時,堆上的值也會被自動清理。Box 主要用于以下情況:

  1. ?在堆上分配大型數據結構?:有時候我們不希望在棧上分配過大的數據結構,這時可以使用 Box 將其分配到堆上。
  2. ?遞歸數據結構?:由于 Rust 編譯器需要在編譯時知道類型的大小,遞歸數據結構(例如樹或鏈表)無法在棧上存儲,這時可以使用 Box
  3. ?動態分發(Dynamic Dispatch)?:當需要使用運行時多態(例如 trait 對象)時,可以使用 Box<dyn Trait>
    在rust中的工作隊列的實現中,Box 用于存儲不同類型的工作項。這是通過將工作項封裝在 Box<dyn Any + Send + Sync> 中實現的,Any 是一個允許存儲任意類型的特征,SendSync 則確保這些類型在線程間傳遞和共享時是安全的。

高并發控制設計

Go 語言中的高并發控制

?在 Go 中,?sync.Mutexsync.Cond 是高并發控制的基礎工具:

  • sync.Mutex:用于保護臨界區,確保同一時間只有一個 goroutine 訪問共享資源。
  • sync.Cond:條件變量,用于 goroutines 等待或通知某個條件發生變化。

**Go 語言中 **sync.Cond 的主要方法:

  • Wait()
  • Signal()
  • Broadcast()
Rust語言中的高并發控制

?在 Rust 中,?std::sync::Mutexstd::sync::Condvar 提供了和golang中類似的功能:

  • std::sync::Mutex:用于保護臨界區,確保同一時間只有一個線程訪問共享資源。
  • std::sync::Condvar:條件變量,用于線程等待或通知某個條件發生變化。

**Rust 中 **Condvar 的主要方法:

  • wait:和golang中的wait相同
  • notify_one
  • notify_all

代碼設計

結構體

struct Queue {
    queue: Mutex<VecDeque<WorkItem>>,
    dirty: Mutex<HashSet<usize>>,
    processing: Mutex<HashSet<usize>>,
    cond: Condvar,
    shutting_down: Mutex<bool>,
}
  • queue:標準的工作隊列,是workqpueue的載體
  • dirty:記錄添加進去隊列中,但未被處理的元素?
  • p
  • cond:控制線程同步、高并發的rust組件
  • shutting_down:用于記錄是否這個隊列被關閉了

Add方法設計

代碼對比
func (q *Type) Add(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    if q.shuttingDown {
        return
    }
    if _, exists := q.dirty[item]; exists {
        return
    }
    q.dirty[item] = struct{}{}
    if _, exists := q.processing[item]; !exists {
        q.queue = append(q.queue, item)
        q.cond.Signal()
    }
}

** **這個方法的實現如下

fn add(&self, item: WorkItem) {
        let key = Queue::get_item_key(&item);
?
        // 如果隊列已經關閉,則退出
        {
            let shutting_down = self.shutting_down.lock().unwrap();
            if *shutting_down {
                println!("Queue shut down");
                return;
            }
        }
?
        let mut dirty = self.dirty.lock().unwrap();
        if !dirty.contains(&key) {
            dirty.insert(key);
            drop(dirty);
?
            let mut queue = self.queue.lock().unwrap();
            queue.push_back(item);
            drop(queue);
?
            self.cond.notify_one();
        }
    }
方法解析
獲取工作項的唯一鍵
let key = Queue::get_item_key(&item);
  • **使用 **Queue::get_item_key(&item) 方法獲取工作項的唯一鍵。這個鍵用于標識工作項并在 dirty 集合中跟蹤它。
檢查隊列是否已經關閉
{
    let shutting_down = self.shutting_down.lock().unwrap();
    if *shutting_down {
        println!("Queue shut down");
        return;
    }
}
  • **通過獲取 **shutting_down 鎖來檢查隊列是否已經關閉。
  • **如果隊列已經關閉,打印 **Queue shut down 并返回。
  • **使用一個代碼塊來限制鎖的持有范圍,確保在檢查完 **shutting_down 狀態后立即釋放鎖。
更新 dirty 集合
let mut dirty = self.dirty.lock().unwrap();
if !dirty.contains(&key) {
    dirty.insert(key);
    drop(dirty);
  • **獲取 **dirty 集合的鎖,檢查工作項的鍵是否已經存在于 dirty 集合中。如果鍵不存在于 dirty 集合中,將鍵插入集合。
  • **使用 **drop(dirty) 手動釋放鎖,確保在插入鍵后立即釋放鎖。
更新 queue 隊列并通知消費者
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);

self.cond.notify_one();
  • 獲取 queue 隊列的鎖,將工作項添加到隊列的末尾。
  • 使用 drop(queue)手動釋放鎖,確保在添加工作項后立即釋放鎖。
  • 調用 self.cond.notify_one() 通知等待的消費者有新的工作項可用。

get方法設計

代碼對比
func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}

	item = q.queue[0]
	// The underlying array still exists and reference this object, so the object will not be garbage collected.
	q.queue[0] = nil
	q.queue = q.queue[1:]

	q.metrics.get(item)

	q.processing.insert(item)
	q.dirty.delete(item)

	return item, false
}

rust中的實現代碼,這里實現了主體功能,省略了metrics的獲取。

fn get(&self) -> Option<WorkItem> {
    let mut item = None;
    let mut queue = self.queue.lock().unwrap();
    while queue.is_empty() {
        // 使用一個塊來釋放 shutting_down 的鎖
        {
            let shutting_down = self.shutting_down.lock().unwrap();
            if *shutting_down {
                return None;
            }
        }

        queue = self.cond.wait(queue).unwrap();
    }
    if !queue.is_empty() {
        item = queue.pop_front();
    }

    if let Some(ref itm) = item {
        let key = Queue::get_item_key(itm);
        self.processing.lock().unwrap().insert(key);
        self.dirty.lock().unwrap().remove(&key);
    }

    item
}
方法解析
定義返回值和獲取隊列鎖
let mut item = None;
let mut queue = self.queue.lock().unwrap();
  • ?item:定義一個變量作為最終的返回值,初始化為 None
  • ?獲取隊列鎖?:獲取 queue 的互斥鎖,如果其他線程正在訪問 queue,當前線程將會阻塞,直到鎖可用。此時 queue 被鎖定,確保對 queue 的操作是線程安全的。
等待非空隊列或檢查關閉狀態
while queue.is_empty() {
    // 使用一個塊來釋放 shutting_down 的鎖
    {
        let shutting_down = self.shutting_down.lock().unwrap();
        if *shutting_down {
            return None;
        }
    }

    queue = self.cond.wait(queue).unwrap();
}
  • ?檢查隊列是否為空?:如果 queue 為空,則進入 while 循環。
  • ?檢查關閉狀態?:使用一個代碼塊來獲取 shutting_down 的互斥鎖并檢查其狀態。如果 shutting_downtrue,說明隊列正在關閉,返回 None 退出方法。
  • ?等待條件變量?:調用 self.cond.wait(queue).unwrap(),使當前線程進入等待狀態,直到條件變量收到通知。此時會釋放 queue 的鎖,允許其他線程操作 queue。當被喚醒后,線程會重新獲取 queue 的鎖。這里他會一直等待隊列不為空,即有生產者入隊消息。
從隊列中獲取工作項
if !queue.is_empty() {
    item = queue.pop_front();
}
  • ?獲取工作項?:如果 queue 不為空,從隊列前端彈出一個工作項并賦值給 item
更新 processingdirty 集合
if let Some(ref itm) = item {
    let key = Queue::get_item_key(itm);
    self.processing.lock().unwrap().insert(key);
    self.dirty.lock().unwrap().remove(&key);
}
  • ?檢查 item 是否存在?:如果 item 存在,獲取其引用 itm
  • ?獲取工作項的鍵?:使用 Queue::get_item_key(itm) 獲取工作項的唯一鍵。
  • ?更新 processing 集合?:獲取 processing 的互斥鎖,并將鍵插入 processing 集合中,表示該工作項正在處理。
  • ?更新 dirty 集合?:獲取 dirty 的互斥鎖,并將鍵從 dirty 集合中移除,表示該工作項不再需要重新處理,因為他已經get到了。
返回工作項
item
  • ?返回值?:返回 item,這是一個可選類型,表示要處理的工作項。如果在整個過程中沒有獲取到工作項,則返回 None

done方法

代碼對比
func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)

	q.processing.delete(item)
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	} else if q.processing.len() == 0 {
		q.cond.Signal()
	}
}

rust中的實現如下:

fn done(&self, item: WorkItem) {
    let key = Queue::get_item_key(&item);
    self.processing.lock().unwrap().remove(&key);

    let mut dirty = self.dirty.lock().unwrap();
    if dirty.contains(&key) {
        drop(dirty);
        let mut queue = self.queue.lock().unwrap();
        queue.push_back(item);
        drop(queue);

        self.cond.notify_one();
    } else if self.processing.lock().unwrap().is_empty() {
        self.cond.notify_all();
    }
}
方法解析
獲取工作項的唯一鍵并從 processing 集合中移除
let key = Queue::get_item_key(&item);
self.processing.lock().unwrap().remove(&key);
  • ?獲取工作項的唯一鍵?:調用 Queue::get_item_key(&item) 方法獲取工作項的唯一鍵。這個鍵用于標識工作項。
  • ?processing 集合中移除?:獲取 processing 的互斥鎖,并從 processing 集合中移除該鍵,表示該工作項的處理已經完成。
檢查并更新 dirty 集合和 queue 隊列
let mut dirty = self.dirty.lock().unwrap();
if dirty.contains(&key) {
    drop(dirty);
    let mut queue = self.queue.lock().unwrap();
    queue.push_back(item);
    drop(queue);

    self.cond.notify_one();
}
  • ?獲取 dirty 集合的鎖?:獲取 dirty 的互斥鎖,確保對 dirty 集合的操作是線程安全的。
  • ?檢查 dirty 集合?:如果 dirty 集合包含該鍵,表示該工作項在處理過程中再次被標記為需要處理。
  • 更新 queue 隊列:將工作項重新添加到 queue 隊列中,以便后續處理。具體步驟如下:
    • ?釋放 dirty?:使用 drop(dirty) 釋放 dirty 的鎖。
    • ?獲取 queue?:獲取 queue 的互斥鎖,并將工作項添加到隊列末尾。
    • ?釋放 queue?:使用 drop(queue) 釋放 queue 的鎖。
    • ?通知等待的線程?:調用 self.cond.notify_one(),通知等待在條件變量上的消費者線程有新的工作項可用。
檢查 processing 集合并通知所有線程
else if self.processing.lock().unwrap().is_empty() {
    self.cond.notify_all();
}
  • ?獲取 processing?:獲取 processing 的互斥鎖。
  • ?檢查 processing 集合?:如果 processing 集合為空,表示當前沒有正在處理的工作項。
  • ?通知所有線程?:調用 self.cond.notify_all(),通知所有等待在條件變量上的線程。這通常用于在隊列即將關閉時確保所有工作項都已處理完畢。
0條評論
0 / 1000
l****n
17文章數
0粉絲數
l****n
17 文章 | 0 粉絲
原創

k8S的工作隊列代碼解析和rust的實現(基礎工作隊列)

2024-09-04 09:42:29
13
0

k8S的工作隊列代碼解析和rust的實現(基礎工作隊列)

基礎知識

在 Kubernetes(K8S)中,workqueue 是一個用于處理異步任務的工具,它提供了一種安全、高效的方式來管理并處理工作項。下面我們詳細解讀這段代碼的工作原理。

主要結構和接口

Interface

type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShutDownWithDrain()
    ShuttingDown() bool
}

這個接口定義了一個工作隊列應具有的基本功能:

  • Add(item interface{}):添加一個新的工作項。
  • Len() int:返回隊列中的當前工作項數。
  • Get() (item interface{}, shutdown bool):獲取一個要處理的工作項。
  • Done(item interface{}):標記一個工作項為已完成。
  • ShutDown():關閉隊列,不再接受新工作項。
  • ShutDownWithDrain():在所有工作項處理完畢后關閉隊列。
  • ShuttingDown() bool:檢查隊列是否正在關閉。

FIFO隊列存儲過程

正常添加流程

通過 Add 方法往 FIFO 隊列中分別插入 1、2、3 三個元素,此時隊列中的 queue 和 dirty 分別存有 1、2、3 元素,processing為空;然后通過 Get 方法獲取最先進入的1元素,此時1 元素被放入 processing中,queue 和 dirty 剩余有 2、3 元素,表示1元素正在被處理;最后,當我們處理完 1 元素時通過 Done 方法標記該元素已經被處理完成,此時將1元素從 processing中移除。

高并發下如何保證一個元素哪怕其被添加了多次,但也只會被處理一次?

元素在queue和dirty中還未放入processing:

在并發場景下,假設 goroutine A 通過Add方法將1元素到queue和dirty中;同一時間,goroutine B 通過 Add 方法插入另一個 1 元素,此時在dirty中已經存在相同的元素,會直接返回。

元素在processing中正被處理:

在并發場景下,假設 goroutine A 通過 Get 方法獲取 1 元素,1 元素被添加到 processing 中并從queue和dirty中移除;同一時間,goroutine B 通過 Add 方法插入另一個 1 元素,此時在 processing 中已經存在相同的元素,所以后面的 1 元素并不會被直接添加到 queue 字段中,而是僅添加到dirty中;在 goroutine A 通過 Done 方法標記1元素被處理完成并從processing刪除后,檢測到dirty 字段中存有 1 元素,則將 1 元素追加到 queue 字段中的尾部。

get時候隊列空了:

當goroutine進行get時候,隊列空了,則會讓get的goroutine進行等待、阻塞。當隊列有值的時候才會繼續get。

隊列關閉了:

當隊列關閉了之后,get、add方法將不允許添加、獲取數據。但正在處理的數據除外,可以設置一種策略等待正在處理的數據處理完成。

多線程管理

在 Go 語言中,sync.Cond 是一個用于協調并發操作的條件變量。它允許一個或多個 goroutine 等待某個條件發生變化,同時確保這些等待和通知操作是安全的。sync.Cond 通常與互斥鎖 (sync.Mutex) 一起使用。

sync.Cond 的主要方法

  • Wait()
  • Signal()
  • Broadcast()

sync.Cond 的用法

Wait()

Wait方法會使調用它的 goroutine 進入等待狀態,直到收到SignalBroadcast通知。調用Wait前,必須持有與條件變量關聯的互斥鎖。調用Wait后,會自動釋放該鎖,當Wait` 返回時,goroutine 會重新獲得該鎖。

Signal()

Signal 方法會喚醒一個正在等待的 goroutine。如果沒有 goroutine 在等待,Signal` 不做任何操作。

Broadcast()

Broadcast` 方法會喚醒所有正在等待的 goroutine。

sync.Cond 的應用場景

sync.Cond` 適用于需要等待某個條件的并發場景,例如生產者-消費者模型、資源池等。

Rust實現

基礎技術

堆、棧與Box

在 Rust 中,Box 是一個智能指針類型,用于在堆上分配內存并擁有該內存的所有權。Box 將值從棧上移動到堆上,并且當 Box 被丟棄時,堆上的值也會被自動清理。Box 主要用于以下情況:

  1. ?在堆上分配大型數據結構?:有時候我們不希望在棧上分配過大的數據結構,這時可以使用 Box 將其分配到堆上。
  2. ?遞歸數據結構?:由于 Rust 編譯器需要在編譯時知道類型的大小,遞歸數據結構(例如樹或鏈表)無法在棧上存儲,這時可以使用 Box
  3. ?動態分發(Dynamic Dispatch)?:當需要使用運行時多態(例如 trait 對象)時,可以使用 Box<dyn Trait>
    在rust中的工作隊列的實現中,Box 用于存儲不同類型的工作項。這是通過將工作項封裝在 Box<dyn Any + Send + Sync> 中實現的,Any 是一個允許存儲任意類型的特征,SendSync 則確保這些類型在線程間傳遞和共享時是安全的。

高并發控制設計

Go 語言中的高并發控制

?在 Go 中,?sync.Mutexsync.Cond 是高并發控制的基礎工具:

  • sync.Mutex:用于保護臨界區,確保同一時間只有一個 goroutine 訪問共享資源。
  • sync.Cond:條件變量,用于 goroutines 等待或通知某個條件發生變化。

**Go 語言中 **sync.Cond 的主要方法:

  • Wait()
  • Signal()
  • Broadcast()
Rust語言中的高并發控制

?在 Rust 中,?std::sync::Mutexstd::sync::Condvar 提供了和golang中類似的功能:

  • std::sync::Mutex:用于保護臨界區,確保同一時間只有一個線程訪問共享資源。
  • std::sync::Condvar:條件變量,用于線程等待或通知某個條件發生變化。

**Rust 中 **Condvar 的主要方法:

  • wait:和golang中的wait相同
  • notify_one
  • notify_all

代碼設計

結構體

struct Queue {
    queue: Mutex<VecDeque<WorkItem>>,
    dirty: Mutex<HashSet<usize>>,
    processing: Mutex<HashSet<usize>>,
    cond: Condvar,
    shutting_down: Mutex<bool>,
}
  • queue:標準的工作隊列,是workqpueue的載體
  • dirty:記錄添加進去隊列中,但未被處理的元素?
  • p
  • cond:控制線程同步、高并發的rust組件
  • shutting_down:用于記錄是否這個隊列被關閉了

Add方法設計

代碼對比
func (q *Type) Add(item interface{}) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    if q.shuttingDown {
        return
    }
    if _, exists := q.dirty[item]; exists {
        return
    }
    q.dirty[item] = struct{}{}
    if _, exists := q.processing[item]; !exists {
        q.queue = append(q.queue, item)
        q.cond.Signal()
    }
}

** **這個方法的實現如下

fn add(&self, item: WorkItem) {
        let key = Queue::get_item_key(&item);
?
        // 如果隊列已經關閉,則退出
        {
            let shutting_down = self.shutting_down.lock().unwrap();
            if *shutting_down {
                println!("Queue shut down");
                return;
            }
        }
?
        let mut dirty = self.dirty.lock().unwrap();
        if !dirty.contains(&key) {
            dirty.insert(key);
            drop(dirty);
?
            let mut queue = self.queue.lock().unwrap();
            queue.push_back(item);
            drop(queue);
?
            self.cond.notify_one();
        }
    }
方法解析
獲取工作項的唯一鍵
let key = Queue::get_item_key(&item);
  • **使用 **Queue::get_item_key(&item) 方法獲取工作項的唯一鍵。這個鍵用于標識工作項并在 dirty 集合中跟蹤它。
檢查隊列是否已經關閉
{
    let shutting_down = self.shutting_down.lock().unwrap();
    if *shutting_down {
        println!("Queue shut down");
        return;
    }
}
  • **通過獲取 **shutting_down 鎖來檢查隊列是否已經關閉。
  • **如果隊列已經關閉,打印 **Queue shut down 并返回。
  • **使用一個代碼塊來限制鎖的持有范圍,確保在檢查完 **shutting_down 狀態后立即釋放鎖。
更新 dirty 集合
let mut dirty = self.dirty.lock().unwrap();
if !dirty.contains(&key) {
    dirty.insert(key);
    drop(dirty);
  • **獲取 **dirty 集合的鎖,檢查工作項的鍵是否已經存在于 dirty 集合中。如果鍵不存在于 dirty 集合中,將鍵插入集合。
  • **使用 **drop(dirty) 手動釋放鎖,確保在插入鍵后立即釋放鎖。
更新 queue 隊列并通知消費者
let mut queue = self.queue.lock().unwrap();
queue.push_back(item);
drop(queue);

self.cond.notify_one();
  • 獲取 queue 隊列的鎖,將工作項添加到隊列的末尾。
  • 使用 drop(queue)手動釋放鎖,確保在添加工作項后立即釋放鎖。
  • 調用 self.cond.notify_one() 通知等待的消費者有新的工作項可用。

get方法設計

代碼對比
func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}

	item = q.queue[0]
	// The underlying array still exists and reference this object, so the object will not be garbage collected.
	q.queue[0] = nil
	q.queue = q.queue[1:]

	q.metrics.get(item)

	q.processing.insert(item)
	q.dirty.delete(item)

	return item, false
}

rust中的實現代碼,這里實現了主體功能,省略了metrics的獲取。

fn get(&self) -> Option<WorkItem> {
    let mut item = None;
    let mut queue = self.queue.lock().unwrap();
    while queue.is_empty() {
        // 使用一個塊來釋放 shutting_down 的鎖
        {
            let shutting_down = self.shutting_down.lock().unwrap();
            if *shutting_down {
                return None;
            }
        }

        queue = self.cond.wait(queue).unwrap();
    }
    if !queue.is_empty() {
        item = queue.pop_front();
    }

    if let Some(ref itm) = item {
        let key = Queue::get_item_key(itm);
        self.processing.lock().unwrap().insert(key);
        self.dirty.lock().unwrap().remove(&key);
    }

    item
}
方法解析
定義返回值和獲取隊列鎖
let mut item = None;
let mut queue = self.queue.lock().unwrap();
  • ?item:定義一個變量作為最終的返回值,初始化為 None
  • ?獲取隊列鎖?:獲取 queue 的互斥鎖,如果其他線程正在訪問 queue,當前線程將會阻塞,直到鎖可用。此時 queue 被鎖定,確保對 queue 的操作是線程安全的。
等待非空隊列或檢查關閉狀態
while queue.is_empty() {
    // 使用一個塊來釋放 shutting_down 的鎖
    {
        let shutting_down = self.shutting_down.lock().unwrap();
        if *shutting_down {
            return None;
        }
    }

    queue = self.cond.wait(queue).unwrap();
}
  • ?檢查隊列是否為空?:如果 queue 為空,則進入 while 循環。
  • ?檢查關閉狀態?:使用一個代碼塊來獲取 shutting_down 的互斥鎖并檢查其狀態。如果 shutting_downtrue,說明隊列正在關閉,返回 None 退出方法。
  • ?等待條件變量?:調用 self.cond.wait(queue).unwrap(),使當前線程進入等待狀態,直到條件變量收到通知。此時會釋放 queue 的鎖,允許其他線程操作 queue。當被喚醒后,線程會重新獲取 queue 的鎖。這里他會一直等待隊列不為空,即有生產者入隊消息。
從隊列中獲取工作項
if !queue.is_empty() {
    item = queue.pop_front();
}
  • ?獲取工作項?:如果 queue 不為空,從隊列前端彈出一個工作項并賦值給 item
更新 processingdirty 集合
if let Some(ref itm) = item {
    let key = Queue::get_item_key(itm);
    self.processing.lock().unwrap().insert(key);
    self.dirty.lock().unwrap().remove(&key);
}
  • ?檢查 item 是否存在?:如果 item 存在,獲取其引用 itm
  • ?獲取工作項的鍵?:使用 Queue::get_item_key(itm) 獲取工作項的唯一鍵。
  • ?更新 processing 集合?:獲取 processing 的互斥鎖,并將鍵插入 processing 集合中,表示該工作項正在處理。
  • ?更新 dirty 集合?:獲取 dirty 的互斥鎖,并將鍵從 dirty 集合中移除,表示該工作項不再需要重新處理,因為他已經get到了。
返回工作項
item
  • ?返回值?:返回 item,這是一個可選類型,表示要處理的工作項。如果在整個過程中沒有獲取到工作項,則返回 None

done方法

代碼對比
func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)

	q.processing.delete(item)
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	} else if q.processing.len() == 0 {
		q.cond.Signal()
	}
}

rust中的實現如下:

fn done(&self, item: WorkItem) {
    let key = Queue::get_item_key(&item);
    self.processing.lock().unwrap().remove(&key);

    let mut dirty = self.dirty.lock().unwrap();
    if dirty.contains(&key) {
        drop(dirty);
        let mut queue = self.queue.lock().unwrap();
        queue.push_back(item);
        drop(queue);

        self.cond.notify_one();
    } else if self.processing.lock().unwrap().is_empty() {
        self.cond.notify_all();
    }
}
方法解析
獲取工作項的唯一鍵并從 processing 集合中移除
let key = Queue::get_item_key(&item);
self.processing.lock().unwrap().remove(&key);
  • ?獲取工作項的唯一鍵?:調用 Queue::get_item_key(&item) 方法獲取工作項的唯一鍵。這個鍵用于標識工作項。
  • ?processing 集合中移除?:獲取 processing 的互斥鎖,并從 processing 集合中移除該鍵,表示該工作項的處理已經完成。
檢查并更新 dirty 集合和 queue 隊列
let mut dirty = self.dirty.lock().unwrap();
if dirty.contains(&key) {
    drop(dirty);
    let mut queue = self.queue.lock().unwrap();
    queue.push_back(item);
    drop(queue);

    self.cond.notify_one();
}
  • ?獲取 dirty 集合的鎖?:獲取 dirty 的互斥鎖,確保對 dirty 集合的操作是線程安全的。
  • ?檢查 dirty 集合?:如果 dirty 集合包含該鍵,表示該工作項在處理過程中再次被標記為需要處理。
  • 更新 queue 隊列:將工作項重新添加到 queue 隊列中,以便后續處理。具體步驟如下:
    • ?釋放 dirty?:使用 drop(dirty) 釋放 dirty 的鎖。
    • ?獲取 queue?:獲取 queue 的互斥鎖,并將工作項添加到隊列末尾。
    • ?釋放 queue?:使用 drop(queue) 釋放 queue 的鎖。
    • ?通知等待的線程?:調用 self.cond.notify_one(),通知等待在條件變量上的消費者線程有新的工作項可用。
檢查 processing 集合并通知所有線程
else if self.processing.lock().unwrap().is_empty() {
    self.cond.notify_all();
}
  • ?獲取 processing?:獲取 processing 的互斥鎖。
  • ?檢查 processing 集合?:如果 processing 集合為空,表示當前沒有正在處理的工作項。
  • ?通知所有線程?:調用 self.cond.notify_all(),通知所有等待在條件變量上的線程。這通常用于在隊列即將關閉時確保所有工作項都已處理完畢。
文章來自個人專欄
文章 | 訂閱
0條評論
0 / 1000
請輸入你的評論
0
0