Twisted服务器开发技巧(2)

来源:岁月联盟 编辑:exp 时间:2012-08-27

第二种优化的方法,可以只用下图来解释。即,使用轻量级的线程池(PreProcess)对所有请求进行预处理,所有不需要I/O执行时间很短的请求直接执行,如果是需要磁盘I/O的则放入下一级阻塞队列,有单独的线程池来处理这些请求。详见下图:

 


第一级请求使用自己已有的线程池,不再多说。I/O请求+二级线程池可以使用twisted提供的ThreadPool机制来实现。而我所说的优化正是使用此方法,代码很简单,如下:

    deferred = threads.deferToThread(data_loader.get, sn)

deferred.addCallback(self.loader_callback, (req, other_data))

解释一下:

threads.deferToThread将会将data_loader.get放入reactor线程池的队列,并返回一个defer对象。data_loader.get由reactor的线程池进行执行,执行完成后放入reactor的队列,然后由reactor主线程来调用deferred.addCallback中注册的回调函数。所以回调函数是不会跨线程调用的,如果在回调函数中调用一些不可跨线程的应用(如,memcached客户端)也可放心使用,这也正是选择reactor的线程池作为二级线程池的原因之一。

选择reactor的线程池作为二级线程池的原因二:回调函数。因为Read Thread将自己负责恢复请求,所以回调函数必不可少。

接下来深入twisted源码探究此方法的原理,以下代码均是节选自twisted2.0.0源码,其他版本大致相同:

[python] 
def deferToThread(f, *args, **kwargs): 
     d = defer.Deferred() 
     from twisted.internet import reactor 
     reactor.callInThread(_putResultInDeferred, d, f, args, kwargs) 
     return d 
def _putResultInDeferred(deferred, f, args, kwargs): 
     from twisted.internet import reactor 
     try: 
         result = f(*args, **kwargs) 
     except: 
         f = failure.Failure() 
         reactor.callFromThread(deferred.errback, f) 
     else: 
         reactor.callFromThread(deferred.callback, result)      
-----摘自threads.py

[python] view plaincopy
def callInThread(self, _callable, *args, **kwargs): 
    if not self.threadpool: 
        self._initThreadPool() 
    self.threadpool.callInThread(_callable, *args, **kwargs)  //由线程池执行具体的读取操作 
def callFromThread(self, f, *args, **kw): 
    ... 
    self.threadCallQueue.append((f, args, kw))  //放入主线程队列,由主线程执行回调函数 
self.wakeUp() 
    ... 
-----摘自base.py

注:callInThread/allFromThread,前者是放入线程池执行,后者是reactor的队列里,由reactor的主线程来执行。

至于threadpool的代码在twisted/python/threadpool是一个线程池

图片内容