Introduction
概述
傳統bitmap是一個整體全局變量,這就會帶來如下問題:
- 并發訪問需要加鎖保護,競爭非常激烈時鎖的開銷不可忽略
- 由于是共享變量,每一次write操作內部都會有大量MESI消息,且會造成cache顛簸
?
sbitmap(scalable bitmap), 重點在于scalable, 那么如何可擴展呢?核心思想是:將整個bitmap進行拆分,將全局競爭變為局部競爭。這是一種通用的思路,例如 Tree RCU 對 Classic RCU 的取代。
bitmap和sbitmap各有優劣。sbitmapp消耗了更多內存資源,且獲取一個bit使用的指令也更多,適用于多核競爭激烈的場景;故在競爭可控的場景下,建議還是選擇bitmap。
?
??
Sbitmap實現
概述
本章重點介紹sbitmap的實現,對應源碼:
linux-6.6/include/linux/bitmap.h
linux-6.6/lib/bitmap.c
數據結構
/*
* sbitmap: scalable bitmap, 本質是bitmap,如何scalab呢?
* 傳統bitmap是整體,并發訪問時需要一把全局鎖。當競爭變激烈時,會造成性能嚴重下滑。
* sbitmap將bitmap切割為小塊map,這樣可以大大減少競爭,特別是在per-cpu的競爭中。同時考慮cacheline(如何理解?)
* depth表示總的bit數量; 1<<shift表示每一塊bit的數量; map_nr表示小塊map的數量
*/
struct sbitmap {
unsigned int depth; //總bit的數量
unsigned int shift; //每個map管理的bit數量, 不能大于一個word(unsigned long)
unsigned int map_nr; //map的數量 = depth / (1 << shift)
struct sbitmap_word *map; //bitmap區域
/* round_robin只是影響單個map查詢是從offset=0開始(false),還是offset=hint開始(true).
* 在實際代碼中會影響hint, warp兩個變量,但本質是一件事: 若從offset=hint開始查詢,
* 沒找到需要wrap. 故代碼里面感覺兩者可以合并
*/
bool round_robin;
/* per_cpu的變量,用于記錄上一次該cpu獲取和釋放到bit的信息nrbit, 加快bit的獲取
* get:下一次的起點是nrbit+1; put:下一次的起點是上次釋放的位置
* 從get接口來看,若round_robin=false, alloc_hint好像也沒有任何作用?
*/
unsigned int __percpu *alloc_hint;
};
/*
* 當free時,歸還者僅標記對應cleard即可,不會操作word. 僅當word已經滿時會將cleard記錄的歸還.
* 這樣保證了word僅被get動作改變, 錯開的get和free操作, 金少競爭。
* 為什么 word 不標記為cacheline對齊?
*/
struct sbitmap_word {
unsigned long word;
unsigned long cleared ____cacheline_aligned_in_smp;
} ____cacheline_aligned_in_smp;
?
API
/* 分配+初始化一個sbmitmap內部資源 */
//alloc_hin=false, 不能使用sbitmap_get()
int sbitmap_init_node(struct sbitmap *sb, unsigned int depth, int shift,
gfp_t flags, int node, bool round_robin, bool alloc_hint);
void sbitmap_free(struct sbitmap *sb);
/* 獲取/釋放 對應的bit */
int sbitmap_get(struct sbitmap *sb); //僅支持hint的模式分配
void sbitmap_put(struct sbitmap *sb, unsigned int bitnr)
bool sbitmap_any_bit_set(const struct sbitmap *sb);
?
?
實現流程
這里重點分析get主要流程,其他操作可以從該流程推導出
sbitmap_get()
|-update_alloc_hint_before_get() //獲取上個cpu的hint
|-__sbitmap_get() //獲取到哪個bit是空閑的, 首先找到map,然后找到map中對應的bit. RR為true表示要從offset=hint開始查詢
|-update_alloc_hint_after_get() //更新該cpu本次訪問的hint
?
代碼流程
int sbitmap_get(struct sbitmap *sb)
{
...
depth = READ_ONCE(sb->depth);
hint = update_alloc_hint_before_get(sb, depth); //讀取sb->alloc_hint
nr = __sbitmap_get(sb, hint);
update_alloc_hint_after_get(sb, depth, hint, nr); //將nr+1寫入到sb->alloc_hint,下次獲取就可以快速的找到
...
}
/*
* 分配的核心邏輯:首先找到對應的map,然后從map中找到合適的bit
* round_robin會用到hint? 非RR從0開始
*/
static int __sbitmap_get(struct sbitmap *sb, unsigned int alloc_hint)
{
unsigned int index;
//找到對應的map: alloc_hint>>sb->shift
index = SB_NR_TO_INDEX(sb, alloc_hint);
if (sb->round_robin)
alloc_hint = SB_NR_TO_BIT(sb, alloc_hint); //找到map對應的bit
else
alloc_hint = 0; //從bit0開始分配
return sbitmap_find_bit(sb, UINT_MAX, index, alloc_hint, !sb->round_robin);
}
static int sbitmap_find_bit(struct sbitmap *sb, unsigned int depth, unsigned int index, unsigned int alloc_hint, bool wrap)
{
unsigned int i;
int nr = -1;
/* 最外層for僅控制次數,i并不控制從哪個map開始分配, 由index指定.
* hint僅適用于最開始預期map中查找,當切換到下一個map,默認從0開始,這也滿足rr策略.
* wrap由底層邏輯處理
*/
for (i = 0; i < sb->map_nr; i++) {
nr = sbitmap_find_bit_in_word(&sb->map[index], min_t(unsigned int, __map_depth(sb, index), depth), alloc_hint, wrap);
if (nr != -1) {
nr += index << sb->shift; //轉換為用戶視角需要的nr(用戶只關心整體bitmap順序)
break;
}
//++index表示跳轉到下一個map
alloc_hint = 0; //在預期外的map分配bit時,采用默認為0的地方開始搜尋
if (++index >= sb->map_nr)
index = 0;
}
return nr;
}
static int sbitmap_find_bit_in_word(struct sbitmap_word *map,
unsigned int depth,
unsigned int alloc_hint,
bool wrap)
{
int nr;
do {
nr = __sbitmap_get_word(&map->word, depth, alloc_hint, wrap);
if (nr != -1)
break;
if (!sbitmap_deferred_clear(map)) //若沒要找到,需要將map->cleared已經釋放的歸還到map->word
break;
} while (1);
return nr;
}
static int __sbitmap_get_word(unsigned long *word, unsigned long depth, unsigned int hint, bool wrap)
{
int nr;
/* don't wrap if starting from 0 */
//為了保證完整查詢了bitmap, wrap僅在在hint非零時才有意義,因為要回滾。從0開始查詢沒有必要回滾
//wrap在查詢中核心就是如果找到了從hint開始沒有找到,就從投開始在查詢一遍
wrap = wrap && hint;
while (1) {
nr = find_next_zero_bit(word, depth, hint);
if (unlikely(nr >= depth)) { //從offset開始查詢,沒有找到
/*
* We started with an offset, and we didn't reset the
* offset to 0 in a failure case, so start from 0 to
* exhaust the map.
*/
if (hint && wrap) { //從offset=0重新查詢一遍
hint = 0;
continue;
}
return -1;
}
if (!test_and_set_bit_lock(nr, word))
break;
//set失敗了,該bit被其他人搶占,此時繼續從下一個bit查詢
hint = nr + 1;
if (hint >= depth - 1)
hint = 0;
}
return nr;
}
?
案例
內核中直接使用sbitmap的模塊并不多,由 blk的hctx 和 scsi模塊
?
?
scsi sbitmap
scsi的利用方式比較簡單,由于alloc_hin=true, 可以使用sbitmap_get()
struct scsi_device {
...
struct sbitmap budget_map;
...
}
int scsi_realloc_sdev_budget_map(struct scsi_device *sdev, unsigned int depth)
{
...
//由于
ret = sbitmap_init_node(&sdev->budget_map, scsi_device_max_queue_depth(sdev),
new_shift, GFP_KERNEL, sdev->request_queue->node, false, true);
...
}
int scsi_dev_queue_ready(struct request_queue *q, struct scsi_device *sdev)
{
...
token = sbitmap_get(&sdev->budget_map);
...
}
?
?
hctx sbitmap
htctx的alloc_hin=false, 不能使用sbitmap_get(),所以從其他角度來使用
struct blk_mq_hw_ctx *blk_mq_alloc_hctx(struct request_queue *q, struct blk_mq_tag_set *set, int node)
{
...
sbitmap_init_node(&hctx->ctx_map, nr_cpu_ids, ilog2(8), gfp, node, false, false);
...
}
static void blk_mq_hctx_mark_pending(struct blk_mq_hw_ctx *hctx,
struct blk_mq_ctx *ctx)
{
const int bit = ctx->index_hw[hctx->type];
if (!sbitmap_test_bit(&hctx->ctx_map, bit))
sbitmap_set_bit(&hctx->ctx_map, bit);
}