import re import json import requests from .named_enum import DocStatus from .models import HILDoc, AFCDoc, AFCSEOCRResult, AFCOCRResult, HILSEOCRResult, HILOCRResult from . import consts from prese.compare import pre_compare, get_empty_result from common.mixins import LoggerMixin from settings import conf from pos.consts import DocumentType class MPOSHandler: @staticmethod def ocr1_process(url, img_base64): result_list = [] json_data = { "file": img_base64, } ocr_1_response = requests.post(url, json=json_data) if ocr_1_response.status_code != 200: return result_list ocr_1_res = ocr_1_response.json() for ocr_data in ocr_1_res.get('data', []): license_data = ocr_data.get('data') if not license_data: continue if isinstance(license_data, dict): license_data.pop('base64_img', '') id_card_dict = {} card_type = license_data.get('type', '') is_ic = card_type.startswith('身份证') is_info_side = card_type.endswith('信息面') # id_card_dict['类别'] = '0' if is_ic else '1' if is_ic: field_map = consts.IC_MAP_0 if is_info_side else consts.IC_MAP_1 else: field_map = consts.RP_MAP_0 if is_info_side else consts.RP_MAP_1 for write_field, search_field in field_map: id_card_dict[write_field] = license_data.get('words_result', {}).get(search_field, {}).get('words', '') if not is_info_side: start_time = license_data.get('words_result', {}).get('签发日期', {}).get('words', '') end_time = license_data.get('words_result', {}).get('失效日期', {}).get('words', '') id_card_dict['有效期限'] = '{0}-{1}'.format(start_time, end_time) result_list.append(id_card_dict) return result_list @staticmethod def ocr2_process(url, classify, img_base64): result_list = [] pid, _, _, _, _, _ = consts.LICENSE_CLASSIFY_MAPPING.get(classify) json_data_2 = { "pid": str(pid), "filedata": img_base64 } ocr_2_response = requests.post(url, data=json_data_2) if ocr_2_response.status_code != 200: return result_list ocr_2_res = json.loads(ocr_2_response.text) if ocr_2_res.get('ErrorCode') in consts.SUCCESS_CODE_SET: if pid == consts.BC_PID: # 银行卡 # res_dict = {} # for en_key, chn_key in consts.BC_FIELD: # res_dict[chn_key] = ocr_res_2.get(en_key, '') result_list.append(ocr_2_res) else: # 营业执照等 for result_dict in ocr_2_res.get('ResultList', []): res_dict = {} for field_dict in result_dict.get('FieldList', []): res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '') result_list.append(res_dict) return result_list class DocHandler: @staticmethod def xss_pass(file): for pno in range(file.pageCount): page = file.loadPage(pno) page_text = page.getText() if re.search(r'/JS(.*)', page_text) and re.search(r'/S /JavaScript', page_text): return False return True @staticmethod def get_name(info, key, length): if not isinstance(info, dict): return '' src_name = info.get(key, '') if len(src_name) < length: return src_name else: return consts.LONG_NAME @staticmethod def get_link(doc_id, business_type, file='pdf'): if file == 'pdf': return '/data/{1}/{2}/{0}/{0}.pdf'.format(doc_id, business_type, consts.TMP_DIR_NAME) elif file == 'img': return '/data/{1}/{2}/{0}/{0}_img.zip'.format(doc_id, business_type, consts.TMP_DIR_NAME) elif file == 'src_excel': return '/data/{1}/{2}/{0}/src.xlsx'.format(doc_id, business_type, consts.TMP_DIR_NAME) else: return '/data/{1}/{2}/{0}/{0}.xlsx'.format(doc_id, business_type, consts.TMP_DIR_NAME) def get_doc_list(self, doc_queryset, business_type): for doc_dict in doc_queryset: if doc_dict['status'] not in [DocStatus.COMPLETE.value, DocStatus.UPLOAD_FAILED.value]: continue doc_id = doc_dict.get('id') doc_dict['pdf_link'] = self.get_link(doc_id, business_type) doc_dict['img_link'] = self.get_link(doc_id, business_type, file='img') doc_dict['excel_link'] = self.get_link(doc_id, business_type, file='excel') doc_dict['src_excel_link'] = self.get_link(doc_id, business_type, file='src_excel') return list(doc_queryset) @staticmethod def get_doc_class(business_type): return (HILDoc, consts.HIL_PREFIX) if business_type in consts.HIL_SET else (AFCDoc, consts.AFC_PREFIX) @staticmethod def fix_scheme(scheme): if scheme in consts.DOC_SCHEME_LIST: return scheme elif scheme.upper() in consts.DOC_SCHEME_LIST: return scheme.upper() else: return consts.DOC_SCHEME_LIST[0] @staticmethod def fix_data_source(data_source): if data_source in consts.DATA_SOURCE_LIST: return data_source elif data_source.upper() in consts.DATA_SOURCE_LIST: return data_source.upper() else: return consts.DATA_SOURCE_LIST[0] class PreSEHandler: # preSettlement @staticmethod def pre_compare_entrance(pos_content): application_entity = pos_content.get('applicationEntity') application_id = pos_content.get('applicationId') # 根据application_id查找OCR累计结果指定license字段,如果没有,结束 result_class = HILSEOCRResult if application_entity in consts.HIL_SET else AFCSEOCRResult ca_result_class = HILOCRResult if application_entity in consts.HIL_SET else AFCOCRResult data_source = '' if application_entity == consts.AFC_PREFIX: doc_obj = AFCDoc.objects.filter(application_id=application_id, document_name__icontains='电子签署-车辆抵押贷款合同').last() if doc_obj is not None: data_source = doc_obj.data_source LoggerMixin.running_log.info('[pre get data_source] [id={0}] [data_source={1}]]'.format( application_id, data_source)) ca_ocr_res_dict = ca_result_class.objects.filter(application_id=application_id).values( *consts.CA_ADD_COMPARE_FIELDS_PRE).first() ocr_res_dict = result_class.objects.filter(application_id=application_id).values( *consts.PRE_COMPARE_FIELDS).first() if ocr_res_dict is None: return get_empty_result() id_res_list = [] for field_name in consts.CA_ADD_COMPARE_FIELDS_PRE: if field_name == consts.IC_OCR_FIELD: id_res_list.append(ca_ocr_res_dict.get(field_name) if isinstance(ca_ocr_res_dict, dict) else None) id_res_list.append(ocr_res_dict.get(field_name)) rebuild_compare_result = pre_compare(pos_content, ocr_res_dict, id_res_list, data_source) return rebuild_compare_result class PosHandler: VehicleRegArea_fields = ['抵押权人姓名/名称', '解除抵押日期'] VehicleRCI_fields = ['1.机动车所有人/身份证名称/号码'] @staticmethod def de_mortgage_ocr_process(img_base64): result_obj = { 'customerName': '', 'application': '', 'deMortgageDate': '' } json_data = {"file": img_base64, "classify": consts.MVC_CLASSIFY, "version": "green"} try: url = conf.OCR_URL_FOLDER response = requests.post(url, json=json_data) if response.status_code != 200: return result_obj response.encoding = 'unicode_escape' ocr_res = response.json() results = ocr_res.get('results') if ocr_res.get('page', '') == 'VehicleRegArea': register_infos = results.get('register_info', []) for register_info in register_infos: if register_info.get('register_type', -1) == 1: info = register_info.get('解除抵押日期', {}) result_obj['deMortgageDate'] = info.get('words', '') elif register_info.get('register_type', -1) == 0: info = register_info.get('抵押权人姓名/名称', {}) result_obj['application'] = info.get('words', '') elif ocr_res.get('page', '') == 'VehicleRCI': info = results.get('1.机动车所有人/身份证名称/号码', {}) result_obj['customerName'] = info.get('words', '').split('/')[0] except Exception as e: LoggerMixin.running_log.error("[PosHandler de_mortgage_ocr_process] error", exc_info=1) return result_obj @staticmethod def greenbook_process(result_obj, img_base64): url = conf.OCR_URL_FOLDER json_data = {"file": img_base64, "classify": consts.MVC_CLASSIFY, "vrc_format": "v2"} try: response = requests.post(url, json=json_data) # unicode转中文 response.encoding = 'unicode_escape' if response.status_code != 200: LoggerMixin.exception_log.error( "[PosHandler greenbook_process] request error, url: %s, response: %s", url, response.text) return LoggerMixin.running_log.info( "[PosHandler greenbook_process] request success, url: %s, response: %s", url, response.text) result = response.json() data = result.get('data', []) for item in data: ocr_res = item.get('data', {}) if ocr_res.get('page', '') == 'page_34': words_result = ocr_res.get('words_result', {}) registration_bar = words_result.get('registration_bar', []) for registration in registration_bar: if registration.get('register_type', '') == '抵押登记': register_info = registration.get('register_info', {}) result_obj['applicationName'] = register_info.get('抵押权人姓名/名称', '') elif registration.get('register_type', '') == '解除抵押': register_info = registration.get('register_info', {}) result_obj['deMortgageDate'] = register_info.get('解除抵押日期', '') elif ocr_res.get('page', '') == 'page_12': words_result = ocr_res.get('words_result', {}) for _, word_result in words_result.items(): if word_result.get('chinese_key', '') == '1.机动车所有人/身份证名称/号码': result_obj['customerName'] = word_result.get('words', '').split('/')[0] except Exception as e: LoggerMixin.exception_log.error("[PosHandler greenbook_process] error", exc_info=1) @staticmethod def de_mortgage_ocr_process1(file_obj): result_obj = { 'customerName': '', 'applicationName': '', 'deMortgageDate': '' } doc_type = file_obj.get('documentType', '') img_base64 = file_obj.get('fileBase64') if doc_type == DocumentType.GREEN_BOOK.value.en_name: PosHandler.greenbook_process(result_obj, img_base64) return result_obj