Gevent深度分析

原创 ryan007  2017-05-09 19:28  阅读 1,087 次

gevent是一个基于greenletlibev两个核心组件来完成的轻量级、高性能异步库,它为各种网络异步请求提供完整的API。

libev是一个高效的事件处理库,为各种事件提供完整的API服务;greenlet是一个基于cpython的协程库,能够高效的处理“用户线程”的切换。

 

本文重点:

1,介绍greenlet和libev如何一起工作;

2,深入理解sleep、switch、join方法的使用;

0,gevent认知:

由一个例子开始:

当我们在受限于网络或IO的函数中使用gevent,这些函数会被协作式的调度, gevent的真正能力会得到发挥。Gevent处理了所有的细节, 来保证你的网络库会在可能的时候,隐式交出greenlet上下文的执行权。 这样的一种用法是如何强大,怎么强调都不为过。或者我们举些例子来详述。

一,greenlet和libev协同工作:

(1)gevent的整个事件循环是在hub.run中启动的,我们看一下run函数,这里很明显的调用了loop的run函数,loop是对libev的封装:

def run(self):  
    assert self is getcurrent(), 'Do not call Hub.run() directly'  
    while True:  
        loop = self.loop  
        loop.error_handler = self  
        try:  
            loop.run()  
        finally:  
            loop.error_handler = None  # break the refcount cycle  
        self.parent.throw(LoopExit('This operation would block forever'))  

(2)hub+waiter:每个greenlet的parent都是hub,waiter

class Hub(greenlet):  
    def wait(self, watcher):  
        waiter = Waiter()  
        unique = object()  
        watcher.start(waiter.switch, unique)  
        try:  
            result = waiter.get()  
            assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)  
            #这里为什么要assert?正常肯定是loop调用waiter.switch(unique),那么waiter.get()获取的肯定是unique,  
            #如果不是unique,肯定是有其它地方调用waiter.switch,这就出现了异常  
        finally:  
            watcher.stop()  

greenlet如何封装到gevent:

class Greenlet(greenlet):  
    """A light-weight cooperatively-scheduled execution unit."""  
  
    def __init__(self, run=None, *args, **kwargs):  
        hub = get_hub()  
        greenlet.__init__(self, parent=hub)  
        if run is not None:  
            self._run = run  

如何启动一个greenlet?

def start(self):  
    """Schedule the greenlet to run in this loop iteration"""  
    if self._start_event is None:  
        self._start_event = self.parent.loop.run_callback(self.switch)  
       #此处很有意思,把greenlet的switch注入到事件循环里面

当loop回调self.switch时将运行run方法(这是底层greenlet提供的),继承时我们可以提供_run方法。

def run(self):  
    try:  
        if self._start_event is None:  
            self._start_event = _dummy_event  
        else:  
            self._start_event.stop() #取消之前添加的回调函数,loop将会从回调链中剔除该函数。  
            #libev提供了一系列的对象封装,如io,timer,都有start,stop方法  
            #而回调是通过loop.run_callback开启的,和其它有所不同  
        try:  
            result = self._run(*self.args, **self.kwargs) #运行自定义_run方法  
        except:  
            self._report_error(sys.exc_info())  
            return  
        self._report_result(result) 
    finally:  
        pass  

来看一下如何获取结果的?

def _report_result(self, result):  
    self._exception = None  
    self.value = result #设置返回结果,可通过get()获取,注意要获取value时  
    #不要直接通过.value,一定要用get方法,因为get()会获取到真正的运行后结果,  
    #而.value那是该Greenlet可能还没结束  
    if self._links and not self._notifier: #这个是干什么的?  
        self._notifier = self.parent.loop.run_callback(self._notify_links)  

使用get()获取异步结果

def get(self, block=True, timeout=None):  
    """Return the result the greenlet has returned or re-raise the exception it has raised. 
 
    If block is ``False``, raise :class:`gevent.Timeout` if the greenlet is still alive. 
    If block is ``True``, unschedule the current greenlet until the result is available 
    or the timeout expires. In the latter case, :class:`gevent.Timeout` is raised. 
    """  
    if self.ready(): #该Greenlet已经运行结束,直接返回结果  
        if self.successful():  
            return self.value  
        else:  
            raise self._exception  
    if block: #到这里说明该Greenlet并没有结束  
        switch = getcurrent().switch  
        self.rawlink(switch) #将当前Greenlet.switch加到自己的回调链中  
        """ 
        self._links.append(callback) 
        """  
        try:  
            t = Timeout.start_new(timeout)  
            try:  
                result = self.parent.switch() #切换到hub,可以理解为当前get()阻塞了,当再次回调刚刚注册的switch将回到这里  
                #可问题是好像我们没有将switch注册到hub中,那是谁去回调的呢?  
                #幕后黑手其实就是上面的_report_result,当Greenlet结束最后会调用_report_result,  
                #而_report_result把将_notify_links注册到loop的回调中,最后由_notify_links回调我们刚注册的switch  
                # def _notify_links(self):  
                #     while self._links:  
                #     link = self._links.popleft()  
                #     try:  
                #         link(self) #就是这里了,我们看到还把self传给了switch,所以result结果就是self(greenlet通过switch传递结果)  
                #     except:  
                #         self.parent.handle_error((link, self), *sys.exc_info())  
                assert result is self, 'Invalid switch into Greenlet.get(): %r' % (result, )   
                #知道为什么result是self的原因了吧  
            finally:  
                t.cancel()  
        except:  
            self.unlink(switch)  
            raise  
        #运行到这里,其实Greenlet已经结束了,换句话说self.ready()肯定为True  
        if self.ready():  
            if self.successful():  
                return self.value  
            else:  
                raise self._exception  
    else: #还没结束,你又不等待,没有值返回啊,只能抛出异常了  
        raise Timeout  

至此已经完整介绍了gevent如何把libev和greenlet融合在一起

二,理解sleep、switch、join方法的使用:

(1)switch

switch函数分为三类:

  1. waiter;
    这里的switch只能由hub调用

    def switch(self, value=None):
        """Switch to the greenlet if one's available. Otherwise store the value."""
        greenlet = self.greenlet
        if greenlet is None:
            self.value = value
            self._exception = None
        else:
            assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
            switch = greenlet.switch
            try:
                switch(value)
            except:
                self.hub.handle_error(switch, *sys.exc_info())
  2. hub;
    控制不同子greenlet进行切换

    def switch(self):
        switch_out = getattr(getcurrent(), 'switch_out', None)
        if switch_out is not None:
            switch_out()
        return greenlet.switch(self)

    return greenlet.switch(self)这句说明了这个switch函数将返回greenlet类的switch方法(也就是第一类switch,分两种情况: 执行过要切换的协程的run函数,和未执行run),从而实现切换到子协程并执行其任务的作用(即在hub实现协程的切换)

  3. greenlet;
    这里面的greenlet是原生greenlet库中的greenlet类,在这个基类里面,它的switch函数的作用是:==切换到这个greenlet协程中,并执行该协程的任务==

    1)如果这个协程的任务并没有被激活过,则执行self.run函数来执行这个协程的任务
    2)如果已经被激活,而且正在运行这个协程的任务(即在执行run函数的时候切到了另一个协程中),调用switch函数将回到这个协程的状态(即上次运行的地点)

(2)sleep

import gevent

def test_sleep(id):
    print('Test %s is running...' % id)
    gevent.sleep(0)
    print('Test %s is done!' % id)

gevent.joinall([gevent.spawn(test_sleep, i) for i in range(2)])

执行结果:

Test 0 is running...

Test 1 is running...

Test 0 is done!

Test 1 is done!

分析一下执行过程:

当gevent运行的时候,gevent需要先创建一个主协程hub,并运行hub的run函数(具体源码在hub.py/run),比较简单,核心代码是loop.run(),这个run函数是Greenlet类中的run函数,用来切入loop中的子协程,源码在greenlet.py/run中。核心就是result = self._run(*self.args, **self.kwargs), _run函数用来执行这个子协程的任务

再分析一下源码:

def sleep(seconds=0, ref=True):
    hub = get_hub() #获得主协程hub对象
    loop = hub.loop     #获得主循环
    if seconds <= 0:
        waiter = Waiter()
        loop.run_callback(waiter.switch)    #设置回调函数(即下次本协程执行的地点)
        waiter.get()
    else:
        hub.wait(loop.timer(seconds, ref=ref))

 

(3)join

 

def test1(id):
    print(id)
    gevent.sleep(0)
    print(id, 'is done!')

t = gevent.spawn(test1, 't')
gevent.sleep(0)

为什么没有继续输出t is done!?:

t

为了解决这个问题,我们还是先把源码分析一下:

join函数源码在greenlet.py中的Greenlet类的join():

def join(self, timeout=None):
    if self.ready():        #检测是否执行完成
        return
    else:
        switch = getcurrent().switch    #获得当前greenlet的switch回调函数
        self.rawlink(switch)    
        try:
            t = Timeout.start_new(timeout)
            try:
                result = self.parent.switch()
                assert result is self, 'Invalid switch into Greenlet.join(): %r' % (result, )
            finally:
                t.cancel()
        except Timeout as ex:
            self.unlink(switch)
            if ex is not t:
                raise
        except:
            self.unlink(switch)
            raise

从join的源码第六行,跟踪到rawlink函数:

def rawlink(self, callback):
    if not callable(callback):
        raise TypeError('Expected callable: %r' % (callback, ))
    self._links.append(callback)
    if self.ready() and self._links and not self._notifier:
        self._notifier = self.parent.loop.run_callback(self._notify_links)

可以看出,这个rawlink函数的目的只有一个:注册当前greenlet的回调函数(第四行), 当主协程hub还没有run的时候,这个时候的greenlet可以理解为一个上下文(这块涉及到greenlet的底层,还不是很清楚)。
回到join函数。在注册了当前greenlet的回调函数后,主要干的事是第10行:切换到主协程hub
主协程的switch函数的功能我在前面的文章中写过了,不再赘述。它会执行greenlet.switch(self),由于当前协程为hub,并且没有运行过run,所以会执行hub.run函数,源码在hub.py下的Hub类里面(这个函数也在前面的文章中讲过,所以不再详细说明)。在这个函数里面就会执行gevent的一般流程(前面的文章讲过)

 

现在可以明白答案了吧

  1. 创建子协程t
  2. 执行到sleep函数,由于此时主协程hub还没有运行hub.run(),sleep函数中,语句loop.run_callback(waiter.switch)保存的是当前协程(可以理解为上下文)的回调函数
  3. 调用waiter.get()函数
  4. waiter.get()函数调用hub.switch(),切换到主协程hub
  5. 由于主协程没有run,所以执行hub.run()函数
  6. 执行loop.run(),切换到子协程t中
  7. 执行_run()函数,即子协程的任务:我们定义的test1函数
  8. 当执行完test1中的sleep(0)的时候,会回到主协程hub,hub会执行之前保存的回调函数,即回到了上下文,不会再回到主协程hub,所以不会输出t is done!

 

总结:第一次完整的接触一个优秀的io库,研究过程中接受了很多之前从未接触过的模型和方法,思路也变得开阔,总之受益匪浅。

 

Refererence:

http://xlambda.com/gevent-tutorial/

http://blog.csdn.net/yueguanghaidao/article/details/39122867

本文地址:http://it.zhongduwang.com/articles/gevent
版权声明:本文为原创文章,版权归 ryan007 所有,欢迎分享本文,转载请保留出处!

发表评论


表情