在爬虫开发中,合理控制代理IP的并发请求是确保爬虫稳定运行、避免被封禁的关键技术。以下是代理IP并发控制的几种方法和最佳实践:
1. 基本并发控制方法
1.1 固定延迟
python
import time
import requests
def request_with_delay(url, proxy, delay=1):
response = requests.get(url, proxies={"http": proxy, "https": proxy})
time.sleep(delay) 固定延迟
return response
1.2 随机延迟
python
import random
import time
def request_with_random_delay(url, proxy, min_delay=0.5, max_delay=2):
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
return requests.get(url, proxies={"http": proxy, "https": proxy})
2. 高级并发控制技术
2.1 令牌桶算法
python
from threading import Semaphore
class ProxyRateLimiter:
def __init__(self, rate_limit):
self.semaphore = Semaphore(rate_limit)
def request(self, url, proxy):
with self.semaphore:
return requests.get(url, proxies={"http": proxy, "https": proxy})
2.2 漏桶算法
python
import time
from collections import deque
class LeakyBucket:
def __init__(self, capacity, leak_rate):
self.capacity = capacity
self.leak_rate = leak_rate requests per second
self.tokens = deque()
def request(self, url, proxy):
now = time.time()
移除过期的令牌
while self.tokens and self.tokens[0] <= now - 1:
self.tokens.popleft()
if len(self.tokens) < self.capacity:
self.tokens.append(now)
return requests.get(url, proxies={"http": proxy, "https": proxy})
else:
time.sleep(1 - (now - self.tokens[0]))
return self.request(url, proxy)
3. 代理IP池管理
3.1 基本代理池
python
class ProxyPool:
def __init__(self, proxies):
self.proxies = proxies
self.current = 0
def get_proxy(self):
proxy = self.proxies[self.current]
self.current = (self.current + 1) % len(self.proxies)
return proxy
3.2 带健康检查的代理池
python
class HealthyProxyPool:
def __init__(self, proxies):
self.available_proxies = proxies
self.blacklist = set()
self.check_interval = 300 5 minutes
def get_proxy(self):
if not self.available_proxies:
self._check_blacklist()
return random.choice(self.available_proxies)
def report_failure(self, proxy):
if proxy in self.available_proxies:
self.available_proxies.remove(proxy)
self.blacklist.add((proxy, time.time()))
def _check_blacklist(self):
now = time.time()
to_remove = []
for proxy, timestamp in self.blacklist:
if now - timestamp > self.check_interval:
to_remove.append((proxy, timestamp))
self.available_proxies.append(proxy)
for item in to_remove:
self.blacklist.remove(item)
4. 分布式并发控制
对于分布式爬虫,可以使用Redis实现分布式限流:
python
import redis
import time
class DistributedRateLimiter:
def __init__(self, redis_host, redis_port, rate_limit):
self.redis = redis.StrictRedis(host=redis_host, port=redis_port)
self.rate_limit = rate_limit
self.script =
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then
return 0
else
redis.call('INCR', key)
redis.call('EXPIRE', key, 1)
return 1
end
def allow_request(self, proxy):
key = f"rate_limit:{proxy}"
return bool(self.redis.eval(self.script, 1, key, self.rate_limit))
5. 最佳实践
1. 动态调整速率:根据响应时间和成功率动态调整请求速率
2. 错误处理:对不同类型的错误(连接超时、HTTP错误等)采取不同策略
3. 代理质量分级:根据代理的响应速度和稳定性分级使用
4. 请求重试:实现带退避算法的重试机制
5. 监控和日志:记录每个代理的使用情况和性能指标
6. 完整示例
python
import random
import time
import requests
from concurrent.futures import ThreadPoolExecutor
class SmartProxyCrawler:
def __init__(self, proxies, max_workers=5, max_retries=3):
self.proxy_pool = HealthyProxyPool(proxies)
self.max_workers = max_workers
self.max_retries = max_retries
self.min_delay = 0.5
self.max_delay = 2.0
def crawl(self, urls):
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = list(executor.map(self._request_with_retry, urls))
return results
def _request_with_retry(self, url):
for attempt in range(self.max_retries):
proxy = self.proxy_pool.get_proxy()
try:
delay = random.uniform(self.min_delay, self.max_delay)
time.sleep(delay)
response = requests.get(
url,
proxies={"http": proxy, "https": proxy},
timeout=10
)
if response.status_code == 200:
return response
else:
self.proxy_pool.report_failure(proxy)
except Exception as e:
self.proxy_pool.report_failure(proxy)
if attempt == self.max_retries - 1:
raise e
通过合理组合这些技术,可以构建出既高效又稳定的爬虫系统,既能充分利用代理IP资源,又能避免因请求频率过高而被封禁。