import re import json import requests from .named_enum import DocStatus from .models import HILDoc, AFCDoc, AFCSEOCRResult, AFCOCRResult, HILSEOCRResult, HILOCRResult from . import consts from prese.compare import pre_compare, get_empty_result class MPOSHandler: @staticmethod def ocr1_process(url, img_base64): result_list = [] json_data = { "file": img_base64, } ocr_1_response = requests.post(url, json=json_data) if ocr_1_response.status_code != 200: return result_list ocr_1_res = ocr_1_response.json() for ocr_data in ocr_1_res.get('data', []): license_data = ocr_data.get('data') if not license_data: continue if isinstance(license_data, dict): license_data.pop('base64_img', '') id_card_dict = {} card_type = license_data.get('type', '') is_ic = card_type.startswith('身份证') is_info_side = card_type.endswith('信息面') # id_card_dict['类别'] = '0' if is_ic else '1' if is_ic: field_map = consts.IC_MAP_0 if is_info_side else consts.IC_MAP_1 else: field_map = consts.RP_MAP_0 if is_info_side else consts.RP_MAP_1 for write_field, search_field in field_map: id_card_dict[write_field] = license_data.get('words_result', {}).get(search_field, {}).get('words', '') if not is_info_side: start_time = license_data.get('words_result', {}).get('签发日期', {}).get('words', '') end_time = license_data.get('words_result', {}).get('失效日期', {}).get('words', '') id_card_dict['有效期限'] = '{0}-{1}'.format(start_time, end_time) result_list.append(id_card_dict) return result_list @staticmethod def ocr2_process(url, classify, img_base64): result_list = [] pid, _, _, _, _, _ = consts.LICENSE_CLASSIFY_MAPPING.get(classify) json_data_2 = { "pid": str(pid), "filedata": img_base64 } ocr_2_response = requests.post(url, data=json_data_2) if ocr_2_response.status_code != 200: return result_list ocr_2_res = json.loads(ocr_2_response.text) if ocr_2_res.get('ErrorCode') in consts.SUCCESS_CODE_SET: if pid == consts.BC_PID: # 银行卡 # res_dict = {} # for en_key, chn_key in consts.BC_FIELD: # res_dict[chn_key] = ocr_res_2.get(en_key, '') result_list.append(ocr_2_res) else: # 营业执照等 for result_dict in ocr_2_res.get('ResultList', []): res_dict = {} for field_dict in result_dict.get('FieldList', []): res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '') result_list.append(res_dict) return result_list class DocHandler: @staticmethod def xss_pass(file): for pno in range(file.pageCount): page = file.loadPage(pno) page_text = page.getText() if re.search(r'/JS(.*)', page_text) and re.search(r'/S /JavaScript', page_text): return False return True @staticmethod def get_name(info, key, length): if not isinstance(info, dict): return '' src_name = info.get(key, '') if len(src_name) < length: return src_name else: return consts.LONG_NAME @staticmethod def get_link(doc_id, business_type, file='pdf'): if file == 'pdf': return '/data/{1}/{2}/{0}/{0}.pdf'.format(doc_id, business_type, consts.TMP_DIR_NAME) elif file == 'img': return '/data/{1}/{2}/{0}/{0}_img.zip'.format(doc_id, business_type, consts.TMP_DIR_NAME) elif file == 'src_excel': return '/data/{1}/{2}/{0}/src.xlsx'.format(doc_id, business_type, consts.TMP_DIR_NAME) else: return '/data/{1}/{2}/{0}/{0}.xlsx'.format(doc_id, business_type, consts.TMP_DIR_NAME) def get_doc_list(self, doc_queryset, business_type): for doc_dict in doc_queryset: if doc_dict['status'] not in [DocStatus.COMPLETE.value, DocStatus.UPLOAD_FAILED.value]: continue doc_id = doc_dict.get('id') doc_dict['pdf_link'] = self.get_link(doc_id, business_type) doc_dict['img_link'] = self.get_link(doc_id, business_type, file='img') doc_dict['excel_link'] = self.get_link(doc_id, business_type, file='excel') doc_dict['src_excel_link'] = self.get_link(doc_id, business_type, file='src_excel') return list(doc_queryset) @staticmethod def get_doc_class(business_type): return (HILDoc, consts.HIL_PREFIX) if business_type in consts.HIL_SET else (AFCDoc, consts.AFC_PREFIX) @staticmethod def fix_scheme(scheme): if scheme in consts.DOC_SCHEME_LIST: return scheme elif scheme.upper() in consts.DOC_SCHEME_LIST: return scheme.upper() else: return consts.DOC_SCHEME_LIST[0] @staticmethod def fix_data_source(data_source): if data_source in consts.DATA_SOURCE_LIST: return data_source elif data_source.upper() in consts.DATA_SOURCE_LIST: return data_source.upper() else: return consts.DATA_SOURCE_LIST[0] class PreSEHandler: # preSettlement @staticmethod def pre_compare_entrance(pos_content): application_entity = pos_content.get('applicationEntity') application_id = pos_content.get('applicationId') # 根据application_id查找OCR累计结果指定license字段,如果没有,结束 result_class = HILSEOCRResult if application_entity in consts.HIL_SET else AFCSEOCRResult ca_result_class = HILOCRResult if application_entity in consts.HIL_SET else AFCOCRResult ca_ocr_res_dict = ca_result_class.objects.filter(application_id=application_id).values( *consts.CA_ADD_COMPARE_FIELDS_PRE).first() ocr_res_dict = result_class.objects.filter(application_id=application_id).values( *consts.PRE_COMPARE_FIELDS).first() if ocr_res_dict is None: return get_empty_result() id_res_list = [] for field_name in consts.CA_ADD_COMPARE_FIELDS_PRE: if field_name == consts.IC_OCR_FIELD: id_res_list.append(ca_ocr_res_dict.get(field_name) if isinstance(ca_ocr_res_dict, dict) else None) id_res_list.append(ocr_res_dict.get(field_name)) rebuild_compare_result = pre_compare(pos_content, ocr_res_dict, id_res_list) return rebuild_compare_result