Twisted服务器开发技巧(3)
第三种方法是使用经典的服务器模型的select(epoll)异步I/O。使用twisted框架中的reactor(epoll/select)+reader,将磁盘I/O封装为reader,交给reactor来管理,磁盘I/O完成后调用回调函数将数据返回发送改请求的客户端。这样既不会因为I/O阻塞请求处理线程也不会如方法二一样因为I/O阻塞读取线程,详见下图:
reactor(epoll/select)+ reader的方法需要继承abstract.FileDescriptor并且实现其几个方法,而twisted框架中的网络(TCP/UDP)、标准I/O、进程都有类似的实现。使用时传入文件描述符,如下:
[python]
fileReader = FileReader(fd, loader_callback, other_data)
reactor.addReader(fileReader)
FileReader类的实现如下:
[python]
class FileReader(abstract.FileDescriptor):
def __init__(self, fd, result_callback, args):
...
self.fd = fd
self.setNonBlocking(self.fd)
self.dataRecieved=result_callback
self.args=args
self.all_data=""
def setNonBlocking(self, fd):
...
def fileno(self):
return self.fd
def connectionLost(self, reason):
sys.close(self.fd)
def doRead(self)://fdesc.readFromFD(self.fd, self.dataReceived)
data = os.read(self.fd, 10240) //每次读取1M
self.all_data += data
if not data:
self.dataRecieved(self.all_data , self.args)
return CONNECTION_LOST
自己实现的reader并没有使用类似其他标准实现中的fdesc.readFromFD(self.fd, self.dataReceived)来读取数据,因为该函数中提供的回调函数不允许传参,所以自己将fdesc实现在了FileReader内。
下面是此方法的理论依据:
[python]
def addReader(self, reader):
fd = reader.fileno()
if not reads.has_key(fd):
selectables[fd] = reader
reads[fd] = 1
self._updateRegistration(fd)
def _updateRegistration(self, fd):
...
mask = 0
if reads.has_key(fd): mask = mask | select.POLLIN
poller.register(fd, mask)
def _doReadOrWrite(self, selectable, fd, event, POLLIN, POLLOUT, log,
faildict={
error.ConnectionDone: failure.Failure(error.ConnectionDone()),
error.ConnectionLost: failure.Failure(error.ConnectionLost())
}):
...
if event & POLLIN:
why = selectable.doRead()
inRead = True
...
if why:
self._disconnectSelectable(selectable, why, inRead)
-----摘自pollreactor.py
[python]
def _disconnectSelectable(self, selectable, why, isRead, faildict={
error.ConnectionDone: failure.Failure(error.ConnectionDone()),
error.ConnectionLost: failure.Failure(error.ConnectionLost())
}):
...
selectable.connectionLost(f)
[python]
def _disconnectSelectable(self, selectable, why, isRead, faildict={
error.ConnectionDone: failure.Failure(error.ConnectionDone()),
error.ConnectionLost: failure.Failure(error.ConnectionLost())
}):
...
selectable.connectionLost(f)
...