self.check_interval:                 to_remove.append((proxy, timestamp))                 self.available_proxies.append(proxy)                  for item in to_remove:             self.blacklist.remove(item)    4. 分布式并发控制   对于分布式爬虫,可以使用Redis实现分布式限流:   python import redis import time   class DistributedRateLimiter:     def __init__(self, redis_host, redis_port, rate_limit):         self.redis = redis.StrictRedis(host=redis_host, port=redis_port)         self.rate_limit = rate_limit         self.script =         local key = KEYS[1]         local limit = tonumber(ARGV[1])         local current = tonumber(redis.call('get', key) or "0")         if current + 1 > limit then             return 0         else             redis.call('INCR', key)             redis.call('EXPIRE', key, 1)             return 1         end                   def allow_request(self, proxy):         key = f"rate_limit:{proxy}"         return bool(self.redis.eval(self.script, 1, key, self.rate_limit))    5. 最佳实践   1. 动态调整速率:根据响应时间和成功率动态调整请求速率 2. 错误处理:对不同类型的错误(连接超时、HTTP错误等)采取不同策略 3. 代理质量分级:根据代理的响应速度和稳定性分级使用 4. 请求重试:实现带退避算法的重试机制 5. 监控和日志:记录每个代理的使用情况和性能指标    6. 完整示例   python import random import time import requests from concurrent.futures import ThreadPoolExecutor   class SmartProxyCrawler:     def __init__(self, proxies, max_workers=5, max_retries=3):         self.proxy_pool = HealthyProxyPool(proxies)         self.max_workers = max_workers         self.max_retries = max_retries         self.min_delay = 0.5         self.max_delay = 2.0          def crawl(self, urls):         with ThreadPoolExecutor(max_workers=self.max_workers) as executor:             results = list(executor.map(self._request_with_retry, urls))         return results          def _request_with_retry(self, url):         for attempt in range(self.max_retries):             proxy = self.proxy_pool.get_proxy()             try:                 delay = random.uniform(self.min_delay, self.max_delay)                 time.sleep(delay)                 response = requests.get(                     url,                     proxies={"http": proxy, "https": proxy},                     timeout=10                 )                 if response.status_code == 200:                     return response                 else:                     self.proxy_pool.report_failure(proxy)             except Exception as e:                 self.proxy_pool.report_failure(proxy)                 if attempt == self.max_retries - 1:                     raise e   通过合理组合这些技术,可以构建出既高效又稳定的爬虫系统,既能充分利用代理IP资源,又能避免因请求频率过高而被封禁。" /> 爬虫中代理IP并发的控制 -代理云
售前电话 15044291310
免费注册
行业资讯 注做好网络代理IP产品和服务,保障用户的长期合法权益和商业利益。
首页 行业资讯 爬虫中代理IP并发的控制

爬虫中代理IP并发的控制

2025/04/26 10:46:10

在爬虫开发中,合理控制代理IP的并发请求是确保爬虫稳定运行、避免被封禁的关键技术。以下是代理IP并发控制的几种方法和最佳实践:

 

 1. 基本并发控制方法

 

 1.1 固定延迟

python

import time

import requests

 

def request_with_delay(url, proxy, delay=1):

    response = requests.get(url, proxies={"http": proxy, "https": proxy})

    time.sleep(delay)  固定延迟

    return response

 

 1.2 随机延迟

python

import random

import time

 

def request_with_random_delay(url, proxy, min_delay=0.5, max_delay=2):

    delay = random.uniform(min_delay, max_delay)

    time.sleep(delay)

    return requests.get(url, proxies={"http": proxy, "https": proxy})

 

 2. 高级并发控制技术

 

 2.1 令牌桶算法

python

from threading import Semaphore

 

class ProxyRateLimiter:

    def __init__(self, rate_limit):

        self.semaphore = Semaphore(rate_limit)

    

    def request(self, url, proxy):

        with self.semaphore:

            return requests.get(url, proxies={"http": proxy, "https": proxy})

 

 2.2 漏桶算法

python

import time

from collections import deque

 

class LeakyBucket:

    def __init__(self, capacity, leak_rate):

        self.capacity = capacity

        self.leak_rate = leak_rate  requests per second

        self.tokens = deque()

    

    def request(self, url, proxy):

        now = time.time()

        移除过期的令牌

        while self.tokens and self.tokens[0] <= now - 1:

            self.tokens.popleft()

        

        if len(self.tokens) < self.capacity:

            self.tokens.append(now)

            return requests.get(url, proxies={"http": proxy, "https": proxy})

        else:

            time.sleep(1 - (now - self.tokens[0]))

            return self.request(url, proxy)

 

 3. 代理IP池管理

 

 3.1 基本代理池

python

class ProxyPool:

    def __init__(self, proxies):

        self.proxies = proxies

        self.current = 0

    

    def get_proxy(self):

        proxy = self.proxies[self.current]

        self.current = (self.current + 1) % len(self.proxies)

        return proxy

 

 3.2 带健康检查的代理池

python

class HealthyProxyPool:

    def __init__(self, proxies):

        self.available_proxies = proxies

        self.blacklist = set()

        self.check_interval = 300  5 minutes

    

    def get_proxy(self):

        if not self.available_proxies:

            self._check_blacklist()

        return random.choice(self.available_proxies)

    

    def report_failure(self, proxy):

        if proxy in self.available_proxies:

            self.available_proxies.remove(proxy)

        self.blacklist.add((proxy, time.time()))

    

    def _check_blacklist(self):

        now = time.time()

        to_remove = []

        for proxy, timestamp in self.blacklist:

            if now - timestamp > self.check_interval:

                to_remove.append((proxy, timestamp))

                self.available_proxies.append(proxy)

        

        for item in to_remove:

            self.blacklist.remove(item)

 

 4. 分布式并发控制

 

对于分布式爬虫,可以使用Redis实现分布式限流:

 

python

import redis

import time

 

class DistributedRateLimiter:

    def __init__(self, redis_host, redis_port, rate_limit):

        self.redis = redis.StrictRedis(host=redis_host, port=redis_port)

        self.rate_limit = rate_limit

        self.script =

        local key = KEYS[1]

        local limit = tonumber(ARGV[1])

        local current = tonumber(redis.call('get', key) or "0")

        if current + 1 > limit then

            return 0

        else

            redis.call('INCR', key)

            redis.call('EXPIRE', key, 1)

            return 1

        end

        

    

    def allow_request(self, proxy):

        key = f"rate_limit:{proxy}"

        return bool(self.redis.eval(self.script, 1, key, self.rate_limit))

 

 5. 最佳实践

 

1. 动态调整速率:根据响应时间和成功率动态调整请求速率

2. 错误处理:对不同类型的错误(连接超时、HTTP错误等)采取不同策略

3. 代理质量分级:根据代理的响应速度和稳定性分级使用

4. 请求重试:实现带退避算法的重试机制

5. 监控和日志:记录每个代理的使用情况和性能指标

 

 6. 完整示例

 

python

import random

import time

import requests

from concurrent.futures import ThreadPoolExecutor

 

class SmartProxyCrawler:

    def __init__(self, proxies, max_workers=5, max_retries=3):

        self.proxy_pool = HealthyProxyPool(proxies)

        self.max_workers = max_workers

        self.max_retries = max_retries

        self.min_delay = 0.5

        self.max_delay = 2.0

    

    def crawl(self, urls):

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:

            results = list(executor.map(self._request_with_retry, urls))

        return results

    

    def _request_with_retry(self, url):

        for attempt in range(self.max_retries):

            proxy = self.proxy_pool.get_proxy()

            try:

                delay = random.uniform(self.min_delay, self.max_delay)

                time.sleep(delay)

                response = requests.get(

                    url,

                    proxies={"http": proxy, "https": proxy},

                    timeout=10

                )

                if response.status_code == 200:

                    return response

                else:

                    self.proxy_pool.report_failure(proxy)

            except Exception as e:

                self.proxy_pool.report_failure(proxy)

                if attempt == self.max_retries - 1:

                    raise e

 

通过合理组合这些技术,可以构建出既高效又稳定的爬虫系统,既能充分利用代理IP资源,又能避免因请求频率过高而被封禁。