redis4源码阅读学习-1

zmalloc.c

tcmalloc

jemalloc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* 申请新的_n大小的内存,用新的gcc原子方法(__atomic_add_fetch)代替之前的 代替自定义的线程安全方法和不安全方法*/

void *zmalloc(size_t size) {
void *ptr = malloc(size+PREFIX_SIZE);

if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(zmalloc_size(ptr));
return ptr;
#else
*((size_t*)ptr) = size;
update_zmalloc_stat_alloc(size+PREFIX_SIZE);
return (char*)ptr+PREFIX_SIZE;
#endif
}

#define update_zmalloc_stat_alloc(__n) do { \
size_t _n = (__n); \
if (_n&(sizeof(long)-1)) _n += sizeof(long)-(_n&(sizeof(long)-1)); \
atomicIncr(used_memory,__n); \
} while(0)

#define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED)

详情可以参考GCC官方文档链接

  1. 分配内存大小为size+PREFIX_SIZE的大小,zmalloc实际分配比需要多一些的内存,这一部分用于存储size信息。
  2. zmalloc_oom_handler用来处理内存申请异常
  3. (_n&(sizeof(long)-1)) _n += sizeof(long)-(_n&(sizeof(long)-1)); = if(_n&7) _n += 8 - (_n&7); 32位4字节对齐,64位下8字节对齐

数据结构:objcet

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct redisObject {
unsigned type:4; // 类型
unsigned encoding:4;//编码方式
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits decreas time).
16 bits 8 bits
+------------------+--------+
+ Last access time | LOG_C |
+------------------+--------+*/
int refcount; //通过引用计数的方式来管理内存,c++11,oc
void *ptr; //指向具体实现的具体区域和数据结构
} robj;
1
2
3
4
5
6
7
8
9
10
11
type字段表示数据类型,有以下几种定义:
OBJ_STRING // 字符串
OBJ_LIST // 链表
OBJ_SET // 集合
OBJ_ZSET // 有序集合
OBJ_HASH // HASH结构(注意,此处不同于传统意义上的哈希表(如stl::hash_map),这里的hash仅有字段散列的语义)
REDIS_VMPOINTER // VM指针,已经废弃,翻看了一下VM相关实现和历史
/*
Redis采用VM机制是希望把存储做成如同Oracle一样的方式,具备自动淘汰冷热数据功能,但是,它采用了RDB文件和VM机制来分别实现二进制存储、冷热淘汰的功能,期望是既节约内存又达到完美性能的地步
redis之所以高性能最本质的原因是数据都cache到内存里。
*/
1
2
3
4
5
6
7
8
9
10
11
//encoding则对应了 Redis 中的10种编码方式
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
数据类型 一般情况 少量情况 特殊情况
STRING RAW EMBSTR INT
LIST LINKEDLIST ZIPLIST
HASH HT ZIPLIST

refcount 引用计数的方式管理内存,自动化的管理除了引用技术,就是垃圾回收,相关可以参考云风写的引用计数与垃圾收集之比较

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//objcet.c基本上就是提供了一堆对象操作,初始化等相关的api接口
robj *createObject(int type, void *ptr){ /* 最初的创建robj对象方法,后面的创建方法与此类似 */
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = OBJ_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;

/* Set the LRU to the current lruclock (minutes resolution), or
* alternatively the LFU counter. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
//lru lfu算法
} else {
o->lru = LRU_CLOCK(); //都记录下该对象的LRU时钟
}
return o;
}
robj *createStringObject(char *ptr, size_t len)
robj *createStringObjectFromLongLong(long long value)
robj *createStringObjectFromLongDouble(long double value)
robj *dupStringObject(robj *o)
robj *createListObject(void)
robj *createZiplistObject(void)
robj *createSetObject(void)
robj *createIntsetObject(void)
robj *createHashObject(void)
robj *createZsetObject(void)
robj *createZsetZiplistObject(void)
void decrRefCount(robj *o) {
if (o->refcount == 1) {
//正常释放
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
case OBJ_SET: freeSetObject(o); break;
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o);
} else {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");//异常
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}
}

Redis中采用两种算法进行内存达到上限时内存回收,引用计数算法以及LRU/LFU算法

  1. 引用计数 refcount
  2. lru (Least Recently Used)//最长时间未被使用
    Image text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) = 2^24 - 1 单位毫秒 /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 //代表LRU算法的精度,即一个LRU的单位是多长时间,1秒
//LRU时钟的时间粒度高于serverCron刷新的时间粒度,那么就主动获取最新的时间,否则使用server缓存的时间
unsigned int LRU_CLOCK(void) {
unsigned int lruclock;
if (1000/server.hz <= LRU_CLOCK_RESOLUTION) {
atomicGet(server.lruclock,lruclock);
} else {
lruclock = getLRUClock();
}
return lruclock;
//如果定时器执行的频率高于LRU算法的精度时,可以直接将server.lruclock直接在对象创建时赋值过去,避免了函数调用的内存开销以及时间开销


}
unsigned int getLRUClock(void)
{
return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
}

//定时循环执行
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
{
.........
unsigned long lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
.........
}

//计算数据的空闲时间
evictionPoolPopulate->estimateObjectIdleTime->LRU_CLOCK
objectCommand(idletime) -> estimateObjectIdleTime

3.LFU(Least Frequently Used) 最不常用页面置换算法,基于LFU的热点key发现机制 4.0
counter:基于概率的对数计数器

1
2
3
4
5
6
7
8
9
10
11
  uint8_t LFULogIncr(uint8_t counter) {
if (counter == 255) return 255;
double r = (double)rand()/RAND_MAX; //RAND_MAX c语言宏,rand最大上限值
double baseval = counter - LFU_INIT_VAL; //LFU_INIT_VAL = 5
if (baseval < 0) baseval = 0;
double p = 1.0/(baseval*server.lfu_log_factor+1);//默认server.lfu_log_factor=10,概率因子
if (r < p) counter++; //baseval增加后,p<r的概率非线形增长
return counter;
}

robj *lookupKey(redisDb *db, robj *key, int flags)//从db->dict字典中获取key的val,并更新val的lru/lfu

Image text

counter增长函数LFULogIncr中我们可以看到,随着key的访问量增长,counter最终都会收敛为255,一百万

动态配置

1
2
lfu-log-factor 10 概率因子
lfu-decay-time 1 每分钟衰减1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//更新对象的访问时间和计数器值,减少,每分钟
unsigned long LFUDecrAndReturn(robj *o) {
unsigned long ldt = o->lru >> 8; //获取lru时间 分钟
unsigned long counter = o->lru & 255; //获取使用频次
//时间超过了lfu_decay_time的话,那么将使用频次减少
if (LFUTimeElapsed(ldt) >= server.lfu_decay_time && counter) {
if (counter > LFU_INIT_VAL*2) {
counter /= 2;
if (counter < LFU_INIT_VAL*2) counter = LFU_INIT_VAL*2;
} else {
counter--;
}
//更新时间和频次
o->lru = (LFUGetTimeInMinutes()<<8) | counter;
}
return counter;
}
//上次访问该数据对象已经过去了多少分钟
unsigned long LFUTimeElapsed(unsigned long ldt) {
unsigned long now = LFUGetTimeInMinutes();
if (now >= ldt) return now-ldt;
return 65535-ldt+now;
}
//获取当前的分钟计数
unsigned long LFUGetTimeInMinutes(void) {
return (server.unixtime/60) & 65535;
}

redis内存达到上限后的释放内存原则

  1. 释放内存的标准是释放之后使用内存的大小小于maxmemory的大小
  2. 每次释放的数量,redis.conf配置
  3. 超过限制后redis策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
redis.conf配置

volatile-lru -> 从设置了过期时间的键中选择空转时间最长的键值对清除掉
volatile-lfu -> 从设置了过期时间的键中选择某段时间之内使用频次最小的键值对清除掉;
volatile-random -> 从设置了过期时间的键中,随机选择键进行清除.

allkeys-lru -> 从所有的键中选择空转时间最长的键值对清除
allkeys-lfu -> 从所有的键中选择某段时间之内使用频次最少的键值对清除.
allkeys-random -> 从所有的key中随机删除

volatile-ttl -> 从已设置过期时间的数据集合中挑选即将过期的数据淘汰
noeviction ->不做任何的清理工作,禁止写入,报错


# The default of 5 produces good enough results. 10 Approximates very closely
# true LRU but costs more CPU. 3 is faster but not very accurate.
#
# maxmemory-samples 5 lru,lfu每次采样数,5最佳

如何筛选释放的key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#define EVPOOL_SIZE 16 //存储待释放的键相关信息的存储空间能够容纳的键的个数
#define EVPOOL_CACHED_SDS_SIZE 255 //cached中的最大的键的长度 512mb
struct evictionPoolEntry {
unsigned long long idle; /* Object idle time (inverse frequency for LFU) */
sds key; /* Key name. */
sds cached; /* Cached SDS object for key name. */
int dbid; /* Key DB number. */
};
static struct evictionPoolEntry *EvictionPoolLRU; //存储待释放的键所在内存的指针

//初始化EvictionPoolLRU
void evictionPoolAlloc(void) {
struct evictionPoolEntry *ep;
int j;

ep = zmalloc(sizeof(*ep)*EVPOOL_SIZE);
for (j = 0; j < EVPOOL_SIZE; j++) {
ep[j].idle = 0;
ep[j].key = NULL;
ep[j].cached = sdsnewlen(NULL,EVPOOL_CACHED_SDS_SIZE);
ep[j].dbid = 0;
}
EvictionPoolLRU = ep;
}

根据配置策略的LRU、LFU以及volatile-ttl策略转换成的idle值将待删除的键相关的信息按照idle值从小到大的顺序从前向后存储在evctionPoolLRU中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int j, k, count;
dictEntry *samples[server.maxmemory_samples];

//从sampledict中最多选取server.maxmemory_samples个指向sampledict的指针存放在放入到samples中,
//server.maxmemory_samples配置5
count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
for (j = 0; j < count; j++) {
unsigned long long idle;
sds key;
robj *o;
dictEntry *de;

de = samples[j];
key = dictGetKey(de);

/* If the dictionary we are sampling from is not the main
* dictionary (but the expires one) we need to lookup the key
* again in the key dictionary to obtain the value object. */

//如果maxmemory_policy为MAXMEMORY_VOLATILE_TTL,证明需要将距离过期时间最近的键清除掉;那么直接从redisDB->expire对应的字典中获取键对应的过期时间值即可,此时不需要改变已经存在的de。
//如果maxmemory_policy != MAXMEMORY_VOLATILE_TTL且不是从redisDB->dict中获取键的值(使用redisDB->expire中获取的键),
那么需要从redisDB->dict中获取键对应的值的对象,才能够获取lru字段
if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
if (sampledict != keydict) de = dictFind(keydict, key);
o = dictGetVal(de);
}

//lru,lfu,ttl 统一用idle来衡量
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
idle = estimateObjectIdleTime(o);
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
/* When we use an LRU policy, we sort the keys by idle time
* so that we expire keys starting from greater idle time.
* However when the policy is an LFU one, we have a frequency
* estimation, and we want to evict keys with lower frequency
* first. So inside the pool we put objects using the inverted
* frequency subtracting the actual frequency to the maximum
* frequency of 255. */
idle = 255-LFUDecrAndReturn(o); //LFUDecrAndReturn返回 counter
} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
/* In this case the sooner the expire the better. */
idle = ULLONG_MAX - (long)dictGetVal(de);
} else {
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}

/* Insert the element inside the pool.
* First, find the first empty bucket or the first populated
* bucket that has an idle time smaller than our idle time. */

//EvictionPoolLRU中的元素全部按照元素对应的idle值按照从小到大的顺序
//idle越大,位置越靠后,当然也就最先被清除掉。
k = 0;
//寻找元素的插入位置
//pool中所有的元素都不为空,且其中元素的最小的idle大于等于idle
//EVPOOL_SIZE 16
while (k < EVPOOL_SIZE &&
pool[k].key &&
pool[k].idle < idle) k++;
//满了
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
/* Can't insert if the element is < the worst element we have
* and there are no empty buckets. */
continue;
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
/* Inserting into empty position. No setup needed before insert. */
} else {
/* Inserting in the middle. Now k points to the first element
* greater than the element to insert. */
if (pool[EVPOOL_SIZE-1].key == NULL) {
/* Free space on the right? Insert at k shifting
* all the elements from k to end to the right. */

/* Save SDS before overwriting. */
sds cached = pool[EVPOOL_SIZE-1].cached;
memmove(pool+k+1,pool+k,
sizeof(pool[0])*(EVPOOL_SIZE-k-1));
pool[k].cached = cached;
} else {
/* No free space on right? Insert at k-1 */
k--;
/* Shift all elements on the left of k (included) to the
* left, so we discard the element with smaller idle time. */
sds cached = pool[0].cached; /* Save SDS before overwriting. */
if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
memmove(pool,pool+1,sizeof(pool[0])*k);
pool[k].cached = cached;
}
}

/* Try to reuse the cached SDS string allocated in the pool entry,
* because allocating and deallocating this object is costly
* (according to the profiler, not my fantasy. Remember:
* premature optimizbla bla bla bla. */
int klen = sdslen(key);
if (klen > EVPOOL_CACHED_SDS_SIZE) {
pool[k].key = sdsdup(key);
} else {
memcpy(pool[k].cached,key,klen+1);
sdssetlen(pool[k].cached,klen);
pool[k].key = pool[k].cached;
}
pool[k].idle = idle;
pool[k].dbid = dbid;
}
}

统计内存是否超过server.maxmemory,去掉AOF,slaves缓冲区内存占用

1
2
3
4
5
6
7
8
/* Remove the size of slaves output buffers and AOF buffer from the
* count of used memory. */
mem_used = mem_reported;
size_t overhead = freeMemoryGetNotCountedMemory();
mem_used = (mem_used > overhead) ? mem_used-overhead : 0;

/* Check if we are still over the memory limit. */
if (mem_used <= server.maxmemory) return C_OK;

内存管理策略的入口函数—freeMemoryIfNeeded()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
int freeMemoryIfNeeded(void) {
size_t mem_reported, mem_used, mem_tofree, mem_freed;
mstime_t latency, eviction_latency;
long long delta;
int slaves = listLength(server.slaves);

/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. */
//在阻塞状态,并且阻塞时间未到,应该是和阻塞状态有关
if (clientsArePaused()) return C_OK;

/* Check if we are over the memory usage limit. If we are not, no need
* to subtract the slaves output buffers. We can just return ASAP. */
// 如果没有超过限制,那么没必要进行内存的清理工作
mem_reported = zmalloc_used_memory();
if (mem_reported <= server.maxmemory) return C_OK;

/* Remove the size of slaves output buffers and AOF buffer from the
* count of used memory. */
// 排查aof,slaves
mem_used = mem_reported;
size_t overhead = freeMemoryGetNotCountedMemory();
mem_used = (mem_used > overhead) ? mem_used-overhead : 0;

/* Check if we are still over the memory limit. */
//在进行比较
if (mem_used <= server.maxmemory) return C_OK;

/* Compute how much memory we need to free. */
mem_tofree = mem_used - server.maxmemory;
mem_freed = 0;
//默认情况,不搞了
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
goto cant_free; /* We need to free memory, but policy forbids. */

latencyStartMonitor(latency);
while (mem_freed < mem_tofree) {
int j, k, i, keys_freed = 0;
static int next_db = 0;
sds bestkey = NULL;
int bestdbid;
redisDb *db;
dict *dict;
dictEntry *de;
//如果策略是TTL,设置了过期时间的LRU,LRU
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
struct evictionPoolEntry *pool = EvictionPoolLRU;

while(bestkey == NULL) {
unsigned long total_keys = 0, keys;

/* We don't want to make local-db choices when expiring keys,
* so to start populate the eviction pool sampling keys from
* every DB. */

//针对server中的所有db中选出最佳的可以被淘汰的键进行淘汰
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, dict, db->dict, pool);
total_keys += keys;
}
}
if (!total_keys) break; /* No keys to evict. */

/* Go backward from best to worst element to evict. */
//从EvctionPoolLRU中的最后一个元素(idle值最大)开始释放内存
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
bestdbid = pool[k].dbid;
//如果是allkeys开头的删除策略,从dict字典中获取对象; 否则从expire字典中获取对象
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
} else {
de = dictFind(server.db[pool[k].dbid].expires,
pool[k].key);
}

/* Remove the entry from the pool. */
//获取了bestid和de,释放EvctionPoolLRU里面key对应的内存
if (pool[k].key != pool[k].cached)
sdsfree(pool[k].key);
pool[k].key = NULL;
pool[k].idle = 0;

/* If the key exists, is our pick. Otherwise it is
* a ghost and we need to try the next element. */
if (de) {
bestkey = dictGetKey(de);
break;
} else {
/* Ghost... Iterate again. */
}
}
}
}

/* volatile-random and allkeys-random policy */
//如果使用random相关的空间管理策略,则直接随机从dict或者expire中获取一个键进行删除
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
{
/* When evicting a random key, we try to evict a key for
* each DB, so we use the static 'next_db' variable to
* incrementally visit all DBs. */
//每次选不同db的一个元素
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db+j;
dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->dict : db->expires;
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);
bestdbid = j;
break;
}
}
}

/* Finally remove the selected key. */
//最终删除选择的bestkey
if (bestkey) {
db = server.db+bestdbid;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
latencyRemoveNestedEvent(latency,eviction_latency);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
server.stat_evictedkeys++;
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
decrRefCount(keyobj);
keys_freed++;

/* When the memory to free starts to be big enough, we may
* start spending so much time here that is impossible to
* deliver data to the slaves fast enough, so we force the
* transmission here inside the loop. */
if (slaves) flushSlavesOutputBuffers();

/* Normally our stop condition is the ability to release
* a fixed, pre-computed amount of memory. However when we
* are deleting objects in another thread, it's better to
* check, from time to time, if we already reached our target
* memory, since the "mem_freed" amount is computed only
* across the dbAsyncDelete() call, while the thread can
* release the memory all the time. */
if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
overhead = freeMemoryGetNotCountedMemory();
mem_used = zmalloc_used_memory();
mem_used = (mem_used > overhead) ? mem_used-overhead : 0;
if (mem_used <= server.maxmemory) {
mem_freed = mem_tofree;
}
}
}

if (!keys_freed) {
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
goto cant_free; /* nothing to free... */
}
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
return C_OK;

cant_free:
/* We are here if we are not able to reclaim memory. There is only one
* last thing we can try: check if the lazyfree thread has jobs in queue
* and wait... */
while(bioPendingJobsOfType(BIO_LAZY_FREE)) {
if (((mem_reported - zmalloc_used_memory()) + mem_freed) >= mem_tofree)
break;
usleep(1000);
}
return C_ERR;
}

后续优化方向

  1. 淘汰算法的优化,更有效的需要淘汰的key
  2. 算法对性能,内存的影响

过期key删除策略

惰性删除:放任键过期不管,但是每次从键空间中获取键时,都检查取得的键是否过期,如果过期的话,就删除该键;如果没有过期,那就返回该键;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
int expireIfNeeded(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key); //获取过期时间
mstime_t now;

if (when < 0) return 0; /* No expire for this key */

/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;

/* If we are in the context of a Lua script, we claim that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
//执行lua脚本,先阻塞
now = server.lua_caller ? server.lua_time_start : mstime();

/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
//主从同步,从不删除,只删除主节点,从节点同步
if (server.masterhost != NULL) return now > when;

/* Return when this key has not expired */
//没过期
if (now <= when) return 0;

/* Delete the key */
//过期数量++
server.stat_expiredkeys++;
//失效记录,aof记录等
propagateExpire(db,key,server.lazyfree_lazy_expire); //lazy机制
//发送删除通知从数据库
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);

//删除方式选择,异步或者同步
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}


#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
//删除待清理key的过期时间
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
//返回数据库字典中包含key的条目指针,并从数据库字典中摘除该条目
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);//评估代价

/* If releasing the object is too much work, let's put it into the
* lazy free list. */
//判断大小,是否需要后台删除,大于64的再丢给后台线程bio后台处理
if (free_effort > LAZYFREE_THRESHOLD) {

atomicIncr(lazyfree_objects,1);//待处理对象增加1

bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);//丢给bio后台子线程弄
dictSetVal(db->dict,de,NULL);
}
}

/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
//集群模式下
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}

redis.conf
lazyfree-lazy-eviction no //内存满 freeMemoryIfNeeded
lazyfree-lazy-expire no //过期 expireIfNeeded
lazyfree-lazy-server-del no //del命令 dbDelete
slave-lazy-flush no //slave进行全量数据同步,slave在加载master的RDB文件前,会运行flushall来清理自己的数据场景 readSyncBulkPayload

lazyfree机制 删除大数据的键值,导致redis阻塞,4.0引入的,将删除键或数据库的操作放在后台线程里执行, 从而尽可能地避免服务器阻塞。

redis 思考

  1. redis4.0引入的命令 unlink(unlinkCommand)
  2. FLUSHALL/FLUSHDB ASYNC 线上禁用
  3. Lazy 应该全开

定期删除:每隔一段时间,程序就对数据库进行一次检查,删除里面的过期键。至于删除多少过期键,以及要检查多少个数据库,则由算法决定。 如何确定频率时长

  1. databasesCron -> activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);分多次
    遍历各个数据库,从expires字典中随机检查一部分过期键的过期时间,删除其中的过期键
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
//server.hz 10 默认值,conf,serverCron任务的执行周期执行周期,空闲每秒执行10次,清理时间最大为25ms

#define ACTIVE_EXPIRE_CYCLE_FAST 1 //1000微秒

if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */

if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;

while (num--) {
dictEntry *de;
long long ttl;
//随机取,过期删除
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedIntegerVal(de)-now;
if (activeExpireCycleTryExpire(db,de,now)) expired++;

while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4//删除5个就遍历下一个db
  1. 事件处理函数aeMain beforeSleep activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
  2. 访问key 判断lookupKeyRead,过期删除

持久化rdb/aof

rdbrdb程序就是将内存中的数据库的数据结构以rdb文件形式保存到磁盘;在redis重启时,rdb程序就通过载入rdb文件来还原重启前的数据库状态

  1. RDB文件创建两个命令可以生成RDB文件:SAVE和BGSAVE。

Image text
SAVE命令会阻塞redis的服务器进程,直到RDB文件创建完毕为止。在阻塞过程中,server不能处理任何请求 saveCommand->rdbSave->rdbSaveRio->rdbSaveKeyValuePair
REDIS // RDB协议约束的固定字符串
0006 // redis的版本
FE 00 // 表示当前接下来的key都是db=0中的key;
FC 1506327609 // 表示key失效时间点为1506327609
0 // 表示key的属性是string类型
username // key
afei // value
FF // 表示遍历完成
y73e9iq1 // checksum , CRC64

Image text
BGSAVE则会fork出一个子进程,然后子进程负责RDB文件的创建,父进程继续处理请求。
fork()系统调用我们可以创建一个和当前进程一样的新进程,继承了父进程的整个地址空间,其中包括了进程上下文,堆栈地址,内存信息进程控制块
bgsaveCommand->rdbSaveBackground-> rdbSave
fork危害,多进程的多线程程序
自动执行bgsave,配置redis.conf

save “”

save 900 1
save 300 10
save 60 10000
只要这三个条件中的任意一条符合,那么服务器就会执行BGSAVE。配置保存在 redisServer saveparam中,serverCron中判断执行
rdb加载,loadDataFromDisk -> rdbLoad(),阻塞

  1. aof记录服务器所处理的所有的除查询意外的操作,以文本的方式记录
    原理是:处理文件事件执行写命令,使得命令被追加到aof_buf中,,然后在处理时间事件执行serverCron函数会调用flushAppendOnlyFile函数进行文件的写入和同步。
    appendonly yes
    appendfilename “appendonly.aof”
    appendfsync everysec
    always:将aof_buf中的所有内容写入并同步到aof文件。
    everysec:将aof_buf中的所有内容写入到aof文件,如果上次同步的时间距离现在超过1s,那么对aof文件进行同步,同步操作由一个线程专门负责执行
    no:将aof_buf中的所有内容写入到aof文件,但不对aof文件同步,同步有操作系统执行。

    aof流程主要函数:
    //函数1:将command写入aof_buff propagate-> feedAppendOnlyFile,数据修改更新到AOF缓存中
    //函数2:启动子进程,后台用于整理aof的数据 serverCron-> rewriteAppendOnlyFileBackground
    //函数3:刷一遍server.db[16],依次将对象写入磁盘临时文件tmpfile,rewriteAppendOnlyFile ->rewriteAppendOnlyFileRio
    //函数4:将aof_buff内容持久化flushAppendOnlyFile
    //函数5:将backgroundRewriteDoneHandler 更新子进程同步期间产生的修改

    Image text

    aof_rewrite 解决AOF文件体积膨胀的问题
    触发条件

    1. 被动: 当AOF文件尺寸超过conf:auto-aof-rewrite-min-size 64mb & 达到一定增长比,
      指当前aof文件比上次重写的增长比例大小 auto-aof-rewrite-percentage 100;
    2. 主动: 调用BGREWRITEAOF命令;
      Image text
      对应bgrewriteaofCommand逻辑
  2. 混合持久化 conf aof-use-rdb-preamble no, aof rewrite 的时候就直接把 rdb 的内容写到 aof 文件开头

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
            +------------------------+
| |
| |
| RDB |
| FORMAT |
| |
| |
| |
+------------------------+
| |
| AOF |
| FORMAT |
+------------------------+


if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}

rdb优缺点:二进制文件,速度快,存在数据丢失,性能消耗低
aof优缺点:数据丢失小,性能消耗大,4.0管道优化相对好些
混合持久化:加载速度快,避免丢失数据

事故案例:陌陌争霸

32 个数据仓库部署到 4 台物理机上即可,每台机器上启动 8 个 Redis 进程。使用 64G 内存的机器,后来增加到了 96G 内存。实测每个 Redis 服务会占到 4~5 G 内存,四台配置相同的从主机进行主从备份。

事故1:有一台数据服务主机无法被游戏服务器访问到,影响了部分用户登陆,运维维护时发现,一台从机的内存耗尽,导致了从机的数据库服务重启。在从机重新对主机连接,8 个 Redis 同时发送 SYNC 的冲击下,把主机击毁了。
问题1:从主机为什么出现内存不足
原因:redis 服务同时做 BGSAVE,概率性 ,而导致 fork 多个进程需要消耗太多内存
问题2:重新进行 SYNC 操作会导致主机过载
原因:重启后,8 个 slave redis 同时开启同步,等于瞬间在主机上 fork 出 8 个 redis 进程
解决方案:取消主从,脚本控制bgsave

事故2:内存内存不足
原因:定期备份redis数据库文件,拷贝数据库文件时,系统使用大量的内存做为cache,释放不及时,脚本控制bgsave,内存不住,使用交换分区,脚本保证30分钟内必须执行一次bgsave

思考:1.备份方案的选择。2.使用leveldb做redis的持久化存储 3.redis内存上限设置一半

hyperloglog.c

在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。HyperLogLog只会根据输入元素来计算基数,而不会储存输入元素本身

##看到第一句话就知道这个已经完全没有了解的必要了,逆天的算法,用于比如ip访问数的基数统计。

+ 动态字符串 sds.h和sds.c

+ 双端链表 adlist.c和adlist.h

+ 字典 dict.h和dict.c

+ 跳跃表 server.h文件里面关于zskiplist结构和zskiplistNode结构,以及t_zset.c中所有zsl开头的函数,比如 zslCreate、zslInsert、zslDeleteNode等等。

+intset.c和intset.h

  1. intset 是一种有序的整型集合,共有 INTSET_ENC_INT16INTSET_ENC_INT32INTSET_ENC_INT64 编码类型。
  2. intset length 属性记了录集合的大小。
  3. intset 内部采用二分查找算法 定位元素。
  4. intset 只会升级,不会降级。当将一个高位元素添加到低编码集合时,此时,需要对集合进行升级。

这几个百度一下基本上能理解实现原理,唯一的用途估计也就是面试的时候拿来测试为难或者测试一下面试者的底线。

ziplist.c和ziplist.h

  1. 实在是不想看实现了,估计也看不懂,看名字就知道是一种极节内存内存的链表,redis是内存性数据库,肯定还是要能省则省,代码肯定是有的,肯定是性能稍微差一点,但是应该差的不多,大神撸出来的东西是不用有任何怀疑的。