8ae1670d by 周伟奇

comparison v1

1 parent 30a0ce9d
......@@ -1019,15 +1019,108 @@ BASE_XML_TEXT = """<?xml version="1.0" encoding="utf-8"?>
CDATA_TEXT = """<![CDATA[<Exec xmlns="http://tempuri.org/"><strXMLParm>&lt;Request&gt;&lt;Framework&gt;&lt;UserName&gt;SFCHINA\qqcout0&lt;/UserName&gt;&lt;GUID&gt;70d0efcb-3bc2-4018-ac4e-681c8f3131b6&lt;/GUID&gt;&lt;DetailedTracingEnabled&gt;False&lt;/DetailedTracingEnabled&gt;&lt;ServiceName&gt;AMSWebService&lt;/ServiceName&gt;&lt;SupportsRedirection&gt;true&lt;/SupportsRedirection&gt;&lt;ServiceType&gt;Service&lt;/ServiceType&gt;&lt;/Framework&gt;&lt;Parms&gt;&lt;InputXML type="string"&gt;&amp;lt;?xml version="1.0" encoding="utf-16"?&amp;gt;&amp;lt;InputXML&amp;gt; &amp;lt;Result&amp;gt; {0} &amp;lt;/Result&amp;gt;&amp;lt;AuthorizationData&amp;gt;&amp;lt;ServiceComponent&amp;gt;OCR&amp;lt;/ServiceComponent&amp;gt;&amp;lt;RoleId/&amp;gt;&amp;lt;CompanyId/&amp;gt;&amp;lt;/AuthorizationData&amp;gt;&amp;lt;/InputXML&amp;gt;&lt;/InputXML&gt;&lt;/Parms&gt;&lt;/Request&gt;</strXMLParm></Exec>]]>"""
RESULT_MAPPING = {
MVI_CLASSIFY: 'mvi_ocr',
# MVI_CLASSIFY: 'mvi_ocr',
IC_CLASSIFY: 'ic_ocr',
RP_CLASSIFY: 'rp_ocr',
BC_CLASSIFY: 'bc_ocr',
# BC_CLASSIFY: 'bc_ocr',
BL_CLASSIFY: 'bl_ocr',
UCI_CLASSIFY: 'uci_ocr',
# UCI_CLASSIFY: 'uci_ocr',
EEP_CLASSIFY: 'eep_ocr',
DL_CLASSIFY: 'dl_ocr',
PP_CLASSIFY: 'pp_ocr',
MVC_CLASSIFY: 'mvc_ocr',
VAT_CLASSIFY: 'vat_ocr',
# VAT_CLASSIFY: 'vat_ocr',
}
COMPARE_FIELDS = ('ic_ocr', 'rp_ocr', 'bl_ocr', 'eep_ocr', 'dl_ocr', 'pp_ocr', 'mvc_ocr')
# 身份证
ITPRC = {
'customerChineseName': ('姓名', 'common_compare', {}),
'idNum': ('公民身份号码', 'common_compare', {}),
# 20200410-20250410 OCR识别为长期,向GCAP发送:2099-12-31 00:00:00.0
'idExpiryDate': ('有效期限', 'date_compare', {'long': True, 'ocr_split': True, 'input_replace': ''}),
}
# 护照
ITPSP = {
'customerChineseName': ('英文姓名', 'common_compare', {}),
'idNum': ('护照号码', 'common_compare', {}),
'idExpiryDate': ('有效期至', 'date_compare', {'input_replace': ''}), # 20250410
'dateOfBirth': ('出生日期', 'date_compare', {'input_replace': ''}), # 20250410
}
# 港澳台通行证
ITHKM_ITTID = {
'customerChineseName': ('中文名', 'common_compare', {}),
'idNum': ('证件号码', 'common_compare', {}),
'idExpiryDate': ('有效期限', 'date_compare', {'ocr_split': True, 'input_replace': '.'}), # 2013.10.24-2023.10.23
'dateOfBirth': ('出生日期', 'date_compare', {'input_replace': '.'}), # 2023.10.23
# 'secondIdNum': ''
}
# 居住证
ITRES = {
'customerChineseName': ('姓名', 'common_compare', {}),
'idNum': ('公民身份号码', 'common_compare', {}),
'idExpiryDate': ('有效期限', 'date_compare', {'ocr_split': True, 'input_replace': ''}), # 20200410-20250410
'secondIdNum': ('通行证号码', 'common_compare', {})
}
ID_TYPE_COMPARE = {
'ITPRC': {'model_field': 'ic_ocr', 'compare_field': ITPRC},
'ITPSP': {'model_field': 'pp_ocr', 'compare_field': ITPSP},
'ITHKM': {'model_field': 'eep_ocr', 'compare_field': ITHKM_ITTID},
'ITTID': {'model_field': 'eep_ocr', 'compare_field': ITHKM_ITTID},
'ITRES': {'model_field': 'rp_ocr', 'compare_field': ITRES},
}
# 1. 分别对比 POS车架号 vs 车辆登记正,POS车架号 vs 行驶证
# a)两种比对 均一致:Y
# b)其中一个 不一致:N,向GCAP发送:不一致的OCR识别结果
# c)两中比对 均不一致:N,向GCAP发送:车辆登记证 & 行驶证识别结果
# 车辆登记证
PCUSD_MVC = {
'vinNo': ('9.车辆识别代号/车架号', 'common_compare', {}),
'manufactureDate': ('32.车辆出厂日期', 'common_compare', {}),
'firstRegistrationDate': ('3.登记日期', 'common_compare', {}),
}
# 行驶证
PCUSD_DL = {
'vinNo': ('车辆识别代码', 'common_compare', {}),
# 'manufactureDate': '',
# 'firstRegistrationDate': '',
}
# 营业执照
TCCOR = {
'customerChineseName': ('企业名称', 'common_compare', {}),
'legalRepName': ('经营者姓名', 'common_compare', {}),
'idNum': ('注册号', 'common_compare', {}),
'businessLicenseNo': ('注册号', 'common_compare', {}),
'taxRegistrationCode': ('注册号', 'common_compare', {}),
'incorporationDate': ('成立日期', 'date_compare', {'ocr_replace': True}), # 2017年07月11日
# 2017年07月11日至长期 1. OCR识别为长期,向GCAP发送:2099-12-31 00:00:00.0
'businessLicenseDueDate': ('营业期限', 'date_compare', {'long': True, 'ocr_replace': True}),
'capitalRegAmount': ('注册资本', 'rmb_compare', {}), # 壹拾万元整 将OCR识别结果(人民币大写)转化为数字
}
TCSEP = {
'companyName': ('企业名称', 'common_compare', {}),
'registeredCapital': ('注册资本', 'rmb_compare', {}), # 壹拾万元整 将OCR识别结果(人民币大写)转化为数字
'selfEmployedSubType': ('企业类型', 'type_compare', {}), # 有限责任公司
}
# 1. POS数据OCR识别结果对应关系如下:
# a)Individual Businessman CSIBM => 个体工商户
# b)Small and Micro Enterprise Owners CSSME => 个人独资企业、有限合伙企业、股份合作制、有限责任公司(***)【只需比对“有限责任公司”即可】
# c)Others CSOTH => 识别结果不一致时,向GCAP发送 OCR的识别结果
RESULT_Y = 'Y'
RESULT_N = 'N'
RESULT_NA = 'NA'
......
......@@ -12,3 +12,7 @@ class OCR2Exception(Exception):
class OCR4Exception(Exception):
pass
class GCAPException(Exception):
pass
......
......@@ -888,6 +888,7 @@ class Command(BaseCommand, LoggerMixin):
os.remove(excel_path)
finally:
# TODO 识别结果存一张表,方便跑报表
if doc.document_scheme == consts.DOC_SCHEME_LIST[0]:
try:
# 更新OCR累计识别结果表
result_class = HILOCRResult if business_type == consts.HIL_PREFIX else AFCOCRResult
......
......@@ -3,6 +3,7 @@ from requests.auth import HTTPBasicAuth
from settings import conf
from common.tools.dict_to_xml import dicttoxml, escape_xml
from apps.doc import consts
from apps.doc.exceptions import GCAPException
class GCAP:
......@@ -19,11 +20,10 @@ class GCAP:
comparison_xml = dicttoxml(comparison_res, root=False, attr_type=False)
return consts.BASE_XML_TEXT.format(consts.CDATA_TEXT.format(escape_xml(comparison_xml))).encode('utf-8')
def send(self, comparison_res):
data = self.dict_to_xml(comparison_res)
def send(self, data):
response = requests.post(self.url, headers=self.headers, data=data, verify=False, auth=self.auth)
return response
if response.status_code != 200:
raise GCAPException('GCAP response with code: {0}'.format(response.status_code))
gcap = GCAP()
......
import json
import logging
import traceback
from . import app
from apps.doc.models import AFCDoc
from apps.doc.models import AFCOCRResult, HILOCRResult, AFCComparisonInfo, HILComparisonInfo
from apps.doc import consts
from apps.doc.ocr.gcap import gcap
from apps.doc.exceptions import GCAPException
from common.tools.comparison import cp
compare_log = logging.getLogger('compare')
log_base = '[CA Compare]'
@app.task
......@@ -10,8 +17,190 @@ def compare(application_id, application_entity, uniq_seq, ocr_res_id):
# POS: application_id, application_entity, uniq_seq, None
# OCR: application_id, business_type(application_entity), None, ocr_res_id
compare_log.info('{0} [receive task] [entity={1}] [id={2}] [uniq_seq={3}] [ocr_res_id={4}]'.format(
log_base, application_entity, application_id, uniq_seq, ocr_res_id))
# 根据application_id查找最新的比对信息,如果没有,结束
# 分析比对信息,需要比对的license
comparison_class = HILComparisonInfo if application_entity == consts.HIL_PREFIX else AFCComparisonInfo
last_obj = comparison_class.objects.filter(application_id=application_id).last()
if last_obj is None:
compare_log.info('{0} [comparison info empty] [entity={1}] [id={2}] [uniq_seq={3}] [ocr_res_id={4}]'.format(
log_base, application_entity, application_id, uniq_seq, ocr_res_id
))
return
# 根据application_id查找OCR累计结果指定license字段,如果没有,结束
# 比对信息,将比对结果发送GCAP
pass
result_class = HILOCRResult if application_entity == consts.HIL_PREFIX else AFCOCRResult
if ocr_res_id is None:
ocr_res_dict = result_class.objects.filter(application_id=application_id).values(*consts.COMPARE_FIELDS).first()
else:
ocr_res_dict = result_class.objects.filter(id=ocr_res_id).values(*consts.COMPARE_FIELDS).first()
if ocr_res_dict is None:
compare_log.info('{0} [ocr info empty] [entity={1}] [id={2}] [uniq_seq={3}] [ocr_res_id={4}]'.format(
log_base, application_entity, application_id, uniq_seq, ocr_res_id
))
return
# 比对信息
comparison_res = {
'OCR_Input': {
'uniqSeq': last_obj.uniq_seq,
'applicationId': application_id,
'applicationEntity': application_entity,
'applicationVersion': last_obj.application_version,
'vehicleStatus': last_obj.vehicle_status,
# 'wholeResult': 'Y',
'wholeResultMessage': '',
'applicationLink': '',
}
}
res_set = set()
is_sep = True if last_obj.customer_type == consts.CUSTOMER_TYPE[5] else False
individual_cus_info_list = json.loads(last_obj.individual_cus_info)
for individual_cus_info in individual_cus_info_list:
individual_cus_info['customerType'] = last_obj.customer_type
# sep营业执照
if is_sep and individual_cus_info.get('companyName') is not None:
sep_is_find = False
sep_ocr_res_str = ocr_res_dict.get('bl_ocr')
if sep_ocr_res_str is not None:
sep_ocr_list = json.loads(sep_ocr_res_str)
for sep_ocr in sep_ocr_list:
company_name = sep_ocr.get(consts.TCSEP.get('companyName')[0])
if company_name is None or company_name != individual_cus_info.get('companyName'):
continue
sep_is_find = True
for sep_field, sep_tuple in consts.TCSEP.items():
sep_res = getattr(cp, sep_tuple[1])(
individual_cus_info.get(sep_field), sep_ocr.get(sep_tuple[0]), sep_tuple[2])
individual_cus_info[sep_field + 'Result'] = sep_res
res_set.add(sep_res)
break
if not sep_is_find:
res_set.add(consts.RESULT_N)
for field in consts.TCSEP.keys():
individual_cus_info[field + 'Result'] = consts.RESULT_N
# 个人信息证件
id_type = individual_cus_info.get('idType')
compare_target = consts.ID_TYPE_COMPARE.get(id_type)
if compare_target is None:
continue
is_find = False
ocr_res_str = ocr_res_dict.get(compare_target.get('model_field'))
if ocr_res_str is not None:
ocr_res_list = json.loads(ocr_res_str)
compare_target_dict = compare_target.get('compare_field')
for ocr_res in ocr_res_list:
base_name = ocr_res.get(compare_target_dict.get('customerChineseName')[0])
if base_name is None or base_name != individual_cus_info.get('customerChineseName'): # TODO 特殊姓名比对
continue
is_find = True
for compare_field, compare_tuple in compare_target.get('compare_field').items():
compare_res = getattr(cp, compare_tuple[1])(
individual_cus_info.get(compare_field), ocr_res.get(compare_tuple[0]), compare_tuple[2])
individual_cus_info[compare_field + 'Result'] = compare_res
res_set.add(compare_res)
break
if not is_find:
res_set.add(consts.RESULT_N)
for field in compare_target.get('compare_field').keys():
individual_cus_info[field + 'Result'] = consts.RESULT_N
comparison_res['individualCusInfo'] = individual_cus_info_list
if last_obj.corporate_cus_info is not None:
corporate_cus_info = json.loads(last_obj.corporate_cus_info)
corporate_cus_info['customerType'] = last_obj.customer_type
is_bl_find = False
bl_ocr_res_str = ocr_res_dict.get('bl_ocr')
if bl_ocr_res_str is not None:
bl_ocr_list = json.loads(bl_ocr_res_str)
for bl_ocr in bl_ocr_list:
company_name = bl_ocr.get(consts.TCCOR.get('customerChineseName')[0])
if company_name is None or company_name != corporate_cus_info.get('customerChineseName'):
continue
is_bl_find = True
for bl_field, bl_tuple in consts.TCCOR.items():
bl_res = getattr(cp, bl_tuple[1])(
corporate_cus_info.get(bl_field), bl_ocr.get(bl_tuple[0]), bl_tuple[2])
corporate_cus_info[bl_field + 'Result'] = bl_res
res_set.add(bl_res)
break
if not is_bl_find:
res_set.add(consts.RESULT_N)
for field in consts.TCCOR.keys():
corporate_cus_info[field + 'Result'] = consts.RESULT_N
comparison_res['corporateCusInfo'] = corporate_cus_info
if last_obj.vehicle_status == consts.VEHICLE_STATUS[0] and last_obj.usedcar_info is not None:
usedcar_info = json.loads(last_obj.usedcar_info)
is_usedcar_find = False
mvc_ocr_res_str = ocr_res_dict.get('mvc_ocr')
if mvc_ocr_res_str is not None:
mvc_ocr_list = json.loads(mvc_ocr_res_str)
for mvc_ocr in mvc_ocr_list:
vin_no = mvc_ocr.get(consts.PCUSD_MVC.get('vinNo')[0])
if vin_no is None or vin_no != usedcar_info.get('vinNo'):
continue
is_usedcar_find = True
for mvc_field, mvc_tuple in consts.PCUSD_MVC.items():
mvc_res = getattr(cp, mvc_tuple[1])(
usedcar_info.get(mvc_field), mvc_ocr.get(mvc_tuple[0]), mvc_tuple[2])
usedcar_info[mvc_field + 'Result'] = mvc_res
res_set.add(mvc_res)
dl_find = False
dl_ocr_res_str = ocr_res_dict.get('dl_ocr')
if dl_ocr_res_str is not None:
dl_ocr_list = json.loads(dl_ocr_res_str)
for dl_ocr in dl_ocr_list:
dl_vin_no = dl_ocr.get(consts.PCUSD_DL.get('vinNo')[0])
if dl_vin_no is None or dl_vin_no != usedcar_info.get('vinNo'):
continue
dl_find = True
break
if not dl_find:
res_set.add(consts.RESULT_N)
usedcar_info['vinNo' + 'Result'] = consts.RESULT_N
break
if not is_usedcar_find:
res_set.add(consts.RESULT_N)
for field in consts.PCUSD_MVC.keys():
usedcar_info[field + 'Result'] = consts.RESULT_N
comparison_res['usedCarInfo'] = usedcar_info
comparison_res['wholeResult'] = consts.RESULT_N if consts.RESULT_N in res_set else consts.RESULT_Y
print(comparison_res)
# 将比对结果发送GCAP
# try:
# data = gcap.dict_to_xml(comparison_res)
# except Exception as e:
# compare_log.error('{0} [dict to xml failed] [entity={1}] [id={2}] [uniq_seq={3}] [ocr_res_id={4}] '
# '[error={5}]'.format(log_base, application_entity, application_id, uniq_seq, ocr_res_id,
# traceback.format_exc()))
# else:
# try:
# for times in range(consts.RETRY_TIMES):
# try:
# gcap.send(data)
# except Exception as e:
# gcap_exc = str(e)
# else:
# break
# else:
# raise GCAPException(gcap_exc)
# except Exception as e:
# compare_log.error('{0} [gcap failed] [entity={1}] [id={2}] [uniq_seq={3}] [ocr_res_id={4}] '
# '[error={5}]'.format(log_base, application_entity, application_id, uniq_seq,
# ocr_res_id, traceback.format_exc()))
# else:
# compare_log.info('{0} [task success] [entity={1}] [id={2}] [uniq_seq={3}] [ocr_res_id={4}]'.format(
# log_base, application_entity, application_id, uniq_seq, ocr_res_id))
......
import re
from .rmb_upper import to_rmb_upper
class Comparison:
def __init__(self):
self.CSIBM = 'CSIBM'
self.CSSME = 'CSSME'
self.CSOTH = 'CSOTH'
self.TYPE_MAPPING = (
(r'个体工商户', self.CSIBM),
(r'有限责任公司', self.CSSME),
(r'个人独资企业', self.CSSME),
(r'有限合伙企业', self.CSSME),
(r'股份合作制', self.CSSME),
)
self.RESULT_Y = 'Y'
self.RESULT_N = 'N'
self.RESULT_NA = 'NA' # TODO NA情况
def build_res(self, result):
if result:
return self.RESULT_Y
else:
return self.RESULT_N
def common_compare(self, input_str, ocr_str, **kwargs):
return self.build_res(input_str == ocr_str)
def date_compare(self, input_str, ocr_str, **kwargs):
if kwargs.get('long', False) and '长期' in ocr_str:
return '2099-12-31'
if kwargs.get('ocr_split', False):
ocr_str = ocr_str.split('-')[-1]
if kwargs.get('ocr_replace', False):
ocr_str = ocr_str.replace('年', '-').replace('月', '-').replace('日', '')
if kwargs.get('input_replace') is not None:
input_str = input_str.replace('-', kwargs.get('replace'))
return self.build_res(input_str == ocr_str)
def rmb_compare(self, input_str, ocr_str, **kwargs):
input_rmb_upper = to_rmb_upper(float(input_str))
return self.build_res(input_rmb_upper == ocr_str)
def type_compare(self, input_str, ocr_str, **kwargs):
for map_tuple in self.TYPE_MAPPING:
if re.search(map_tuple[0], ocr_str) is not None:
compare_str = map_tuple[1]
break
else:
compare_str = self.CSOTH
return self.build_res(input_str == compare_str)
cp = Comparison()
from io import StringIO
import math
_RMB_DIGITS = ['零', '壹', '贰', '叁', '肆', '伍', '陆', '柒', '捌', '玖' ]
_SECTION_CHARS = ['', '拾', '佰', '仟', '万' ]
def to_rmb_upper(price):
price = round(price, 2)
integer_part = int(price)
wanyi_part = integer_part // 1000000000000
yi_part = integer_part % 1000000000000 // 100000000
wan_part = integer_part % 100000000 // 10000
qian_part = integer_part % 10000
dec_part = int(round(price * 100 % 100))
strio = StringIO()
zero_count = 0
#处理万亿以上的部分
if integer_part >= 1000000000000 and wanyi_part > 0:
zero_count = _parse_integer(strio, wanyi_part, zero_count, True)
strio.write('万')
#处理亿到千亿的部分
if integer_part >= 100000000 and yi_part > 0:
is_first_section = integer_part >= 100000000 and integer_part < 1000000000000
zero_count = _parse_integer(strio, yi_part, zero_count, is_first_section)
strio.write('亿')
#处理万的部分
if integer_part >= 10000 and wan_part > 0:
is_first_section = integer_part >= 1000 and integer_part < 10000000
zero_count = _parse_integer(strio, wan_part, zero_count, is_first_section)
strio.write('万')
#处理千及以后的部分
if qian_part > 0:
is_first_section = integer_part < 1000
zero_count = _parse_integer(strio, qian_part, zero_count, is_first_section)
else:
zero_count += 1
if integer_part > 0:
strio.write('元')
#处理小数
if dec_part > 0:
_parse_decimal(strio, integer_part, dec_part, zero_count)
elif dec_part == 0 and integer_part > 0:
strio.write('整')
else:
strio.write('零元整')
return strio.getvalue()
def _parse_integer(strio, value, zero_count = 0, is_first_section = False):
assert value > 0 and value <= 9999
ndigits = int(math.floor(math.log10(value))) + 1
if value < 1000 and not is_first_section:
zero_count += 1
for i in range(0, ndigits):
factor = int(pow(10, ndigits - 1 - i))
digit = int(value / factor)
if digit != 0:
if zero_count > 0:
strio.write('零')
strio.write(_RMB_DIGITS[digit])
strio.write(_SECTION_CHARS[ndigits - i - 1])
zero_count = 0
else:
zero_count += 1
value -= value // factor * factor
return zero_count
def _parse_decimal(strio, integer_part, value, zero_count):
assert value > 0 and value <= 99
jiao = value // 10
fen = value % 10
if zero_count > 0 and (jiao > 0 or fen > 0) and integer_part > 0:
strio.write('零')
if jiao > 0:
strio.write(_RMB_DIGITS[jiao])
strio.write('角')
if zero_count == 0 and jiao == 0 and fen > 0 and integer_part > 0:
strio.write('零')
if fen > 0:
strio.write(_RMB_DIGITS[fen])
strio.write('分')
else:
strio.write('整')
\ No newline at end of file
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!