##scrapy
单进程,基于twisted(利用epoll或其他多路复用)
流程
settings = conf.settings
inproject = inside_project()
cmds = _get_commands_dict(settings, inproject)
cmdname = _pop_command_name(argv)
cmd = cmds[cmdname]
...
cmd.crawler_process = CrawlerProcess(settings)
cmd.run()
1.读取scrapy.cgf配置文件
[settings]
default = m_p.settings
[deploy]
#url = http://localhost:6800/
project = m_p
2.读取setting设置文件
BOT_NAME = 'm_p'
SPIDER_MODULES = ['m_p.spiders']
NEWSPIDER_MODULE = 'm_p.spiders'
ITEM_PIPELINES = {'m_p.pipelines.mysql_pipeline':2, 'm_p.pipelines.download_image_pipeline':1 }
SERVER = "localhost"
PORT = 3306
DB = "music"
COLLECTION = "music"
USER = 'root'
PASSWORD = ''
CHARSET = 'utf8'
IMAGES_STORE='/home/admin/nginx/html/yii/demos/music_web/images/cover/'
3.导入相应的module爬虫模块
4.解析命令行参数
5.设置爬虫程序入口
6.运行cmd
def run(self, args, opts):
if len(args) < 1:
raise UsageError()
elif len(args) > 1:
raise UsageError("running 'scrapy crawl' with more than one spider is no longer supported")
spname = args[0]
self.crawler_process.crawl(spname, **opts.spargs)
self.crawler_process.start()
crawler.py
class CrawlerRunner(object):
def crawl(self, spidercls, *args, **kwargs):
crawler = self._create_crawler(spidercls)
self._setup_crawler_logging(crawler)
self.crawlers.add(crawler)
d = crawler.crawl(*args, **kwargs)
self._active.add(d)
def _done(result):
self.crawlers.discard(crawler)
self._active.discard(d)
return result
return d.addBoth(_done)
class Crawler(object):
def crawl(self, *args, **kwargs):
assert not self.crawling, "Crawling already taking place"
self.crawling = True
try:
self.spider = self._create_spider(*args, **kwargs)
self.engine = self._create_engine()
start_requests = iter(self.spider.start_requests())
yield self.engine.open_spider(self.spider, start_requests) open engine's spider
yield defer.maybeDeferred(self.engine.start)
except Exception:
self.crawling = False
raise
class ExecutionEngine(object):
@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True):
assert self.has_capacity(), "No free spider slot when opening %r" % \
spider.name
log.msg("Spider opened", spider=spider)
nextcall = CallLaterOnce(self._next_request, spider)
scheduler = self.scheduler_cls.from_crawler(self.crawler)
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.slot = slot
self.spider = spider
yield scheduler.open(spider)
yield self.scraper.open_spider(spider)
self.crawler.stats.open_spider(spider)
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
slot.nextcall.schedule()
class CrawlerProcess(CrawlerRunner):
def start(self, stop_after_crawl=True):
if stop_after_crawl:
d = self.join()
# Don't start the reactor if the deferreds are already fired
if d.called:
return
d.addBoth(lambda _: self._stop_reactor())
if self.settings.getbool('DNSCACHE_ENABLED'):
reactor.installResolver(CachingThreadedResolver(reactor))
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
reactor.run(installSignalHandlers=False) # blocking call
用了Twisted->reactor
1.engine从spider获取要爬取的url
2.engine将url交给调度器
3.engine从调度器获取要爬取的url
_next_request_from_scheduler ->slot.scheduler.next_request()
def _next_request_from_scheduler(self, spider):
slot = self.slot
request = slot.scheduler.next_request()
if not request:
return
d = self._download(request, spider)
d.addBoth(self._handle_downloader_output, request, spider)
d.addErrback(log.msg, spider=spider)
d.addBoth(lambda _: slot.remove_request(request))
d.addErrback(log.msg, spider=spider)
d.addBoth(lambda _: slot.nextcall.schedule())
d.addErrback(log.msg, spider=spider)
return d
4.engine将url通过下载中间件交给下载器
slot.add_request(request)
dwld = self.downloader.fetch(request, spider)
dwld.addCallbacks(_on_success)
dwld.addBoth(_on_complete)
5.下载完毕后,下载器将response交给engine
6.engine将response通过spider中间件交给spider
7.spider处理response,将item和下一个请求url返回给engine
8.engine将item交给item pipeline,将url交给调度器
9.第2步
class CrawlerProcess(CrawlerRunner):
"""A class to run multiple scrapy crawlers in a process simultaneously"""
def __init__(self, settings):
super(CrawlerProcess, self).__init__(settings)
install_shutdown_handlers(self._signal_shutdown)
self.stopping = False
self.log_observer = log.start_from_settings(self.settings)
log.scrapy_info(settings)
def _signal_shutdown(self, signum, _):
install_shutdown_handlers(self._signal_kill)
signame = signal_names[signum]
log.msg(format="Received %(signame)s, shutting down gracefully. Send again to force ",
level=log.INFO, signame=signame)
reactor.callFromThread(self.stop)
def _signal_kill(self, signum, _):
install_shutdown_handlers(signal.SIG_IGN)
signame = signal_names[signum]
log.msg(format='Received %(signame)s twice, forcing unclean shutdown',
level=log.INFO, signame=signame)
self._stop_logging()
reactor.callFromThread(self._stop_reactor)
def start(self, stop_after_crawl=True):
if stop_after_crawl:
d = self.join()
# Don't start the reactor if the deferreds are already fired
if d.called:
return
d.addBoth(lambda _: self._stop_reactor())
if self.settings.getbool('DNSCACHE_ENABLED'):
reactor.installResolver(CachingThreadedResolver(reactor))
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
reactor.run(installSignalHandlers=False) # blocking call
def _stop_logging(self):
if self.log_observer:
self.log_observer.stop()
def _stop_reactor(self, _=None):
try:
reactor.stop()
except RuntimeError: # raised if already stopped or in shutdown stage
pass
class CrawlerRunner(object):
def __init__(self, settings):
self.settings = settings
smcls = load_object(settings['SPIDER_MANAGER_CLASS'])
self.spiders = smcls.from_settings(settings.frozencopy())
self.crawlers = set()
self._active = set()
def crawl(self, spidercls, *args, **kwargs):
crawler = self._create_crawler(spidercls)
self._setup_crawler_logging(crawler)
self.crawlers.add(crawler)
d = crawler.crawl(*args, **kwargs)
self._active.add(d)
def _done(result):
self.crawlers.discard(crawler)
self._active.discard(d)
return result
return d.addBoth(_done)
def _create_crawler(self, spidercls):
if isinstance(spidercls, six.string_types):
spidercls = self.spiders.load(spidercls)
crawler_settings = self.settings.copy()
spidercls.update_settings(crawler_settings)
crawler_settings.freeze()
return Crawler(spidercls, crawler_settings)
def _setup_crawler_logging(self, crawler):
log_observer = log.start_from_crawler(crawler)
if log_observer:
crawler.signals.connect(log_observer.stop, signals.engine_stopped)
def stop(self):
return defer.DeferredList([c.stop() for c in list(self.crawlers)])
@defer.inlineCallbacks
def join(self):
"""Wait for all managed crawlers to complete"""
while self._active:
yield defer.DeferredList(self._active)