代碼:dict.h/dict.c
對于 Redis 鍵值數據庫來說,Hash 表既是鍵值對中的一種值類型,同時,Redis 也使用一個全局 Hash 表來保存所有的鍵值對,從而既滿足應用存取 Hash 結構數據需求,又能提供快速查詢功能
Redis 采用了鏈式哈希處理沖突
- 對于 rehash 開銷,Redis 實現了漸進式 rehash 設計
- 好的設計
節省內存的小技巧:下面使用union關鍵字,如果數據為uint64_t/int64_t/double,直接保存,無需用val指向了,節省一個指針的開銷,代碼如下:
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
Redis 如何實現 rehash
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask; //用于將hash值映射到table位置的索引,大小為(size-1)
unsigned long used;
} dictht;
#dict字典的數據結構
typedef struct dict{
dictType *type; //直線dictType結構,dictType結構中包含自定義的函數,這些函數使得key和value能夠存儲任何類型的數據
void *privdata; //私有數據,保存著dictType結構中函數的 參數
dictht ht[2]; //兩張哈希表
long rehashidx; //rehash的標記,rehashidx=-1表示沒有進行rehash,rehash時每遷移一個桶就對rehashidx加一
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
}
什么時候觸發 rehash?
判斷條件
- 哈希表size為0,進行擴容,即進行初始化
- 使用的大小超過size并且(能進行resize或者負載因子超過dict_force_resize_ration)并且能進行該操作,就進行擴容,這里才是rehash的判斷
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) &&
dictTypeExpandAllowed(d))
{
return dictExpand(d, d->ht[0].used + 1);
}
return DICT_OK;
}
更新 dict_can_resize 變量(能否進行resize)
該變量會在沒有子進程時設置為1,即當前沒有 RDB 子進程,并且也沒有 AOF 子進程時就可以進行resize
/* This function is called once a background process of some kind terminates, * as we want to avoid resizing the hash tables when there is a child in order to play well with copy-on-write (otherwise when a resize happens lots of memory pages are copied). The goal of this function is to update the ability for dict.c to resize the hash tables accordingly to the fact we have a active fork child running. */
void updateDictResizePolicy(void) {
if (!hasActiveChildProcess())
dictEnableResize();
else
dictDisableResize();
}
void dictEnableResize(void) {
dict_can_resize = 1;
}
void dictDisableResize(void) {
dict_can_resize = 0;
}
為什么存在子進程時,不允許resize
大多數操作系統都采用寫時復制(copy-on-write)技術來優化子進程的使用效率,所以在子進程存在期間,服務器會提高執行擴展操作所需的負載因子,從而盡可能地避免在子進程存在期間進行哈希表擴展操作,這可以避免不必要的內存寫入操作, 最大限度地節約內存,簡單來說就是子進程存在時,使用了寫時復制,父進程如果寫了,需要拷貝一份,浪費了內存空間
三種情況調用判斷是否需要進行rehash
需要進行rehash時會分配ht[1]空間,以及設置rehashidx=0,表示正處于rehash,下次操作時會進行漸進式rehash
rehash 擴容擴多大?
對 Hash 表擴容的思路也很簡單,就是如果當前表的已用空間大小為 size,那么就將表擴容到 "下一個2的倍數大小",該值大于等于(已用空間大小 + 1)
_dictNextPower(d->ht[0].used + 1) 返回擴容后的大小
static unsigned long _dictNextPower(unsigned long size)
{
//哈希表的初始大小
unsigned long i = DICT_HT_INITIAL_SIZE;
//如果要擴容的大小已經超過最大值,則返回最大值加1
if (size >= LONG_MAX) return LONG_MAX + 1LU;
//擴容大小沒有超過最大值
while(1) {
//如果擴容大小大于等于最大值,就返回截至當前擴到的大小
if (i >= size)
return i;
//每一步擴容都在現有大小基礎上乘以2
i *= 2;
}
}
漸進式 rehash 如何實現?
核心函數:int dictRehash(dict *d, int n)
漸進式 rehash 的意思就是 Redis 并不會一次性把當前 Hash 表中的所有鍵,都拷貝到新位置,而是會分批拷貝,每次的鍵拷貝只拷貝 Hash 表中一個 bucket 中的哈希項。這樣一來,每次鍵拷貝的時長有限,對主線程的影響也就有限了
分批拷貝,每次拷貝多少
每次拷貝一個bucket,將ht[0]上某一個bucket(即一個桶上dictEntry鏈表)上的每一個鏈表移動到擴容后的ht[1]上(每次只移動一個鏈表,即漸進式rehash,使用 rehashidx 表示下一次要遷移鏈表所在桶的位置
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
} dict;
遷移期間增,刪,改查怎么操作的
新增數據
在漸進式 rehash 執行期間,新添加到字典的鍵值對一律會被保存到ht[1]里面,而ht[0]則不再進行任何添加操作。這一措施保證了ht[0]包含的鍵值對數量會只減不增,并隨著rehash操作的執行而最終變成空表
插入到新表,_dictKeyIndex代碼中可以看出
/* Returns the index of a free slot that can be populated with
* a hash entry for the given 'key'.
* If the key already exists, -1 is returned
* and the optional output parameter may be filled.
*
* Note that if we are in the process of rehashing the hash table, the
* index is always returned in the context of the second (new) hash table. */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
/* Expand the hash table if needed */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
if (!dictIsRehashing(d)) break; // 如果不處于rehash,則直接break,此時為ht[0],即舊表,否則為ht[1],即新表
}
return idx;
}
刪除操作
刪除時會訪問兩個表進行刪除操作,見函數 dictGenericDelete
查詢操作
程序會先在ht[0]里面進行查找,如果沒找到的話,就會繼續到ht[1]里面進行查找
dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
uint64_t h, idx, table;
if (dictSize(d) == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key))
return he;
he = he->next;
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}
更新操作
會訪問兩個表,如果處于rehash,就會返回新表的索引
/* Returns the index of a free slot that can be populated with
* a hash entry for the given 'key'.
* If the key already exists, -1 is returned
* and the optional output parameter may be filled.
*
* Note that if we are in the process of rehashing the hash table, the
* index is always returned in the context of the second (new) hash table. */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
/* Expand the hash table if needed */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
操作時rehash
在rehash進行期間,每次對字典執行添加、刪除、查找或者更新操作時,程序除了執行指定的操作以外,還會順帶將ht[0]哈希表在 rehashidx索引(table[rehashidx]桶上的鏈表)上的所有鍵值對rehash到ht[1]上,當rehash工作完成之后,將rehashidx屬性的值增一,表示下一次要遷移鏈表所在桶的位置
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
遷移時,如果遇到empty bucket怎么辦
漸進式 rehash 在執行時設置了一個變量 empty_visits,用來表示已經檢查過的空 bucket,當檢查了一定數量的空 bucket 后,這一輪的 rehash 就停止執行,轉而繼續處理外來請求,避免了對 Redis 性能的影響