mixins.py 11.1 KB
import re
import json
import requests
from .named_enum import DocStatus
from .models import HILDoc, AFCDoc, AFCSEOCRResult, AFCOCRResult, HILSEOCRResult, HILOCRResult
from . import consts
from prese.compare import pre_compare, get_empty_result
from common.mixins import LoggerMixin
from settings import conf
from pos.consts import DocumentType


class MPOSHandler:

    @staticmethod
    def ocr1_process(url, img_base64):
        result_list = []
        json_data = {
            "file": img_base64,
        }
        ocr_1_response = requests.post(url, json=json_data)
        if ocr_1_response.status_code != 200:
            return result_list
        ocr_1_res = ocr_1_response.json()

        for ocr_data in ocr_1_res.get('data', []):

            license_data = ocr_data.get('data')
            if not license_data:
                continue

            if isinstance(license_data, dict):
                license_data.pop('base64_img', '')

            id_card_dict = {}
            card_type = license_data.get('type', '')
            is_ic = card_type.startswith('身份证')
            is_info_side = card_type.endswith('信息面')
            # id_card_dict['类别'] = '0' if is_ic else '1'
            if is_ic:
                field_map = consts.IC_MAP_0 if is_info_side else consts.IC_MAP_1
            else:
                field_map = consts.RP_MAP_0 if is_info_side else consts.RP_MAP_1
            for write_field, search_field in field_map:
                id_card_dict[write_field] = license_data.get('words_result', {}).get(search_field, {}).get('words', '')
            if not is_info_side:
                start_time = license_data.get('words_result', {}).get('签发日期', {}).get('words', '')
                end_time = license_data.get('words_result', {}).get('失效日期', {}).get('words', '')
                id_card_dict['有效期限'] = '{0}-{1}'.format(start_time, end_time)
            result_list.append(id_card_dict)
        return result_list

    @staticmethod
    def ocr2_process(url, classify, img_base64):
        result_list = []
        pid, _, _, _, _, _ = consts.LICENSE_CLASSIFY_MAPPING.get(classify)
        json_data_2 = {
            "pid": str(pid),
            "filedata": img_base64
        }
        ocr_2_response = requests.post(url, data=json_data_2)
        if ocr_2_response.status_code != 200:
            return result_list
        ocr_2_res = json.loads(ocr_2_response.text)

        if ocr_2_res.get('ErrorCode') in consts.SUCCESS_CODE_SET:
            if pid == consts.BC_PID:
                # 银行卡
                # res_dict = {}
                # for en_key, chn_key in consts.BC_FIELD:
                #     res_dict[chn_key] = ocr_res_2.get(en_key, '')
                result_list.append(ocr_2_res)
            else:
                # 营业执照等
                for result_dict in ocr_2_res.get('ResultList', []):
                    res_dict = {}
                    for field_dict in result_dict.get('FieldList', []):
                        res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '')
                    result_list.append(res_dict)
        return result_list


class DocHandler:

    @staticmethod
    def xss_pass(file):
        for pno in range(file.pageCount):
            page = file.loadPage(pno)
            page_text = page.getText()
            if re.search(r'/JS(.*)', page_text) and re.search(r'/S /JavaScript', page_text):
                return False
        return True

    @staticmethod
    def get_name(info, key, length):
        if not isinstance(info, dict):
            return ''
        src_name = info.get(key, '')
        if len(src_name) < length:
            return src_name
        else:
            return consts.LONG_NAME

    @staticmethod
    def get_link(doc_id, business_type, file='pdf'):
        if file == 'pdf':
            return '/data/{1}/{2}/{0}/{0}.pdf'.format(doc_id, business_type, consts.TMP_DIR_NAME)
        elif file == 'img':
            return '/data/{1}/{2}/{0}/{0}_img.zip'.format(doc_id, business_type, consts.TMP_DIR_NAME)
        elif file == 'src_excel':
            return '/data/{1}/{2}/{0}/src.xlsx'.format(doc_id, business_type, consts.TMP_DIR_NAME)
        else:
            return '/data/{1}/{2}/{0}/{0}.xlsx'.format(doc_id, business_type, consts.TMP_DIR_NAME)

    def get_doc_list(self, doc_queryset, business_type):
        for doc_dict in doc_queryset:
            if doc_dict['status'] not in [DocStatus.COMPLETE.value, DocStatus.UPLOAD_FAILED.value]:
                continue
            doc_id = doc_dict.get('id')
            doc_dict['pdf_link'] = self.get_link(doc_id, business_type)
            doc_dict['img_link'] = self.get_link(doc_id, business_type, file='img')
            doc_dict['excel_link'] = self.get_link(doc_id, business_type, file='excel')
            doc_dict['src_excel_link'] = self.get_link(doc_id, business_type, file='src_excel')
        return list(doc_queryset)

    @staticmethod
    def get_doc_class(business_type):
        return (HILDoc, consts.HIL_PREFIX) if business_type in consts.HIL_SET else (AFCDoc, consts.AFC_PREFIX)

    @staticmethod
    def fix_scheme(scheme):
        if scheme in consts.DOC_SCHEME_LIST:
            return scheme
        elif scheme.upper() in consts.DOC_SCHEME_LIST:
            return scheme.upper()
        else:
            return consts.DOC_SCHEME_LIST[0]

    @staticmethod
    def fix_data_source(data_source):
        if data_source in consts.DATA_SOURCE_LIST:
            return data_source
        elif data_source.upper() in consts.DATA_SOURCE_LIST:
            return data_source.upper()
        else:
            return consts.DATA_SOURCE_LIST[0]


class PreSEHandler:

    # preSettlement
    @staticmethod
    def pre_compare_entrance(pos_content):
        application_entity = pos_content.get('applicationEntity')
        application_id = pos_content.get('applicationId')

        # 根据application_id查找OCR累计结果指定license字段,如果没有,结束
        result_class = HILSEOCRResult if application_entity in consts.HIL_SET else AFCSEOCRResult
        ca_result_class = HILOCRResult if application_entity in consts.HIL_SET else AFCOCRResult

        ca_ocr_res_dict = ca_result_class.objects.filter(application_id=application_id).values(
            *consts.CA_ADD_COMPARE_FIELDS_PRE).first()
        ocr_res_dict = result_class.objects.filter(application_id=application_id).values(
            *consts.PRE_COMPARE_FIELDS).first()
        if ocr_res_dict is None:
            return get_empty_result()

        id_res_list = []
        for field_name in consts.CA_ADD_COMPARE_FIELDS_PRE:
            if field_name == consts.IC_OCR_FIELD:
                id_res_list.append(ca_ocr_res_dict.get(field_name) if isinstance(ca_ocr_res_dict, dict) else None)
                id_res_list.append(ocr_res_dict.get(field_name))

        rebuild_compare_result = pre_compare(pos_content, ocr_res_dict, id_res_list)
        return rebuild_compare_result


class PosHandler:
    VehicleRegArea_fields = ['抵押权人姓名/名称', '解除抵押日期']
    VehicleRCI_fields = ['1.机动车所有人/身份证名称/号码']

    @staticmethod
    def de_mortgage_ocr_process(img_base64):
        result_obj = {
            'customerName': '',
            'application': '',
            'deMortgageDate': ''
        }
        json_data = {"file": img_base64, "classify": consts.MVC_CLASSIFY, "version": "green"}
        try:
            url = conf.OCR_URL_FOLDER
            response = requests.post(url, json=json_data)
            if response.status_code != 200:
                return result_obj
            response.encoding = 'unicode_escape'
            ocr_res = response.json()
            results = ocr_res.get('results')
            if ocr_res.get('page', '') == 'VehicleRegArea':
                register_infos = results.get('register_info', [])
                for register_info in register_infos:
                    if register_info.get('register_type', -1) == 1:
                        info = register_info.get('解除抵押日期', {})
                        result_obj['deMortgageDate'] = info.get('words', '')
                    elif register_info.get('register_type', -1) == 0:
                        info = register_info.get('抵押权人姓名/名称', {})
                        result_obj['application'] = info.get('words', '')
            elif ocr_res.get('page', '') == 'VehicleRCI':
                info = results.get('1.机动车所有人/身份证名称/号码', {})
                result_obj['customerName'] = info.get('words', '').split('/')[0]
        except Exception as e:
            LoggerMixin.running_log.error("[PosHandler de_mortgage_ocr_process] error", exc_info=1)
        return result_obj

    @staticmethod
    def greenbook_process(result_obj, img_base64):
        url = conf.OCR_URL_FOLDER
        json_data = {"file": img_base64, "classify": consts.MVC_CLASSIFY, "version": "green"}
        try:
            response = requests.post(url, json=json_data)
            # unicode转中文
            response.encoding = 'unicode_escape'
            if response.status_code != 200:
                LoggerMixin.exception_log.error(
                    "[PosHandler de_mortgage_ocr_process1] request error, url: %s, response: %s",
                    url, response.text)
                return
            result = response.json()
            data = result.get('data', [])
            for item in data:
                ocr_res = item.get('data', {})
                if ocr_res.get('page', '') == 'page_34':
                    words_result = ocr_res.get('words_result', {})
                    registration_bar = words_result.get('registration_bar', [])
                    for registration in registration_bar:
                        if registration.get('register_type', '') == '抵押登记':
                            register_info = registration.get('register_info', {})
                            result_obj['applicationName'] = register_info.get('抵押权人姓名/名称', '')
                        elif registration.get('register_type', '') == '解除抵押':
                            register_info = registration.get('register_info', {})
                            result_obj['deMortgageDate'] = register_info.get('解除抵押日期', '')
                elif ocr_res.get('page', '') == 'page_12':
                    words_result = ocr_res.get('words_result', {})
                    for _, word_result in words_result.items():
                        if word_result.get('chinese_key', '') == '1.机动车所有人/身份证名称/号码':
                            result_obj['customerName'] = word_result.get('words', '').split('/')[0]
        except Exception as e:
            LoggerMixin.exception_log.error("[PosHandler greenbook_process] error", exc_info=1)

    @staticmethod
    def de_mortgage_ocr_process1(file_obj):
        result_obj = {
            'customerName': '',
            'applicationName': '',
            'deMortgageDate': ''
        }

        doc_type = file_obj.get('documentType', '')
        img_base64 = file_obj.get('fileBase64')
        if doc_type == DocumentType.GREEN_BOOK.value.en_name:
            PosHandler.greenbook_process(result_obj, img_base64)
        return result_obj