# -*- coding: utf-8 -*-
import logging
import time
from threading import current_thread
from six.moves import queue
from six.moves.urllib.parse import urlsplit
from icrawler.utils import ThreadPool
[docs]class Parser(ThreadPool):
"""Base class for parser.
A thread pool of parser threads, in charge of downloading and parsing pages,
extracting file urls and put them into the input queue of downloader.
Attributes:
global_signal: A Signal object for cross-module communication.
session: A requests.Session object.
logger: A logging.Logger object used for logging.
threads: A list storing all the threading.Thread objects of the parser.
thread_num: An integer indicating the number of threads.
lock: A threading.Lock object.
"""
def __init__(self, thread_num, signal, session):
"""Init Parser with some shared variables."""
super(Parser, self).__init__(thread_num, name='parser')
self.signal = signal
self.session = session
[docs] def parse(self, response, **kwargs):
"""Parse a page and extract image urls, then put it into task_queue.
This method should be overridden by users.
:Example:
>>> task = {}
>>> self.output(task)
"""
raise NotImplementedError
[docs] def worker_exec(self,
queue_timeout=2,
req_timeout=5,
max_retry=3,
**kwargs):
"""Target method of workers.
Firstly download the page and then call the :func:`parse` method.
A parser thread will exit in either of the following cases:
1. All feeder threads have exited and the ``url_queue`` is empty.
2. Downloaded image number has reached required number.
Args:
queue_timeout (int): Timeout of getting urls from ``url_queue``.
req_timeout (int): Timeout of making requests for downloading pages.
max_retry (int): Max retry times if the request fails.
**kwargs: Arguments to be passed to the :func:`parse` method.
"""
while True:
if self.signal.get('reach_max_num'):
self.logger.info('downloaded image reached max num, thread %s '
'is ready to exit', current_thread().name)
break
# get the page url
try:
url = self.in_queue.get(timeout=queue_timeout)
except queue.Empty:
if self.signal.get('feeder_exited'):
self.logger.info(
'no more page urls for thread %s to parse',
current_thread().name)
break
else:
self.logger.info('%s is waiting for new page urls',
current_thread().name)
continue
except:
self.logger.error('exception in thread %s',
current_thread().name)
continue
else:
self.logger.debug('start fetching page {}'.format(url))
# fetch and parse the page
retry = max_retry
while retry > 0:
try:
base_url = '{0.scheme}://{0.netloc}'.format(urlsplit(url))
response = self.session.get(url,
timeout=req_timeout,
headers={'Referer': base_url})
except Exception as e:
self.logger.error(
'Exception caught when fetching page %s, '
'error: %s, remaining retry times: %d', url, e,
retry - 1)
else:
self.logger.info('parsing result page {}'.format(url))
for task in self.parse(response, **kwargs):
while not self.signal.get('reach_max_num'):
try:
if isinstance(task, dict):
self.output(task, timeout=1)
elif isinstance(task, str):
# this case only work for GreedyCrawler,
# which need to feed the url back to
# url_queue, dirty implementation
self.input(task, timeout=1)
except queue.Full:
time.sleep(1)
except Exception as e:
self.logger.error(
'Exception caught when put task %s into '
'queue, error: %s', task, url)
else:
break
if self.signal.get('reach_max_num'):
break
self.in_queue.task_done()
break
finally:
retry -= 1
self.logger.info('thread {} exit'.format(current_thread().name))
def __exit__(self):
logging.info('all parser threads exited')