##Twisted
reactor+deferred
没使用reactor的异步客户端
# This is the asynchronous Get Poetry Now! client.
import datetime, errno, optparse, select, socket
def parse_args():
usage = """usage: %prog [options] [hostname]:port ...
This is the Get Poetry Now! client, asynchronous edition.
Run it like this:
python get-poetry.py port1 port2 port3 ...
If you are in the base directory of the twisted-intro package,
you could run it like this:
python async-client/get-poetry.py 10001 10002 10003
to grab poetry from servers on ports 10001, 10002, and 10003.
Of course, there need to be servers listening on those ports
for that to work.
"""
parser = optparse.OptionParser(usage)
_, addresses = parser.parse_args()
if not addresses:
print parser.format_help()
parser.exit()
def parse_address(addr):
if ':' not in addr:
host = '127.0.0.1'
port = addr
else:
host, port = addr.split(':', 1)
if not port.isdigit():
parser.error('Ports must be integers.')
return host, int(port)
return map(parse_address, addresses)
def get_poetry(sockets):
"""Download poety from all the given sockets."""
poems = dict.fromkeys(sockets, '') # socket -> accumulated poem
# socket -> task numbers
sock2task = dict([(s, i + 1) for i, s in enumerate(sockets)])
sockets = list(sockets) # make a copy
# we go around this loop until we've gotten all the poetry
# from all the sockets. This is the 'reactor loop'.
while sockets:
# this select call blocks until one or more of the
# sockets is ready for read I/O
rlist, _, _ = select.select(sockets, [], [])
# rlist is the list of sockets with data ready to read
for sock in rlist:
data = ''
while True:
try:
new_data = sock.recv(1024)
except socket.error, e:
if e.args[0] == errno.EWOULDBLOCK:
# this error code means we would have
# blocked if the socket was blocking.
# instead we skip to the next socket
break
raise
else:
if not new_data:
break
else:
data += new_data
# Each execution of this inner loop corresponds to
# working on one asynchronous task in Figure 3 here:
# http://krondo.com/?p=1209#figure3
task_num = sock2task[sock]
if not data:
sockets.remove(sock)
sock.close()
print 'Task %d finished' % task_num
else:
addr_fmt = format_address(sock.getpeername())
msg = 'Task %d: got %d bytes of poetry from %s'
print msg % (task_num, len(data), addr_fmt)
poems[sock] += data
return poems
def connect(address):
"""Connect to the given server and return a non-blocking socket."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(address)
sock.setblocking(0)
return sock
def format_address(address):
host, port = address
return '%s:%s' % (host or '127.0.0.1', port)
def main():
addresses = parse_args()
start = datetime.datetime.now()
sockets = map(connect, addresses)
poems = get_poetry(sockets)
elapsed = datetime.datetime.now() - start
for i, sock in enumerate(sockets):
print 'Task %d: %d bytes of poetry' % (i + 1, len(poems[sock]))
print 'Got %d poems in %s' % (len(addresses), elapsed)
if __name__ == '__main__':
main()
twisted异步客户端
# This is the Twisted Get Poetry Now! client, version 1.0.
# NOTE: This should not be used as the basis for production code.
# It uses low-level Twisted APIs as a learning exercise.
import datetime, errno, optparse, socket
from twisted.internet import main
def parse_args():
usage = """usage: %prog [options] [hostname]:port ...
This is the Get Poetry Now! client, Twisted version 1.0.
Run it like this:
python get-poetry.py port1 port2 port3 ...
If you are in the base directory of the twisted-intro package,
you could run it like this:
python twisted-client-1/get-poetry.py 10001 10002 10003
to grab poetry from servers on ports 10001, 10002, and 10003.
Of course, there need to be servers listening on those ports
for that to work.
"""
parser = optparse.OptionParser(usage)
_, addresses = parser.parse_args()
if not addresses:
print parser.format_help()
parser.exit()
def parse_address(addr):
if ':' not in addr:
host = '127.0.0.1'
port = addr
else:
host, port = addr.split(':', 1)
if not port.isdigit():
parser.error('Ports must be integers.')
return host, int(port)
return map(parse_address, addresses)
class PoetrySocket(object):
poem = ''
def __init__(self, task_num, address):
self.task_num = task_num
self.address = address
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(address)
self.sock.setblocking(0)
# tell the Twisted reactor to monitor this socket for reading
from twisted.internet import reactor
reactor.addReader(self)
def fileno(self):
try:
return self.sock.fileno()
except socket.error:
return -1
def connectionLost(self, reason):
self.sock.close()
# stop monitoring this socket
from twisted.internet import reactor
reactor.removeReader(self)
# see if there are any poetry sockets left
for reader in reactor.getReaders():
if isinstance(reader, PoetrySocket):
return
reactor.stop() # no more poetry
def doRead(self):
bytes = ''
while True:
try:
bytesread = self.sock.recv(1024)
if not bytesread:
break
else:
bytes += bytesread
except socket.error, e:
if e.args[0] == errno.EWOULDBLOCK:
break
return main.CONNECTION_LOST
if not bytes:
print 'Task %d finished' % self.task_num
return main.CONNECTION_DONE
else:
msg = 'Task %d: got %d bytes of poetry from %s'
print msg % (self.task_num, len(bytes), self.format_addr())
self.poem += bytes
def logPrefix(self):
return 'poetry'
def format_addr(self):
host, port = self.address
return '%s:%s' % (host or '127.0.0.1', port)
def poetry_main():
addresses = parse_args()
start = datetime.datetime.now()
sockets = [PoetrySocket(i + 1, addr) for i, addr in enumerate(addresses)]
from twisted.internet import reactor
reactor.run()
elapsed = datetime.datetime.now() - start
for i, sock in enumerate(sockets):
print 'Task %d: %d bytes of poetry' % (i + 1, len(sock.poem))
print 'Got %d poems in %s' % (len(addresses), elapsed)
if __name__ == '__main__':
poetry_main()
IReactorFDSet是一个Twisted的reactor实现的接口
epollreactor
def doPoll(self, timeout):
"""
Poll the poller for new events.
"""
if timeout is None:
timeout = -1 # Wait indefinitely.
try:
# Limit the number of events to the number of io objects we're
# currently tracking (because that's maybe a good heuristic) and
# the amount of time we block to the value specified by our
# caller.
l = self._poller.poll(timeout, len(self._selectables))
except IOError as err:
if err.errno == errno.EINTR:
return
# See epoll_wait(2) for documentation on the other conditions
# under which this can fail. They can only be due to a serious
# programming error on our part, so let's just announce them
# loudly.
raise
_drdw = self._doReadOrWrite
for fd, event in l:
try:
selectable = self._selectables[fd]
except KeyError:
pass
else:
log.callWithLogger(selectable, _drdw, selectable, fd, event)
run
回调函数的实现,通过传入类,重写基类的函数
有很多抽象类
有关socket的操作由Protocol类完成,抽象了底层,提供操作数据流的接口,即实现数据协议
# This is the Twisted Get Poetry Now! client, version 2.0.
# NOTE: This should not be used as the basis for production code.
import datetime, optparse
from twisted.internet.protocol import Protocol, ClientFactory
def parse_args():
usage = """usage: %prog [options] [hostname]:port ...
This is the Get Poetry Now! client, Twisted version 2.0.
Run it like this:
python get-poetry.py port1 port2 port3 ...
If you are in the base directory of the twisted-intro package,
you could run it like this:
python twisted-client-2/get-poetry.py 10001 10002 10003
to grab poetry from servers on ports 10001, 10002, and 10003.
Of course, there need to be servers listening on those ports
for that to work.
"""
parser = optparse.OptionParser(usage)
_, addresses = parser.parse_args()
if not addresses:
print parser.format_help()
parser.exit()
def parse_address(addr):
if ':' not in addr:
host = '127.0.0.1'
port = addr
else:
host, port = addr.split(':', 1)
if not port.isdigit():
parser.error('Ports must be integers.')
return host, int(port)
return map(parse_address, addresses)
class PoetryProtocol(Protocol):
poem = ''
task_num = 0
def dataReceived(self, data):
self.poem += data
msg = 'Task %d: got %d bytes of poetry from %s'
print msg % (self.task_num, len(data), self.transport.getPeer())
def connectionLost(self, reason):
self.poemReceived(self.poem)
def poemReceived(self, poem):
self.factory.poem_finished(self.task_num, poem)
class PoetryClientFactory(ClientFactory):
task_num = 1
protocol = PoetryProtocol # tell base class what proto to build
def __init__(self, poetry_count):
self.poetry_count = poetry_count
self.poems = {} # task num -> poem
def buildProtocol(self, address):
proto = ClientFactory.buildProtocol(self, address)
proto.task_num = self.task_num
self.task_num += 1
return proto
def poem_finished(self, task_num=None, poem=None):
if task_num is not None:
self.poems[task_num] = poem
self.poetry_count -= 1
if self.poetry_count == 0:
self.report()
from twisted.internet import reactor
reactor.stop()
def report(self):
for i in self.poems:
print 'Task %d: %d bytes of poetry' % (i, len(self.poems[i]))
def clientConnectionFailed(self, connector, reason):
print 'Failed to connect to:', connector.getDestination()
self.poem_finished()
def poetry_main():
addresses = parse_args()
start = datetime.datetime.now()
factory = PoetryClientFactory(len(addresses))
from twisted.internet import reactor
for address in addresses:
host, port = address
reactor.connectTCP(host, port, factory)
reactor.run()
elapsed = datetime.datetime.now() - start
print 'Got %d poems in %s' % (len(addresses), elapsed)
if __name__ == '__main__':
poetry_main()
加入Transports,Protocols
# This is the Twisted Get Poetry Now! client, version 2.0.
# NOTE: This should not be used as the basis for production code.
import datetime, optparse
from twisted.internet.protocol import Protocol, ClientFactory
def parse_args():
usage = """usage: %prog [options] [hostname]:port ...
This is the Get Poetry Now! client, Twisted version 2.0.
Run it like this:
python get-poetry.py port1 port2 port3 ...
If you are in the base directory of the twisted-intro package,
you could run it like this:
python twisted-client-2/get-poetry.py 10001 10002 10003
to grab poetry from servers on ports 10001, 10002, and 10003.
Of course, there need to be servers listening on those ports
for that to work.
"""
parser = optparse.OptionParser(usage)
_, addresses = parser.parse_args()
if not addresses:
print parser.format_help()
parser.exit()
def parse_address(addr):
if ':' not in addr:
host = '127.0.0.1'
port = addr
else:
host, port = addr.split(':', 1)
if not port.isdigit():
parser.error('Ports must be integers.')
return host, int(port)
return map(parse_address, addresses)
class PoetryProtocol(Protocol):处理接收的数据
poem = ''
task_num = 0
def dataReceived(self, data):
self.poem += data
msg = 'Task %d: got %d bytes of poetry from %s'
print msg % (self.task_num, len(data), self.transport.getPeer())
def connectionLost(self, reason):
self.poemReceived(self.poem)
def poemReceived(self, poem):
self.factory.poem_finished(self.task_num, poem)
class PoetryClientFactory(ClientFactory):
task_num = 1
protocol = PoetryProtocol # tell base class what proto to build
def __init__(self, poetry_count):
self.poetry_count = poetry_count
self.poems = {} # task num -> poem
def buildProtocol(self, address):
proto = ClientFactory.buildProtocol(self, address)
proto.task_num = self.task_num
self.task_num += 1
return proto
def poem_finished(self, task_num=None, poem=None):
if task_num is not None:
self.poems[task_num] = poem
self.poetry_count -= 1
if self.poetry_count == 0:
self.report()
from twisted.internet import reactor
reactor.stop()
def report(self):
for i in self.poems:
print 'Task %d: %d bytes of poetry' % (i, len(self.poems[i]))
def clientConnectionFailed(self, connector, reason):
print 'Failed to connect to:', connector.getDestination()
self.poem_finished()
def poetry_main():
addresses = parse_args()
start = datetime.datetime.now()
factory = PoetryClientFactory(len(addresses)) 创建factory
from twisted.internet import reactor
for address in addresses:
host, port = address
reactor.connectTCP(host, port, factory) 创建协议
reactor.run()
elapsed = datetime.datetime.now() - start
print 'Got %d poems in %s' % (len(addresses), elapsed)
if __name__ == '__main__':
poetry_main()
# This is the Twisted Get Poetry Now! client, version 3.0.
# NOTE: This should not be used as the basis for production code.
import optparse
from twisted.internet.protocol import Protocol, ClientFactory
def parse_args():
usage = """usage: %prog [options] [hostname]:port ...
This is the Get Poetry Now! client, Twisted version 3.0
Run it like this:
python get-poetry-1.py port1 port2 port3 ...
If you are in the base directory of the twisted-intro package,
you could run it like this:
python twisted-client-3/get-poetry-1.py 10001 10002 10003
to grab poetry from servers on ports 10001, 10002, and 10003.
Of course, there need to be servers listening on those ports
for that to work.
"""
parser = optparse.OptionParser(usage)
_, addresses = parser.parse_args()
if not addresses:
print parser.format_help()
parser.exit()
def parse_address(addr):
if ':' not in addr:
host = '127.0.0.1'
port = addr
else:
host, port = addr.split(':', 1)
if not port.isdigit():
parser.error('Ports must be integers.')
return host, int(port)
return map(parse_address, addresses)
class PoetryProtocol(Protocol):
poem = ''
def dataReceived(self, data):
self.poem += data
def connectionLost(self, reason):
self.poemReceived(self.poem)
def poemReceived(self, poem):
self.factory.poem_finished(poem)
class PoetryClientFactory(ClientFactory):
protocol = PoetryProtocol
def __init__(self, callback):
self.callback = callback
def poem_finished(self, poem):
self.callback(poem)
def get_poetry(host, port, callback):
"""
Download a poem from the given host and port and invoke
callback(poem)
when the poem is complete.
"""
from twisted.internet import reactor
factory = PoetryClientFactory(callback)
reactor.connectTCP(host, port, factory)
def poetry_main():
addresses = parse_args()
from twisted.internet import reactor
poems = []
def got_poem(poem):
poems.append(poem)
if len(poems) == len(addresses):
reactor.stop()
for address in addresses:
host, port = address
get_poetry(host, port, got_poem)
reactor.run()
for poem in poems:
print poem
if __name__ == '__main__':
poetry_main()
Deferred 管理回调函数
deferred.callback errback 不能被调用两次 from twisted.internet.defer import Deferred
def got_poem(res):
print 'Your poem is served:'
print res
def poem_failed(err):
print 'No poetry for you.'
d = Deferred()
# add a callback/errback pair to the chain
d.addCallbacks(got_poem, poem_failed)
# fire the chain with a normal result
d.callback('This poem is short.')
print "Finished"
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
def got_poem(res):
print 'Your poem is served:'
print res
def poem_failed(err):
print 'No poetry for you.'
d = Deferred()
# add a callback/errback pair to the chain
d.addCallbacks(got_poem, poem_failed)
# fire the chain with an error result
d.errback(Failure(Exception('I have failed.')))
print "Finished"
# This is the Twisted Get Poetry Now! client, version 4.0
import optparse, sys
from twisted.internet import defer
from twisted.internet.protocol import Protocol, ClientFactory
def parse_args():
usage = """usage: %prog [options] [hostname]:port ...
This is the Get Poetry Now! client, Twisted version 4.0
Run it like this:
python get-poetry.py port1 port2 port3 ...
If you are in the base directory of the twisted-intro package,
you could run it like this:
python twisted-client-4/get-poetry.py 10001 10002 10003
to grab poetry from servers on ports 10001, 10002, and 10003.
Of course, there need to be servers listening on those ports
for that to work.
"""
parser = optparse.OptionParser(usage)
_, addresses = parser.parse_args()
if not addresses:
print parser.format_help()
parser.exit()
def parse_address(addr):
if ':' not in addr:
host = '127.0.0.1'
port = addr
else:
host, port = addr.split(':', 1)
if not port.isdigit():
parser.error('Ports must be integers.')
return host, int(port)
return map(parse_address, addresses)
class PoetryProtocol(Protocol):
poem = ''
def dataReceived(self, data):
self.poem += data
def connectionLost(self, reason):
self.poemReceived(self.poem)
def poemReceived(self, poem):
self.factory.poem_finished(poem)
class PoetryClientFactory(ClientFactory):
protocol = PoetryProtocol
def __init__(self, deferred):
self.deferred = deferred
def poem_finished(self, poem):
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.callback(poem)
def clientConnectionFailed(self, connector, reason):
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.errback(reason)
def get_poetry(host, port):
"""
Download a poem from the given host and port. This function
returns a Deferred which will be fired with the complete text of
the poem or a Failure if the poem could not be downloaded.
"""
d = defer.Deferred()
from twisted.internet import reactor
factory = PoetryClientFactory(d)
reactor.connectTCP(host, port, factory)
return d
def poetry_main():
addresses = parse_args()
from twisted.internet import reactor
poems = []
errors = []
def got_poem(poem):
poems.append(poem)
def poem_failed(err):
print >>sys.stderr, 'Poem failed:', err
errors.append(err)
def poem_done(_):
if len(poems) + len(errors) == len(addresses):
reactor.stop()
for address in addresses:
host, port = address
d = get_poetry(host, port)
d.addCallbacks(got_poem, poem_failed)
d.addBoth(poem_done)
reactor.run()
for poem in poems:
print poem
if __name__ == '__main__':
poetry_main()
socket.setblocking(flag)
Set blocking or non-blocking mode of the socket: if flag is 0, the socket is set to non-blocking, else to blocking mode. Initially all sockets are in blocking mode. In non-blocking mode, if a recv() call doesn’t find any data, or if a send() call can’t immediately dispose of the data, a error exception is raised; in blocking mode, the calls block until they can proceed. s.setblocking(0) is equivalent to s.settimeout(0.0); s.setblocking(1) is equivalent to s.settimeout(None).
class ClientFactory(Factory):
"""A Protocol factory for clients.
This can be used together with the various connectXXX methods in
reactors.
"""
def startedConnecting(self, connector):
"""Called when a connection has been started.
You can call connector.stopConnecting() to stop the connection attempt.
@param connector: a Connector object.
"""
def clientConnectionFailed(self, connector, reason):
"""Called when a connection has failed to connect.
It may be useful to call connector.connect() - this will reconnect.
@type reason: L{twisted.python.failure.Failure}
"""
def clientConnectionLost(self, connector, reason):
"""Called when an established connection is lost.
It may be useful to call connector.connect() - this will reconnect.
@type reason: L{twisted.python.failure.Failure}
"""
@implementer(interfaces.IProtocolFactory, interfaces.ILoggingContext)
class Factory:
"""
This is a factory which produces protocols.
By default, buildProtocol will create a protocol of the class given in
self.protocol.
"""
# put a subclass of Protocol here:
protocol = None
numPorts = 0
noisy = True
@classmethod
def forProtocol(cls, protocol, *args, **kwargs):
"""
Create a factory for the given protocol.
It sets the C{protocol} attribute and returns the constructed factory
instance.
@param protocol: A L{Protocol} subclass
@param args: Positional arguments for the factory.
@param kwargs: Keyword arguments for the factory.
@return: A L{Factory} instance wired up to C{protocol}.
"""
factory = cls(*args, **kwargs)
factory.protocol = protocol
return factory
def logPrefix(self):
"""
Describe this factory for log messages.
"""
return self.__class__.__name__
def doStart(self):
"""Make sure startFactory is called.
Users should not call this function themselves!
"""
if not self.numPorts:
if self.noisy:
log.msg("Starting factory %r" % self)
self.startFactory()
self.numPorts = self.numPorts + 1
def doStop(self):
"""Make sure stopFactory is called.
Users should not call this function themselves!
"""
if self.numPorts == 0:
# this shouldn't happen, but does sometimes and this is better
# than blowing up in assert as we did previously.
return
self.numPorts = self.numPorts - 1
if not self.numPorts:
if self.noisy:
log.msg("Stopping factory %r" % self)
self.stopFactory()
def startFactory(self):
"""This will be called before I begin listening on a Port or Connector.
It will only be called once, even if the factory is connected
to multiple ports.
This can be used to perform 'unserialization' tasks that
are best put off until things are actually running, such
as connecting to a database, opening files, etcetera.
"""
def stopFactory(self):
"""This will be called before I stop listening on all Ports/Connectors.
This can be overridden to perform 'shutdown' tasks such as disconnecting
database connections, closing files, etc.
It will be called, for example, before an application shuts down,
if it was connected to a port. User code should not call this function
directly.
"""
def buildProtocol(self, addr):
"""
Create an instance of a subclass of Protocol.
The returned instance will handle input on an incoming server
connection, and an attribute "factory" pointing to the creating
factory.
Alternatively, C{None} may be returned to immediately close the
new connection.
Override this method to alter how Protocol instances get created.
@param addr: an object implementing L{twisted.internet.interfaces.IAddress}
"""
p = self.protocol()
p.factory = self
return p
###安装
sudo pip install lxml==3.1.2 --trusted-host pypi.python.org
sudo pip install Scrapy --trusted-host pypi.python.org