scrapy       

##scrapy

单进程,基于twisted(利用epoll或其他多路复用)

流程

settings = conf.settings
inproject = inside_project()
cmds = _get_commands_dict(settings, inproject)
cmdname = _pop_command_name(argv)
cmd = cmds[cmdname]
...
cmd.crawler_process = CrawlerProcess(settings)
cmd.run()

1.读取scrapy.cgf配置文件

[settings]
default = m_p.settings

[deploy]
#url = http://localhost:6800/
project = m_p

2.读取setting设置文件

BOT_NAME = 'm_p'

SPIDER_MODULES = ['m_p.spiders']
NEWSPIDER_MODULE = 'm_p.spiders'

ITEM_PIPELINES = {'m_p.pipelines.mysql_pipeline':2, 'm_p.pipelines.download_image_pipeline':1 }

SERVER = "localhost"
PORT = 3306
DB = "music"
COLLECTION = "music"
USER = 'root'
PASSWORD = ''
CHARSET = 'utf8'
IMAGES_STORE='/home/admin/nginx/html/yii/demos/music_web/images/cover/'

3.导入相应的module爬虫模块

4.解析命令行参数

5.设置爬虫程序入口

6.运行cmd

   def run(self, args, opts):
        if len(args) < 1:
            raise UsageError()
        elif len(args) > 1:
            raise UsageError("running 'scrapy crawl' with more than one spider is no longer supported")
        spname = args[0]

        self.crawler_process.crawl(spname, **opts.spargs)
        self.crawler_process.start()




	crawler.py  

	class CrawlerRunner(object):

    def crawl(self, spidercls, *args, **kwargs):
        crawler = self._create_crawler(spidercls)
        self._setup_crawler_logging(crawler)
        self.crawlers.add(crawler)
        d = crawler.crawl(*args, **kwargs)
        self._active.add(d)

        def _done(result):
            self.crawlers.discard(crawler)
            self._active.discard(d)
            return result

        return d.addBoth(_done)


	class Crawler(object):


    def crawl(self, *args, **kwargs):
        assert not self.crawling, "Crawling already taking place"
        self.crawling = True

        try:
            self.spider = self._create_spider(*args, **kwargs)
            self.engine = self._create_engine()
            start_requests = iter(self.spider.start_requests())
            yield self.engine.open_spider(self.spider, start_requests)  open engine's spider
            yield defer.maybeDeferred(self.engine.start)
        except Exception:
            self.crawling = False
            raise


	class ExecutionEngine(object):

    @defer.inlineCallbacks
    def open_spider(self, spider, start_requests=(), close_if_idle=True):
        assert self.has_capacity(), "No free spider slot when opening %r" % \
            spider.name
        log.msg("Spider opened", spider=spider)
        nextcall = CallLaterOnce(self._next_request, spider)
        scheduler = self.scheduler_cls.from_crawler(self.crawler)
        start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
        slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
        self.slot = slot
        self.spider = spider
        yield scheduler.open(spider)
        yield self.scraper.open_spider(spider)
        self.crawler.stats.open_spider(spider)
        yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
        slot.nextcall.schedule()


	class CrawlerProcess(CrawlerRunner):

    def start(self, stop_after_crawl=True):
        if stop_after_crawl:
            d = self.join()
            # Don't start the reactor if the deferreds are already fired
            if d.called:
                return
            d.addBoth(lambda _: self._stop_reactor())

        if self.settings.getbool('DNSCACHE_ENABLED'):
            reactor.installResolver(CachingThreadedResolver(reactor))

        reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
        reactor.run(installSignalHandlers=False)  # blocking call

用了Twisted->reactor

1.engine从spider获取要爬取的url

2.engine将url交给调度器

3.engine从调度器获取要爬取的url

_next_request_from_scheduler ->slot.scheduler.next_request()

   def _next_request_from_scheduler(self, spider):
        slot = self.slot
        request = slot.scheduler.next_request()
        if not request:
            return
        d = self._download(request, spider)
        d.addBoth(self._handle_downloader_output, request, spider)
        d.addErrback(log.msg, spider=spider)
        d.addBoth(lambda _: slot.remove_request(request))
        d.addErrback(log.msg, spider=spider)
        d.addBoth(lambda _: slot.nextcall.schedule())
        d.addErrback(log.msg, spider=spider)
        return d

4.engine将url通过下载中间件交给下载器

slot.add_request(request)

    dwld = self.downloader.fetch(request, spider)
    dwld.addCallbacks(_on_success)
    dwld.addBoth(_on_complete)

5.下载完毕后,下载器将response交给engine

6.engine将response通过spider中间件交给spider

7.spider处理response,将item和下一个请求url返回给engine

8.engine将item交给item pipeline,将url交给调度器

9.第2步

class CrawlerProcess(CrawlerRunner):
    """A class to run multiple scrapy crawlers in a process simultaneously"""

    def __init__(self, settings):
        super(CrawlerProcess, self).__init__(settings)
        install_shutdown_handlers(self._signal_shutdown)
        self.stopping = False
        self.log_observer = log.start_from_settings(self.settings)
        log.scrapy_info(settings)

    def _signal_shutdown(self, signum, _):
        install_shutdown_handlers(self._signal_kill)
        signame = signal_names[signum]
        log.msg(format="Received %(signame)s, shutting down gracefully. Send again to force ",
                level=log.INFO, signame=signame)
        reactor.callFromThread(self.stop)

    def _signal_kill(self, signum, _):
        install_shutdown_handlers(signal.SIG_IGN)
        signame = signal_names[signum]
        log.msg(format='Received %(signame)s twice, forcing unclean shutdown',
                level=log.INFO, signame=signame)
        self._stop_logging()
        reactor.callFromThread(self._stop_reactor)

    def start(self, stop_after_crawl=True):
        if stop_after_crawl:
            d = self.join()
            # Don't start the reactor if the deferreds are already fired
            if d.called:
                return
            d.addBoth(lambda _: self._stop_reactor())

        if self.settings.getbool('DNSCACHE_ENABLED'):
            reactor.installResolver(CachingThreadedResolver(reactor))

        reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
        reactor.run(installSignalHandlers=False)  # blocking call

    def _stop_logging(self):
        if self.log_observer:
            self.log_observer.stop()

    def _stop_reactor(self, _=None):
        try:
            reactor.stop()
        except RuntimeError:  # raised if already stopped or in shutdown stage
            pass



class CrawlerRunner(object):

    def __init__(self, settings):
        self.settings = settings
        smcls = load_object(settings['SPIDER_MANAGER_CLASS'])
        self.spiders = smcls.from_settings(settings.frozencopy())
        self.crawlers = set()
        self._active = set()

    def crawl(self, spidercls, *args, **kwargs):
        crawler = self._create_crawler(spidercls)
        self._setup_crawler_logging(crawler)
        self.crawlers.add(crawler)
        d = crawler.crawl(*args, **kwargs)
        self._active.add(d)

        def _done(result):
            self.crawlers.discard(crawler)
            self._active.discard(d)
            return result

        return d.addBoth(_done)

    def _create_crawler(self, spidercls):
        if isinstance(spidercls, six.string_types):
            spidercls = self.spiders.load(spidercls)

        crawler_settings = self.settings.copy()
        spidercls.update_settings(crawler_settings)
        crawler_settings.freeze()
        return Crawler(spidercls, crawler_settings)

    def _setup_crawler_logging(self, crawler):
        log_observer = log.start_from_crawler(crawler)
        if log_observer:
            crawler.signals.connect(log_observer.stop, signals.engine_stopped)

    def stop(self):
        return defer.DeferredList([c.stop() for c in list(self.crawlers)])

    @defer.inlineCallbacks
    def join(self):
        """Wait for all managed crawlers to complete"""
        while self._active:
            yield defer.DeferredList(self._active)