redis客户端和服务器的相关代码
ae.c(Redis 的事件处理器实现(基于 Reactor 模式))
handle_events()
在Redis中,对于文件事件,相应的处理函数为Ae.c/aeProcessEvents
register_handler/remove_handler 对应Redis中,相关的处理函数也在Ae.c文件中方法
select对应不同平台的io复用的函数库
handle对应fd资源,文件描述符
event handler 事件处理器的接口
Concrete Event Handler 事件处理器的实际实现,绑定handle,比如acceptTcpHandler 这些
相关数据结构
aeFileEvent
1 | //文件事件结构 |
aeTimeEvent
1 | // 时间事件结构体 |
aeFiredEvent
1 | // 触发事件 |
aeEventLoop
1 | // 事件循环结构体 |
整体流程
1 | { |
aeCreateEventLoop //initServer调用初始化
1 | aeEventLoop *aeCreateEventLoop(int setsize) { |
代码说明
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
setsize = server.maxclients+CONFIG_FDSET_INCR = 10000 + CONFIG_MIN_RESERVED_FDS+96
CONFIG_MIN_RESERVED_FDS = 32 是Sentinel保留的用于额外的的操作,如listening sockets, log files 等
1 | cat /proc/sys/fs/file-max 查看系统级别的能够打开的文件句柄的数量 |
相关的案例事件……
9554:M 24 Mar 10:40:25.869 # Error registering fd event for the new client: Numerical result out of range (fd=10128)
系统内存不足以fork子进程时,AOF重写就无法启动,而此之前已打开的pipe也永远不会关闭,并在下一次尝试AOF重写时又创建新的pipe,从而造成fd泄漏。
具体代码
1 | int rewriteAppendOnlyFileBackground(void) { |
事件类型
1. 文件事件
在一般情况下,aeProcessEvents
都会先计算最近的时间事件发生所需要等待的时间,然后调用 aeApiPoll
方法在这段时间中等待事件的发生,在这段时间中如果发生了文件事件,就会优先处理文件事件,否则就会一直等待,直到最近的时间事件需要触发:
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
int invert = fe->mask & AE_BARRIER;
文件事件注册 aeCreateFileEvent
监听实现
1.initServer()
中完成了对事件循环的初始化操作
listenToPort
2.完成了监听套接字的初始化,initServer()
还需要为所有监听套接字注册读事件
acceptTcpHandler/acceptUnixHandler //回调监听到新连接请求时
acceptTcpHandler()->acceptCommonHandler
在与客户端成功建立连接之后,调用了acceptCommonHandler()
函数,其作用为:
1、 建立并保存服务器与客户端的连接信息,将信息保存到一个struct redisClient 中;
2、为客户端的cfd(已连接的socket)注册读事件,相应的回调函数为readQueryFromClient()
,其作用是从socket读取数据,执行相应操作,并回复给客户端(而acceptCommonHandler是为监听socket的读事件回调函数)
事件循环aeMain
1、根据时间事件链表计算需要等待的最短事件;
2、调用redis aeApiPoll()
进入监听轮询,如果没有事件发生就进入睡眠状态,其实就是进行I/O多路复用函数调用。
3、如果有事件发生,处理事件。
事件触发
aeApiPoll //多路复用函数阻塞监听
2.时间事件
在 Redis 中会发生两种时间事件:
- 一种是定时事件,每隔一段时间会执行一次;
- 另一种是非定时事件,只会在某个时间点执行一次;
aeProcessEvents ->processTimeEvents
总结
1时间事件分为定时时间和周期事件,周期事件serverCron 100毫秒
2时间事件和文件事件合并到一起处理,分开;
3.时间事件晚于预设
networking.c
readQueryFromClient processCommand
clientsCronHandleTimeout //keep_alive,ping,异常处理
三类方法
1.addReply API 往数据结构里面写东西
2.Client API 客户端创建释放
3.工具方法
1 | listMatchObjects |
server.c
1.Globals
2.Utility functions
3.Hash table
4.Cron
trackInstantaneousMetric bug
writeToClient server.stat_net_output_bytes += totwritten;
5.0
//缓冲区大于32K,且远大于查询缓冲区数据峰值
//询缓冲区大于32K,且客户端当前处于非活跃状态
int clientsCronResizeQueryBuffer(client *c)
if (querybuf_size > PROTO_MBULK_BIG_ARG &&
((querybuf_size/(c->querybuf_peak+1)) > 2 ||
idletime > 2))
{
/* Only resize the query buffer if it is actually wasting
* at least a few kbytes. */
if (sdsavail(c->querybuf) > 1024*4) {
c->querybuf = sdsRemoveFreeSpace(c->querybuf);
}
}
4.0
// 查询缓冲区大于32K,且远大于查询缓冲区数据峰值
// 查询缓冲区大于1K,且客户端当前处于非活跃状态
if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
(querybuf_size/(c->querybuf_peak+1)) > 2) ||
(querybuf_size > 1024 && idletime > 2))
{
/* Only resize the query buffer if it is actually wasting space. */
if (sdsavail(c->querybuf) > 1024) {
c->querybuf = sdsRemoveFreeSpace(c->querybuf);
}
}
//c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
初始化默认分配的大小就是32k
如果客户端数量比较多,且刚好比较空闲,需要一次处理很多客户端的输入缓冲区,阻塞或者崩溃
5.serverCron 相关的知识点
1).信息更新,统计
2). XXXXCron
3). 持久化
4). 集群
6.Server initialization
7.Redis OP Array API 持久化复制的时候命令数组
8.Commands lookup and execution
9.Shutdown
10.Commands
- linux
12.main