Twisted服务器开发技巧(3)

来源:岁月联盟 编辑:exp 时间:2012-08-27

第三种方法是使用经典的服务器模型的select(epoll)异步I/O。使用twisted框架中的reactor(epoll/select)+reader,将磁盘I/O封装为reader,交给reactor来管理,磁盘I/O完成后调用回调函数将数据返回发送改请求的客户端。这样既不会因为I/O阻塞请求处理线程也不会如方法二一样因为I/O阻塞读取线程,详见下图:

 


reactor(epoll/select)+ reader的方法需要继承abstract.FileDescriptor并且实现其几个方法,而twisted框架中的网络(TCP/UDP)、标准I/O、进程都有类似的实现。使用时传入文件描述符,如下:

[python]
fileReader = FileReader(fd, loader_callback, other_data) 
reactor.addReader(fileReader) 
FileReader类的实现如下:

[python] 
class FileReader(abstract.FileDescriptor): 
   
  def __init__(self, fd, result_callback, args): 
    ... 
    self.fd = fd 
self.setNonBlocking(self.fd) 
self.dataRecieved=result_callback 
self.args=args 
self.all_data="" 
  def setNonBlocking(self, fd): 
   ... 
  def fileno(self): 
return self.fd 
  def connectionLost(self, reason): 
sys.close(self.fd) 
  def doRead(self)://fdesc.readFromFD(self.fd, self.dataReceived) 
    data = os.read(self.fd, 10240)     //每次读取1M 
    self.all_data += data 
    if not data: 
       self.dataRecieved(self.all_data , self.args) 
      return CONNECTION_LOST 
自己实现的reader并没有使用类似其他标准实现中的fdesc.readFromFD(self.fd, self.dataReceived)来读取数据,因为该函数中提供的回调函数不允许传参,所以自己将fdesc实现在了FileReader内。

下面是此方法的理论依据:

[python] 
def addReader(self, reader): 
    fd = reader.fileno() 
    if not reads.has_key(fd): 
        selectables[fd] = reader 
        reads[fd] =  1 
        self._updateRegistration(fd) 
def _updateRegistration(self, fd): 
    ... 
    mask = 0 
    if reads.has_key(fd): mask = mask | select.POLLIN 
    poller.register(fd, mask) 
def _doReadOrWrite(self, selectable, fd, event, POLLIN, POLLOUT, log, 
    faildict={ 
        error.ConnectionDone: failure.Failure(error.ConnectionDone()), 
        error.ConnectionLost: failure.Failure(error.ConnectionLost()) 
    }): 
    ... 
    if event & POLLIN: 
      why = selectable.doRead() 
      inRead = True 
       ... 
    if why: 
        self._disconnectSelectable(selectable, why, inRead) 
-----摘自pollreactor.py

[python] 
def _disconnectSelectable(self, selectable, why, isRead, faildict={ 
    error.ConnectionDone: failure.Failure(error.ConnectionDone()), 
    error.ConnectionLost: failure.Failure(error.ConnectionLost()) 
}): 
             ... 
             selectable.connectionLost(f) 

[python]
def _disconnectSelectable(self, selectable, why, isRead, faildict={ 
    error.ConnectionDone: failure.Failure(error.ConnectionDone()), 
    error.ConnectionLost: failure.Failure(error.ConnectionLost()) 
}): 
            ... 
            selectable.connectionLost(f) 
            ... 

图片内容