mixins.py 6.77 KB
import re
import json
import requests
from .named_enum import DocStatus
from .models import HILDoc, AFCDoc, AFCSEOCRResult, AFCOCRResult, HILSEOCRResult, HILOCRResult
from . import consts
from prese.compare import pre_compare, get_empty_result


class MPOSHandler:

    @staticmethod
    def ocr1_process(url, img_base64):
        result_list = []
        json_data = {
            "file": img_base64,
        }
        ocr_1_response = requests.post(url, json=json_data)
        if ocr_1_response.status_code != 200:
            return result_list
        ocr_1_res = ocr_1_response.json()

        for ocr_data in ocr_1_res.get('data', []):

            license_data = ocr_data.get('data')
            if not license_data:
                continue

            if isinstance(license_data, dict):
                license_data.pop('base64_img', '')

            id_card_dict = {}
            card_type = license_data.get('type', '')
            is_ic = card_type.startswith('身份证')
            is_info_side = card_type.endswith('信息面')
            # id_card_dict['类别'] = '0' if is_ic else '1'
            if is_ic:
                field_map = consts.IC_MAP_0 if is_info_side else consts.IC_MAP_1
            else:
                field_map = consts.RP_MAP_0 if is_info_side else consts.RP_MAP_1
            for write_field, search_field in field_map:
                id_card_dict[write_field] = license_data.get('words_result', {}).get(search_field, {}).get('words', '')
            if not is_info_side:
                start_time = license_data.get('words_result', {}).get('签发日期', {}).get('words', '')
                end_time = license_data.get('words_result', {}).get('失效日期', {}).get('words', '')
                id_card_dict['有效期限'] = '{0}-{1}'.format(start_time, end_time)
            result_list.append(id_card_dict)
        return result_list

    @staticmethod
    def ocr2_process(url, classify, img_base64):
        result_list = []
        pid, _, _, _, _, _ = consts.LICENSE_CLASSIFY_MAPPING.get(classify)
        json_data_2 = {
            "pid": str(pid),
            "filedata": img_base64
        }
        ocr_2_response = requests.post(url, data=json_data_2)
        if ocr_2_response.status_code != 200:
            return result_list
        ocr_2_res = json.loads(ocr_2_response.text)

        if ocr_2_res.get('ErrorCode') in consts.SUCCESS_CODE_SET:
            if pid == consts.BC_PID:
                # 银行卡
                # res_dict = {}
                # for en_key, chn_key in consts.BC_FIELD:
                #     res_dict[chn_key] = ocr_res_2.get(en_key, '')
                result_list.append(ocr_2_res)
            else:
                # 营业执照等
                for result_dict in ocr_2_res.get('ResultList', []):
                    res_dict = {}
                    for field_dict in result_dict.get('FieldList', []):
                        res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '')
                    result_list.append(res_dict)
        return result_list


class DocHandler:

    @staticmethod
    def xss_pass(file):
        for pno in range(file.pageCount):
            page = file.loadPage(pno)
            page_text = page.getText()
            if re.search(r'/JS(.*)', page_text) and re.search(r'/S /JavaScript', page_text):
                return False
        return True

    @staticmethod
    def get_name(info, key, length):
        if not isinstance(info, dict):
            return ''
        src_name = info.get(key, '')
        if len(src_name) < length:
            return src_name
        else:
            return consts.LONG_NAME

    @staticmethod
    def get_link(doc_id, business_type, file='pdf'):
        if file == 'pdf':
            return '/data/{1}/{2}/{0}/{0}.pdf'.format(doc_id, business_type, consts.TMP_DIR_NAME)
        elif file == 'img':
            return '/data/{1}/{2}/{0}/{0}_img.zip'.format(doc_id, business_type, consts.TMP_DIR_NAME)
        elif file == 'src_excel':
            return '/data/{1}/{2}/{0}/src.xlsx'.format(doc_id, business_type, consts.TMP_DIR_NAME)
        else:
            return '/data/{1}/{2}/{0}/{0}.xlsx'.format(doc_id, business_type, consts.TMP_DIR_NAME)

    def get_doc_list(self, doc_queryset, business_type):
        for doc_dict in doc_queryset:
            if doc_dict['status'] not in [DocStatus.COMPLETE.value, DocStatus.UPLOAD_FAILED.value]:
                continue
            doc_id = doc_dict.get('id')
            doc_dict['pdf_link'] = self.get_link(doc_id, business_type)
            doc_dict['img_link'] = self.get_link(doc_id, business_type, file='img')
            doc_dict['excel_link'] = self.get_link(doc_id, business_type, file='excel')
            doc_dict['src_excel_link'] = self.get_link(doc_id, business_type, file='src_excel')
        return list(doc_queryset)

    @staticmethod
    def get_doc_class(business_type):
        return (HILDoc, consts.HIL_PREFIX) if business_type in consts.HIL_SET else (AFCDoc, consts.AFC_PREFIX)

    @staticmethod
    def fix_scheme(scheme):
        if scheme in consts.DOC_SCHEME_LIST:
            return scheme
        elif scheme.upper() in consts.DOC_SCHEME_LIST:
            return scheme.upper()
        else:
            return consts.DOC_SCHEME_LIST[0]

    @staticmethod
    def fix_data_source(data_source):
        if data_source in consts.DATA_SOURCE_LIST:
            return data_source
        elif data_source.upper() in consts.DATA_SOURCE_LIST:
            return data_source.upper()
        else:
            return consts.DATA_SOURCE_LIST[0]


class PreSEHandler:

    # preSettlement
    @staticmethod
    def pre_compare_entrance(pos_content):
        application_entity = pos_content.get('applicationEntity')
        application_id = pos_content.get('applicationId')

        # 根据application_id查找OCR累计结果指定license字段,如果没有,结束
        result_class = HILSEOCRResult if application_entity in consts.HIL_SET else AFCSEOCRResult
        ca_result_class = HILOCRResult if application_entity in consts.HIL_SET else AFCOCRResult

        ca_ocr_res_dict = ca_result_class.objects.filter(application_id=application_id).values(
            *consts.CA_ADD_COMPARE_FIELDS_PRE).first()
        ocr_res_dict = result_class.objects.filter(application_id=application_id).values(
            *consts.PRE_COMPARE_FIELDS).first()
        if ocr_res_dict is None:
            return get_empty_result()

        id_res_list = []
        for field_name in consts.CA_ADD_COMPARE_FIELDS_PRE:
            if field_name == consts.IC_OCR_FIELD:
                id_res_list.append(ca_ocr_res_dict.get(field_name) if isinstance(ca_ocr_res_dict, dict) else None)
                id_res_list.append(ocr_res_dict.get(field_name))

        rebuild_compare_result = pre_compare(pos_content, ocr_res_dict, id_res_list)
        return rebuild_compare_result