环境准备
安装Python:
确保已安装Python 3.7及以上版本。
您可以通过命令
python --version检查Python版本。
安装依赖库:
安装
playwright,ddddocr, 和opencv-python库。使用命令行运行以下命令:
1 pip install playwright ddddocr opencv-python
下载Playwright浏览器数据:
根据您的操作系统下载对应的Playwright浏览器数据。
运行命令:
1 python -m playwright install chromium
代码结构解析
1. 导入必要的模块
1 import asyncio
2 import os
3 import json
4 import cv2
5 import ddddocr
6 import requests
7 from playwright.async_api import async_playwright
8 import random2. 初始化ddddocr实例
在代码的开头,我们初始化了ddddocr的实例ocr,用于后续的OCR识别。
# 传参获得已初始化的ddddocr实例
ocr = None3. 支持的颜色和形状
我们定义了支持的颜色和形状类型,用于处理颜色和形状验证码。
# 支持的形状类型
supported_types = [
"三角形",
"正方形",
"长方形",
"五角星",
"六边形",
"圆形",
"梯形",
"圆环",
]
# 定义了支持的每种颜色的 HSV 范围
supported_colors = {
"紫色": ([125, 50, 50], [145, 255, 255]),
"灰色": ([0, 0, 50], [180, 50, 255]),
"粉色": ([160, 50, 50], [180, 255, 255]),
"蓝色": ([100, 50, 50], [130, 255, 255]),
"绿色": ([40, 50, 50], [80, 255, 255]),
"橙色": ([10, 50, 50], [25, 255, 255]),
"黄色": ([25, 50, 50], [35, 255, 255]),
"红色": ([0, 50, 50], [10, 255, 255]),
}4. 初始化浏览器
在main函数中,我们初始化了浏览器的路径,并根据操作系统选择合适的浏览器路径。
async def main(workList, uid, oocr):
global ocr
ocr = oocr
async def init_chrome():
if platform.system() == "Windows":
# Windows系统的浏览器路径初始化
chrome_dir = os.path.join(
os.environ["USERPROFILE"],
"AppData",
"Local",
"pyppeteer",
"pyppeteer",
"local-chromium",
"588429",
"chrome-win32",
)
chrome_exe = os.path.join(chrome_dir, "chrome.exe")
if os.path.exists(chrome_exe):
return chrome_exe
else:
# 下载并解压浏览器
print("貌似第一次使用,未找到chrome,正在下载chrome浏览器....")
chromeurl = "https://mirrors.huaweicloud.com/chromium-browser-snapshots/Win_x64/588429/chrome-win32.zip"
target_file = "chrome-win.zip"
await download_file(chromeurl, target_file)
with zipfile.ZipFile(target_file, "r") as zip_ref:
zip_ref.extractall(chrome_dir)
os.remove(target_file)
return chrome_exe
elif platform.system() == "Linux":
# Linux系统的浏览器路径初始化
chrome_path = os.path.expanduser(
"~/.local/share/pyppeteer/local-chromium/1181205/chrome-linux/chrome"
)
download_path = os.path.expanduser(
"~/.local/share/pyppeteer/local-chromium/1181205/"
)
if os.path.isfile(chrome_path):
return chrome_path
else:
print("貌似第一次使用,未找到chrome,正在下载chrome浏览器....")
download_url = "https://mirrors.huaweicloud.com/chromium-browser-snapshots/Linux_x64/884014/chrome-linux.zip"
target_file = os.path.join(
download_path, "chrome-linux.zip"
)
await download_file(download_url, target_file)
with zipfile.ZipFile(target_file, "r") as zip_ref:
zip_ref.extractall(download_path)
os.remove(target_file)
os.chmod(chrome_path, 0o755)
return chrome_path
elif platform.system() == "Darwin":
return "mac"
else:
return "unknown"5. 处理验证码
滑块验证码
在verification函数中,我们通过OpenCV计算滑块的滑动距离,并模拟滑动操作。
async def verification(page):
print("开始过滑块")
async def get_distance():
# 使用OpenCV计算滑动距离
img = cv2.imread("image.png", 0)
template = cv2.imread("template.png", 0)
img = cv2.GaussianBlur(img, (5, 5), 0)
template = cv2.GaussianBlur(template, (5, 5), 0)
bg_edge = cv2.Canny(img, 100, 200)
cut_edge = cv2.Canny(template, 100, 200)
img = cv2.cvtColor(bg_edge, cv2.COLOR_GRAY2RGB)
template = cv2.cvtColor(cut_edge, cv2.COLOR_GRAY2RGB)
res = cv2.matchTemplate(img, template, cv2.TM_CCOEFF_NORMED)
value = cv2.minMaxLoc(res)[3][0]
distance = value + random.uniform(5, 15)
print(f"计算出的滑动距离: {distance}")
return distance
# 下载并处理验证码图片
await page.waitForSelector("#cpc_img", {'timeout': 10000})
image_src = await page.Jeval("#cpc_img", 'el => el.getAttribute("src")')
request.urlretrieve(image_src, "image.png")
width = await page.evaluate('() => document.getElementById("cpc_img").clientWidth')
height = await page.evaluate('() => document.getElementById("cpc_img").clientHeight')
image = Image.open("image.png")
resized_image = image.resize((width, height))
resized_image.save("image.png")
template_src = await page.Jeval("#small_img", 'el => el.getAttribute("src")')
request.urlretrieve(template_src, "template.png")
width = await page.evaluate('() => document.getElementById("small_img").clientWidth')
height = await page.evaluate('() => document.getElementById("small_img").clientHeight')
image = Image.open("template.png")
resized_image = image.resize((width, height))
resized_image.save("template.png")
el = await page.querySelector("#captcha_modal > div > div.captcha_footer > div > img")
box = await el.boundingBox()
distance = await get_distance()
# 模拟滑动操作
await page.mouse.move(box["x"] + 10, box["y"] + 10)
await page.mouse.down()
steps = 30
for i in range(steps):
progress = i / steps
x = box["x"] + 10 + (distance * (progress ** 2)) + random.uniform(-2, 2)
y = box["y"] + 10 + random.uniform(-3, 3)
await page.mouse.move(x, y, {'steps': 1})
await asyncio.sleep(random.uniform(0.03, 0.07))
await page.mouse.up()
print("过滑块结束")形状验证码
在verification_shape函数中,我们通过ddddocr识别验证码中的文字,并结合OpenCV检测形状和颜色。
async def verification_shape(page):
print("开始过颜色、形状验证")
# 初始化文字检测器
det = ddddocr.DdddOcr(det=True)
def get_shape_location_by_type(img_path, type: str):
# 通过OpenCV检测形状
img = cv2.imread(img_path)
imgGray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
imgBlur = cv2.GaussianBlur(imgGray, (5, 5), 1)
imgCanny = cv2.Canny(imgBlur, 60, 60)
contours, hierarchy = cv2.findContours(
imgCanny, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE
)
for obj in contours:
perimeter = cv2.arcLength(obj, True)
approx = cv2.approxPolyDP(obj, 0.02 * perimeter, True)
CornerNum = len(approx)
x, y, w, h = cv2.boundingRect(approx)
if CornerNum == 3:
obj_type = "三角形"
elif CornerNum == 4:
if w == h:
obj_type = "正方形"
else:
obj_type = "长方形"
elif CornerNum == 6:
obj_type = "六边形"
elif CornerNum == 8:
obj_type = "圆形"
elif CornerNum == 20:
obj_type = "五角星"
else:
obj_type = "未知"
if obj_type == type:6. 获取Cookie
在登录成功后,我们通过getCookie函数获取页面的Cookie信息,特别是pt_key和pt_pin,这些是后续操作的重要凭证。
async def getCookie(page):
cookies = await page.cookies()
pt_key = ""
pt_pin = ""
for cookie in cookies:
if cookie["name"] == "pt_key":
pt_key = cookie["value"]
elif cookie["name"] == "pt_pin":
pt_pin = cookie["value"]
ck = f"pt_key={pt_key};pt_pin={pt_pin};"
print(f"登录成功 {ck}")
return ck7. 发送短信验证码
在sendSMS和sendSMSDirectly函数中,我们模拟用户点击发送短信验证码按钮,并处理可能出现的验证码弹窗。
async def sendSMS(page):
async def preSendSMS(page):
print("进行发送验证码前置操作")
await page.waitForXPath(
'//*[@id="app"]/div/div[2]/div[2]/span/a'
)
await page.waitFor(random.randint(1, 3) * 1000)
elements = await page.xpath(
'//*[@id="app"]/div/div[2]/div[2]/span/a'
)
await elements[0].click()
await page.waitForXPath(
'//*[@id="app"]/div/div[2]/div[2]/button'
)
await page.waitFor(random.randint(1, 3) * 1000)
elements = await page.xpath(
'//*[@id="app"]/div/div[2]/div[2]/button'
)
await elements[0].click()
await page.waitFor(3000)
await preSendSMS(page)
print("开始发送验证码")
try:
while True:
if await page.xpath('//*[@id="captcha_modal"]/div/div[3]/div'):
await verification(page)
elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'):
await verification_shape(page)
else:
break
await page.waitFor(3000)
except Exception as e:
raise e8. 输入短信验证码
在typeSMScode函数中,我们等待用户输入短信验证码,并将其填入表单中。
async def typeSMScode(page, workList, uid):
print("开始输入验证码")
async def get_verification_code(workList, uid):
print("开始从全局变量获取验证码")
retry = 60
while not workList[uid].SMS_CODE and not retry < 0:
await asyncio.sleep(1)
retry -= 1
if retry < 0:
workList[uid].status = "error"
workList[uid].msg = "输入短信验证码超时"
return
workList[uid].status = "pending"
return workList[uid].SMS_CODE
await page.waitForXPath('//*[@id="app"]/div/div[2]/div[2]/div/input')
code = await get_verification_code(workList, uid)
if not code:
return
workList[uid].status = "pending"
workList[uid].msg = "正在通过短信验证"
input_elements = await page.xpath('//*[@id="app"]/div/div[2]/div[2]/div/input')
try:
if input_elements:
input_value = await input_elements[0].getProperty("value")
if input_value:
print("清除验证码输入框中已有的验证码")
await page.evaluate(
'(element) => element.value = ""', input_elements[0]
)
except Exception as e:
print("typeSMScode" + str(e))
await input_elements[0].type(code)
await page.waitForXPath('//*[@id="app"]/div/div[2]/a[1]')
await page.waitFor(random.randint(1, 3) * 1000)
elements = await page.xpath('//*[@id="app"]/div/div[2]/a[1]')
await elements[0].click()
await page.waitFor(random.randint(2, 3) * 1000)9. 错误处理和状态更新
在整个登录流程中,我们通过多个函数检查登录状态,处理错误情况,并更新任务状态。
async def isWrongAccountOrPassword(page, verify=False):
try:
element = await page.xpath('//*[@id="app"]/div/div[5]')
if element:
text = await page.evaluate(
"(element) => element.textContent", element[0]
)
if text == "账号或密码不正确":
if verify == True:
return True
await asyncio.sleep(2)
return await isWrongAccountOrPassword(page, verify=True)
return False
except Exception as e:
print("isWrongAccountOrPassword " + str(e))
return False10. 任务管理和资源清理
在deleteSession函数中,我们管理任务状态,并在任务完成后清理资源。
async def deleteSession(workList, uid):
s = workList.get(uid, "")
if s:
await asyncio.sleep(60)
del workList[uid]11. 主函数
在main函数中,我们初始化浏览器,启动登录流程,并在完成后清理临时文件。
async def main(workList, uid, oocr):
global ocr
ocr = oocr
async def init_chrome():
# 浏览器初始化逻辑,如前所述
chromium_path = await init_chrome()
headless = platform.system() != "Windows"
await logon_main(chromium_path, workList, uid, headless)
os.remove("image.png") if os.path.exists("image.png") else None
os.remove("template.png") if os.path.exists("template.png") else None
os.remove("shape_image.png") if os.path.exists("shape_image.png") else None
os.remove("rgba_word_img.png") if os.path.exists("rgba_word_img.png") else None
os.remove("rgb_word_img.png") if os.path.exists("rgb_word_img.png") else None
print("登录完成")
await asyncio.sleep(10)
评论区