430ce72e by 周伟奇

add idCard special

1 parent dee1b339
......@@ -2333,6 +2333,183 @@ def se_compare_license(license_en, ocr_res_dict, field_list):
return result_field_list, no_ocr_result, field_img_path_dict
def se_compare_license_id(license_en, id_res_list, field_list):
ocr_field, compare_logic, special_expiry_date = consts.SE_COMPARE_FIELD[license_en]
is_find = False
no_ocr_result = True
special_expiry_date_slice = False
result_field_list = []
section_img_info = dict()
field_img_path_dict = dict()
# ocr_res_str = ocr_res_dict.get(ocr_field)
for ocr_res_str in id_res_list:
if is_find:
break
if ocr_res_str is not None:
no_ocr_result = False
ocr_res_list = json.loads(ocr_res_str)
# 3/4页去除
# if ocr_field == consts.MVC_OCR_FIELD:
# tmp_list = []
# for res in ocr_res_list:
# if compare_logic['vinNo'][0] in res:
# tmp_list.append(res)
# ocr_res_list = tmp_list
length = len(ocr_res_list)
# 身份证、居住证 过期期限特殊处理
if special_expiry_date:
expiry_dates = dict()
key = compare_logic.get('idExpiryDate')[0]
for date_tmp_idx, ocr_res in enumerate(ocr_res_list):
if key in ocr_res:
expiry_dates[ocr_res[key]] = (ocr_res.get(consts.IMG_PATH_KEY_2, ''), date_tmp_idx)
else:
expiry_dates = dict()
for res_idx in range(length-1, -1, -1):
if is_find:
break
for idx, (name, value) in enumerate(field_list):
# if ocr_field == consts.MVI_OCR_FIELD and name == consts.SE_NEW_ADD_FIELD[9]:
# ocr_str = getattr(cp, consts.ZW_METHOD)(
# ocr_res_list[res_idx].get(consts.LOWER_AMOUNT_FIELD, ''),
# ocr_res_list[res_idx].get(consts.UPPER_AMOUNT_FIELD, ''),
# )
# else:
ocr_str = ocr_res_list[res_idx].get(compare_logic[name][0])
if not isinstance(ocr_str, str):
result = consts.RESULT_N
ocr_str = empty_str
no_key = True
else:
result = getattr(cp, compare_logic[name][1])(value, ocr_str, **compare_logic[name][2])
no_key = False
if idx == 0 and result == consts.RESULT_N and length > 1:
break
is_find = True
section_img_info[consts.SECTION_IMG_PATH_KEY] = ocr_res_list[res_idx].get(consts.SECTION_IMG_PATH_KEY, '')
section_img_info[consts.ALL_POSITION_KEY] = ocr_res_list[res_idx].get(consts.ALL_POSITION_KEY, {})
if special_expiry_date:
section_img_info[consts.SECTION_IMG_PATH_KEY_2] = ocr_res_list[res_idx].get(
consts.SECTION_IMG_PATH_KEY_2, '')
section_img_info[consts.ALL_POSITION_KEY_2] = ocr_res_list[res_idx].get(consts.ALL_POSITION_KEY_2, {})
# 过期期限特殊处理
if special_expiry_date and name == 'idExpiryDate' and result == consts.RESULT_N:
if no_key:
if len(expiry_dates) == 0:
ocr_str = empty_str
result = consts.RESULT_N
img_path = empty_str
else:
for expiry_date, (date_img_path, date_res_idx) in expiry_dates.items():
expiry_date_res = getattr(cp, compare_logic[name][1])(value, expiry_date, **compare_logic[name][2])
if expiry_date_res == consts.RESULT_N:
ocr_str = expiry_date
img_path = date_img_path
special_expiry_date_slice = True
section_img_info[consts.SECTION_IMG_PATH_KEY_2] = ocr_res_list[date_res_idx].get(
consts.SECTION_IMG_PATH_KEY_2, '')
section_img_info[consts.ALL_POSITION_KEY_2] = ocr_res_list[date_res_idx].get(
consts.ALL_POSITION_KEY_2, {})
break
else:
ocr_str = empty_str
result = consts.RESULT_Y
img_path = empty_str
else:
img_path = ocr_res_list[res_idx].get(consts.IMG_PATH_KEY_2, '')
special_expiry_date_slice = True
else:
img_path = ocr_res_list[res_idx].get(consts.IMG_PATH_KEY, '') if result == consts.RESULT_N else empty_str
if isinstance(value, list):
value = json.dumps(value, ensure_ascii=False)
error_type = empty_error_type if result == consts.RESULT_Y else ErrorType.OCR.value
result_field_list.append((name, value, result, ocr_str, img_path, error_type, compare_logic[name][3]))
if not is_find:
for name, value in field_list:
if isinstance(value, list):
value = json.dumps(value, ensure_ascii=False)
no_find_str = consts.DDA_NO_FIND if license_en == consts.DDA_EN else '{0}未找到'.format(license_en)
result_field_list.append((name, value, consts.RESULT_N, empty_str, empty_str, ErrorType.NF.value, no_find_str))
if is_find:
if special_expiry_date_slice:
special_section_img_path = section_img_info.get(consts.SECTION_IMG_PATH_KEY_2, '')
if os.path.exists(special_section_img_path):
field = 'idExpiryDate'
special_info = section_img_info.get(consts.ALL_POSITION_KEY_2, {})
special_section_position = special_info.get(consts.POSITION_KEY, {})
special_section_angle = special_info.get(consts.ANGLE_KEY, 0)
try:
last_img = img_process(special_section_img_path, special_section_position, special_section_angle)
except Exception as e:
field_img_path_dict[field] = special_section_img_path
else:
pre, suf = os.path.splitext(special_section_img_path)
try:
res_field = compare_logic[field][0]
is_valid, coord_tuple = field_build_coordinates(special_info.get(res_field, {}))
if is_valid:
save_path = '{0}_{1}{2}'.format(pre, field, suf)
field_img = last_img[coord_tuple[0]:coord_tuple[1], coord_tuple[2]:coord_tuple[3], :]
cv2.imwrite(save_path, field_img)
field_img_path_dict[field] = save_path
else:
field_img_path_dict[field] = special_section_img_path
except Exception as e:
field_img_path_dict[field] = special_section_img_path
section_img_path = section_img_info.get(consts.SECTION_IMG_PATH_KEY, '')
if os.path.exists(section_img_path):
failed_field = []
base_img_path = empty_str
for name, _, result, _, img_path, _, _ in result_field_list:
if result == consts.RESULT_N:
if special_expiry_date_slice and name == 'idExpiryDate':
continue
failed_field.append(name)
if base_img_path == empty_str:
base_img_path = img_path
if len(failed_field) > 0:
info = section_img_info.get(consts.ALL_POSITION_KEY, {})
section_position = info.get(consts.POSITION_KEY, {})
section_angle = info.get(consts.ANGLE_KEY, 0)
try:
last_img = img_process(section_img_path, section_position, section_angle)
except Exception as e:
for field in failed_field:
field_img_path_dict[field] = base_img_path
else:
pre, suf = os.path.splitext(section_img_path)
for field in failed_field:
try:
res_field = compare_logic[field][0]
is_valid, coord_tuple = field_build_coordinates(info.get(res_field, {}))
if is_valid:
save_path = '{0}_{1}{2}'.format(pre, field, suf)
field_img = last_img[coord_tuple[0]:coord_tuple[1], coord_tuple[2]:coord_tuple[3], :]
cv2.imwrite(save_path, field_img)
field_img_path_dict[field] = save_path
else:
field_img_path_dict[field] = base_img_path
except Exception as e:
field_img_path_dict[field] = base_img_path
return result_field_list, no_ocr_result, field_img_path_dict
def se_contract_compare(license_en, ocr_res_dict, strip_list, is_gsyh):
ocr_field, compare_logic, _ = consts.SE_COMPARE_FIELD[license_en]
ocr_res_str = ocr_res_dict.get(ocr_field)
......@@ -2557,7 +2734,7 @@ def se_mvc34_compare(license_en, ocr_res_dict, field_list):
return result_field_list, field_img_path_dict
def se_compare_process(compare_info, ocr_res_dict, is_gsyh, is_auto):
def se_compare_process(compare_info, ocr_res_dict, is_gsyh, is_auto, id_res_list):
# individualCusInfo
# corporateCusInfo
# vehicleInfo
......@@ -2590,7 +2767,13 @@ def se_compare_process(compare_info, ocr_res_dict, is_gsyh, is_auto):
else:
strip_list.append((a, b))
failure_field = []
result_field_list, no_ocr_result, field_img_path_dict = se_compare_license(license_en, ocr_res_dict, strip_list)
# 身份证先SE正反面,后CA正反面
if license_en == consts.ID_EN:
result_field_list, no_ocr_result, field_img_path_dict = se_compare_license_id(
license_en, id_res_list, strip_list)
else:
result_field_list, no_ocr_result, field_img_path_dict = se_compare_license(
license_en, ocr_res_dict, strip_list)
for name, value, result, ocr_str, img_path, error_type, cn_reason in result_field_list:
if license_en not in consts.SKIP_CARD or not no_ocr_result:
total_fields += 1
......@@ -2709,13 +2892,15 @@ def se_result_detect(ocr_res_dict):
return detect_list
def se_compare_auto(application_id, application_entity, ocr_res_id, last_obj, ocr_res_dict, auto_obj, ignore_bank):
def se_compare_auto(application_id, application_entity, ocr_res_id, last_obj, ocr_res_dict, auto_obj, ignore_bank, id_res_list):
try:
# 比对逻辑
# detect_list = se_result_detect(ocr_res_dict)
compare_info, aa_type, is_gsyh = get_se_cms_compare_info_auto(
last_obj, application_entity, ignore_bank=ignore_bank)
compare_result, total_fields, failed_count, successful_at_this_level, failure_reason_str, cn_failure_reason_str, bs_failure_reason_str, _ = se_compare_process(compare_info, ocr_res_dict, is_gsyh, True)
compare_result, total_fields, failed_count, successful_at_this_level, failure_reason_str, \
cn_failure_reason_str, bs_failure_reason_str, _ = se_compare_process(
compare_info, ocr_res_dict, is_gsyh, True, id_res_list)
compare_log.info('{0} [Auto SE] [compare success] [entity={1}] [id={2}] [ocr_res_id={3}] [result={4}]'.format(
log_base, application_entity, application_id, ocr_res_id, compare_result))
except Exception as e:
......@@ -2753,14 +2938,17 @@ def se_compare_auto(application_id, application_entity, ocr_res_id, last_obj, oc
return successful_at_this_level
def se_compare(application_id, application_entity, ocr_res_id, last_obj, ocr_res_dict, is_cms, auto_result, ignore_bank):
def se_compare(application_id, application_entity, ocr_res_id, last_obj, ocr_res_dict, is_cms,
auto_result, ignore_bank, id_res_list):
try:
# 比对逻辑
start_time = datetime.now()
detect_list = se_result_detect(ocr_res_dict)
compare_info, application_version, is_gsyh = get_se_cms_compare_info(
last_obj, application_entity, detect_list, ignore_bank=ignore_bank)
compare_result, total_fields, failed_count, successful_at_this_level, failure_reason_str, cn_failure_reason_str, bs_failure_reason_str, rpa_failure_reason = se_compare_process(compare_info, ocr_res_dict, is_gsyh, False)
compare_result, total_fields, failed_count, successful_at_this_level, failure_reason_str, \
cn_failure_reason_str, bs_failure_reason_str, rpa_failure_reason = se_compare_process(
compare_info, ocr_res_dict, is_gsyh, False, id_res_list)
compare_log.info('{0} [SE] [compare success] [entity={1}] [id={2}] [ocr_res_id={3}] [result={4}]'.format(
log_base, application_entity, application_id, ocr_res_id, compare_result))
except Exception as e:
......@@ -2902,7 +3090,12 @@ def compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True
if is_ca:
ca_compare(application_id, application_entity, ocr_res_id, last_obj, ocr_res_dict)
else:
id_res_list = []
for field_name in consts.CA_ADD_COMPARE_FIELDS:
if field_name == consts.IC_OCR_FIELD:
id_res_list.append(ocr_res_dict.get(field_name))
id_res_list.append(ca_ocr_res_dict.get(field_name) if isinstance(ca_ocr_res_dict, dict) else None)
if isinstance(ca_ocr_res_dict, dict) and isinstance(ca_ocr_res_dict.get(field_name), str):
tmp_ca_result = json.loads(ca_ocr_res_dict.get(field_name))
if isinstance(ocr_res_dict.get(field_name), str):
......@@ -2915,11 +3108,11 @@ def compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True
bank_class = HILbankVerification if application_entity == consts.HIL_PREFIX else AFCbankVerification
ignore_bank = bank_class.objects.filter(application_id=application_id, on_off=True).exists()
if auto_obj is not None:
auto_result = se_compare_auto(application_id, application_entity, ocr_res_id, last_obj, ocr_res_dict, auto_obj, ignore_bank)
auto_result = se_compare_auto(application_id, application_entity, ocr_res_id, last_obj, ocr_res_dict, auto_obj, ignore_bank, id_res_list)
else:
auto_result = None
full_result = se_compare(application_id, application_entity, ocr_res_id, last_obj, ocr_res_dict, is_cms, auto_result, ignore_bank)
full_result = se_compare(application_id, application_entity, ocr_res_id, last_obj, ocr_res_dict, is_cms, auto_result, ignore_bank, id_res_list)
if auto_obj is not None:
try:
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!