98e8884c by chenyao

ocr_process添加try-except处理前半部分

1 parent 75d18a3c
......@@ -178,171 +178,180 @@ class Command(BaseCommand, LoggerMixin):
# self.online_log.info('{0} [edms download success] [pdf_path={1}]'.format(self.log_base, pdf_path))
def bs_process(self, wb, ocr_data, bs_summary, unknown_summary, classify, res_list, pno, ino, part_idx):
sheets = ocr_data.get('data', [])
if not sheets:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS_EMPTY))
return
# confidence = ocr_data.get('confidence', 1)
img_name = 'page_{0}_img_{1}_{2}'.format(pno, ino, part_idx)
cells_exists = False
for i, sheet in enumerate(sheets):
cells = sheet.get('cells')
if not cells:
continue
cells_exists = True
sheet_name = '{0}_{1}'.format(img_name, i)
ws = wb.create_sheet(sheet_name)
for cell in cells:
c1 = cell.get('start_column')
r1 = cell.get('start_row')
words = cell.get('words')
ws.cell(row=r1 + 1, column=c1 + 1, value=words)
# 真伪
verify_info = []
verify_dict = sheet.get('verify', {})
if verify_dict.get('verify_res') == 'fake':
verify_info.extend(verify_dict.get('verify_info', []))
# ['户名', '卡号', '页码', '回单验证码', '打印时间', '起始时间', '终止时间']
summary = sheet.get('summary')
card = summary[1]
if card is None:
classify_dict = unknown_summary.setdefault(classify, {})
role = consts.UNKNOWN_ROLE if summary[0] is None else summary[0]
role_dict = classify_dict.setdefault(role, {})
role_dict['classify'] = classify
role_dict['role'] = role
role_dict.setdefault('sheet', []).append(sheet_name)
# role_dict.setdefault('confidence', []).append(confidence)
code_list = role_dict.setdefault('code', [])
pt_list = role_dict.setdefault('print_time', [])
sd_list = role_dict.setdefault('start_date', [])
ed_list = role_dict.setdefault('end_date', [])
verify_list = role_dict.setdefault('verify', [])
if summary[3] is not None:
code_list.append((summary[2], summary[3]))
if summary[4] is not None:
pt_list.append(summary[4])
if summary[5] is not None:
sd_list.append(summary[5])
if summary[6] is not None:
ed_list.append(summary[6])
if len(verify_info) > 0:
verify_list.append(
(pno, ino, '、'.join(verify_info))
)
# 添加 try-except 处理
try:
sheets = ocr_data.get('data', [])
if not sheets:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS_EMPTY))
return
# confidence = ocr_data.get('confidence', 1)
img_name = 'page_{0}_img_{1}_{2}'.format(pno, ino, part_idx)
cells_exists = False
for i, sheet in enumerate(sheets):
cells = sheet.get('cells')
if not cells:
continue
cells_exists = True
sheet_name = '{0}_{1}'.format(img_name, i)
ws = wb.create_sheet(sheet_name)
for cell in cells:
c1 = cell.get('start_column')
r1 = cell.get('start_row')
words = cell.get('words')
ws.cell(row=r1 + 1, column=c1 + 1, value=words)
# 真伪
verify_info = []
verify_dict = sheet.get('verify', {})
if verify_dict.get('verify_res') == 'fake':
verify_info.extend(verify_dict.get('verify_info', []))
# ['户名', '卡号', '页码', '回单验证码', '打印时间', '起始时间', '终止时间']
summary = sheet.get('summary')
card = summary[1]
if card is None:
classify_dict = unknown_summary.setdefault(classify, {})
role = consts.UNKNOWN_ROLE if summary[0] is None else summary[0]
role_dict = classify_dict.setdefault(role, {})
role_dict['classify'] = classify
role_dict['role'] = role
role_dict.setdefault('sheet', []).append(sheet_name)
# role_dict.setdefault('confidence', []).append(confidence)
code_list = role_dict.setdefault('code', [])
pt_list = role_dict.setdefault('print_time', [])
sd_list = role_dict.setdefault('start_date', [])
ed_list = role_dict.setdefault('end_date', [])
verify_list = role_dict.setdefault('verify', [])
if summary[3] is not None:
code_list.append((summary[2], summary[3]))
if summary[4] is not None:
pt_list.append(summary[4])
if summary[5] is not None:
sd_list.append(summary[5])
if summary[6] is not None:
ed_list.append(summary[6])
if len(verify_info) > 0:
verify_list.append(
(pno, ino, '、'.join(verify_info))
)
else:
card_dict = bs_summary.setdefault(card, {})
card_dict['count'] = card_dict.get('count', 0) + 1
card_dict.setdefault('classify', []).append(classify)
# card_dict.setdefault('confidence', []).append(confidence)
card_dict.setdefault('sheet', []).append(sheet_name)
role_list = card_dict.setdefault('role', [])
role_set = card_dict.setdefault('role_set', set())
code_list = card_dict.setdefault('code', [])
pt_list = card_dict.setdefault('print_time', [])
sd_list = card_dict.setdefault('start_date', [])
ed_list = card_dict.setdefault('end_date', [])
verify_list = card_dict.setdefault('verify', [])
if summary[0] is not None:
role_list.append(summary[0])
role_set.add(summary[0])
if summary[3] is not None:
code_list.append((summary[2], summary[3]))
if summary[4] is not None:
pt_list.append(summary[4])
if summary[5] is not None:
sd_list.append(summary[5])
if summary[6] is not None:
ed_list.append(summary[6])
if len(verify_info) > 0:
verify_list.append(
(pno, ino, '、'.join(verify_info))
)
if cells_exists:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS))
else:
card_dict = bs_summary.setdefault(card, {})
card_dict['count'] = card_dict.get('count', 0) + 1
card_dict.setdefault('classify', []).append(classify)
# card_dict.setdefault('confidence', []).append(confidence)
card_dict.setdefault('sheet', []).append(sheet_name)
role_list = card_dict.setdefault('role', [])
role_set = card_dict.setdefault('role_set', set())
code_list = card_dict.setdefault('code', [])
pt_list = card_dict.setdefault('print_time', [])
sd_list = card_dict.setdefault('start_date', [])
ed_list = card_dict.setdefault('end_date', [])
verify_list = card_dict.setdefault('verify', [])
if summary[0] is not None:
role_list.append(summary[0])
role_set.add(summary[0])
if summary[3] is not None:
code_list.append((summary[2], summary[3]))
if summary[4] is not None:
pt_list.append(summary[4])
if summary[5] is not None:
sd_list.append(summary[5])
if summary[6] is not None:
ed_list.append(summary[6])
if len(verify_info) > 0:
verify_list.append(
(pno, ino, '、'.join(verify_info))
)
if cells_exists:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS))
else:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS_EMPTY))
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS_EMPTY))
except Exception as e:
res_list.append((pno, ino, part_idx, consts.RES_FAILED))
self.online_log.error('{0} [bs_process error] [error={1}]'.format(self.log_base, traceback.format_exc()))
def contract_process(self, classify, ocr_data, contract_result, res_list, pno, ino, part_idx,
img_path, contract_result_compare):
contract_dict = ocr_data.get('data')
if not contract_dict or contract_dict.get('page_num') is None or contract_dict.get('page_info') is None:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS_EMPTY))
return
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS))
page_num = contract_dict.get('page_num')
if page_num.startswith('page_'):
page_num_only = page_num.split('_')[-1]
else:
page_num_only = page_num
rebuild_page_info = []
text_key = 'words'
position_key = 'position'
for key, value in contract_dict.get('page_info', {}).items():
if value is None:
rebuild_page_info.append((key, ))
elif text_key in value:
if value[text_key] is None:
rebuild_page_info.append((key,))
elif isinstance(value[text_key], str):
rebuild_page_info.append((key, value[text_key]))
elif isinstance(value[text_key], list):
rebuild_page_info.append((key,))
for row_list in value[text_key]:
rebuild_page_info.append(row_list)
# 添加 try-except 处理
try:
contract_dict = ocr_data.get('data')
if not contract_dict or contract_dict.get('page_num') is None or contract_dict.get('page_info') is None:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS_EMPTY))
return
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS))
page_num = contract_dict.get('page_num')
if page_num.startswith('page_'):
page_num_only = page_num.split('_')[-1]
else:
rebuild_page_info.append((key,))
for sub_key, sub_value in value.items():
if sub_value is None:
rebuild_page_info.append((sub_key,))
elif text_key in sub_value:
if sub_value[text_key] is None:
rebuild_page_info.append((sub_key,))
elif isinstance(sub_value[text_key], str):
rebuild_page_info.append((sub_key, sub_value[text_key]))
elif isinstance(sub_value[text_key], list):
page_num_only = page_num
rebuild_page_info = []
text_key = 'words'
position_key = 'position'
for key, value in contract_dict.get('page_info', {}).items():
if value is None:
rebuild_page_info.append((key, ))
elif text_key in value:
if value[text_key] is None:
rebuild_page_info.append((key,))
elif isinstance(value[text_key], str):
rebuild_page_info.append((key, value[text_key]))
elif isinstance(value[text_key], list):
rebuild_page_info.append((key,))
for row_list in value[text_key]:
rebuild_page_info.append(row_list)
else:
rebuild_page_info.append((key,))
for sub_key, sub_value in value.items():
if sub_value is None:
rebuild_page_info.append((sub_key,))
for row_list in sub_value[text_key]:
rebuild_page_info.append(row_list)
elif text_key in sub_value:
if sub_value[text_key] is None:
rebuild_page_info.append((sub_key,))
elif isinstance(sub_value[text_key], str):
rebuild_page_info.append((sub_key, sub_value[text_key]))
elif isinstance(sub_value[text_key], list):
rebuild_page_info.append((sub_key,))
for row_list in sub_value[text_key]:
rebuild_page_info.append(row_list)
contract_result.setdefault(classify, dict()).setdefault(page_num_only, []).append(rebuild_page_info)
page_compare_dict = {
consts.IMG_PATH_KEY: img_path,
consts.ALL_POSITION_KEY: {},
}
for key, value in contract_dict.get('page_info', {}).items():
if not isinstance(value, dict):
continue
elif text_key in value:
position_list = value.get(position_key, [])
page_compare_dict[consts.ALL_POSITION_KEY][key] = position_list if isinstance(position_list, list) else []
if value[text_key] is None:
page_compare_dict[key] = ''
elif isinstance(value[text_key], str):
page_compare_dict[key] = value[text_key]
elif isinstance(value[text_key], list):
page_compare_dict[key] = value[text_key]
else:
page_compare_dict[key] = {}
page_compare_dict[consts.ALL_POSITION_KEY][key] = {}
for sub_key, sub_value in value.items():
position_list = sub_value.get(position_key, [])
page_compare_dict[consts.ALL_POSITION_KEY][key][sub_key] = position_list if isinstance(
position_list, list) else []
contract_result.setdefault(classify, dict()).setdefault(page_num_only, []).append(rebuild_page_info)
if sub_value[text_key] is None:
page_compare_dict[key][sub_key] = ''
elif isinstance(sub_value[text_key], str):
page_compare_dict[key][sub_key] = sub_value[text_key]
page_compare_dict = {
consts.IMG_PATH_KEY: img_path,
consts.ALL_POSITION_KEY: {},
}
for key, value in contract_dict.get('page_info', {}).items():
if not isinstance(value, dict):
continue
elif text_key in value:
position_list = value.get(position_key, [])
page_compare_dict[consts.ALL_POSITION_KEY][key] = position_list if isinstance(position_list, list) else []
if value[text_key] is None:
page_compare_dict[key] = ''
elif isinstance(value[text_key], str):
page_compare_dict[key] = value[text_key]
elif isinstance(value[text_key], list):
page_compare_dict[key] = value[text_key]
else:
page_compare_dict[key] = {}
page_compare_dict[consts.ALL_POSITION_KEY][key] = {}
for sub_key, sub_value in value.items():
position_list = sub_value.get(position_key, [])
page_compare_dict[consts.ALL_POSITION_KEY][key][sub_key] = position_list if isinstance(
position_list, list) else []
if sub_value[text_key] is None:
page_compare_dict[key][sub_key] = ''
elif isinstance(sub_value[text_key], str):
page_compare_dict[key][sub_key] = sub_value[text_key]
contract_result_compare.setdefault(classify, dict())[consts.ASP_KEY] = contract_dict.get(consts.ASP_KEY, False)
# "position" = [xmin, ymin, xmax, ymax]
contract_result_compare.setdefault(classify, dict())[page_num_only] = page_compare_dict
contract_result_compare.setdefault(classify, dict())[consts.ASP_KEY] = contract_dict.get(consts.ASP_KEY, False)
# "position" = [xmin, ymin, xmax, ymax]
contract_result_compare.setdefault(classify, dict())[page_num_only] = page_compare_dict
except Exception as e:
self.online_log.error('{0} [contract_process error] [error={1}]'.format(self.log_base, traceback.format_exc()))
@staticmethod
def rebuild_position(src_position):
......@@ -372,499 +381,525 @@ class Command(BaseCommand, LoggerMixin):
def license1_process(self, ocr_data, license_summary, classify, res_list, pno, ino, part_idx, img_path, do_dda,
dda_id_bc_mapping):
# 类别:'0'身份证, '1'居住证
license_data = ocr_data.get('data')
if not license_data:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS_EMPTY))
return
if isinstance(license_data, dict):
pre, suf = os.path.splitext(img_path)
base64_img = license_data.pop('base64_img', '')
is_save = True if len(base64_img) > 0 else False
section_img_path = '{0}_{1}{2}'.format(pre, part_idx, suf) if is_save else img_path
if is_save:
try:
with open(section_img_path, "wb") as fh:
fh.write(base64.b64decode(base64_img.encode()))
except Exception as e:
self.online_log.warn(
'{0} [section img save failed] [img_path={1}]'
' [part_idx={2}]'.format(self.log_base, img_path, part_idx))
else:
is_save = False
section_img_path = img_path
# 保单
if classify == consts.INSURANCE_CLASSIFY:
product_result = ['', '', '']
product_result_position = [dict(), dict(), dict()]
min_char_count_1 = 1000
min_char_count_2 = 1000
for product in license_data.get('result', {}).get('productList', []):
name = product.get('name', {}).get('words', '')
if name.find('机动车损失') != -1 or name.find('汽车损失') != -1 or name.find('车损险') != -1 or \
name.find('车损失险') != -1 or name.find('车损失保险') != -1:
if len(name) < min_char_count_1:
min_char_count_1 = len(name)
product_result[0] = product.get('coverage', {}).get('words', '')
product_result[2] = product.get('deductible_franchise', {}).get('words', '')
product_result_position[0] = self.rebuild_position(product.get('coverage', {}).get(
'position', {}))
product_result_position[2] = self.rebuild_position(product.get('deductible_franchise', {}).get(
'position', {}))
elif name.find('第三者责任') != -1:
if len(name) < min_char_count_2:
min_char_count_2 = len(name)
product_result[1] = product.get('coverage', {}).get('words', '')
product_result_position[1] = self.rebuild_position(product.get('coverage', {}).get(
'position', {}))
special_str = license_data.get('result', {}).get('1stBeneficiary', {}).get('words', '')
special = '无'
if special_str.find('宝马') != -1 or special_str.find('先锋国际融资租赁有限公司') != -1:
special = '有'
insurance_ocr_result = {
'被保险人姓名': license_data.get('result', {}).get('insured', {}).get('name', {}).get('words', ''),
'被保险人证件号码': license_data.get('result', {}).get('insured', {}).get('certiCode', {}).get('words', ''),
'车架号': license_data.get('result', {}).get('vehicle', {}).get('VIN', {}).get('words', ''),
'机动车损失保险金额': product_result[0],
'机动车第三者责任保险金额': product_result[1],
'机动车损失保险绝对免赔率/绝对免赔额': product_result[2],
'保险费合计': license_data.get('result', {}).get('premiumSum', {}).get('words', ''),
'保险起始日期': license_data.get('result', {}).get('startDate', {}).get('words', ''),
'保险截止日期': license_data.get('result', {}).get('endDate', {}).get('words', ''),
'保单章': license_data.get('result', {}).get('seal', {}).get('words', ''),
'特别约定第一受益人': special,
consts.IMG_PATH_KEY: img_path,
consts.SECTION_IMG_PATH_KEY: section_img_path,
}
position_dict = {
'被保险人姓名': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'insured', {}).get('name', {}).get('position', {}))},
'被保险人证件号码': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'insured', {}).get('certiCode', {}).get('position', {}))},
'车架号': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'vehicle', {}).get('VIN', {}).get('position', {}))},
'机动车损失保险金额': {consts.FIELD_POSITION_KEY: product_result_position[0]},
'机动车第三者责任保险金额': {consts.FIELD_POSITION_KEY: product_result_position[1]},
'机动车损失保险绝对免赔率/绝对免赔额': {consts.FIELD_POSITION_KEY: product_result_position[2]},
'保险费合计': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'premiumSum', {}).get('position', {}))},
'保险起始日期': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'startDate', {}).get('position', {}))},
'保险截止日期': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'endDate', {}).get('position', {}))},
'保单章': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'seal', {}).get('position', {}))},
'特别约定第一受益人': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'1stBeneficiary', {}).get('position', {}))},
}
insurance_ocr_result[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(insurance_ocr_result)
# DDA
elif classify == consts.DDA_CLASSIFY:
pro = ocr_data.get('confidence', 0)
if pro < consts.DDA_PRO_MIN:
# 添加 try-except 处理
try:
# 类别:'0'身份证, '1'居住证
license_data = ocr_data.get('data')
if not license_data:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS_EMPTY))
return
dda_ocr_result = {}
position_dict = {}
for key, value in license_data.get('result', {}).items():
dda_ocr_result[key] = value.get('words', '')
position_dict[key] = {
consts.FIELD_POSITION_KEY: value.get('position', {})
if isinstance(license_data, dict):
pre, suf = os.path.splitext(img_path)
base64_img = license_data.pop('base64_img', '')
is_save = True if len(base64_img) > 0 else False
section_img_path = '{0}_{1}{2}'.format(pre, part_idx, suf) if is_save else img_path
if is_save:
try:
with open(section_img_path, "wb") as fh:
fh.write(base64.b64decode(base64_img.encode()))
except Exception as e:
self.online_log.warn(
'{0} [section img save failed] [img_path={1}]'
' [part_idx={2}]'.format(self.log_base, img_path, part_idx))
else:
is_save = False
section_img_path = img_path
# 保单
if classify == consts.INSURANCE_CLASSIFY:
product_result = ['', '', '']
product_result_position = [dict(), dict(), dict()]
min_char_count_1 = 1000
min_char_count_2 = 1000
for product in license_data.get('result', {}).get('productList', []):
name = product.get('name', {}).get('words', '')
if name.find('机动车损失') != -1 or name.find('汽车损失') != -1 or name.find('车损险') != -1 or \
name.find('车损失险') != -1 or name.find('车损失保险') != -1:
if len(name) < min_char_count_1:
min_char_count_1 = len(name)
product_result[0] = product.get('coverage', {}).get('words', '')
product_result[2] = product.get('deductible_franchise', {}).get('words', '')
product_result_position[0] = self.rebuild_position(product.get('coverage', {}).get(
'position', {}))
product_result_position[2] = self.rebuild_position(product.get('deductible_franchise', {}).get(
'position', {}))
elif name.find('第三者责任') != -1:
if len(name) < min_char_count_2:
min_char_count_2 = len(name)
product_result[1] = product.get('coverage', {}).get('words', '')
product_result_position[1] = self.rebuild_position(product.get('coverage', {}).get(
'position', {}))
special_str = license_data.get('result', {}).get('1stBeneficiary', {}).get('words', '')
special = '无'
if special_str.find('宝马') != -1 or special_str.find('先锋国际融资租赁有限公司') != -1:
special = '有'
insurance_ocr_result = {
'被保险人姓名': license_data.get('result', {}).get('insured', {}).get('name', {}).get('words', ''),
'被保险人证件号码': license_data.get('result', {}).get('insured', {}).get('certiCode', {}).get('words', ''),
'车架号': license_data.get('result', {}).get('vehicle', {}).get('VIN', {}).get('words', ''),
'机动车损失保险金额': product_result[0],
'机动车第三者责任保险金额': product_result[1],
'机动车损失保险绝对免赔率/绝对免赔额': product_result[2],
'保险费合计': license_data.get('result', {}).get('premiumSum', {}).get('words', ''),
'保险起始日期': license_data.get('result', {}).get('startDate', {}).get('words', ''),
'保险截止日期': license_data.get('result', {}).get('endDate', {}).get('words', ''),
'保单章': license_data.get('result', {}).get('seal', {}).get('words', ''),
'特别约定第一受益人': special,
consts.IMG_PATH_KEY: img_path,
consts.SECTION_IMG_PATH_KEY: section_img_path,
}
dda_ocr_result[consts.DDA_IMG_PATH] = img_path
dda_ocr_result[consts.DDA_PRO] = pro
dda_ocr_result[consts.IMG_PATH_KEY] = img_path
dda_ocr_result[consts.SECTION_IMG_PATH_KEY] = section_img_path
dda_ocr_result[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(dda_ocr_result)
# 抵押登记豁免函
elif classify == consts.HMH_CLASSIFY:
hmh_ocr_result = {}
position_dict = {}
for key, value in license_data.get('words_result', {}).items():
hmh_ocr_result[key] = value.get('words', '')
location_list = value.get('location', [-1, -1, -1, -1])
if len(location_list) == 4:
position_dict = {
'被保险人姓名': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'insured', {}).get('name', {}).get('position', {}))},
'被保险人证件号码': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'insured', {}).get('certiCode', {}).get('position', {}))},
'车架号': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'vehicle', {}).get('VIN', {}).get('position', {}))},
'机动车损失保险金额': {consts.FIELD_POSITION_KEY: product_result_position[0]},
'机动车第三者责任保险金额': {consts.FIELD_POSITION_KEY: product_result_position[1]},
'机动车损失保险绝对免赔率/绝对免赔额': {consts.FIELD_POSITION_KEY: product_result_position[2]},
'保险费合计': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'premiumSum', {}).get('position', {}))},
'保险起始日期': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'startDate', {}).get('position', {}))},
'保险截止日期': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'endDate', {}).get('position', {}))},
'保单章': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'seal', {}).get('position', {}))},
'特别约定第一受益人': {consts.FIELD_POSITION_KEY: self.rebuild_position(license_data.get('result', {}).get(
'1stBeneficiary', {}).get('position', {}))},
}
insurance_ocr_result[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(insurance_ocr_result)
# DDA
elif classify == consts.DDA_CLASSIFY:
pro = ocr_data.get('confidence', 0)
if pro < consts.DDA_PRO_MIN:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS_EMPTY))
return
dda_ocr_result = {}
position_dict = {}
for key, value in license_data.get('result', {}).items():
dda_ocr_result[key] = value.get('words', '')
position_dict[key] = {
consts.FIELD_POSITION_KEY: {
'top': location_list[1],
'left': location_list[0],
'height': location_list[-1] - location_list[1],
'width': location_list[2] - location_list[0]
}
consts.FIELD_POSITION_KEY: value.get('position', {})
}
hmh_ocr_result[consts.IMG_PATH_KEY] = img_path
hmh_ocr_result[consts.SECTION_IMG_PATH_KEY] = section_img_path
hmh_ocr_result[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(hmh_ocr_result)
# 二手车交易凭证
elif classify == consts.JYPZ_CLASSIFY:
jypz_ocr_result = {}
position_dict = {}
for key, value in license_data.get('result', {}).items():
jypz_ocr_result[key] = value.get('words', '')
position_dict[key] = {
consts.FIELD_POSITION_KEY: value.get('position', {})
}
jypz_ocr_result[consts.IMG_PATH_KEY] = img_path
jypz_ocr_result[consts.SECTION_IMG_PATH_KEY] = section_img_path
jypz_ocr_result[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(jypz_ocr_result)
# 车辆登记证 3/4页结果整合
elif classify == consts.MVC_CLASSIFY:
rebuild_data_dict = {}
position_dict = {}
mvc_page = license_data.pop('page', 'VehicleRCI')
mvc_res = license_data.pop('results', {})
if mvc_page == 'VehicleRegArea':
rebuild_data_dict['机动车登记证书编号'] = mvc_res.get('机动车登记证书编号', {}).get('words', '')
code_position_list = mvc_res.get('机动车登记证书编号', {}).get('position', [0, 0, 0, 0])
if len(code_position_list) == 4:
position_dict['机动车登记证书编号'] = {
consts.FIELD_POSITION_KEY: {
'top': code_position_list[1],
'left': code_position_list[0],
'height': code_position_list[-1],
'width': code_position_list[2],
dda_ocr_result[consts.DDA_IMG_PATH] = img_path
dda_ocr_result[consts.DDA_PRO] = pro
dda_ocr_result[consts.IMG_PATH_KEY] = img_path
dda_ocr_result[consts.SECTION_IMG_PATH_KEY] = section_img_path
dda_ocr_result[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(dda_ocr_result)
# 抵押登记豁免函
elif classify == consts.HMH_CLASSIFY:
hmh_ocr_result = {}
position_dict = {}
for key, value in license_data.get('words_result', {}).items():
hmh_ocr_result[key] = value.get('words', '')
location_list = value.get('location', [-1, -1, -1, -1])
if len(location_list) == 4:
position_dict[key] = {
consts.FIELD_POSITION_KEY: {
'top': location_list[1],
'left': location_list[0],
'height': location_list[-1] - location_list[1],
'width': location_list[2] - location_list[0]
}
}
hmh_ocr_result[consts.IMG_PATH_KEY] = img_path
hmh_ocr_result[consts.SECTION_IMG_PATH_KEY] = section_img_path
hmh_ocr_result[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(hmh_ocr_result)
# 二手车交易凭证
elif classify == consts.JYPZ_CLASSIFY:
jypz_ocr_result = {}
position_dict = {}
for key, value in license_data.get('result', {}).items():
jypz_ocr_result[key] = value.get('words', '')
position_dict[key] = {
consts.FIELD_POSITION_KEY: value.get('position', {})
}
for register_info in mvc_res.get('登记信息', []):
register_info.pop('register_type', None)
register_info.pop('register_type_name', None)
for cn_key, detail_dict in register_info.items():
rebuild_data_dict.setdefault(cn_key, []).append(
detail_dict.get('words', ''))
tmp_position_list = detail_dict.get('position', [0, 0, 0, 0])
if len(tmp_position_list) == 4:
position_dict.setdefault(cn_key, []).append(
{
consts.FIELD_POSITION_KEY: {
'top': tmp_position_list[1],
'left': tmp_position_list[0],
'height': tmp_position_list[-1],
'width': tmp_position_list[2],
jypz_ocr_result[consts.IMG_PATH_KEY] = img_path
jypz_ocr_result[consts.SECTION_IMG_PATH_KEY] = section_img_path
jypz_ocr_result[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(jypz_ocr_result)
# 车辆登记证 3/4页结果整合
elif classify == consts.MVC_CLASSIFY:
rebuild_data_dict = {}
position_dict = {}
mvc_page = license_data.pop('page', 'VehicleRCI')
mvc_res = license_data.pop('results', {})
if mvc_page == 'VehicleRegArea':
rebuild_data_dict['机动车登记证书编号'] = mvc_res.get('机动车登记证书编号', {}).get('words', '')
code_position_list = mvc_res.get('机动车登记证书编号', {}).get('position', [0, 0, 0, 0])
if len(code_position_list) == 4:
position_dict['机动车登记证书编号'] = {
consts.FIELD_POSITION_KEY: {
'top': code_position_list[1],
'left': code_position_list[0],
'height': code_position_list[-1],
'width': code_position_list[2],
}
}
for register_info in mvc_res.get('登记信息', []):
register_info.pop('register_type', None)
register_info.pop('register_type_name', None)
for cn_key, detail_dict in register_info.items():
rebuild_data_dict.setdefault(cn_key, []).append(
detail_dict.get('words', ''))
tmp_position_list = detail_dict.get('position', [0, 0, 0, 0])
if len(tmp_position_list) == 4:
position_dict.setdefault(cn_key, []).append(
{
consts.FIELD_POSITION_KEY: {
'top': tmp_position_list[1],
'left': tmp_position_list[0],
'height': tmp_position_list[-1],
'width': tmp_position_list[2],
}
}
)
rebuild_data_dict[consts.ALL_POSITION_KEY_2] = position_dict
rebuild_data_dict[consts.IMG_PATH_KEY_2] = img_path
rebuild_data_dict[consts.SECTION_IMG_PATH_KEY_2] = section_img_path
else:
for cn_key, detail_dict in mvc_res.items():
rebuild_data_dict[cn_key] = detail_dict.get('words', '')
position_list = detail_dict.get('position', [0, 0, 0, 0])
if len(position_list) == 4:
position_dict[cn_key] = {
consts.FIELD_POSITION_KEY: {
'top': position_list[1],
'left': position_list[0],
'height': position_list[-1],
'width': position_list[2],
}
)
rebuild_data_dict[consts.ALL_POSITION_KEY_2] = position_dict
rebuild_data_dict[consts.IMG_PATH_KEY_2] = img_path
rebuild_data_dict[consts.SECTION_IMG_PATH_KEY_2] = section_img_path
else:
for cn_key, detail_dict in mvc_res.items():
rebuild_data_dict[cn_key] = detail_dict.get('words', '')
position_list = detail_dict.get('position', [0, 0, 0, 0])
if len(position_list) == 4:
position_dict[cn_key] = {
}
rebuild_data_dict[consts.ALL_POSITION_KEY] = position_dict
rebuild_data_dict[consts.IMG_PATH_KEY] = img_path
rebuild_data_dict[consts.SECTION_IMG_PATH_KEY] = section_img_path
del mvc_res
license_summary.setdefault(classify, []).append(rebuild_data_dict)
# for mvc_dict in license_data:
# mvc_dict[consts.IMG_PATH_KEY] = img_path
# try:
# mvc_page = mvc_dict.pop('page')
# except Exception as e:
# pass
# else:
# if mvc_page == 'VehicleRegArea':
# mvc_res = mvc_dict.pop('results', {})
# mvc_dict['机动车登记证书编号'] = mvc_res.get('register_no', {}).get('words', '')
# for register_info in mvc_res.get('register_info', []):
# for detail_dict in register_info.get('details', {}).values():
# mvc_dict.setdefault(detail_dict.get('chinese_key', '未知'), []).append(
# detail_dict.get('words', ''))
# del mvc_res
# license_summary.setdefault(classify, []).extend(license_data)
# 身份证真伪
elif classify == consts.IC_CLASSIFY:
id_card_dict = {}
position_dict = {}
card_type = license_data.get('type', '')
is_ic = card_type.startswith('身份证')
is_info_side = card_type.endswith('信息面')
id_card_dict['类别'] = '0' if is_ic else '1'
if is_ic:
field_map = consts.IC_MAP_0 if is_info_side else consts.IC_MAP_1
else:
field_map = consts.RP_MAP_0 if is_info_side else consts.RP_MAP_1
for write_field, search_field in field_map:
id_card_dict[write_field] = license_data.get('words_result', {}).get(search_field, {}).get('words', '')
location_list = license_data.get('words_result', {}).get(search_field, {}).get(
'location', [-1, -1, -1, -1])
if len(location_list) == 4:
position_dict[write_field] = {
consts.FIELD_POSITION_KEY: {
'top': position_list[1],
'left': position_list[0],
'height': position_list[-1],
'width': position_list[2],
'top': location_list[1],
'left': location_list[0],
'height': location_list[-1] - location_list[1],
'width': location_list[2] - location_list[0]
}
}
rebuild_data_dict[consts.ALL_POSITION_KEY] = position_dict
rebuild_data_dict[consts.IMG_PATH_KEY] = img_path
rebuild_data_dict[consts.SECTION_IMG_PATH_KEY] = section_img_path
del mvc_res
license_summary.setdefault(classify, []).append(rebuild_data_dict)
# for mvc_dict in license_data:
# mvc_dict[consts.IMG_PATH_KEY] = img_path
# try:
# mvc_page = mvc_dict.pop('page')
# except Exception as e:
# pass
# else:
# if mvc_page == 'VehicleRegArea':
# mvc_res = mvc_dict.pop('results', {})
# mvc_dict['机动车登记证书编号'] = mvc_res.get('register_no', {}).get('words', '')
# for register_info in mvc_res.get('register_info', []):
# for detail_dict in register_info.get('details', {}).values():
# mvc_dict.setdefault(detail_dict.get('chinese_key', '未知'), []).append(
# detail_dict.get('words', ''))
# del mvc_res
# license_summary.setdefault(classify, []).extend(license_data)
# 身份证真伪
elif classify == consts.IC_CLASSIFY:
id_card_dict = {}
position_dict = {}
card_type = license_data.get('type', '')
is_ic = card_type.startswith('身份证')
is_info_side = card_type.endswith('信息面')
id_card_dict['类别'] = '0' if is_ic else '1'
if is_ic:
field_map = consts.IC_MAP_0 if is_info_side else consts.IC_MAP_1
else:
field_map = consts.RP_MAP_0 if is_info_side else consts.RP_MAP_1
for write_field, search_field in field_map:
id_card_dict[write_field] = license_data.get('words_result', {}).get(search_field, {}).get('words', '')
location_list = license_data.get('words_result', {}).get(search_field, {}).get(
'location', [-1, -1, -1, -1])
if len(location_list) == 4:
position_dict[write_field] = {
consts.FIELD_POSITION_KEY: {
'top': location_list[1],
'left': location_list[0],
'height': location_list[-1] - location_list[1],
'width': location_list[2] - location_list[0]
}
}
if not is_info_side:
start_time = license_data.get('words_result', {}).get('签发日期', {}).get('words', '')
end_time = license_data.get('words_result', {}).get('失效日期', {}).get('words', '')
id_card_dict['有效期限'] = '{0}-{1}'.format(start_time, end_time)
end_time_location_list = license_data.get('words_result', {}).get('失效日期', {}).get(
'location', [-1, -1, -1, -1])
if len(end_time_location_list) == 4:
position_dict['有效期限'] = {
consts.FIELD_POSITION_KEY: {
'top': end_time_location_list[1],
'left': end_time_location_list[0],
'height': end_time_location_list[-1] - end_time_location_list[1],
'width': end_time_location_list[2] - end_time_location_list[0]
if not is_info_side:
start_time = license_data.get('words_result', {}).get('签发日期', {}).get('words', '')
end_time = license_data.get('words_result', {}).get('失效日期', {}).get('words', '')
id_card_dict['有效期限'] = '{0}-{1}'.format(start_time, end_time)
end_time_location_list = license_data.get('words_result', {}).get('失效日期', {}).get(
'location', [-1, -1, -1, -1])
if len(end_time_location_list) == 4:
position_dict['有效期限'] = {
consts.FIELD_POSITION_KEY: {
'top': end_time_location_list[1],
'left': end_time_location_list[0],
'height': end_time_location_list[-1] - end_time_location_list[1],
'width': end_time_location_list[2] - end_time_location_list[0]
}
}
}
if not is_info_side:
id_card_dict[consts.IMG_PATH_KEY_2] = img_path
id_card_dict[consts.ALL_POSITION_KEY_2] = position_dict
id_card_dict[consts.SECTION_IMG_PATH_KEY_2] = section_img_path
if not is_info_side:
id_card_dict[consts.IMG_PATH_KEY_2] = img_path
id_card_dict[consts.ALL_POSITION_KEY_2] = position_dict
id_card_dict[consts.SECTION_IMG_PATH_KEY_2] = section_img_path
else:
id_card_dict[consts.ALL_POSITION_KEY] = position_dict
id_card_dict[consts.IMG_PATH_KEY] = img_path
id_card_dict[consts.SECTION_IMG_PATH_KEY] = section_img_path
if is_ic and is_save:
card_type = -1
json_data_4 = {
'mode': 1,
'user_info': {
'image_content': base64_img,
},
'options': {
'distinguish_type': 1,
'auto_rotate': True,
},
}
for times in range(consts.RETRY_TIMES):
try:
start_time = time.time()
ocr_4_response = requests.post(self.ocr_url_4, json=json_data_4)
if ocr_4_response.status_code != 200:
raise OCR4Exception('ocr_4 status code: {0}'.format(ocr_4_response.status_code))
except Exception as e:
self.online_log.warn(
'{0} [ocr_4 failed] [times={1}] [img_path={2}] [error={3}]'.format(
self.log_base, times, img_path, traceback.format_exc()))
else:
ocr_4_res = ocr_4_response.json()
end_time = time.time()
speed_time = int(end_time - start_time)
if ocr_4_res.get('code') == 0 and ocr_4_res.get('result', {}).get('rtn') == 0:
card_type = ocr_4_res.get('result', {}).get(
'idcard_distinguish_result', {}).get('result', -1)
self.online_log.info(
'{0} [ocr_4 success] [img_path={1}] [speed_time={2}]'.format(
self.log_base, img_path, speed_time))
break
else:
self.online_log.warn(
'{0} [ocr_4 failed] [img_path={1}]'.format(self.log_base, img_path))
id_card_dict[consts.IC_TURE_OR_FALSE] = consts.IC_RES_MAPPING.get(card_type)
if do_dda and isinstance(id_card_dict.get(consts.IC_KEY_FIELD[0]), str) and \
isinstance(id_card_dict.get(consts.IC_KEY_FIELD[1]), str):
ic_name = id_card_dict.get(consts.IC_KEY_FIELD[0], '').strip()
ic_id = id_card_dict.get(consts.IC_KEY_FIELD[1], '').strip()
if len(ic_name) > 0 and len(ic_id) > 0:
dda_id_bc_mapping.setdefault(consts.IC_FIELD, []).append((ic_name, ic_id, img_path))
license_summary.setdefault(classify, []).append(id_card_dict)
# 购车发票 & 二手车发票
elif classify == consts.MVI_CLASSIFY or classify == consts.UCI_CLASSIFY:
rebuild_data_dict = {}
position_dict = {}
mvi_res = license_data.pop('result', {})
for en_key, detail_dict in mvi_res.items():
rebuild_data_dict[detail_dict.get('chinese_key', '')] = detail_dict.get('words', '')
position_dict[detail_dict.get('chinese_key', '')] = {
consts.FIELD_POSITION_KEY: detail_dict.get('position', {})
}
rebuild_data_dict['新旧版式'] = license_data.get('layout', '')
rebuild_data_dict[consts.IMG_PATH_KEY] = img_path
rebuild_data_dict[consts.SECTION_IMG_PATH_KEY] = section_img_path
rebuild_data_dict[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(rebuild_data_dict)
# 其他
else:
for res_dict in license_data:
res_dict[consts.IMG_PATH_KEY] = img_path
res_dict[consts.SECTION_IMG_PATH_KEY] = section_img_path
license_summary.setdefault(classify, []).extend(license_data)
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS))
def license2_process(self, ocr_res_2, license_summary, pid, classify, res_list, pno, ino, part_idx, img_path, do_dda, dda_id_bc_mapping, file_data):
if ocr_res_2.get('ErrorCode') in consts.SUCCESS_CODE_SET:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS))
if pid == consts.BC_PID:
# 银行卡
# res_dict = {}
# for en_key, chn_key in consts.BC_FIELD:
# res_dict[chn_key] = ocr_res_2.get(en_key, '')
ocr_res_2[consts.IMG_PATH_KEY] = img_path
license_summary.setdefault(classify, []).append(ocr_res_2)
if do_dda and isinstance(ocr_res_2.get(consts.BC_KEY_FIELD), str):
bc_no = ocr_res_2[consts.BC_KEY_FIELD].strip()
if len(bc_no) > 0:
dda_id_bc_mapping.setdefault(consts.BC_FIELD, []).append((bc_no, img_path))
else:
# 营业执照等
pre, suf = os.path.splitext(img_path)
src_section_img_path = img_path if file_data is None else '{0}_{1}{2}'.format(pre, part_idx, suf)
is_save = False
for res_idx, result_dict in enumerate(ocr_res_2.get('ResultList', [])):
image_data = result_dict.get('image_data', '')
if len(image_data) > 0:
position = {}
angle = 0
section_img_path = '{0}_{1}_{2}{3}'.format(pre, part_idx, res_idx, suf)
id_card_dict[consts.ALL_POSITION_KEY] = position_dict
id_card_dict[consts.IMG_PATH_KEY] = img_path
id_card_dict[consts.SECTION_IMG_PATH_KEY] = section_img_path
if is_ic and is_save:
card_type = -1
json_data_4 = {
'mode': 1,
'user_info': {
'image_content': base64_img,
},
'options': {
'distinguish_type': 1,
'auto_rotate': True,
},
}
for times in range(consts.RETRY_TIMES):
try:
with open(section_img_path, "wb") as fh:
fh.write(base64.b64decode(image_data.encode()))
start_time = time.time()
ocr_4_response = requests.post(self.ocr_url_4, json=json_data_4)
if ocr_4_response.status_code != 200:
raise OCR4Exception('ocr_4 status code: {0}'.format(ocr_4_response.status_code))
except Exception as e:
self.online_log.warn(
'{0} [section img save failed] [img_path={1}]'
' [part_idx={2}] [res_idx={3}]'.format(self.log_base, img_path, part_idx, res_idx))
'{0} [ocr_4 failed] [times={1}] [img_path={2}] [error={3}]'.format(
self.log_base, times, img_path, traceback.format_exc()))
else:
ocr_4_res = ocr_4_response.json()
end_time = time.time()
speed_time = int(end_time - start_time)
if ocr_4_res.get('code') == 0 and ocr_4_res.get('result', {}).get('rtn') == 0:
card_type = ocr_4_res.get('result', {}).get(
'idcard_distinguish_result', {}).get('result', -1)
self.online_log.info(
'{0} [ocr_4 success] [img_path={1}] [speed_time={2}]'.format(
self.log_base, img_path, speed_time))
break
else:
is_save = True
section_img_path = src_section_img_path
position = result_dict.get('position', {})
angle = result_dict.get('angle', 0)
res_dict = {}
position_dict = {}
for field_dict in result_dict.get('FieldList', []):
res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '')
position_dict[field_dict.get('chn_key', '')] = {
consts.FIELD_POSITION_KEY: field_dict.get('position', {}),
consts.FIELD_QUAD_KEY: field_dict.get('quad', []),
}
position_dict[consts.POSITION_KEY] = position
position_dict[consts.ANGLE_KEY] = angle
self.online_log.warn(
'{0} [ocr_4 failed] [img_path={1}]'.format(self.log_base, img_path))
id_card_dict[consts.IC_TURE_OR_FALSE] = consts.IC_RES_MAPPING.get(card_type)
if do_dda and isinstance(id_card_dict.get(consts.IC_KEY_FIELD[0]), str) and \
isinstance(id_card_dict.get(consts.IC_KEY_FIELD[1]), str):
ic_name = id_card_dict.get(consts.IC_KEY_FIELD[0], '').strip()
ic_id = id_card_dict.get(consts.IC_KEY_FIELD[1], '').strip()
if len(ic_name) > 0 and len(ic_id) > 0:
dda_id_bc_mapping.setdefault(consts.IC_FIELD, []).append((ic_name, ic_id, img_path))
license_summary.setdefault(classify, []).append(id_card_dict)
# 购车发票 & 二手车发票
elif classify == consts.MVI_CLASSIFY or classify == consts.UCI_CLASSIFY:
rebuild_data_dict = {}
position_dict = {}
mvi_res = license_data.pop('result', {})
for en_key, detail_dict in mvi_res.items():
rebuild_data_dict[detail_dict.get('chinese_key', '')] = detail_dict.get('words', '')
position_dict[detail_dict.get('chinese_key', '')] = {
consts.FIELD_POSITION_KEY: detail_dict.get('position', {})
}
rebuild_data_dict['新旧版式'] = license_data.get('layout', '')
rebuild_data_dict[consts.IMG_PATH_KEY] = img_path
rebuild_data_dict[consts.SECTION_IMG_PATH_KEY] = section_img_path
rebuild_data_dict[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(rebuild_data_dict)
# 其他
else:
for res_dict in license_data:
res_dict[consts.IMG_PATH_KEY] = img_path
res_dict[consts.SECTION_IMG_PATH_KEY] = section_img_path
res_dict[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(res_dict)
license_summary.setdefault(classify, []).extend(license_data)
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS))
except Exception as e:
res_list.append((pno, ino, part_idx, consts.RES_FAILED))
self.online_log.error('{0} [license1_process error] [error={1}]'.format(self.log_base, traceback.format_exc()))
if is_save and file_data is not None:
try:
with open(src_section_img_path, "wb") as fh:
fh.write(base64.b64decode(file_data.encode()))
except Exception as e:
self.online_log.warn(
'{0} [section img save failed] [img_path={1}]'
' [part_idx={2}]'.format(self.log_base, img_path, part_idx))
else:
def license2_process(self, ocr_res_2, license_summary, pid, classify, res_list, pno, ino, part_idx, img_path, do_dda, dda_id_bc_mapping, file_data):
# 添加 try-except 处理
try:
if ocr_res_2.get('ErrorCode') in consts.SUCCESS_CODE_SET:
res_list.append((pno, ino, part_idx, consts.RES_SUCCESS))
if pid == consts.BC_PID:
# 银行卡
# res_dict = {}
# for en_key, chn_key in consts.BC_FIELD:
# res_dict[chn_key] = ocr_res_2.get(en_key, '')
ocr_res_2[consts.IMG_PATH_KEY] = img_path
license_summary.setdefault(classify, []).append(ocr_res_2)
if do_dda and isinstance(ocr_res_2.get(consts.BC_KEY_FIELD), str):
bc_no = ocr_res_2[consts.BC_KEY_FIELD].strip()
if len(bc_no) > 0:
dda_id_bc_mapping.setdefault(consts.BC_FIELD, []).append((bc_no, img_path))
else:
# 营业执照等
pre, suf = os.path.splitext(img_path)
src_section_img_path = img_path if file_data is None else '{0}_{1}{2}'.format(pre, part_idx, suf)
is_save = False
for res_idx, result_dict in enumerate(ocr_res_2.get('ResultList', [])):
image_data = result_dict.get('image_data', '')
if len(image_data) > 0:
position = {}
angle = 0
section_img_path = '{0}_{1}_{2}{3}'.format(pre, part_idx, res_idx, suf)
try:
with open(section_img_path, "wb") as fh:
fh.write(base64.b64decode(image_data.encode()))
except Exception as e:
self.online_log.warn(
'{0} [section img save failed] [img_path={1}]'
' [part_idx={2}] [res_idx={3}]'.format(self.log_base, img_path, part_idx, res_idx))
else:
is_save = True
section_img_path = src_section_img_path
position = result_dict.get('position', {})
angle = result_dict.get('angle', 0)
res_dict = {}
position_dict = {}
for field_dict in result_dict.get('FieldList', []):
res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '')
position_dict[field_dict.get('chn_key', '')] = {
consts.FIELD_POSITION_KEY: field_dict.get('position', {}),
consts.FIELD_QUAD_KEY: field_dict.get('quad', []),
}
position_dict[consts.POSITION_KEY] = position
position_dict[consts.ANGLE_KEY] = angle
res_dict[consts.IMG_PATH_KEY] = img_path
res_dict[consts.SECTION_IMG_PATH_KEY] = section_img_path
res_dict[consts.ALL_POSITION_KEY] = position_dict
license_summary.setdefault(classify, []).append(res_dict)
if is_save and file_data is not None:
try:
with open(src_section_img_path, "wb") as fh:
fh.write(base64.b64decode(file_data.encode()))
except Exception as e:
self.online_log.warn(
'{0} [section img save failed] [img_path={1}]'
' [part_idx={2}]'.format(self.log_base, img_path, part_idx))
else:
res_list.append((pno, ino, part_idx, consts.RES_FAILED_2))
except Exception as e:
res_list.append((pno, ino, part_idx, consts.RES_FAILED_2))
self.online_log.error('{0} [license2_process error] [error={1}]'.format(self.log_base, traceback.format_exc()))
@staticmethod
def license_rebuild(license_summary):
ic_merge = False
rp_merge = False
# 添加 try-except 处理
try:
ic_merge = False
rp_merge = False
for classify in (consts.IC_CLASSIFY, consts.MVI_CLASSIFY, consts.MVC_CLASSIFY):
for classify in (consts.IC_CLASSIFY, consts.MVI_CLASSIFY, consts.MVC_CLASSIFY):
license_list = license_summary.get(classify)
license_list = license_summary.get(classify)
if not license_list:
continue
if not license_list:
continue
if classify == consts.IC_CLASSIFY: # 身份证、居住证分开,先正面,后反面
key, _, _ = consts.FIELD_ORDER_MAP.get(classify)
ic_side1_list = []
ic_side2_list = []
rp_side1_list = []
rp_side2_list = []
for license_dict in license_list:
is_rp = license_dict.pop('类别', '0')
if key in license_dict:
if is_rp == '1':
rp_side2_list.append(license_dict)
if classify == consts.IC_CLASSIFY: # 身份证、居住证分开,先正面,后反面
key, _, _ = consts.FIELD_ORDER_MAP.get(classify)
ic_side1_list = []
ic_side2_list = []
rp_side1_list = []
rp_side2_list = []
for license_dict in license_list:
is_rp = license_dict.pop('类别', '0')
if key in license_dict:
if is_rp == '1':
rp_side2_list.append(license_dict)
else:
ic_side2_list.append(license_dict)
elif is_rp == '1':
rp_side1_list.append(license_dict)
else:
ic_side2_list.append(license_dict)
elif is_rp == '1':
rp_side1_list.append(license_dict)
else:
ic_side1_list.append(license_dict)
ic_side1_list.append(license_dict)
ic_merge = len(ic_side1_list) == len(ic_side2_list) == 1
rp_merge = len(rp_side1_list) == len(rp_side2_list) == 1
ic_merge = len(ic_side1_list) == len(ic_side2_list) == 1
rp_merge = len(rp_side1_list) == len(rp_side2_list) == 1
ic_side1_list.extend(ic_side2_list)
rp_side1_list.extend(rp_side2_list)
ic_side1_list.extend(ic_side2_list)
rp_side1_list.extend(rp_side2_list)
if ic_side1_list:
# license_list = ic_side1_list
license_summary[classify] = ic_side1_list
else:
license_summary.pop(classify, None)
if ic_side1_list:
# license_list = ic_side1_list
license_summary[classify] = ic_side1_list
else:
license_summary.pop(classify, None)
if rp_side1_list:
license_summary[consts.RP_CLASSIFY] = rp_side1_list
if rp_side1_list:
license_summary[consts.RP_CLASSIFY] = rp_side1_list
ic_side1_list = ic_side2_list = rp_side1_list = rp_side2_list = None
ic_side1_list = ic_side2_list = rp_side1_list = rp_side2_list = None
if classify == consts.MVI_CLASSIFY: # 机动车销售统一发票, 增加不含税价(逻辑计算)
for license_dict in license_list:
price = ''
rate_str = license_dict.get('增值税税率')
price_total_str = license_dict.get('价税合计小写')
if rate_str is not None and price_total_str is not None:
try:
rate = int(rate_str.rstrip('%'))
price_total = float(price_total_str)
except Exception as e:
pass
if classify == consts.MVI_CLASSIFY: # 机动车销售统一发票, 增加不含税价(逻辑计算)
for license_dict in license_list:
price = ''
rate_str = license_dict.get('增值税税率')
price_total_str = license_dict.get('价税合计小写')
if rate_str is not None and price_total_str is not None:
try:
rate = int(rate_str.rstrip('%'))
price_total = float(price_total_str)
except Exception as e:
pass
else:
price = round(price_total * 100 / (rate + 100), 2)
license_dict['不含税价(逻辑计算)'] = price
if classify == consts.MVC_CLASSIFY: # 机动车登记证先1/2页,后3/4页
key, _, _ = consts.FIELD_ORDER_MAP.get(classify)
page_1_2 = []
page_3_4 = []
for license_dict in license_list:
if key in license_dict:
page_3_4.append(license_dict)
else:
price = round(price_total * 100 / (rate + 100), 2)
license_dict['不含税价(逻辑计算)'] = price
if classify == consts.MVC_CLASSIFY: # 机动车登记证先1/2页,后3/4页
key, _, _ = consts.FIELD_ORDER_MAP.get(classify)
page_1_2 = []
page_3_4 = []
for license_dict in license_list:
if key in license_dict:
page_3_4.append(license_dict)
else:
page_1_2.append(license_dict)
page_1_2.extend(page_3_4)
license_summary[classify] = page_1_2
page_1_2 = page_3_4 = None
page_1_2.append(license_dict)
page_1_2.extend(page_3_4)
license_summary[classify] = page_1_2
page_1_2 = page_3_4 = None
return ic_merge, rp_merge
return ic_merge, rp_merge
except Exception as e:
print("license_rebuild error")
print(traceback.format_exc())
return False, False
def parse_img_path(self, img_path):
img_name, _ = os.path.splitext(os.path.basename(img_path))
part_list = img_name.split('_')
# page_7_img_11_0
return int(part_list[1])+1, int(part_list[3])+1
# 添加 try-except 处理
try:
img_name, _ = os.path.splitext(os.path.basename(img_path))
part_list = img_name.split('_')
# page_7_img_11_0
return int(part_list[1])+1, int(part_list[3])+1
except Exception as e:
self.online_log.error('{0} [parse_img_path error] [error={1}]'.format(self.log_base, traceback.format_exc()))
return 0, 0
def get_most(self, value_list):
if value_list:
most_common = Counter(value_list).most_common(1)
return most_common[0][0] if most_common else None
# 添加 try-except 处理
try:
if value_list:
most_common = Counter(value_list).most_common(1)
return most_common[0][0] if most_common else None
except Exception as e:
self.online_log.error('{0} [get_most error] [error={1}]'.format(self.log_base, traceback.format_exc()))
return None
def date_format(self, date_str, format_str):
try:
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!