版本比较
标识
- 该行被添加。
- 该行被删除。
- 格式已经改变。
继承自协程调度器,封装了epoll,支持为socket fd注册读写事件回调函数。本章对应源码:https://github.com/zhongluqiang/sylar-from-scratch/releases/tag/v1.6.0 。。
IO协程调度还解决了调度器在idle状态下忙等待导致CPU占用率高的问题。IO协程调度器使用一对管道fd来tickle调度协程,当调度器空闲时,idle协程通过epoll_wait阻塞在管道的读描述符上,等管道的可读事件。添加新任务时,tickle方法写管道,idle协程检测到管道可读后退出,调度器执行调度。
学习本章内容之前必须对sylar的协程模块和协程调度模块非常熟悉,参考链接 协程模块 协程调度模块,此外,对epoll接口也要非常熟悉,参考 man 7 epoll
。
IO协程调度概述
IO协程调度可以看成是增强版的协程调度。
在前面的协程调度模块中,调度器对协程的调度是无条件执行的,在调度器已经启动调度的情况下,任务一旦添加成功,就会排队等待调度器执行。调度器不支持删除调度任务,并且调度器在正常退出之前一定会执行完全部的调度任务,所以在某种程度上可以认为,把一个协程添加到调度器的任务队列,就相当于调用了协程的resume方法。
IO协程调度支持协程调度的全部功能,因为IO协程调度器是直接继承协程调度器实现的。除了协程调度,IO协程调度还增加了IO事件调度的功能,这个功能是针对描述符(一般是套接字描述符)的。IO协程调度支持为描述符注册可读和可写事件的回调函数,当描述符可读或可写时,执行对应的回调函数。(这里可以直接把回调函数等效成协程,所以这个功能被称为IO协程调度)
IO事件调度功能对服务器开发至关重要,因为服务器通常需要处理大量来自客户端的socket fd,使用IO事件调度可以将开发者从判断socket fd是否可读或可写的工作中解放出来,使得程序员只需要关心socket fd的IO操作。后续的socket api hook模块也依赖IO协程调度。
很多的库都可以实现类似的工作,比如libevent,libuv,libev等,这些库被称为异步事件库或异步IO库,从网上可以很容易地找到大把的资料介绍这类库。有的库不仅可以处理socket fd事件,还可以处理定时器事件和信号事件。
这些事件库的实现原理基本类似,都是先将套接字设置成非阻塞状态,然后将套接字与回调函数绑定,接下来进入一个基于IO多路复用的事件循环,等待事件发生,然后调用对应的回调函数。这里可以参考一个基于epoll实现的简单事件库:3.2 epoll的反应堆模式实现 · libevent深入浅出 · 看云,sylar的IO调度和这种写法类似。
sylar IO协程调度模块设计
sylar的IO协程调度模块基于epoll实现,只支持Linux平台。对每个fd,sylar支持两类事件,一类是可读事件,对应EPOLLIN
,一类是可写事件,对应EPOLLOUT
,sylar的事件枚举值直接继承自epoll。
当然epoll本身除了支持了EPOLLIN和EPOLLOUT两类事件外,还支持其他事件,比如EPOLLRDHUP, EPOLLERR, EPOLLHUP等,对于这些事件,sylar的做法是将其进行归类,分别对应到EPOLLIN和EPOLLOUT中,也就是所有的事件都可以表示为可读或可写事件,甚至有的事件还可以同时表示可读及可写事件,比如EPOLLERR事件发生时,fd将同时触发可读和可写事件。
对于IO协程调度来说,每次调度都包含一个三元组信息,分别是描述符-事件类型(可读或可写)-回调函数,调度器记录全部需要调度的三元组信息,其中描述符和事件类型用于epoll_wait,回调函数用于协程调度。这个三元组信息在源码上通过FdContext
结构体来存储,在执行epoll_wait时通过epoll_event的私有数据指针data.ptr来保存FdContext结构体信息。
IO协程调度器在idle时会epoll_wait所有注册的fd,如果有fd满足条件,epoll_wait返回,从私有数据中拿到fd的上下文信息,并且执行其中的回调函数。(实际是idle协程只负责收集所有已触发的fd的回调函数并将其加入调度器的任务队列,真正的执行时机是idle协程退出后,调度器在下一轮调度时执行)
与协程调度器不一样的是,IO协程调度器支持取消事件。取消事件表示不关心某个fd的某个事件了,如果某个fd的可读或可写事件都被取消了,那这个fd会从调度器的epoll_wait中删除。
sylar IO协程调度器实现
sylar的IO协程调度器对应IOManager,这个类直接继承自Scheduler:
代码块 |
---|
class IOManager : public Scheduler { public: typedef std::shared_ptr<IOManager> ptr; typedef RWMutex RWMutexType; ... } |
首先是读写事件的定义,这里直接继承epoll的枚举值,如下:
代码块 |
---|
/** * @brief IO事件,继承自epoll对事件的定义 * @details 这里只关心socket fd的读和写事件,其他epoll事件会归类到这两类事件中 */ enum Event { /// 无事件 NONE = 0x0, /// 读事件(EPOLLIN) READ = 0x1, /// 写事件(EPOLLOUT) WRITE = 0x4, }; |
接下来是对描述符-事件类型-回调函数三元组的定义,这个三元组也称为fd上下文,使用结构体FdContext来表示。由于fd有可读和可写两种事件,每种事件的回调函数也可以不一样,所以每个fd都需要保存两个事件类型-回调函数组合。FdContext结构体定义如下:
代码块 |
---|
/** * @brief socket fd上下文类 * @details 每个socket fd都对应一个FdContext,包括fd的值,fd上的事件,以及fd的读写事件上下文 */ struct FdContext { typedef Mutex MutexType; /** * @brief 事件上下文类 * @details fd的每个事件都有一个事件上下文,保存这个事件的回调函数以及执行回调函数的调度器 * sylar对fd事件做了简化,只预留了读事件和写事件,所有的事件都被归类到这两类事件中 */ struct EventContext { /// 执行事件回调的调度器 Scheduler *scheduler = nullptr; /// 事件回调协程 Fiber::ptr fiber; /// 事件回调函数 std::function<void()> cb; }; /** * @brief 获取事件上下文类 * @param[in] event 事件类型 * @return 返回对应事件的上下文 */ EventContext &getEventContext(Event event); /** * @brief 重置事件上下文 * @param[in, out] ctx 待重置的事件上下文对象 */ void resetEventContext(EventContext &ctx); /** * @brief 触发事件 * @details 根据事件类型调用对应上下文结构中的调度器去调度回调协程或回调函数 * @param[in] event 事件类型 */ void triggerEvent(Event event); /// 读事件上下文 EventContext read; /// 写事件上下文 EventContext write; /// 事件关联的句柄 int fd = 0; /// 该fd添加了哪些事件的回调函数,或者说该fd关心哪些事件 Event events = NONE; /// 事件的Mutex MutexType mutex; }; |
接下来是IOManager的成员变量。IOManager包含一个epoll实例的句柄m_epfd以及用于tickle的一对pipe fd,还有全部的fd上下文m_fdContexts,如下:
代码块 |
---|
/// epoll 文件句柄 int m_epfd = 0; /// pipe 文件句柄,fd[0]读端,fd[1]写端 int m_tickleFds[2]; /// 当前等待执行的IO事件数量 std::atomic<size_t> m_pendingEventCount = {0}; /// IOManager的Mutex RWMutexType m_mutex; /// socket事件上下文的容器 std::vector<FdContext *> m_fdContexts; |
接下来是在继承类IOManager中改造协程调度器,使其支持epoll,并重载tickle和idle,实现通知调度协程和IO协程调度功能:
代码块 |
---|
/** * @brief 构造函数 * @param[in] threads 线程数量 * @param[in] use_caller 是否将调用线程包含进去 * @param[in] name 调度器的名称 */ IOManager::IOManager(size_t threads, bool use_caller, const std::string &name) : Scheduler(threads, use_caller, name) { // 创建epoll实例 m_epfd = epoll_create(5000); SYLAR_ASSERT(m_epfd > 0); // 创建pipe,获取m_tickleFds[2],其中m_tickleFds[0]是管道的读端,m_tickleFds[1]是管道的写端 int rt = pipe(m_tickleFds); SYLAR_ASSERT(!rt); // 注册pipe读句柄的可读事件,用于tickle调度协程,通过epoll_event.data.fd保存描述符 epoll_event event; memset(&event, 0, sizeof(epoll_event)); event.events = EPOLLIN | EPOLLET; event.data.fd = m_tickleFds[0]; // 非阻塞方式,配合边缘触发 rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK); SYLAR_ASSERT(!rt); // 将管道的读描述符加入epoll多路复用,如果管道可读,idle中的epoll_wait会返回 rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event); SYLAR_ASSERT(!rt); contextResize(32); // 这里直接开启了Schedluer,也就是说IOManager创建即可调度协程 start(); } /** * @brief 通知调度器有任务要调度 * @details 写pipe让idle协程从epoll_wait退出,待idle协程yield之后Scheduler::run就可以调度其他任务 * 如果当前没有空闲调度线程,那就没必要发通知 */ void IOManager::tickle() { SYLAR_LOG_DEBUG(g_logger) << "tickle"; if(!hasIdleThreads()) { return; } int rt = write(m_tickleFds[1], "T", 1); SYLAR_ASSERT(rt == 1); } /** * @brief idle协程 * @details 对于IO协程调度来说,应阻塞在等待IO事件上,idle退出的时机是epoll_wait返回,对应的操作是tickle或注册的IO事件就绪 * 调度器无调度任务时会阻塞idle协程上,对IO调度器而言,idle状态应该关注两件事,一是有没有新的调度任务,对应Schduler::schedule(), * 如果有新的调度任务,那应该立即退出idle状态,并执行对应的任务;二是关注当前注册的所有IO事件有没有触发,如果有触发,那么应该执行 * IO事件对应的回调函数 */ void IOManager::idle() { SYLAR_LOG_DEBUG(g_logger) << "idle"; // 一次epoll_wait最多检测256个就绪事件,如果就绪事件超过了这个数,那么会在下轮epoll_wati继续处理 const uint64_t MAX_EVNETS = 256; epoll_event *events = new epoll_event[MAX_EVNETS](); std::shared_ptr<epoll_event> shared_events(events, [](epoll_event *ptr) { delete[] ptr; }); while (true) { if(stopping()) { SYLAR_LOG_DEBUG(g_logger) << "name=" << getName() << "idle stopping exit"; break; } // 阻塞在epoll_wait上,等待事件发生 static const int MAX_TIMEOUT = 5000; int rt = epoll_wait(m_epfd, events, MAX_EVNETS, MAX_TIMEOUT); if(rt < 0) { if(errno == EINTR) { continue; } SYLAR_LOG_ERROR(g_logger) << "epoll_wait(" << m_epfd << ") (rt=" << rt << ") (errno=" << errno << ") (errstr:" << strerror(errno) << ")"; break; } // 遍历所有发生的事件,根据epoll_event的私有指针找到对应的FdContext,进行事件处理 for (int i = 0; i < rt; ++i) { epoll_event &event = events[i]; if (event.data.fd == m_tickleFds[0]) { // ticklefd[0]用于通知协程调度,这时只需要把管道里的内容读完即可,本轮idle结束Scheduler::run会重新执行协程调度 uint8_t dummy[256]; while (read(m_tickleFds[0], dummy, sizeof(dummy)) > 0) ; continue; } // 通过epoll_event的私有指针获取FdContext FdContext *fd_ctx = (FdContext *)event.data.ptr; FdContext::MutexType::Lock lock(fd_ctx->mutex); /** * EPOLLERR: 出错,比如写读端已经关闭的pipe * EPOLLHUP: 套接字对端关闭 * 出现这两种事件,应该同时触发fd的读和写事件,否则有可能出现注册的事件永远执行不到的情况 */ if (event.events & (EPOLLERR | EPOLLHUP)) { event.events |= (EPOLLIN | EPOLLOUT) & fd_ctx->events; } int real_events = NONE; if (event.events & EPOLLIN) { real_events |= READ; } if (event.events & EPOLLOUT) { real_events |= WRITE; } if ((fd_ctx->events & real_events) == NONE) { continue; } // 剔除已经发生的事件,将剩下的事件重新加入epoll_wait, // 如果剩下的事件为0,表示这个fd已经不需要关注了,直接从epoll中删除 int left_events = (fd_ctx->events & ~real_events); int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; event.events = EPOLLET | left_events; int rt2 = epoll_ctl(m_epfd, op, fd_ctx->fd, &event); if (rt2) { SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd_ctx->fd << ", " << (EPOLL_EVENTS)event.events << "):" << rt2 << " (" << errno << ") (" << strerror(errno) << ")"; continue; } // 处理已经发生的事件,也就是让调度器调度指定的函数或协程 if (real_events & READ) { fd_ctx->triggerEvent(READ); --m_pendingEventCount; } if (real_events & WRITE) { fd_ctx->triggerEvent(WRITE); --m_pendingEventCount; } } // end for /** * 一旦处理完所有的事件,idle协程yield,这样可以让调度协程(Scheduler::run)重新检查是否有新任务要调度 * 上面triggerEvent实际也只是把对应的fiber重新加入调度,要执行的话还要等idle协程退出 */ Fiber::ptr cur = Fiber::GetThis(); auto raw_ptr = cur.get(); cur.reset(); raw_ptr->yield(); } // end while(true) } |
接下来是注册事件回调addEvent,删除事件回调delEvent,取消事件回调cancelEvent,以及取消全部事件cancelAll:
代码块 |
---|
/** * @brief 添加事件 * @details fd描述符发生了event事件时执行cb函数 * @param[in] fd socket句柄 * @param[in] event 事件类型 * @param[in] cb 事件回调函数,如果为空,则默认把当前协程作为回调执行体 * @return 添加成功返回0,失败返回-1 */ int IOManager::addEvent(int fd, Event event, std::function<void()> cb) { // 找到fd对应的FdContext,如果不存在,那就分配一个 FdContext *fd_ctx = nullptr; RWMutexType::ReadLock lock(m_mutex); if ((int)m_fdContexts.size() > fd) { fd_ctx = m_fdContexts[fd]; lock.unlock(); } else { lock.unlock(); RWMutexType::WriteLock lock2(m_mutex); contextResize(fd * 1.5); fd_ctx = m_fdContexts[fd]; } // 同一个fd不允许重复添加相同的事件 FdContext::MutexType::Lock lock2(fd_ctx->mutex); if (SYLAR_UNLIKELY(fd_ctx->events & event)) { SYLAR_LOG_ERROR(g_logger) << "addEvent assert fd=" << fd << " event=" << (EPOLL_EVENTS)event << " fd_ctx.event=" << (EPOLL_EVENTS)fd_ctx->events; SYLAR_ASSERT(!(fd_ctx->events & event)); } // 将新的事件加入epoll_wait,使用epoll_event的私有指针存储FdContext的位置 int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; epoll_event epevent; epevent.events = EPOLLET | fd_ctx->events | event; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if (rt) { SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ") fd_ctx->events=" << (EPOLL_EVENTS)fd_ctx->events; return -1; } // 待执行IO事件数加1 ++m_pendingEventCount; // 找到这个fd的event事件对应的EventContext,对其中的scheduler, cb, fiber进行赋值 fd_ctx->events = (Event)(fd_ctx->events | event); FdContext::EventContext &event_ctx = fd_ctx->getEventContext(event); SYLAR_ASSERT(!event_ctx.scheduler && !event_ctx.fiber && !event_ctx.cb); // 赋值scheduler和回调函数,如果回调函数为空,则把当前协程当成回调执行体 event_ctx.scheduler = Scheduler::GetThis(); if (cb) { event_ctx.cb.swap(cb); } else { event_ctx.fiber = Fiber::GetThis(); SYLAR_ASSERT2(event_ctx.fiber->getState() == Fiber::RUNNING, "state=" << event_ctx.fiber->getState()); } return 0; } /** * @brief 删除事件 * @param[in] fd socket句柄 * @param[in] event 事件类型 * @attention 不会触发事件 * @return 是否删除成功 */ bool IOManager::delEvent(int fd, Event event) { // 找到fd对应的FdContext RWMutexType::ReadLock lock(m_mutex); if ((int)m_fdContexts.size() <= fd) { return false; } FdContext *fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if (SYLAR_UNLIKELY(!(fd_ctx->events & event))) { return false; } // 清除指定的事件,表示不关心这个事件了,如果清除之后结果为0,则从epoll_wait中删除该文件描述符 Event new_events = (Event)(fd_ctx->events & ~event); int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; epoll_event epevent; epevent.events = EPOLLET | new_events; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if (rt) { SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } // 待执行事件数减1 --m_pendingEventCount; // 重置该fd对应的event事件上下文 fd_ctx->events = new_events; FdContext::EventContext &event_ctx = fd_ctx->getEventContext(event); fd_ctx->resetEventContext(event_ctx); return true; } /** * @brief 取消事件 * @param[in] fd socket句柄 * @param[in] event 事件类型 * @attention 如果该事件被注册过回调,那就触发一次回调事件 * @return 是否删除成功 */ bool IOManager::cancelEvent(int fd, Event event) { // 找到fd对应的FdContext RWMutexType::ReadLock lock(m_mutex); if ((int)m_fdContexts.size() <= fd) { return false; } FdContext *fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if (SYLAR_UNLIKELY(!(fd_ctx->events & event))) { return false; } // 删除事件 Event new_events = (Event)(fd_ctx->events & ~event); int op = new_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; epoll_event epevent; epevent.events = EPOLLET | new_events; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if (rt) { SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } // 删除之前触发一次事件 fd_ctx->triggerEvent(event); // 活跃事件数减1 --m_pendingEventCount; return true; } /** * @brief 取消所有事件 * @details 所有被注册的回调事件在cancel之前都会被执行一次 * @param[in] fd socket句柄 * @return 是否删除成功 */ bool IOManager::cancelAll(int fd) { // 找到fd对应的FdContext RWMutexType::ReadLock lock(m_mutex); if ((int)m_fdContexts.size() <= fd) { return false; } FdContext *fd_ctx = m_fdContexts[fd]; lock.unlock(); FdContext::MutexType::Lock lock2(fd_ctx->mutex); if (!fd_ctx->events) { return false; } // 删除全部事件 int op = EPOLL_CTL_DEL; epoll_event epevent; epevent.events = 0; epevent.data.ptr = fd_ctx; int rt = epoll_ctl(m_epfd, op, fd, &epevent); if (rt) { SYLAR_LOG_ERROR(g_logger) << "epoll_ctl(" << m_epfd << ", " << (EpollCtlOp)op << ", " << fd << ", " << (EPOLL_EVENTS)epevent.events << "):" << rt << " (" << errno << ") (" << strerror(errno) << ")"; return false; } // 触发全部已注册的事件 if (fd_ctx->events & READ) { fd_ctx->triggerEvent(READ); --m_pendingEventCount; } if (fd_ctx->events & WRITE) { fd_ctx->triggerEvent(WRITE); --m_pendingEventCount; } SYLAR_ASSERT(fd_ctx->events == 0); return true; } |
接下来是IOManager的析构函数实现和stopping重载。对于IOManager的析构,首先要等Scheduler调度完所有的任务,然后再关闭epoll句柄和pipe句柄,然后释放所有的FdContext;对于stopping,IOManager在判断是否可退出时,还要加上所有IO事件都完成调度的条件:
代码块 |
---|
IOManager::~IOManager() { stop(); close(m_epfd); close(m_tickleFds[0]); close(m_tickleFds[1]); for (size_t i = 0; i < m_fdContexts.size(); ++i) { if (m_fdContexts[i]) { delete m_fdContexts[i]; } } } bool IOManager::stopping() { // 对于IOManager而言,必须等所有待调度的IO事件都执行完了才可以退出 return m_pendingEventCount == 0 && Scheduler::stopping(); } |
sylar IO协程调度的几点总结
1. 总得来说,sylar的IO协程调度模块可分为两部分,第一部分是对协程调度器的改造,将epoll与协程调度融合,重新实现tickle和idle,并保证原有的功能不变。第二部分是基于epoll实现IO事件的添加、删除、调度、取消等功能。
2. IO协程调度关注的是FdContext信息,也就是描述符-事件-回调函数三元组,IOManager需要保存所有关注的三元组,并且在epoll_wait检测到描述符事件就绪时执行对应的回调函数。
3. epoll是线程安全的,即使调度器有多个调度线程,它们也可以共用同一个epoll实例,而不用担心互斥。由于空闲时所有线程都阻塞的epoll_wait上,所以也不用担心CPU占用问题。
4. addEvent是一次性的,比如说,注册了一个读事件,当fd可读时会触发该事件,但触发完之后,这次注册的事件就失效了,后面fd再次可读时,并不会继续执行该事件回调,如果要持续触发事件的回调,那每次事件处理完都要手动再addEvent。这样在应对fd的WRITE事件时会比较好处理,因为fd可写是常态,如果注册一次就一直有效,那么可写事件就必须在执行完之后就删除掉。
5. cancelEvent和cancelAll都会触发一次事件,但delEvent不会。
6. FdContext的寻址问题,sylar直接使用fd的值作为FdContext数组的下标,这样可以快速找到一个fd对应的FdContext。由于关闭的fd会被重复利用,所以这里也不用担心FdContext数组膨胀太快,或是利用率低的问题。
7. IO协程调度器的退出,不但所有协程要完成调度,所有IO事件也要完成调度。
8. sylar的IO协程调度器应该配合非阻塞IO来使用,如果使用阻塞模式,可能会阻塞进程,参考为什么 IO 多路复用要搭配非阻塞 IO? - 知乎。
目录 |
---|