RunTest.py 3.2 KB
# @Time    : 2022/8/23 11:15
# @Author  : 付孟奇
from ddt import ddt, data
from config.VendorPath import excel_path, global_path
from util.HandleJenkins import handle_jenkins
from util.HandleData import handle_data
from util.HandleSpeech import handle_speech
from util.HandleRequest import DealRequest
from util.HandleLog import logger
from util.HandleFile import file_read_save
import unittest
import json
import re
import operator

# 获取配置信息
vendor_name = handle_jenkins()['pro']
# 获取项目所有测试用例
data_list = handle_data.load_excel(excel_path, '话术变量验证')


@ddt
class DDTTest(unittest.TestCase):

    @data(*data_list)
    def test_case(self, data_list):
        logger.info('***************【' + str(data_list['desc']) + '】接口测试开始***************')
        logger.info('接口测试用例为:' + str(data_list))

        # 1、设置报告的接口信息
        self._testMethodName = data_list['uri']
        if data_list['positive_case'] == '正向':
            self._testMethodDoc = '正向case--' + data_list['desc']
        elif data_list['positive_case'] == '反向':
            self._testMethodDoc = '正向case--' + data_list['desc']
        else:
            self._testMethodDoc = data_list['desc']

        # 2、参数整理
        re_headers = handle_data.handle_params(data_list['headers'])
        logger.info('请求头为:' + str(re_headers))

        re_method = data_list['method']
        re_url = handle_jenkins()['host'] + data_list['uri']
        re_params = handle_data.handle_params(data_list['params'])
        logger.info('请求参数为:' + str(re_params))

        # 3、进行接口请求
        res = DealRequest(re_method, re_url, re_headers, re_params).return_response()
        logger.info('接口测试结果为:' + str(res))

        # 4、响应断言
        assert_num = str(data_list['response_assert']).count('话术变量验证')  # 判断是否是话术场景
        if assert_num == 0:
            assert_data = json.loads(data_list['response_assert'])
            for k in assert_data:
                self.assertEqual(handle_data.get_value(res, k)[0], assert_data[k])
        else:  # 话术变量验证
            orderRecordId = res.get('result').get('orderRecordId')
            if orderRecordId is not None:
                assert_msg = handle_speech.getAssert(orderRecordId)
                if assert_msg['code'] ==0:
                    logger.info(assert_msg['msg'])
                else:
                    raise Exception(assert_msg['msg'])

        # 5、判断是否需要保存响应信息:正向case且有相应需要保存的
        if res['code'] == 0:
            if data_list['save_args'] != '' and data_list['positive_case'] == '正向':
                list_args = re.split(r'[,,;;、.。]', data_list['save_args'])
                dic_args = {}
                for i in list_args:
                    res_args = handle_data.get_value(res, i)[0]
                    dic_args[i] = res_args
                file_read_save(global_path, str(int(data_list['id'])), dic_args)

        logger.info('***************【' + str(data_list['desc']) + '】接口测试结束***************')


if __name__ == '__main__':
    unittest.main()