软件著作权证书,汕头seo不错,wordpress首页布局插件,商铺免费做的网站在 1688 数据采集等爬虫场景中#xff0c;类封装能实现代码的复用与解耦#xff0c;工程化则保障爬虫的稳定性、可维护性和可扩展性。本文将结合 1688 爬虫的实际需求#xff0c;从框架设计原则、核心类封装、工程化配套模块到实战落地#xff0c;完整讲解爬虫框架的设计与…在 1688 数据采集等爬虫场景中类封装能实现代码的复用与解耦工程化则保障爬虫的稳定性、可维护性和可扩展性。本文将结合 1688 爬虫的实际需求从框架设计原则、核心类封装、工程化配套模块到实战落地完整讲解爬虫框架的设计与实现。一、框架设计原则与整体架构1. 核心设计原则爬虫框架需遵循开闭原则对扩展开放、对修改关闭、单一职责每个模块只做一件事和依赖注入模块间通过配置解耦同时需适配 1688 的反爬特性如动态渲染、IP 封禁。2. 整体架构分层将爬虫拆分为5 个核心层层与层之间通过接口交互降低耦合层级职责核心实现方式配置层管理爬虫参数如代理、UA、爬取关键词、存储配置等YAML/JSON 配置文件 配置类请求层封装 HTTP 请求处理反爬代理、UA、延迟、异常重试基础请求类 反爬中间件解析层解析网页 / 接口数据提取目标字段如 1688 商品标题、价格解析器基类 业务解析子类存储层处理数据持久化CSV/MySQL/MongoDB支持数据去重存储基类 多存储实现子类调度层管理爬取任务分页、多线程 / 异步、监控任务状态调度器类 任务队列二、核心类封装实现基于分层架构我们通过类的继承与多态封装通用逻辑再针对 1688 场景实现具体业务。1. 环境准备安装必备依赖bash运行pip install requests beautifulsoup4 pyyaml fake-useragent playwright pymongo mysql-connector-python playwright install chromium # 处理动态页面2. 配置层封装Config 类通过 YAML 配置文件管理参数避免硬编码便于后续修改。配置文件config.yamlyaml# 爬虫基础配置 spider: keyword: 手机壳 # 1688搜索关键词 max_page: 5 # 最大爬取页数 delay: 3 # 请求延迟秒 retry_times: 3 # 失败重试次数 # 反爬配置 anti_crawl: user_agent_pool: [Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36..., Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ...] proxy_pool: [http://127.0.0.1:7890, http://username:passwordproxy.example.com:8080] # 代理池 # 存储配置 storage: type: csv # 可选csv/mongo/mysql csv_path: ./1688_products.csv mongo: uri: mongodb://localhost:27017/ db: 1688_spider collection: products mysql: host: localhost port: 3306 user: root password: 123456 db: 1688_spider配置类封装config.pypython运行import yaml from typing import Dict, List, Any class Config: 配置管理类加载并解析YAML配置文件 def __init__(self, config_path: str ./config.yaml): self.config_path config_path self.config self._load_config() def _load_config(self) - Dict[str, Any]: 加载YAML配置 try: with open(self.config_path, r, encodingutf-8) as f: return yaml.safe_load(f) except FileNotFoundError: raise Exception(f配置文件{self.config_path}不存在) except yaml.YAMLError as e: raise Exception(f配置文件解析错误{e}) def get(self, key: str, default: Any None) - Any: 按层级获取配置如spider.keyword keys key.split(.) value self.config for k in keys: if k not in value: return default value value[k] return value # 测试配置类 if __name__ __main__: config Config() print(config.get(spider.keyword)) # 输出手机壳 print(config.get(anti_crawl.proxy_pool)) # 输出代理池列表3. 请求层封装BaseRequest 类封装 HTTP 请求的通用逻辑反爬、重试、延迟支持同步请求和动态页面请求Playwright。python运行import requests import time import random from typing import Dict, Any, Optional from fake_useragent import UserAgent from playwright.sync_api import sync_playwright from config import Config class BaseRequest: 请求基类封装通用请求逻辑 def __init__(self, config: Config): self.config config self.ua UserAgent() self.retry_times self.config.get(spider.retry_times, 3) self.delay self.config.get(spider.delay, 2) self.proxy_pool self.config.get(anti_crawl.proxy_pool, []) self.ua_pool self.config.get(anti_crawl.user_agent_pool, []) def _get_random_proxy(self) - Optional[str]: 随机获取代理 return random.choice(self.proxy_pool) if self.proxy_pool else None def _get_random_ua(self) - str: 随机获取User-Agent return random.choice(self.ua_pool) if self.ua_pool else self.ua.random def _add_delay(self) - None: 请求延迟防反爬 time.sleep(random.uniform(self.delay, self.delay 2)) def get(self, url: str, params: Optional[Dict] None, headers: Optional[Dict] None) - Optional[str]: 同步GET请求支持重试和反爬 headers headers or {} headers[User-Agent] self._get_random_ua() proxies {http: self._get_random_proxy(), https: self._get_random_proxy()} if self._get_random_proxy() else None for retry in range(self.retry_times): try: self._add_delay() resp requests.get(url, paramsparams, headersheaders, proxiesproxies, timeout10) resp.raise_for_status() # 抛出HTTP错误 return resp.text except Exception as e: print(f请求失败第{retry1}次重试{e}) time.sleep(2 ** retry) # 指数退避重试 return None def get_dynamic(self, url: str) - Optional[Dict[str, Any]]: 动态页面请求Playwright返回关键数据 result {title: None, price: None, sales: None} with sync_playwright() as p: browser p.chromium.launch(headlessTrue) page browser.new_page(user_agentself._get_random_ua()) try: page.goto(url, timeout30000) # 提取1688商品详情页关键数据需根据页面结构调整 result[title] page.locator(.detail-title).inner_text() if page.locator(.detail-title).count() 0 else None result[price] page.locator(.price).inner_text() if page.locator(.price).count() 0 else None result[sales] page.locator(.sales-volume).inner_text() if page.locator(.sales-volume).count() 0 else None self._add_delay() except Exception as e: print(f动态页面请求失败{e}) finally: browser.close() return result # 测试请求类 if __name__ __main__: config Config() request BaseRequest(config) html request.get(https://s.1688.com/selloffer/offer_search.htm?keywords手机壳) print(html[:500]) # 输出页面前500字符4. 解析层封装BaseParser 类封装数据解析的通用接口子类实现具体的 1688 页面解析逻辑。python运行from bs4 import BeautifulSoup from typing import List, Dict, Any from config import Config class BaseParser: 解析器基类定义解析接口 def __init__(self, config: Config): self.config config def parse(self, html: str) - List[Dict[str, Any]]: 解析接口子类必须实现 raise NotImplementedError(子类需实现parse方法) class Ali1688ListParser(BaseParser): 1688商品列表页解析器 def parse(self, html: str) - List[Dict[str, Any]]: 解析商品列表页提取标题、价格、链接 soup BeautifulSoup(html, lxml) products soup.select(.sm-offer-item) result [] for item in products: # 提取字段需根据1688页面结构实时调整 title_elem item.select_one(.offer-title a) price_elem item.select_one(.price) link_elem item.select_one(.offer-title a) if not (title_elem and price_elem and link_elem): continue product { title: title_elem.get(title, ).strip(), price: price_elem.text.strip(), link: link_elem.get(href, ).strip(), source: 1688 } result.append(product) return result # 测试解析类 if __name__ __main__: config Config() request BaseRequest(config) parser Ali1688ListParser(config) html request.get(https://s.1688.com/selloffer/offer_search.htm?keywords手机壳) if html: products parser.parse(html) print(f解析到{len(products)}个商品) print(products[:2])5. 存储层封装BaseStorage 类支持多存储方式CSV/MySQL/MongoDB通过子类实现具体存储逻辑。python运行import csv import pymongo import mysql.connector from typing import List, Dict, Any from config import Config class BaseStorage: 存储基类定义存储接口 def __init__(self, config: Config): self.config config def save(self, data: List[Dict[str, Any]]) - None: 存储接口子类必须实现 raise NotImplementedError(子类需实现save方法) class CsvStorage(BaseStorage): CSV存储类 def __init__(self, config: Config): super().__init__(config) self.csv_path self.config.get(storage.csv_path, ./products.csv) # 初始化CSV文件并写入表头 with open(self.csv_path, w, newline, encodingutf-8) as f: writer csv.DictWriter(f, fieldnames[title, price, link, source]) writer.writeheader() def save(self, data: List[Dict[str, Any]]) - None: 将数据追加写入CSV with open(self.csv_path, a, newline, encodingutf-8) as f: writer csv.DictWriter(f, fieldnames[title, price, link, source]) writer.writerows(data) print(f成功写入{len(data)}条数据到CSV{self.csv_path}) class MongoStorage(BaseStorage): MongoDB存储类 def __init__(self, config: Config): super().__init__(config) self.client pymongo.MongoClient(self.config.get(storage.mongo.uri)) self.db self.client[self.config.get(storage.mongo.db)] self.collection self.db[self.config.get(storage.mongo.collection)] # 创建唯一索引避免重复存储 self.collection.create_index(link, uniqueTrue) def save(self, data: List[Dict[str, Any]]) - None: 将数据写入MongoDB自动去重 if not data: return try: self.collection.insert_many(data, orderedFalse) print(f成功写入{len(data)}条数据到MongoDB) except pymongo.errors.BulkWriteError as e: # 忽略重复数据错误 print(f部分数据重复实际写入{len(data) - len(e.details[writeErrors])}条) # 测试存储类 if __name__ __main__: config Config() storage CsvStorage(config) # 模拟数据 test_data [ {title: 苹果15手机壳, price: 10.00, link: https://example.com/1, source: 1688}, {title: 华为Mate60手机壳, price: 8.50, link: https://example.com/2, source: 1688} ] storage.save(test_data)6. 调度层封装SpiderScheduler 类管理爬取任务的生命周期分页、任务分发整合请求、解析、存储模块。python运行from typing import List, Dict, Any from config import Config from request import BaseRequest from parser import Ali1688ListParser from storage import BaseStorage, CsvStorage, MongoStorage class SpiderScheduler: 爬虫调度器整合各模块并管理爬取任务 def __init__(self, config: Config): self.config config self.request BaseRequest(config) self.parser Ali1688ListParser(config) self.storage self._init_storage() self.keyword self.config.get(spider.keyword) self.max_page self.config.get(spider.max_page) def _init_storage(self) - BaseStorage: 根据配置初始化存储类 storage_type self.config.get(storage.type, csv) if storage_type csv: return CsvStorage(self.config) elif storage_type mongo: return MongoStorage(self.config) else: raise ValueError(f不支持的存储类型{storage_type}) def build_url(self, page: int) - str: 构建1688搜索页URL from urllib.parse import quote return fhttps://s.1688.com/selloffer/offer_search.htm?keywords{quote(self.keyword)}page{page} def run(self) - None: 启动爬虫任务 print(f开始爬取1688关键词【{self.keyword}】共{self.max_page}页) all_data [] for page in range(1, self.max_page 1): print(f正在爬取第{page}页...) url self.build_url(page) html self.request.get(url) if not html: print(f第{page}页爬取失败跳过) continue # 解析数据 page_data self.parser.parse(html) if page_data: all_data.extend(page_data) # 实时存储 self.storage.save(page_data) print(f爬取完成总计获取{len(all_data)}条商品数据) # 测试调度器 if __name__ __main__: config Config() scheduler SpiderScheduler(config) scheduler.run()三、工程化配套模块1. 日志系统Logging替换 print 语句使用 Python 标准库logging实现分级日志INFO/ERROR便于问题排查。python运行import logging import os def init_logger() - None: 初始化日志系统 # 创建日志目录 if not os.path.exists(logs): os.makedirs(logs) # 配置日志格式 log_format %(asctime)s - %(name)s - %(levelname)s - %(message)s # 写入文件 控制台输出 logging.basicConfig( levellogging.INFO, formatlog_format, handlers[ logging.FileHandler(logs/1688_spider.log, encodingutf-8), logging.StreamHandler() ] ) # 在调度器中使用日志 if __name__ __main__: init_logger() logger logging.getLogger(__name__) logger.info(爬虫启动) try: config Config() scheduler SpiderScheduler(config) scheduler.run() logger.info(爬虫结束) except Exception as e: logger.error(f爬虫异常{e}, exc_infoTrue)2. 异常处理体系在核心模块中定义自定义异常便于精准捕获和处理不同类型的错误python运行# exceptions.py class SpiderRequestError(Exception): 请求异常 pass class SpiderParseError(Exception): 解析异常 pass class SpiderStorageError(Exception): 存储异常 pass # 在请求层中抛出自定义异常 def get(self, url: str) - Optional[str]: for retry in range(self.retry_times): try: # ... 原有逻辑 ... return resp.text except Exception as e: if retry self.retry_times - 1: raise SpiderRequestError(f请求{url}失败{e}) time.sleep(2 ** retry) return None3. 代理池集成对于大规模爬取可对接第三方代理池如阿布云、快代理或自建代理池通过 API 动态获取可用代理python运行def _get_proxy_from_pool(self) - Optional[str]: 从代理池API获取可用代理 proxy_api http://proxy.example.com/get_proxy try: resp requests.get(proxy_api, timeout5) return resp.json().get(proxy) except Exception as e: print(f获取代理失败{e}) return None四、工程化扩展与最佳实践1. 多线程 / 异步爬取针对单线程效率低的问题可使用concurrent.futures.ThreadPoolExecutor实现多线程或用aiohttp实现异步爬取注意 1688 的反爬限制避免并发过高。python运行from concurrent.futures import ThreadPoolExecutor def run_multi_thread(self) - None: 多线程爬取 with ThreadPoolExecutor(max_workers3) as executor: # 控制并发数 executor.map(self.crawl_page, range(1, self.max_page 1)) def crawl_page(self, page: int) - None: 单页爬取逻辑供线程调用 url self.build_url(page) html self.request.get(url) if html: page_data self.parser.parse(html) self.storage.save(page_data)2. 爬虫监控与告警通过prometheusgrafana监控爬虫的爬取量、失败率或通过邮件 / 钉钉机器人在爬虫异常时发送告警python运行import smtplib from email.mime.text import MIMEText def send_alert_email(message: str) - None: 发送告警邮件 msg MIMEText(message, plain, utf-8) msg[Subject] 1688爬虫异常告警 msg[From] senderexample.com msg[To] receiverexample.com smtp smtplib.SMTP_SSL(smtp.example.com, 465) smtp.login(senderexample.com, password) smtp.sendmail(senderexample.com, [receiverexample.com], msg.as_string()) smtp.quit()3. 合规与维护遵守 robots 协议1688 的robots.txthttps://www.1688.com/robots.txt明确禁止爬取的路径需严格规避。定期更新解析规则1688 页面结构会频繁变更需定期检查并调整 CSS 选择器 / XPath。数据去重与清洗通过商品链接、ID 等唯一键去重对价格、销量等字段做格式清洗如去除非数字字符。