redis4源码阅读学习-2

redis客户端和服务器的相关代码

ae.c(Redis 的事件处理器实现(基于 Reactor 模式))

handle_events()

在Redis中,对于文件事件,相应的处理函数为Ae.c/aeProcessEvents

register_handler/remove_handler 对应Redis中,相关的处理函数也在Ae.c文件中方法

select对应不同平台的io复用的函数库

handle对应fd资源,文件描述符

event handler 事件处理器的接口

Concrete Event Handler 事件处理器的实际实现,绑定handle,比如acceptTcpHandler 这些

相关数据结构

aeFileEvent
1
2
3
4
5
6
7
//文件事件结构
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc; //读文件事件回调
aeFileProc *wfileProc; //写文件事件回调
void *clientData; //指向 redisClient 的指针
} aeFileEvent;
aeTimeEvent
1
2
3
4
5
6
7
8
9
10
11
// 时间事件结构体
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;//定时回调函数指针
aeEventFinalizerProc *finalizerProc;// 定时事件清理函数,当删除定时事件的时候会被调用
void *clientData; //指向 redisClient 的指针
struct aeTimeEvent *prev;//时间事件表指针
struct aeTimeEvent *next;
} aeTimeEvent;
aeFiredEvent
1
2
3
4
5
// 触发事件
typedef struct aeFiredEvent {
int fd; //表示事件发生在哪个文件描述符上面
int mask;//被触发事件的类型
} aeFiredEvent;
aeEventLoop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 事件循环结构体
typedef struct aeEventLoop {
//记录最大的定时事件 id
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; //用于系统时间的矫正
aeFileEvent *events; //文件表
aeFiredEvent *fired; //触发事件表
aeTimeEvent *timeEventHead;//定时事件表
int stop; //事件循环停止标志
void *apidata; //统一接口,屏蔽不同平台io服用机制,
aeBeforeSleepProc *beforesleep; //进入poll之前需要处理的事情
aeBeforeSleepProc *aftersleep; //进入poll之后需要处理的事情
} aeEventLoop;

整体流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
...
aeCreateEventLoop /* 创建总的事件管理结构 */
aeCreateTimeEvent /* 创建定时器事件的管理结构 */
aeCreateFileEvent for inet socket /* 创建网络事件的管理结构 */
aeCreateFileEvent for unix socket /* 内部通信事件的管理结构 */
aeSetBeforeSleepProc /* 设置beforeSleep处理函数 */
aeSetAfterSleepProc /* 设置afterSleep处理函数 */
...
while (!stop) //aeMain
{
//aeProcessEvents
beforeSleep /* 调用beforeSleep处理函数 */
aeApiPoll /* 进入poll函数 */
afterSleep /* 调用afterSleep处理函数 */
process file events /* 处理file events */
process time events /* 处理time events */
}
...
}
aeCreateEventLoop //initServer调用初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
aeEventLoop *aeCreateEventLoop(int setsize) { 
aeEventLoop *eventLoop;
int i;

if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;

err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
代码说明

server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

setsize = server.maxclients+CONFIG_FDSET_INCR = 10000 + CONFIG_MIN_RESERVED_FDS+96

CONFIG_MIN_RESERVED_FDS = 32 是Sentinel保留的用于额外的的操作,如listening sockets, log files 等

1
2
cat /proc/sys/fs/file-max 查看系统级别的能够打开的文件句柄的数量
ulimit -a 1024 查看用户进程级的能够打开文件句柄的数量
相关的案例事件……

9554:M 24 Mar 10:40:25.869 # Error registering fd event for the new client: Numerical result out of range (fd=10128)

系统内存不足以fork子进程时,AOF重写就无法启动,而此之前已打开的pipe也永远不会关闭,并在下一次尝试AOF重写时又创建新的pipe,从而造成fd泄漏。

具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int rewriteAppendOnlyFileBackground(void) {
//……
if (aofCreatePipes() != C_OK) return C_ERR; // 创建pipe
//……
if ((childpid = fork()) == 0) {
/* Child */
//……
} else {
/* Parent */
//子进程启动出错处理
if (childpid == -1) {
serverLog(LL_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno)); // 最初内存不足正是这里打出的错误log
aofClosePipes();//fix bug issues/2857
return C_ERR;
}
//……
}
}
事件类型
1. 文件事件

在一般情况下,aeProcessEvents 都会先计算最近的时间事件发生所需要等待的时间,然后调用 aeApiPoll 方法在这段时间中等待事件的发生,在这段时间中如果发生了文件事件,就会优先处理文件事件,否则就会一直等待,直到最近的时间事件需要触发:

numevents = aeApiPoll(eventLoop, tvp);

/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
    eventLoop->aftersleep(eventLoop);

for (j = 0; j < numevents; j++) {
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int fired = 0; /* Number of events fired for current fd. */

    int invert = fe->mask & AE_BARRIER;

文件事件注册 aeCreateFileEvent

监听实现

1.initServer()中完成了对事件循环的初始化操作

listenToPort

2.完成了监听套接字的初始化,initServer()还需要为所有监听套接字注册读事件

acceptTcpHandler/acceptUnixHandler //回调监听到新连接请求时

acceptTcpHandler()->acceptCommonHandler

在与客户端成功建立连接之后,调用了acceptCommonHandler()函数,其作用为:
1、 建立并保存服务器与客户端的连接信息,将信息保存到一个struct redisClient 中;
2、为客户端的cfd(已连接的socket)注册读事件,相应的回调函数为readQueryFromClient(),其作用是从socket读取数据,执行相应操作,并回复给客户端(而acceptCommonHandler是为监听socket的读事件回调函数)

事件循环aeMain

1、根据时间事件链表计算需要等待的最短事件;
2、调用redis aeApiPoll() 进入监听轮询,如果没有事件发生就进入睡眠状态,其实就是进行I/O多路复用函数调用。
3、如果有事件发生,处理事件。

事件触发

aeApiPoll //多路复用函数阻塞监听

2.时间事件

在 Redis 中会发生两种时间事件:

  • 一种是定时事件,每隔一段时间会执行一次;
  • 另一种是非定时事件,只会在某个时间点执行一次;

aeProcessEvents ->processTimeEvents

总结

1时间事件分为定时时间和周期事件,周期事件serverCron 100毫秒

2时间事件和文件事件合并到一起处理,分开;

3.时间事件晚于预设

networking.c

readQueryFromClient processCommand

clientsCronHandleTimeout //keep_alive,ping,异常处理

三类方法

1.addReply API 往数据结构里面写东西

2.Client API 客户端创建释放

3.工具方法

1
2
3
4
5
listMatchObjects
disconnectSlaves
flushSlavesOutputBuffers
acceptTcpHandler
copyClientOutputBuffer

server.c

1.Globals

2.Utility functions

3.Hash table

4.Cron

trackInstantaneousMetric bug

writeToClient server.stat_net_output_bytes += totwritten;

5.0
//缓冲区大于32K,且远大于查询缓冲区数据峰值
//询缓冲区大于32K,且客户端当前处于非活跃状态
int clientsCronResizeQueryBuffer(client *c)

if (querybuf_size > PROTO_MBULK_BIG_ARG &&
     ((querybuf_size/(c->querybuf_peak+1)) > 2 ||
      idletime > 2))
{
    /* Only resize the query buffer if it is actually wasting
     * at least a few kbytes. */
    if (sdsavail(c->querybuf) > 1024*4) {
        c->querybuf = sdsRemoveFreeSpace(c->querybuf);
    }
}

4.0
// 查询缓冲区大于32K,且远大于查询缓冲区数据峰值
// 查询缓冲区大于1K,且客户端当前处于非活跃状态
if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
    (querybuf_size/(c->querybuf_peak+1)) > 2) ||
    (querybuf_size > 1024 && idletime > 2))
{
        /* Only resize the query buffer if it is actually wasting space. */
        if (sdsavail(c->querybuf) > 1024) {
            c->querybuf = sdsRemoveFreeSpace(c->querybuf);
        }
 }

//c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
初始化默认分配的大小就是32k
如果客户端数量比较多,且刚好比较空闲,需要一次处理很多客户端的输入缓冲区,阻塞或者崩溃

5.serverCron 相关的知识点

1).信息更新,统计

2). XXXXCron

3). 持久化

4). 集群

6.Server initialization

7.Redis OP Array API 持久化复制的时候命令数组

8.Commands lookup and execution

9.Shutdown

10.Commands

  1. linux

12.main