Redis源码分析--事件处理器
2022/2/6 19:12:36
本文主要是介绍Redis源码分析--事件处理器,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
事件处理器:
Redis采用Reactor模式作为自己的网络事件处理器,可以看作是单线程单Reactor模型。
一、主要结构体:
1、事件:
/* File event structure */ typedef struct aeFileEvent { /* 事件类型:可读or可写 */ int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent; /* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent; /* A fired event */ typedef struct aeFiredEvent { int fd; int mask; } aeFiredEvent;
-
Redis的事件分为文件事件与时间事件;
-
L5:文件事件处理函数,一个函数指针:
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
-
L15:时间事件处理函数,函数指针:
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
2、 事件循环:
typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ /* 用户层保存的events,不是poller系统调用的events,注意区别 */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ /* 时间事件列表 */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; } aeEventLoop;
- L4:Redis自己实现定时器;
- L8:注意区别;
- L11:时间事件被放置在一个无序列表中,由于当前版本(Redis3.0)下Redis服务器只有serverCron一个时间事件,所以这个列表实际上退化为指针,无序也不影响性能;
二、创建及初始化:
1、创建事件循环:
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; /* aeApiCreate:系统适配的poller api的初始化 * Linux下使用epoll,创建epoll api所需要的epollfd和epoll_event数组,并放入apidata中*/ if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
- L12:Redis没有使用timefd的api;
- L18:注意Redis会选择对于该系统适配最佳的poller;
2、创建文件事件:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { /* fd不能超过setsize */ if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } /* 取出空的aeFileEvent */ aeFileEvent *fe = &eventLoop->events[fd]; /* Linux下调用(之后默认Linux)epoll_ctl(2)添加事件 */ if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
- L12:注意对系统调用的封装;
3、创建时间事件:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { /* 分配id */ long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; /* 设置定时时间 */ aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); /* 处理函数 */ te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; /* 新事件插到链表头 */ te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; return id; }
三、开始事件循环:
- 在Redis main函数中调用aeMain:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
1、事件处理函数:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; /* 根据最近的时间事件设置poller调用的阻塞时间 */ if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; tvp->tv_sec --; } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } /* epoll_wait调用 */ numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ /* 事件处理 */ if (fe->mask & mask & AE_READABLE) { /* rfired 确保读/写事件只能执行其中一个 */ rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /* Check time events */ /* 先处理文件事件,再处理时间事件 */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
- L19、L75:以最近时间事件的到达时间作为poller调用阻塞时间,以及先处理文件事件,再处理时间事件的好处是:
- 避免长时间阻塞在文件事件的监听上;
- 处理完文件事件后最近时间事件刚好或即将触发,这时再处理效率更高;
- L76:时间事件分为周期性事件和定时事件,周期性事件在执行完后会被重新监听,目前Redis用的就是周期性事件;
参考:
本文基于redis 3.0
这篇关于Redis源码分析--事件处理器的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-08阿里云Redis项目实战入门教程
- 2024-11-08阿里云Redis资料:新手入门与初级使用指南
- 2024-11-08阿里云Redis教程:新手入门及实用指南
- 2024-11-07阿里云Redis学习入门:新手必读指南
- 2024-11-07阿里云Redis学习入门:从零开始的操作指南
- 2024-11-07阿里云Redis学习:初学者指南
- 2024-11-06阿里云Redis入门教程:轻松搭建与使用指南
- 2024-11-02Redis项目实战:新手入门教程
- 2024-10-22Redis入门教程:轻松掌握数据存储与操作
- 2024-10-22Redis缓存入门教程:快速掌握Redis缓存基础知识