场景:
进入搜狗,输入搜索关键字进行搜索
利用多进程分布式实现
成都创新互联公司企业建站,十多年网站建设经验,专注于网站建设技术,精于网页设计,有多年建站和网站代运营经验,设计师为客户打造网络企业风格,提供周到的建站售前咨询和贴心的售后服务。对于成都网站设计、成都做网站中不同领域进行深入了解和探索,创新互联在网站建设中充分了解客户行业的需求,以灵动的思维在网页中充分展现,通过对客户行业精准市场调研,为客户提供的解决方案。
from multiprocessing import Pool
import os, time
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from multiprocessing import Manager, current_process
import traceback
#定义测试行为函数:此处为打开搜狗搜索内容
def node_task(name, lock, arg, successTestCases, failTestCases):
"""
:param name: 执行进程名
:param lock:进程间的共享资源锁
:param arg:node节点计算机、浏览器字典 ,如:{"node": "http://127.0.0.1:6666/wd/hub", "browserName": "chrome"}
:param successTestCases:成功执行用例列表
:param failTestCases:失败用例列表
:return:返回成功执行、失败执行的用例列表
"""
procName = current_process().name
print("当前进程名:",procName)
time.sleep(1.2)
#获取节点计算机地址
node_host = arg["node"]
#获取浏览器
browser = arg["browserName"]
print(arg["node"])
print(arg["browserName"])
print('Run task %s (%s)...\n' % (name, os.getpid()))
#获取用例执行的开始时间
start = time.time()
#获取driver对象
driver = webdriver.Remote(
#节点计算机
command_executor="%s" %arg["node"],
desired_capabilities={
#浏览器
"browserName": "%s" %arg["browserName"],
"video": "True",
"platform": "WINDOWS"})
try:
driver.maximize_window()
driver.get("http://www.sogou.com")
assert "搜狗" in driver.title
element = driver.find_element_by_id("query")
element.send_keys("测试开发")
element.send_keys(Keys.RETURN)
time.sleep(3)
assert "测试开发" in driver.page_source
#获取共享锁
lock.acquire()
#用例执行成功,用例加入成功列表
successTestCases.append("TestCase: " + str(name))
#释放共享锁
lock.release()
print("TestCase: " + str(name) + " done!")
except AssertionError as e:
#断言失败,打印异常信息
print("AssertionError occur!" " testCase " + str(name))
print(traceback.print_exc())
#保存异常现场图片
driver.save_screenshot("e:\\screenshot" + str(name) + ".png")
#获取共享锁
lock.acquire()
#存储失败用例信息
failTestCases.append("TestCase: " + str(name))
#释放共享锁
lock.release()
print("测试用例执行失败")
except Exception as e:
print("Exception occur!")
print(traceback.print_exc())
driver.save_screenshot("e:\\screenshot" + str(name) + ".png")
#获取共享锁,存储失败用例信息,并释放锁
with lock:
failTestCases.append("TestCase: " + str(name))
print("测试 用例执行失败")
finally:
#退出驱动,并关闭所有窗口
driver.quit()
end = time.time()
#打印用例执行时间
print("Tast %s run %.2f seconds." % (name, (end - start)))
#封装多进程执行函数
def run(nodeSeq):
"""
:param nodeSeq: 节点机器和浏览器字典,列表
:return: 返回成功、失败用例列表successTestCases, failTestCases
"""
#定义共享manager对象
manager = Manager()
#创建进程间共享的用例执行成功列表
successTestCases = manager.list([])
# 创建进程间共享的用例执行失败列表
failTestCases = manager.list([])
#创建一个共享资源锁,各进程共享
lock = manager.Lock()
#打印父进程ID
print("Parent process %s." % os.getpid())
#创建包含3个进程的进程池
p = Pool(processes=3)
#获取用例数量
testCaseNumber = len(nodeSeq)
#循环索引遍历用例列表,多进程执行node_task函数(搜狗搜索)
for i in range(testCaseNumber):
p.apply_async(node_task, args=(i + 1, lock, nodeSeq[i], successTestCases, failTestCases))
print("Waiting for all subprocesses done...")
#关闭p
p.close()
#等待各个子进程执行结束
p.join()
return successTestCases, failTestCases
#封装写测试结果函数
def resultReport(testCaseNumber, successTestCases, failTestCases):
"""
:param testCaseNumber: 用例个数
:param successTestCases: 成功执行列表
:param failTestCases: 失败执行列表
:return: 无
"""
print("测试报告: \n")
print("共执行测试用例:" + str(testCaseNumber) + "个\n")
print("执行成功的测试用例: " + str(len(successTestCases)) + "个")
if len(successTestCases) > 0:
for t in successTestCases:
print(t)
else:
print("没有执行成功的测试用例")
print("执行失败的测试用例: " + str(len(failTestCases)) + "个")
if len(failTestCases) > 0:
for t in failTestCases:
print(t)
else:
print("没有执行失败的测试用例")
if __name__ == "__main__":
nodeList = [
{"node": "http://127.0.0.1:6666/wd/hub", "browserName": "internet explorer"},
{"node": "http://127.0.0.1:6666/wd/hub", "browserName": "chrome"},
{"node": "http://127.0.0.1:6666/wd/hub", "browserName": "firefox"}]
testCaseNumber = len(nodeList)
#执行多进程函数
successTestCases, failTestCases = run(nodeList)
print("All processes done")
#写测试结果
resultReport(testCaseNumber, successTestCases, failTestCases)