Source code for icrawler.utils.proxy_pool

# -*- coding: utf-8 -*-

import json
import logging
import random
import threading
import time

import requests
from bs4 import BeautifulSoup

from six.moves import queue


[docs]class Proxy(object): """Proxy class Attributes: addr (str): A string with IP and port, for example '123.123.123.123:8080' protocol (str): 'http' or 'https' weight (float): A float point number indicating the probability of being selected, the weight is based on the connection time and stability last_checked (time): A UNIX timestamp indicating when the proxy was checked """ def __init__(self, addr=None, protocol='http', weight=1.0, last_checked=None): self.addr = addr self.protocol = protocol self.weight = weight if last_checked is None: self.last_checked = int(time.time()) else: self.last_checked = last_checked
[docs] def format(self): """Return the proxy compatible with requests.Session parameters Returns: dict: A dict like {'http': '123.123.123.123:8080'} """ return {self.protocol: self.addr}
[docs] def to_dict(self): """convert detailed proxy info into a dict Returns: dict: A dict with four keys: ``addr``, ``protocol``, ``weight`` and ``last_checked`` """ return dict( addr=self.addr, protocol=self.protocol, weight=self.weight, last_checked=self.last_checked)
[docs]class ProxyPool(object): """Proxy pool class ProxyPool provides friendly apis to manage proxies. Attributes: idx (dict): Index for http proxy list and https proxy list. test_url (dict): A dict containing two urls, when testing if a proxy is valid, test_url['http'] and test_url['https'] will be used according to the protocol. proxies (dict): All the http and https proxies. addr_list (dict): Address of proxies. dec_ratio (float): When decreasing the weight of some proxy, its weight is multiplied with `dec_ratio`. inc_ratio (float): Similar to `dec_ratio` but used for increasing weights, default the reciprocal of `dec_ratio`. weight_thr (float): The minimum weight of a valid proxy, if the weight of a proxy is lower than `weight_thr`, it will be removed. logger (Logger): A logging.Logger object used for logging. """ def __init__(self, filename=None): """Init the pool from a json file. Args: filename (str, optional): if the filename is provided, proxies will be load from it. """ self.idx = {'http': 0, 'https': 0} self.test_url = { 'http': 'http://www.sina.com.cn', 'https': 'https://www.taobao.com' } self.proxies = {'http': {}, 'https': {}} self.addr_list = {'http': [], 'https': []} self.dec_ratio = 0.9 self.inc_ratio = 1 / self.dec_ratio self.weight_thr = 0.2 self.logger = logging.getLogger(__name__) if filename is not None: self.load(filename)
[docs] def proxy_num(self, protocol=None): """Get the number of proxies in the pool Args: protocol (str, optional): 'http' or 'https' or None. (default None) Returns: If protocol is None, return the total number of proxies, otherwise, return the number of proxies of corresponding protocol. """ http_num = len(self.proxies['http']) https_num = len(self.proxies['https']) if protocol == 'http': return http_num elif protocol == 'https': return https_num else: return http_num + https_num
[docs] def get_next(self, protocol='http', format=False, policy='loop'): """Get the next proxy Args: protocol (str): 'http' or 'https'. (default 'http') format (bool): Whether to format the proxy. (default False) policy (str): Either 'loop' or 'random', indicating the policy of getting the next proxy. If set to 'loop', will return proxies in turn, otherwise will return a proxy randomly. Returns: Proxy or dict: If format is true, then return the formatted proxy which is compatible with requests.Session parameters, otherwise a Proxy object. """ if not self.proxies[protocol]: return None if policy == 'loop': idx = self.idx[protocol] self.idx[protocol] = (idx + 1) % len(self.proxies[protocol]) elif policy == 'random': idx = random.randint(0, self.proxy_num(protocol) - 1) else: self.logger.error('Unsupported get_next policy: {}'.format(policy)) exit() proxy = self.proxies[protocol][self.addr_list[protocol][idx]] if proxy.weight < random.random(): return self.get_next(protocol, format, policy) if format: return proxy.format() else: return proxy
[docs] def save(self, filename): """Save proxies to file""" proxies = {'http': [], 'https': []} for protocol in ['http', 'https']: for proxy in self.proxies[protocol]: serializable_proxy = self.proxies[protocol][proxy].to_dict() proxies[protocol].append(serializable_proxy) with open(filename, 'w') as fout: json.dump(proxies, fout)
[docs] def load(self, filename): """Load proxies from file""" with open(filename, 'r') as fin: proxies = json.load(fin) for protocol in proxies: for proxy in proxies[protocol]: self.proxies[protocol][proxy['addr']] = Proxy( proxy['addr'], proxy['protocol'], proxy['weight'], proxy['last_checked']) self.addr_list[protocol].append(proxy['addr'])
[docs] def add_proxy(self, proxy): """Add a valid proxy into pool You must call `add_proxy` method to add a proxy into pool instead of directly operate the `proxies` variable. """ protocol = proxy.protocol addr = proxy.addr if addr in self.proxies: self.proxies[protocol][addr].last_checked = proxy.last_checked else: self.proxies[protocol][addr] = proxy self.addr_list[protocol].append(addr)
[docs] def remove_proxy(self, proxy): """Remove a proxy out of the pool""" del self.search_flag[proxy.protocol][proxy.addr] del self.addr_list[proxy.protocol][proxy.addr]
[docs] def increase_weight(self, proxy): """Increase the weight of a proxy by multiplying inc_ratio""" new_weight = proxy.weight * self.inc_ratio if new_weight < 1.0: proxy.weight = new_weight else: proxy.weight = 1.0
[docs] def decrease_weight(self, proxy): """Decreasing the weight of a proxy by multiplying dec_ratio""" new_weight = proxy.weight * self.dec_ratio if new_weight < self.weight_thr: self.remove_proxy(proxy) else: proxy.weight = new_weight
[docs] def is_valid(self, addr, protocol='http', timeout=5): """Check if a proxy is valid Args: addr: A string in the form of 'ip:port' protocol: Either 'http' or 'https', different test urls will be used according to protocol. timeout: A integer indicating the timeout of connecting the test url. Returns: dict: If the proxy is valid, returns {'valid': True, 'response_time': xx} otherwise returns {'valid': False, 'msg': 'xxxxxx'}. """ start = time.time() try: r = requests.get(self.test_url[protocol], timeout=timeout, proxies={protocol: 'http://' + addr}) except KeyboardInterrupt: raise except requests.exceptions.Timeout: return {'valid': False, 'msg': 'timeout'} except: return {'valid': False, 'msg': 'exception'} else: if r.status_code == 200: response_time = time.time() - start return {'valid': True, 'response_time': response_time} else: return { 'valid': False, 'msg': 'status code: {}'.format(r.status_code) }
[docs] def validate(self, proxy_scanner, expected_num=20, queue_timeout=3, val_timeout=5): """Target function of validation threads Args: proxy_scanner: A ProxyScanner object. expected_num: Max number of valid proxies to be scanned. queue_timeout: Timeout for getting a proxy from the queue. val_timeout: An integer passed to `is_valid` as argument `timeout`. """ while self.proxy_num() < expected_num: try: candidate_proxy = proxy_scanner.proxy_queue.get( timeout=queue_timeout) except queue.Empty: if proxy_scanner.is_scanning(): continue else: break addr = candidate_proxy['addr'] protocol = candidate_proxy['protocol'] ret = self.is_valid(addr, protocol, val_timeout) if self.proxy_num() >= expected_num: self.logger.info('Enough valid proxies, thread {} exit.' .format(threading.current_thread().name)) break if ret['valid']: self.add_proxy(Proxy(addr, protocol)) self.logger.info('{} ok, {:.2f}s'.format(addr, ret[ 'response_time'])) else: self.logger.info('{} invalid, {}'.format(addr, ret['msg']))
[docs] def scan(self, proxy_scanner, expected_num=20, val_thr_num=4, queue_timeout=3, val_timeout=5, out_file='proxies.json'): """Scan and validate proxies Firstly, call the `scan` method of `proxy_scanner`, then using multiple threads to validate them. Args: proxy_scanner: A ProxyScanner object. expected_num: Max number of valid proxies to be scanned. val_thr_num: Number of threads used for validating proxies. queue_timeout: Timeout for getting a proxy from the queue. val_timeout: An integer passed to `is_valid` as argument `timeout`. out_file: A string or None. If not None, the proxies will be saved into `out_file`. """ try: proxy_scanner.scan() self.logger.info('starting {} threads to validating proxies...' .format(val_thr_num)) val_threads = [] for i in range(val_thr_num): t = threading.Thread( name='val-{:0>2d}'.format(i + 1), target=self.validate, kwargs=dict( proxy_scanner=proxy_scanner, expected_num=expected_num, queue_timeout=queue_timeout, val_timeout=val_timeout)) t.daemon = True val_threads.append(t) t.start() for t in val_threads: t.join() self.logger.info('Proxy scanning done!') except: raise finally: if out_file is not None: self.save(out_file)
[docs] def default_scan(self, region='mainland', expected_num=20, val_thr_num=4, queue_timeout=3, val_timeout=5, out_file='proxies.json', src_files=None): """Default scan method, to simplify the usage of `scan` method. It will register following scan functions: 1. scan_file 2. scan_cnproxy (if region is mainland) 3. scan_free_proxy_list (if region is overseas) 4. scan_ip84 5. scan_mimiip After scanning, all the proxy info will be saved in out_file. Args: region: Either 'mainland' or 'overseas' expected_num: An integer indicating the expected number of proxies, if this argument is set too great, it may take long to finish scanning process. val_thr_num: Number of threads used for validating proxies. queue_timeout: An integer indicating the timeout for getting a candidate proxy from the queue. val_timeout: An integer indicating the timeout when connecting the test url using a candidate proxy. out_file: the file name of the output file saving all the proxy info src_files: A list of file names to scan """ if expected_num > 30: self.logger.warn('The more proxy you expect, the more time it ' 'will take. It is highly recommended to limit the' ' expected num under 30.') proxy_scanner = ProxyScanner() if src_files is None: src_files = [] elif isinstance(src_files, str): src_files = [src_files] for filename in src_files: proxy_scanner.register_func(proxy_scanner.scan_file, {'src_file': filename}) if region == 'mainland': proxy_scanner.register_func(proxy_scanner.scan_cnproxy, {}) elif region == 'overseas': proxy_scanner.register_func(proxy_scanner.scan_free_proxy_list, {}) proxy_scanner.register_func(proxy_scanner.scan_ip84, {'region': region, 'page': 5}) proxy_scanner.register_func(proxy_scanner.scan_mimiip, {'region': region, 'page': 5}) self.scan(proxy_scanner, expected_num, val_thr_num, queue_timeout, val_timeout, out_file)
[docs]class ProxyScanner(): """Proxy scanner class ProxyScanner focuses on scanning proxy lists from different sources. Attributes: proxy_queue: The queue for storing proxies. scan_funcs: Name of functions to be used in `scan` method. scan_kwargs: Arguments of functions scan_threads: A list of `threading.thread` object. logger: A `logging.Logger` object used for logging. """ def __init__(self): self.proxy_queue = queue.Queue() self.scan_funcs = [] self.scan_kwargs = [] self.scan_threads = [] self.logger = logging.getLogger(__name__)
[docs] def register_func(self, func_name, func_kwargs): """Register a scan function Args: func_name: The function name of a scan function. func_kwargs: A dict containing arguments of the scan function. """ self.scan_funcs.append(func_name) self.scan_kwargs.append(func_kwargs)
[docs] def scan_ip84(self, region='mainland', page=1): """Scan candidate proxies from http://ip84.com Args: region: Either 'mainland' or 'overseas'. page: An integer indicating how many pages to be scanned. """ self.logger.info('start scanning http://ip84.com for proxy list...') for i in range(1, page + 1): if region == 'mainland': url = 'http://ip84.com/dlgn/{}'.format(i) elif region == 'overseas': url = 'http://ip84.com/gwgn/{}'.format(i) else: url = 'http://ip84.com/gn/{}'.format(i) response = requests.get(url) soup = BeautifulSoup(response.content, 'lxml') table = soup.find('table', class_='list') for tr in table.find_all('tr'): if tr.th is not None: continue info = tr.find_all('td') protocol = info[4].string.lower() addr = '{}:{}'.format(info[0].string, info[1].string) self.proxy_queue.put({'addr': addr, 'protocol': protocol})
[docs] def scan_mimiip(self, region='mainland', page=1): """Scan candidate proxies from http://mimiip.com Args: region: Either 'mainland' or 'overseas'. page: An integer indicating how many pages to be scanned. """ self.logger.info('start scanning http://mimiip.com for proxy list...') for i in range(1, page + 1): if region == 'mainland': url = 'http://www.mimiip.com/gngao/{}'.format(i) elif region == 'overseas': url = 'http://www.mimiip.com/hw/{}'.format(i) else: url = 'http://www.mimiip.com/gngao/{}'.format(i) response = requests.get(url) soup = BeautifulSoup(response.content, 'lxml') table = soup.find('table', class_='list') for tr in table.find_all('tr'): if tr.th is not None: continue info = tr.find_all('td') protocol = info[4].string.lower() addr = '{}:{}'.format(info[0].string, info[1].string) self.proxy_queue.put({'addr': addr, 'protocol': protocol})
[docs] def scan_cnproxy(self): """Scan candidate (mainland) proxies from http://cn-proxy.com""" self.logger.info( 'start scanning http://cn-proxy.com for proxy list...') response = requests.get('http://cn-proxy.com') soup = BeautifulSoup(response.content, 'lxml') tables = soup.find_all('table', class_='sortable') for table in tables: for tr in table.tbody.find_all('tr'): info = tr.find_all('td') addr = '{}:{}'.format(info[0].string, info[1].string) self.proxy_queue.put({'addr': addr, 'protocol': 'http'})
[docs] def scan_free_proxy_list(self): """Scan candidate (overseas) proxies from http://free-proxy-list.net""" self.logger.info('start scanning http://free-proxy-list.net ' 'for proxy list...') response = requests.get('http://free-proxy-list.net') soup = BeautifulSoup(response.content, 'lxml') table = soup.find('table', id='proxylisttable') for tr in table.tbody.find_all('tr'): info = tr.find_all('td') if info[4].string != 'elite proxy': continue if info[6].string == 'yes': protocol = 'https' else: protocol = 'http' addr = '{}:{}'.format(info[0].string, info[1].string) self.proxy_queue.put({'addr': addr, 'protocol': protocol})
[docs] def scan_file(self, src_file): """Scan candidate proxies from an existing file""" self.logger.info('start scanning file {} for proxy list...' .format(src_file)) with open(src_file, 'r') as fin: proxies = json.load(fin) for protocol in proxies.keys(): for proxy in proxies[protocol]: self.proxy_queue.put({ 'addr': proxy['addr'], 'protocol': protocol })
[docs] def is_scanning(self): """Return whether at least one scanning thread is alive""" for t in self.scan_threads: if t.is_alive(): return True return False
[docs] def scan(self): """Start a thread for each registered scan function to scan proxy lists""" self.logger.info('{0} registered scan functions, starting {0} threads ' 'to scan candidate proxy lists...' .format(len(self.scan_funcs))) for i in range(len(self.scan_funcs)): t = threading.Thread( name=self.scan_funcs[i].__name__, target=self.scan_funcs[i], kwargs=self.scan_kwargs[i]) t.daemon = True self.scan_threads.append(t) t.start()