Source code for icrawler.utils.thread_pool

import logging
from threading import Lock, Thread

from .cached_queue import CachedQueue

class Worker(Thread):

    def __init__(self, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.daemon = True
        self.quit = False

    def terminate(self):
        self.quit = True

[docs]class ThreadPool(object): """Simple implementation of a thread pool This is the base class of :class:`Feeder`, :class:`Parser` and :class:`Downloader`, it incorporates two FIFO queues and a number of "workers", namely threads. All threads share the two queues, after each thread starts, it will watch the ``in_queue``, once the queue is not empty, it will get a task from the queue and process as wanted, then it will put the output to ``out_queue``. Note: This class is not designed as a generic thread pool, but works specifically for crawler components. Attributes: name (str): thread pool name. thread_num (int): number of available threads. in_queue (Queue): input queue of tasks. out_queue (Queue): output queue of finished tasks. workers (list): a list of working threads. lock (Lock): thread lock. logger (Logger): standard python logger. """ def __init__(self, thread_num, in_queue=None, out_queue=None, name=None): self.thread_num = thread_num self.in_queue = (in_queue if in_queue else CachedQueue(5 * self.thread_num)) self.out_queue = (out_queue if out_queue else CachedQueue(5 * self.thread_num)) = name if name else __name__ self.workers = [] self.lock = Lock() self.logger = logging.getLogger( def init_workers(self, *args, **kwargs): self.workers = [] for i in range(self.thread_num): worker = Worker( target=self.worker_exec, name='{}-{:03d}'.format(, i + 1), args=args, kwargs=kwargs) self.workers.append(worker) def start(self, *args, **kwargs): self.init_workers(*args, **kwargs) for worker in self.workers: self.logger.debug('thread %s started', worker.start() def input(self, task, block=True, timeout=None): if self.in_queue is not None: self.in_queue.put(task, block, timeout) def output(self, task, block=True, timeout=None): if self.out_queue is not None: self.out_queue.put(task, block, timeout) def clear_buffer(self, clear_out=False): self.in_queue.queue.clear() if clear_out: self.out_queue.queue.clear() def worker_exec(self, *args, **kwargs): raise NotImplementedError
[docs] def connect(self, component): """Connect two ThreadPools. The ``in_queue`` of the second pool will be set as the ``out_queue`` of the current pool, thus all the output will be input to the second pool. Args: component (ThreadPool): the ThreadPool to be connected. Returns: ThreadPool: the modified second ThreadPool. """ if not isinstance(component, ThreadPool): raise TypeError('"component" must be a ThreadPool object') component.in_queue = self.out_queue return component
def is_alive(self): for worker in self.workers: if worker.is_alive(): return True return False def terminate(self): for worker in self.workers: worker.terminate()