在网站后台备案号怎么改,百度推广官网全国开户:sk67666,做图用哪个素材网站,网页翻译怎么设置Scrapy是一个比较好用的Python爬虫框架#xff0c;你只需要编写几个组件就可以实现网页数据的爬取。但是当我们要爬取的页面非常多的时候#xff0c;单个主机的处理能力就不能满足我们的需求了#xff08;无论是处理速度还是网络请求的并发数#xff09;#xff0c;这时候… Scrapy是一个比较好用的Python爬虫框架你只需要编写几个组件就可以实现网页数据的爬取。但是当我们要爬取的页面非常多的时候单个主机的处理能力就不能满足我们的需求了无论是处理速度还是网络请求的并发数这时候分布式爬虫的优势就显现出来。 而Scrapy-Redis则是一个基于Redis的Scrapy分布式组件。它利用Redis对用于爬取的请求(Requests)进行存储和调度(Schedule)并对爬取产生的项目(items)存储以供后续处理使用。scrapy-redi重写了scrapy一些比较关键的代码将scrapy变成一个可以在多个主机上同时运行的分布式爬虫。 原生的Scrapy的架构是这样子的 加上了Scrapy-Redis之后的架构变成了 scrapy-redis的官方文档写的比较简洁没有提及其运行原理所以如果想全面的理解分布式爬虫的运行原理还是得看scrapy-redis的源代码才行不过scrapy-redis的源代码很少也比较好懂很快就能看完。 scrapy-redis工程的主体还是是redis和scrapy两个库工程本身实现的东西不是很多这个工程就像胶水一样把这两个插件粘结了起来。 scrapy-redis提供了哪些组件 scrapy-redis所实现的两种分布式爬虫分布式以及item处理分布式。分别是由模块scheduler和模块pipelines实现。 connection.py 负责根据setting中配置实例化redis连接。被dupefilter和scheduler调用总之涉及到redis存取的都要使用到这个模块。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 import redis import six from scrapy.utils.misc import load_object DEFAULT_REDIS_CLS redis.StrictRedis # Sane connection defaults. DEFAULT_PARAMS { socket_timeout: 30, socket_connect_timeout: 30, retry_on_timeout: True, } # Shortcut maps setting name - parmater name. SETTINGS_PARAMS_MAP { REDIS_URL: url, REDIS_HOST: host, REDIS_PORT: port, } def get_redis_from_settings(settings): Returns a redis client instance from given Scrapy settings object. This function uses get_client to instantiate the client and uses DEFAULT_PARAMS global as defaults values for the parameters. You can override them using the REDIS_PARAMS setting. Parameters ---------- settings : Settings A scrapy settings object. See the supported settings below. Returns ------- server Redis client instance. Other Parameters ---------------- REDIS_URL : str, optional Server connection URL. REDIS_HOST : str, optional Server host. REDIS_PORT : str, optional Server port. REDIS_PARAMS : dict, optional Additional client parameters. params DEFAULT_PARAMS.copy() params.update(settings.getdict(REDIS_PARAMS)) # XXX: Deprecate REDIS_* settings. for source, dest in SETTINGS_PARAMS_MAP.items(): val settings.get(source) if val: params[dest] val # Allow redis_cls to be a path to a class. if isinstance(params.get(redis_cls), six.string_types): params[redis_cls] load_object(params[redis_cls]) return get_redis(**params) # Backwards compatible alias. from_settings get_redis_from_settings def get_redis(**kwargs): Returns a redis client instance. Parameters ---------- redis_cls : class, optional Defaults to redis.StrictRedis. url : str, optional If given, redis_cls.from_url is used to instantiate the class. **kwargs Extra parameters to be passed to the redis_cls class. Returns ------- server Redis client instance. redis_cls kwargs.pop(redis_cls, DEFAULT_REDIS_CLS) url kwargs.pop(url, None) if url: return redis_cls.from_url(url, **kwargs) else: return redis_cls(**kwargs) connect文件引入了redis模块这个是redis-python库的接口用于通过python访问redis数据库可见这个文件主要是实现连接redis数据库的功能返回的是redis库的Redis对象或者StrictRedis对象这俩都是可以直接用来进行数据操作的对象。这些连接接口在其他文件中经常被用到。其中我们可以看到要想连接到redis数据库和其他数据库差不多需要一个ip地址、端口号、用户名密码可选和一个整形的数据库编号同时我们还可以在scrapy工程的setting文件中配置套接字的超时时间、等待时间等。 dupefilter.py 负责执行requst的去重实现的很有技巧性使用redis的set数据结构。但是注意scheduler并不使用其中用于在这个模块中实现的dupefilter键做request的调度而是使用queue.py模块中实现的queue。当request不重复时将其存入到queue中调度时将其弹出。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 import logging import time from scrapy.dupefilters import BaseDupeFilter from scrapy.utils.request import request_fingerprint from .connection import get_redis_from_settings DEFAULT_DUPEFILTER_KEY dupefilter:%(timestamp)s logger logging.getLogger(__name__) # TODO: Rename class to RedisDupeFilter. class RFPDupeFilter(BaseDupeFilter): Redis-based request duplicates filter. This class can also be used with default Scrapys scheduler. logger logger def __init__(self, server, key, debugFalse): Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. self.server server self.key key self.debug debug self.logdupes True classmethod def from_settings(cls, settings): Returns an instance from given settings. This uses by default the key dupefilter:timestamp. When using the scrapy_redis.scheduler.Scheduler class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. server get_redis_from_settings(settings) # XXX: This creates one-time key. needed to support to use this # class as standalone dupefilter with scrapys default scheduler # if scrapy passes spider on open() method this wouldnt be needed # TODO: Use SCRAPY_JOB env as default and fallback to timestamp. key DEFAULT_DUPEFILTER_KEY % {timestamp: int(time.time())} debug settings.getbool(DUPEFILTER_DEBUG) return cls(server, keykey, debugdebug) classmethod def from_crawler(cls, crawler): Returns instance from crawler. Parameters ---------- crawler : scrapy.crawler.Crawler Returns ------- RFPDupeFilter Instance of RFPDupeFilter. return cls.from_settings(crawler.settings) def request_seen(self, request): Returns True if request was already seen. Parameters ---------- request : scrapy.http.Request Returns ------- bool fp self.request_fingerprint(request) # This returns the number of values added, zero if already exists. added self.server.sadd(self.key, fp) return added 0 def request_fingerprint(self, request): Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str return request_fingerprint(request) def close(self, reason): Delete data on close. Called by Scrapys scheduler. Parameters ---------- reason : str, optional self.clear() def clear(self): Clears fingerprints data. self.server.delete(self.key) def log(self, request, spider): Logs given request. Parameters ---------- request : scrapy.http.Request spider : scrapy.spiders.Spider if self.debug: msg Filtered duplicate request: %(request)s self.logger.debug(msg, {request: request}, extra{spider: spider}) elif self.logdupes: msg (Filtered duplicate request %(request)s - no more duplicates will be shown (see DUPEFILTER_DEBUG to show all duplicates)) msg Filtered duplicate request: %(request)s self.logger.debug(msg, {request: request}, extra{spider: spider}) self.logdupes False 这个文件看起来比较复杂重写了scrapy本身已经实现的request判重功能。因为本身scrapy单机跑的话只需要读取内存中的request队列或者持久化的request队列scrapy默认的持久化似乎是json格式的文件不是数据库就能判断这次要发出的request url是否已经请求过或者正在调度本地读就行了。而分布式跑的话就需要各个主机上的scheduler都连接同一个数据库的同一个request池来判断这次的请求是否是重复的了。 在这个文件中通过继承BaseDupeFilter重写他的方法实现了基于redis的判重。根据源代码来看scrapy-redis使用了scrapy本身的一个fingerprint接request_fingerprint这个接口很有趣根据scrapy文档所说他通过hash来判断两个url是否相同相同的url会生成相同的hash结果但是当两个url的地址相同get型参数相同但是顺序不同时也会生成相同的hash结果这个真的比较神奇。。。所以scrapy-redis依旧使用url的fingerprint来判断request请求是否已经出现过。这个类通过连接redis使用一个key来向redis的一个set中插入fingerprint这个key对于同一种spider是相同的redis是一个key-value的数据库如果key是相同的访问到的值就是相同的这里使用spider名字DupeFilter的key就是为了在不同主机上的不同爬虫实例只要属于同一种spider就会访问到同一个set而这个set就是他们的url判重池如果返回值为0说明该set中该fingerprint已经存在因为集合是没有重复值的则返回False如果返回值为1说明添加了一个fingerprint到set中则说明这个request没有重复于是返回True还顺便把新fingerprint加入到数据库中了。 DupeFilter判重会在scheduler类中用到每一个request在进入调度之前都要进行判重如果重复就不需要参加调度直接舍弃就好了不然就是白白浪费资源。 queue.py 其作用如dupefilter.py所述但是这里实现了三种方式的queueFIFO的SpiderQueueSpiderPriorityQueue以及LIFI的SpiderStack。默认使用的是第二种这也就是出现之前文章中所分析情况的原因链接。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 from scrapy.utils.reqser import request_to_dict, request_from_dict from . import picklecompat class Base(object): Per-spider queue/stack base class def __init__(self, server, spider, key, serializerNone): Initialize per-spider redis queue. Parameters: server -- redis connection spider -- spider instance key -- key for this queue (e.g. %(spider)s:queue) if serializer is None: # Backward compatibility. # TODO: deprecate pickle. serializer picklecompat if not hasattr(serializer, loads): raise TypeError(serializer does not implement loads function: %r % serializer) if not hasattr(serializer, dumps): raise TypeError(serializer %s does not implement dumps function: %r % serializer) self.server server self.spider spider self.key key % {spider: spider.name} self.serializer serializer def _encode_request(self, request): Encode a request object obj request_to_dict(request, self.spider) return self.serializer.dumps(obj) def _decode_request(self, encoded_request): Decode an request previously encoded obj self.serializer.loads(encoded_request) return request_from_dict(obj, self.spider) def __len__(self): Return the length of the queue raise NotImplementedError def push(self, request): Push a request raise NotImplementedError def pop(self, timeout0): Pop a request raise NotImplementedError def clear(self): Clear queue/stack self.server.delete(self.key) class SpiderQueue(Base): Per-spider FIFO queue def __len__(self): Return the length of the queue return self.server.llen(self.key) def push(self, request): Push a request self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout0): Pop a request if timeout 0: data self.server.brpop(self.key, timeout) if isinstance(data, tuple): data data[1] else: data self.server.rpop(self.key) if data: return self._decode_request(data) class SpiderPriorityQueue(Base): Per-spider priority queue abstraction using redis sorted set def __len__(self): Return the length of the queue return self.server.zcard(self.key) def push(self, request): Push a request data self._encode_request(request) score -request.priority # We dont use zadd method as the order of arguments change depending on # whether the class is Redis or StrictRedis, and the option of using # kwargs only accepts strings, not bytes. self.server.execute_command(ZADD, self.key, score, data) def pop(self, timeout0): Pop a request timeout not support in this queue class # use atomic range/remove using multi/exec pipe self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count pipe.execute() if results: return self._decode_request(results[0]) class SpiderStack(Base): Per-spider stack def __len__(self): Return the length of the stack return self.server.llen(self.key) def push(self, request): Push a request self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout0): Pop a request if timeout 0: data self.server.blpop(self.key, timeout) if isinstance(data, tuple): data data[1] else: data self.server.lpop(self.key) if data: return self._decode_request(data) __all__ [SpiderQueue, SpiderPriorityQueue, SpiderStack] 该文件实现了几个容器类可以看这些容器和redis交互频繁同时使用了我们上边picklecompat中定义的serializer。这个文件实现的几个容器大体相同只不过一个是队列一个是栈一个是优先级队列这三个容器到时候会被scheduler对象实例化来实现request的调度。比如我们使用SpiderQueue最为调度队列的类型到时候request的调度方法就是先进先出而实用SpiderStack就是先进后出了。 我们可以仔细看看SpiderQueue的实现他的push函数就和其他容器的一样只不过push进去的request请求先被scrapy的接口request_to_dict变成了一个dict对象因为request对象实在是比较复杂有方法有属性不好串行化之后使用picklecompat中的serializer串行化为字符串然后使用一个特定的key存入redis中该key在同一种spider中是相同的。而调用pop时其实就是从redis用那个特定的key去读其值一个list从list中读取最早进去的那个于是就先进先出了。 这些容器类都会作为scheduler调度request的容器scheduler在每个主机上都会实例化一个并且和spider一一对应所以分布式运行时会有一个spider的多个实例和一个scheduler的多个实例存在于不同的主机上但是因为scheduler都是用相同的容器而这些容器都连接同一个redis服务器又都使用spider名加queue来作为key读写数据所以不同主机上的不同爬虫实例公用一个request调度池实现了分布式爬虫之间的统一调度。 picklecompat.py 1 2 3 4 5 6 7 8 9 A pickle wrapper module with protocol-1 by default. try: import cPickle as pickle # PY2 except ImportError: import pickle def loads(s): return pickle.loads(s) def dumps(obj): return pickle.dumps(obj, protocol-1) 这里实现了loads和dumps两个函数其实就是实现了一个serializer因为redis数据库不能存储复杂对象value部分只能是字符串字符串列表字符串集合和hashkey部分只能是字符串所以我们存啥都要先串行化成文本才行。这里使用的就是python的pickle模块一个兼容py2和py3的串行化工具。这个serializer主要用于一会的scheduler存reuqest对象至于为什么不实用json格式我也不是很懂item pipeline的串行化默认用的就是json。 pipelines.py 这是是用来实现分布式处理的作用。它将Item存储在redis中以实现分布式处理。另外可以发现同样是编写pipelines在这里的编码实现不同于文章中所分析的情况由于在这里需要读取配置所以就用到了from_crawler()函数。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from scrapy.utils.misc import load_object from scrapy.utils.serialize import ScrapyJSONEncoder from twisted.internet.threads import deferToThread from . import connection default_serialize ScrapyJSONEncoder().encode class RedisPipeline(object): Pushes serialized item into a redis list/queue def __init__(self, server, key%(spider)s:items, serialize_funcdefault_serialize): self.server server self.key key self.serialize serialize_func classmethod def from_settings(cls, settings): params { server: connection.from_settings(settings), } if settings.get(REDIS_ITEMS_KEY): params[key] settings[REDIS_ITEMS_KEY] if settings.get(REDIS_ITEMS_SERIALIZER): params[serialize_func] load_object( settings[REDIS_ITEMS_SERIALIZER] ) return cls(**params) classmethod def from_crawler(cls, crawler): return cls.from_settings(crawler.settings) def process_item(self, item, spider): return deferToThread(self._process_item, item, spider) def _process_item(self, item, spider): key self.item_key(item, spider) data self.serialize(item) self.server.rpush(key, data) return item def item_key(self, item, spider): Returns redis key based on given spider. Override this function to use a different key depending on the item and/or spider. return self.key % {spider: spider.name} pipeline文件实现了一个item pipieline类和scrapy的item pipeline是同一个对象通过从settings中拿到我们配置的REDIS_ITEMS_KEY作为key把item串行化之后存入redis数据库对应的value中这个value可以看出出是个list我们的每个item是这个list中的一个结点这个pipeline把提取出的item存起来主要是为了方便我们延后处理数据。 scheduler.py 此扩展是对scrapy中自带的scheduler的替代在settings的SCHEDULER变量中指出正是利用此扩展实现crawler的分布式调度。其利用的数据结构来自于queue中实现的数据结构。 scrapy-redis所实现的两种分布式爬虫分布式以及item处理分布式就是由模块scheduler和模块pipelines实现。上述其它模块作为为二者辅助的功能模块。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 import importlib import six from scrapy.utils.misc import load_object from . import connection # TODO: add SCRAPY_JOB support. class Scheduler(object): Redis-based scheduler def __init__(self, server, persistFalse, flush_on_startFalse, queue_key%(spider)s:requests, queue_clsscrapy_redis.queue.SpiderPriorityQueue, dupefilter_key%(spider)s:dupefilter, dupefilter_clsscrapy_redis.dupefilter.RFPDupeFilter, idle_before_close0, serializerNone): Initialize scheduler. Parameters ---------- server : Redis The redis server instance. persist : bool Whether to flush requests when closing. Default is False. flush_on_start : bool Whether to flush requests on start. Default is False. queue_key : str Requests queue key. queue_cls : str Importable path to the queue class. dupefilter_key : str Duplicates filter key. dupefilter_cls : str Importable path to the dupefilter class. idle_before_close : int Timeout before giving up. if idle_before_close 0: raise TypeError(idle_before_close cannot be negative) self.server server self.persist persist self.flush_on_start flush_on_start self.queue_key queue_key self.queue_cls queue_cls self.dupefilter_cls dupefilter_cls self.dupefilter_key dupefilter_key self.idle_before_close idle_before_close self.serializer serializer self.stats None def __len__(self): return len(self.queue) classmethod def from_settings(cls, settings): kwargs { persist: settings.getbool(SCHEDULER_PERSIST), flush_on_start: settings.getbool(SCHEDULER_FLUSH_ON_START), idle_before_close: settings.getint(SCHEDULER_IDLE_BEFORE_CLOSE), } # If these values are missing, it means we want to use the defaults. optional { # TODO: Use custom prefixes for this settings to note that are # specific to scrapy-redis. queue_key: SCHEDULER_QUEUE_KEY, queue_cls: SCHEDULER_QUEUE_CLASS, dupefilter_key: SCHEDULER_DUPEFILTER_KEY, # We use the default setting name to keep compatibility. dupefilter_cls: DUPEFILTER_CLASS, serializer: SCHEDULER_SERIALIZER, } for name, setting_name in optional.items(): val settings.get(setting_name) if val: kwargs[name] val # Support serializer as a path to a module. if isinstance(kwargs.get(serializer), six.string_types): kwargs[serializer] importlib.import_module(kwargs[serializer]) server connection.from_settings(settings) # Ensure the connection is working. server.ping() return cls(serverserver, **kwargs) classmethod def from_crawler(cls, crawler): instance cls.from_settings(crawler.settings) # FIXME: for now, stats are only supported from this constructor instance.stats crawler.stats return instance def open(self, spider): self.spider spider try: self.queue load_object(self.queue_cls)( serverself.server, spiderspider, keyself.queue_key % {spider: spider.name}, serializerself.serializer, ) except TypeError as e: raise ValueError(Failed to instantiate queue class %s: %s, self.queue_cls, e) try: self.df load_object(self.dupefilter_cls)( serverself.server, keyself.dupefilter_key % {spider: spider.name}, debugspider.settings.getbool(DUPEFILTER_DEBUG), ) except TypeError as e: raise ValueError(Failed to instantiate dupefilter class %s: %s, self.dupefilter_cls, e) if self.flush_on_start: self.flush() # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log(Resuming crawl (%d requests scheduled) % len(self.queue)) def close(self, reason): if not self.persist: self.flush() def flush(self): self.df.clear() self.queue.clear() def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value(scheduler/enqueued/redis, spiderself.spider) self.queue.push(request) return True def next_request(self): block_pop_timeout self.idle_before_close request self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value(scheduler/dequeued/redis, spiderself.spider) return request def has_pending_requests(self): return len(self) 0 这个文件重写了scheduler类用来代替scrapy.core.scheduler的原有调度器。其实对原有调度器的逻辑没有很大的改变主要是使用了redis作为数据存储的媒介以达到各个爬虫之间的统一调度。 scheduler负责调度各个spider的request请求scheduler初始化时通过settings文件读取queue和dupefilters的类型一般就用上边默认的配置queue和dupefilters使用的key一般就是spider name加上queue或者dupefilters这样对于同一种spider的不同实例就会使用相同的数据块了。每当一个request要被调度时enqueue_request被调用scheduler使用dupefilters来判断这个url是否重复如果不重复就添加到queue的容器中先进先出先进后出和优先级都可以可以在settings中配置。当调度完成时next_request被调用scheduler就通过queue容器的接口取出一个request把他发送给相应的spider让spider进行爬取工作。 spider.py 设计的这个spider从redis中读取要爬的url然后执行爬取若爬取过程中返回更多的url那么继续进行直至所有的request完成。之后继续从redis中读取url循环这个过程。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 from scrapy import signals from scrapy.exceptions import DontCloseSpider from scrapy.spiders import Spider, CrawlSpider from . import connection class RedisMixin(object): Mixin class to implement reading urls from a redis queue. redis_key None # If empty, uses default spider:start_urls. # Fetch this amount of start urls when idle. redis_batch_size 100 # Redis client instance. server None def start_requests(self): Returns a batch of start requests from redis. return self.next_requests() def setup_redis(self, crawlerNone): Setup redis connection and idle signal. This should be called after the spider has set its crawler object. if self.server is not None: return if crawler is None: # We allow optional crawler argument to keep backwrads # compatibility. # XXX: Raise a deprecation warning. assert self.crawler, crawler not set crawler self.crawler if not self.redis_key: self.redis_key %s:start_urls % self.name self.log(Reading URLs from redis key %s % self.redis_key) self.redis_batch_size self.settings.getint( REDIS_START_URLS_BATCH_SIZE, self.redis_batch_size, ) self.server connection.from_settings(crawler.settings) # The idle signal is called when the spider has no requests left, # thats when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signalsignals.spider_idle) def next_requests(self): Returns a request to be scheduled or none. use_set self.settings.getbool(REDIS_START_URLS_AS_SET) fetch_one self.server.spop if use_set else self.server.lpop # XXX: Do we need to use a timeout here? found 0 while found self.redis_batch_size: data fetch_one(self.redis_key) if not data: # Queue empty. break yield self.make_request_from_data(data) found 1 if found: self.logger.debug(Read %s requests from %s, found, self.redis_key) def make_request_from_data(self, data): # By default, data is an URL. if :// in data: return self.make_requests_from_url(data) else: self.logger.error(Unexpected URL from %s: %r, self.redis_key, data) def schedule_next_requests(self): Schedules a request if available for req in self.next_requests(): self.crawler.engine.crawl(req, spiderself) def spider_idle(self): Schedules a request if available, otherwise waits. # XXX: Handle a sentinel to close the spider. self.schedule_next_requests() raise DontCloseSpider class RedisSpider(RedisMixin, Spider): Spider that reads urls from redis queue when idle. classmethod def from_crawler(self, crawler): obj super(RedisSpider, self).from_crawler(crawler) obj.setup_redis(crawler) return obj class RedisCrawlSpider(RedisMixin, CrawlSpider): Spider that reads urls from redis queue when idle. classmethod def from_crawler(self, crawler): obj super(RedisCrawlSpider, self).from_crawler(crawler) obj.setup_redis(crawler) return obj spider的改动也不是很大主要是通过connect接口给spider绑定了spider_idle信号spider初始化时通过setup_redis函数初始化好和redis的连接之后通过next_requests函数从redis中取出strat url使用的key是settings中REDIS_START_URLS_AS_SET定义的注意了这里的初始化url池和我们上边的queue的url池不是一个东西queue的池是用于调度的初始化url池是存放入口url的他们都存在redis中但是使用不同的key来区分就当成是不同的表吧spider使用少量的start url可以发展出很多新的url这些url会进入scheduler进行判重和调度。直到spider跑到调度池内没有url的时候会触发spider_idle信号从而触发spider的next_requests函数再次从redis的start url池中读取一些url。 组件之间的关系 最后总结一下scrapy-redis的总体思路这个工程通过重写scheduler和spider类实现了调度、spider启动和redis的交互。实现新的dupefilter和queue类达到了判重和调度容器和redis的交互因为每个主机上的爬虫进程都访问同一个redis数据库所以调度和判重都统一进行统一管理达到了分布式爬虫的目的。 当spider被初始化时同时会初始化一个对应的scheduler对象这个调度器对象通过读取settings配置好自己的调度容器queue和判重工具dupefilter。每当一个spider产出一个request的时候scrapy内核会把这个reuqest递交给这个spider对应的scheduler对象进行调度scheduler对象通过访问redis对request进行判重如果不重复就把他添加进redis中的调度池。当调度条件满足时scheduler对象就从redis的调度池中取出一个request发送给spider让他爬取。当spider爬取的所有暂时可用url之后scheduler发现这个spider对应的redis的调度池空了于是触发信号spider_idlespider收到这个信号之后直接连接redis读取strart url池拿去新的一批url入口然后再次重复上边的工作。 为什么要提供这些组件 我们先从scrapy的“待爬队列”和“Scheduler”入手玩过爬虫的同学都多多少少有些了解在爬虫爬取过程当中有一个主要的数据结构是“待爬队列”以及能够操作这个队列的调度器也就是Scheduler。scrapy官方文档对这二者的描述不多基本上没提。 scrapy使用什么样的数据结构来存放待爬取的request呢其实没用高大上的数据结构就是python自带的collection.deque改造过后的问题来了该怎么让两个以上的Spider共用这个deque呢 scrapy-redis提供了一个解决方法把deque换成redis数据库我们从同一个redis服务器存放要爬取的request这样就能让多个spider去同一个数据库里读取这样分布式的主要问题就解决了嘛。 那么问题又来了我们换了redis来存放队列哪scrapy就能直接分布式了么。scrapy中跟“待爬队列”直接相关的就是调度器“Scheduler”它负责对新的request进行入列操作加入deque取出下一个要爬取的request从deque中取出等操作。在scrapy中Scheduler并不是直接就把deque拿来就粗暴的使用了而且提供了一个比较高级的组织方法它把待爬队列按照优先级建立了一个字典结构比如 1 2 3 4 5 { priority0:队列0 priority1:队列2 priority2:队列2 } 然后根据request中的priority属性来决定该入哪个队列。而出列时则按priority较小的优先出列。为了管理这个比较高级的队列字典Scheduler需要提供一系列的方法。你要是换了redis做队列这个scrapy下的Scheduler就用不了所以自己写一个吧。于是就出现了scrapy-redis的专用scheduler。 那么既然使用了redis做主要数据结构能不能把其他使用自带数据结构关键功能模块也换掉呢 在我们爬取过程当中还有一个重要的功能模块就是request去重。scrapy中是如何实现这个去重功能的呢用集合~scrapy中把已经发送的request指纹放入到一个集合中把下一个request的指纹拿到集合中比对如果该指纹存在于集合中说明这个request发送过了如果没有则继续操作。 为了分布式把这个集合也换掉吧换了redis照样也得把去重类给换了。于是就有了scrapy-redis的dupefilter。那么依次类推接下来的其他组件Pipeline和Spider我们也可以轻松的猜到他们是为什么要被修改呢。 参考链接 https://github.com/younghz/scrapy-redishttps://github.com/younghz/sr-chnhttps://github.com/KDF5000/RSpider 转载于:https://www.cnblogs.com/zxtceq/p/8985622.html