实现了一个N-M的协程调度器,N个线程运行M个协程,协程可以在线程之间进行切换,也可以绑定到指定线程运行。本章对应源码:https://github.com/zhongluqiang/sylar-from-scratch/releases/tag/v1.5.0。
学习协程调度之前必须完全掌握sylar的协程模块,参考协程模块。
实现协程调度之后,可以解决前一章协程模块中子协程不能运行另一个子协程的缺陷,子协程可以通过向调度器添加调度任务的方式来运行另一个子协程。
协程调度最难理解的地方是user caller线程时调度协程和主线程切换的情况,注意对照源码进行理解。
协程调度概述
当你有很多协程时,如何把这些协程都消耗掉,这就是协程调度。
在前面的协程模块中,对于每个协程,都需要用户手动调用协程的resume方法将协程运行起来,然后等协程运行结束并返回,再运行下一个协程。这种运行协程的方式其实是用户自己在挑选协程执行,相当于用户在充当调度器,显然不够灵活.
引入协程调度后,则可以先创建一个协程调度器,然后把这些要调度的协程传递给调度器,由调度器负责把这些协程一个一个消耗掉。
从某种程度来看,协程调度其实非常简单,简单到用下面的代码就可以实现一个调度器,这个调度器可以添加调度任务,运行调度任务,并且还是完全公平调度的,先添加的任务先执行,后添加的任务后执行。
/** * @file simple_fiber_scheduler.cc * @brief 一个简单的协程调度器实现 * @version 0.1 * @date 2021-07-10 */ #include "sylar/sylar.h" /** * @brief 简单协程调度类,支持添加调度任务以及运行调度任务 */ class Scheduler { public: /** * @brief 添加协程调度任务 */ void schedule(sylar::Fiber::ptr task) { m_tasks.push_back(task); } /** * @brief 执行调度任务 */ void run() { sylar::Fiber::ptr task; auto it = m_tasks.begin(); while(it != m_tasks.end()) { task = *it; m_tasks.erase(it++); task->resume(); } } private: /// 任务队列 std::list<sylar::Fiber::ptr> m_tasks; }; void test_fiber(int i) { std::cout << "hello world " << i << std::endl; } int main() { /// 初始化当前线程的主协程 sylar::Fiber::GetThis(); /// 创建调度器 Scheduler sc; /// 添加调度任务 for(auto i = 0; i < 10; i++) { sylar::Fiber::ptr fiber(new sylar::Fiber( std::bind(test_fiber, i) )); sc.schedule(fiber); } /// 执行调度任务 sc.run(); return 0; }
不要觉得上面这个调度器扯淡,除了不支持多线程,sylar的协程调度器和它的设计思路完全相同,甚至,上面的实现可以看成是sylar的协程调度器的一个特例,当sylar的协程调度器只使用main函数所在线程进行调度时,它的工作原理和上面的完全一样,只不过代码看起来更花里胡哨一些。
接下来将从上面这个调度器开始,来分析一些和协程调度器相关的概念。
首先是关于调度任务的定义,对于协程调度器来说,协程当然可以作为调度任务,但实际上,函数也应可以,因为函数也是可执行的对象,调度器应当支持直接调度一个函数。这在代码实现上也很简单,只需要将函数包装成协程即可,协程调度器的实现重点还是以协程为基础。
接下来是多线程,通过前面协程模块的知识我们可以知道,一个线程同一时刻只能运行一个协程,所以,作为协程调度器,势必要用到多线程来提高调度的效率,因为有多个线程就意味着有多个协程可以同时执行,这显然是要好过单线程的。
既然多线程可以提高协程调度的效率,那么,能不能把调度器所在的线程(称为caller线程)也加入进来作为调度线程呢?比如典型地,在main函数中定义的调度器,能不能把main函数所在的线程也用来执行调度任务呢?答案是肯定的,在实现相同调度能力的情况下(指能够同时调度的协程数量),线程数越小,线程切换的开销也就越小,效率就更高一些,所以,调度器所在的线程,也应该支持用来执行调度任务。甚至,调度器完全可以不创建新的线程,而只使用caller线程来进行协程调度,比如只使用main函数所在的线程来进行协程调度。
接下来是调度器如何运行,这里可以简单地认为,调度器创建后,内部首先会创建一个调度线程池,调度开始后,所有调度线程按顺序从任务队列里取任务执行,调度线程数越多,能够同时调度的任务也就越多,当所有任务都调度完后,调度线程就停下来等新的任务进来。
接下来是添加调度任务,添加调度任务的本质就是往调度器的任务队列里塞任务,但是,只添加调度任务是不够的,还应该有一种方式用于通知调度线程有新的任务加进来了,因为调度线程并不一定知道有新任务进来了。
接下来是调度器的停止。调度器应该支持停止调度的功能,以便回收调度线程的资源。
通过上面的描述,一个协程调度器的大概设计也就出炉了:
调度器内部维护一个任务队列和一个调度线程池。开始调度后,线程池从任务队列里按顺序取任务执行。调度线程可以包含caller线程。当全部任务都执行完了,线程池停止调度,等新的任务进来。添加新任务后,通知线程池有新的任务进来了,线程池重新开始运行调度。停止调度时,各调度线程退出,调度器停止工作。
sylar协程调度模块设计
sylar的协程调度模块支持多线程,支持使用caller线程进行调度,支持添加函数或协程作为调度对象,并且支持将函数或协程绑定到一个具体的线程上执行。
首先是协程调度器的初始化。sylar的协程调度器在初始化时支持传入线程数和一个布尔型的use_caller参数,表示是否使用caller线程。在使用caller线程的情况下,线程数自动减一,并且调度器内部会初始化caller线程的调度协程并保存起来(比如,在main函数中创建的调度器,如果use_caller为true,那调度器会记录main函数所在线程的调度协程)。
调度器创建好后,即可调用调度器的schedule方法向调度器添加调度任务,但此时调度器并不会立刻执行这些任务,而是将它们保存到内部的一个任务队列中。
接下来是调用start方法启动调度。start方法调用后会创建调度线程池,线程数量由初始化时的线程数和use_caller确定。调度线程一旦创建,就会立刻从任务队列里取任务执行。比较特殊的一点是,如果初始化时指定线程数为1且use_caller为true,那么start方法什么也不做,因为不需要创建新线程用于调度。并且,由于没有创建新的调度线程,那只能由caller线程的调度协程来负责调度协程,而caller线程的调度协程的执行时机与start方法并不在同一个地方。
接下来是调度协程,对应run方法。调度协程负责从调度器的任务队列中取任务执行。取出的任务即子协程,这里调度协程和子协程的切换模型即为前一章介绍的非对称模型,每个子协程执行完后都必须返回调度协程,由调度协程重新从任务队列中取新的协程并执行。如果任务队列空了,那么调度协程会切换到一个idle协程,这个idle协程什么也不做,等有新任务进来时,idle协程才会退出并回到调度协程,重新开始下一轮调度。
在非caller线程里,调度协程的身份只有一个,那就是各个调度线程的主线程,但在caller线程里,调度协程并不是caller线程的主协程,而是相当于caller线程的子协程,这在协程切换时会有大麻烦(这点是sylar协程调度模块最难理解的地方),如何处理这个问题将在下面的章节专门进行讨论。
接下来是添加调度任务,对应schedule方法,这个方法支持传入协程或函数,并且支持一个线程号参数,表示是否将这个协程或函数绑定到一个具体的线程上执行。如果任务队列为空,那么在添加任务之后,要调用一次tickle方法以通知各调度线程的调度协程有新任务来了。
在执行调度任务时,还可以通过调度器的GetThis()方法获取到当前调度器,再通过schedule方法继续添加新的任务,这就变相实现了在子协程中创建并运行新的子协程的功能。
接下来是调度器的停止。调度器的停止行为要分两种情况讨论,首先是use_caller为false的情况,这种情况下,没有使用caller线程进行调度,那么只需要简单地等各个调度线程的调度协程退出就行了。如果use_caller为true,表示caller线程也要参考调度,这时,调度器初始化时记录的caller线程的调度协程就要起作用了,在调度器停止前,应该让这个caller线程的调度协程也运行一次,让caller线程完成调度工作后再退出。如果调度器只使用了caller线程进行调度,那么所有的调度任务要在调度器停止时才会被调度。
调度协程切换问题
这里分两种情况讨论一下调度协程的切换情况,其他情况可以看成以下两种情况的组合,原理是一样的。
1. 线程数为1,且use_caller为true,对应只使用main函数的线程进行协程调度。
2. 线程数为1,且use_caller为false,对应只创建一个线程进行协程调度,main函数线程不参与调度。
情况2比较好理解,因为有单独的线程用于协程调度,那只需要让新线程运行调度协程就可以了,main函数与协程调度完全不相关,main函数只需要向调度器添加任务,然后在适当的时机停止调度器,当调度器停止时,main函数要等待调度线程结束后再退出,参考下面的图示:
sylar协程模块运行图示
然后描述一下协程调度的本质。
有一堆的协程或函数,把这些协程或函数跑完即可。如果协程半路yield了,那是协程有自己的想法,调度器不用管,也将其当成调度完成了。
当然上面这个调度器有些粗糙,一是不够优雅,二是没法利用多线程来调度协程。
调度器本质上就是内部维护一个协程任务队列,然后N个线程组成的线程池各自从这个任务队列中取协程任务并运行。N最少为1,也就是至少要有一个调度线程。比较特殊的一点是这里调度器的调度线程不一定要新创建,因为调度器所在的线程也可以加入调度,用于运行协程任务。比如常见的,main函数所在的线程就可以进行协程调度。
讨论几种情况:
- 调度器的退出问题,调度器内部有一个协程任务队列,调度器调度的实质就是内部的线程池从这个任务队列拿任务并执行,那么,停止调度该怎么实现?这里可以简化处理,强制规定只有所有的任务都完成调度时,调度器才可以退出,如果有一个任务没有执行完,那调度器就不能退出。
- 协程执行过程中主动调用yield让出了执行权,调度器要怎么处理?半路yield的协程显然并没有执行完,一种处理方法是调度器来帮协程擦屁股,在检测到协程yield之后主动将其再次加入任务队列的队尾,这样这个执行到半路的协程最终还是会被调度到,但这种策略是画蛇添足的,从生活经验的角度来看,一个成熟的协程肯定要学会自我管理,既然你自己yield了,那么你就应该自己管理好自己,而不是让别人来帮你,这样才算是一个成熟的协程。对于主动yield的协程,我们的策略是,调度器直接认为这个任务已经调度完了,如果协程还想再次运行,那么应该由协程自己主动将自己加入调度,而不是由调度器来主动调度。这里规定了一点,协程在执行yield前,必须以某种形式先将自己重新添加到调度任务中。如果协程不顾后果地执行yield,最后的后果就是协程将永远无法再被执行,也就是所说的逃逸状态。(sylar的处理方法比较折衷一些,sylar定义了两种yield操作,一种是yield to ready,这种yield调度器会再次将协程加入任务队列并等待调度,另一种是yield to hold,这种yield调度器不会再将协程加入任务队列,协程在yield之前必须自己先将自己加入到协程的调度队列中,否则协程就处于逃逸状态。再说一点,sylar定义的yield to ready,在框架的其他模块上一次都没用到,看来sylar也同意,一个成熟的协程要学会自我管理。)
- 只使用调度器所在的线程进行调度,典型的就是main函数中定义调度器并且只使用main函数线程执行调度任务。这种场景下,可以认为是main函数先攒下一波协程,然后用协程的手段进入一个while(1)循环,再依次把这些协程消耗完。每个协程在运行时也可以继续创建新的协程并加入调度。如果所有协程都调度完了,并且没有创建新的调度任务,那么下一步就是讨论idle该如何处理。
- idle如何处理,也就是当调度器没有协程可调度时,调度线程该怎么办。直觉上来看这里应该有一些同步手段,比如,没有调度任务时,调度线程阻塞住,比如阻塞在一个idle协程上,等待新任务加入后退出idle协程,恢复调度。然而这种方案是无法实现的,因为单线程下,同一时间只能有一个协程在执行,如果调度器阻塞在idle协程上,那么除非idle协程自行让出执行权,否则其他的协程都得不到执行,这里就造成了一个先有鸡还是先有蛋的问题:只有创建新任务idle协程才会退出,只有idle协程退出才能创建新任务。所以,对于调度器来说,处理idle的方法简单而粗暴,如果任务队列空了,那就不停地看有没有新任务,俗称忙等待,CPU爆表。这点可以从sylar的源码上发现,一是Scheduler的tickle函数什么也不做,因为tickle根本没用,二是idle协程在协程调度器未停止的情况下只会yield to hold,而调度协程又会将idle协程重新swapIn,相当于idle啥也不做直接返回。这个问题在sylar框架内无解,只有一种方法可以规避掉,那就是设置autostop标志,这个标志会使得调度器在调度完所有任务后自动退出。在IOManager中,上面的问题仍然存在,但是tickle和idle可以实现得更加巧妙一些,以应对IO事件。
- 主线程参与调度时的调度执行时机。前面说过,当主线程也参与调度时,可以认为是主线程先攒下一波协程,然后执行开始调度,这时主线程就进入一个while(1)循环,开始调度这些协程。也就是说,主线程一旦开始了调度,那就无法回头了,位于开始调度点之后的代码都执行不到。对于这个问题,sylar把调度器的开始点放在了stop方法中,也就是,调度开始即结束,干完活就下班,IOManager也是类似,除了可以调用stop方法外,IOManager类的析构函数也有一个stop方法,可以保证所有的任务都会被调度到。
- 是否额外创建线程与协程的执行时机。如果不额外创建线程,也就是线程数为1并且use caller,那所有的调度任务都是在stop()时进行调度。如果额外创建了线程,那么,在添加完调度任务之后任务马上就可以在另一个线程中执行。归纳起来,如果没有额外线程,所有的协程都在主协程之后排队调度,如果有额外线程,那协程在创建时就可以得到调度。
- 协程中的异常要怎么处理,子协程抛出了异常该怎么办?这点其实非常好办,类比一下线程即可,你会在线程外面处理线程抛出的异常吗?答案是不会,所以协程抛出的异常我们也不处理,直接让程序按默认的处理方式来处理即可。一个成熟的协程应该自己处理掉自己的异常,而不是让调度器来帮忙。顺便说一下,sylar的协程调度器处理了协程抛出的异常,并且给异常结束的协程设置了一个EXCEPT状态,这看似贴心,但从长远的角度来看,其实是非常不利于协程的健康成长的。
- 关于协程调度器的优雅停止。sylar停止调度器的策略如下:
- 设置m_autoStop标志,该标志表示是否自动停止
- 设置m_stopping标志,该标志表示是否正在停止
- 如果use caller为true,那只能在调度器所在的线程执行stop()方法
- 通知其他线程退出调度
- 通知当前线程退出调度
- 如果use_caller为true,那当前线程的调度协程要执行一次(当前线程的调度协程只在stop时执行这一次)
- 等所有线程结束