mixins.py
6.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import re
import json
import requests
from .named_enum import DocStatus
from .models import HILDoc, AFCDoc, AFCSEOCRResult, AFCOCRResult, HILSEOCRResult, HILOCRResult
from . import consts
from prese.compare import pre_compare, get_empty_result
class MPOSHandler:
@staticmethod
def ocr1_process(url, img_base64):
result_list = []
json_data = {
"file": img_base64,
}
ocr_1_response = requests.post(url, json=json_data)
if ocr_1_response.status_code != 200:
return result_list
ocr_1_res = ocr_1_response.json()
for ocr_data in ocr_1_res.get('data', []):
license_data = ocr_data.get('data')
if not license_data:
continue
if isinstance(license_data, dict):
license_data.pop('base64_img', '')
id_card_dict = {}
card_type = license_data.get('type', '')
is_ic = card_type.startswith('身份证')
is_info_side = card_type.endswith('信息面')
# id_card_dict['类别'] = '0' if is_ic else '1'
if is_ic:
field_map = consts.IC_MAP_0 if is_info_side else consts.IC_MAP_1
else:
field_map = consts.RP_MAP_0 if is_info_side else consts.RP_MAP_1
for write_field, search_field in field_map:
id_card_dict[write_field] = license_data.get('words_result', {}).get(search_field, {}).get('words', '')
if not is_info_side:
start_time = license_data.get('words_result', {}).get('签发日期', {}).get('words', '')
end_time = license_data.get('words_result', {}).get('失效日期', {}).get('words', '')
id_card_dict['有效期限'] = '{0}-{1}'.format(start_time, end_time)
result_list.append(id_card_dict)
return result_list
@staticmethod
def ocr2_process(url, classify, img_base64):
result_list = []
pid, _, _, _, _, _ = consts.LICENSE_CLASSIFY_MAPPING.get(classify)
json_data_2 = {
"pid": str(pid),
"filedata": img_base64
}
ocr_2_response = requests.post(url, data=json_data_2)
if ocr_2_response.status_code != 200:
return result_list
ocr_2_res = json.loads(ocr_2_response.text)
if ocr_2_res.get('ErrorCode') in consts.SUCCESS_CODE_SET:
if pid == consts.BC_PID:
# 银行卡
# res_dict = {}
# for en_key, chn_key in consts.BC_FIELD:
# res_dict[chn_key] = ocr_res_2.get(en_key, '')
result_list.append(ocr_2_res)
else:
# 营业执照等
for result_dict in ocr_2_res.get('ResultList', []):
res_dict = {}
for field_dict in result_dict.get('FieldList', []):
res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '')
result_list.append(res_dict)
return result_list
class DocHandler:
@staticmethod
def xss_pass(file):
for pno in range(file.pageCount):
page = file.loadPage(pno)
page_text = page.getText()
if re.search(r'/JS(.*)', page_text) and re.search(r'/S /JavaScript', page_text):
return False
return True
@staticmethod
def get_name(info, key, length):
if not isinstance(info, dict):
return ''
src_name = info.get(key, '')
if len(src_name) < length:
return src_name
else:
return consts.LONG_NAME
@staticmethod
def get_link(doc_id, business_type, file='pdf'):
if file == 'pdf':
return '/data/{1}/{2}/{0}/{0}.pdf'.format(doc_id, business_type, consts.TMP_DIR_NAME)
elif file == 'img':
return '/data/{1}/{2}/{0}/{0}_img.zip'.format(doc_id, business_type, consts.TMP_DIR_NAME)
elif file == 'src_excel':
return '/data/{1}/{2}/{0}/src.xlsx'.format(doc_id, business_type, consts.TMP_DIR_NAME)
else:
return '/data/{1}/{2}/{0}/{0}.xlsx'.format(doc_id, business_type, consts.TMP_DIR_NAME)
def get_doc_list(self, doc_queryset, business_type):
for doc_dict in doc_queryset:
if doc_dict['status'] not in [DocStatus.COMPLETE.value, DocStatus.UPLOAD_FAILED.value]:
continue
doc_id = doc_dict.get('id')
doc_dict['pdf_link'] = self.get_link(doc_id, business_type)
doc_dict['img_link'] = self.get_link(doc_id, business_type, file='img')
doc_dict['excel_link'] = self.get_link(doc_id, business_type, file='excel')
doc_dict['src_excel_link'] = self.get_link(doc_id, business_type, file='src_excel')
return list(doc_queryset)
@staticmethod
def get_doc_class(business_type):
return (HILDoc, consts.HIL_PREFIX) if business_type in consts.HIL_SET else (AFCDoc, consts.AFC_PREFIX)
@staticmethod
def fix_scheme(scheme):
if scheme in consts.DOC_SCHEME_LIST:
return scheme
elif scheme.upper() in consts.DOC_SCHEME_LIST:
return scheme.upper()
else:
return consts.DOC_SCHEME_LIST[0]
@staticmethod
def fix_data_source(data_source):
if data_source in consts.DATA_SOURCE_LIST:
return data_source
elif data_source.upper() in consts.DATA_SOURCE_LIST:
return data_source.upper()
else:
return consts.DATA_SOURCE_LIST[0]
class PreSEHandler:
# preSettlement
@staticmethod
def pre_compare_entrance(pos_content):
application_entity = pos_content.get('applicationEntity')
application_id = pos_content.get('applicationId')
# 根据application_id查找OCR累计结果指定license字段,如果没有,结束
result_class = HILSEOCRResult if application_entity in consts.HIL_SET else AFCSEOCRResult
ca_result_class = HILOCRResult if application_entity in consts.HIL_SET else AFCOCRResult
ca_ocr_res_dict = ca_result_class.objects.filter(application_id=application_id).values(
*consts.CA_ADD_COMPARE_FIELDS_PRE).first()
ocr_res_dict = result_class.objects.filter(application_id=application_id).values(
*consts.PRE_COMPARE_FIELDS).first()
if ocr_res_dict is None:
return get_empty_result()
id_res_list = []
for field_name in consts.CA_ADD_COMPARE_FIELDS_PRE:
if field_name == consts.IC_OCR_FIELD:
id_res_list.append(ca_ocr_res_dict.get(field_name) if isinstance(ca_ocr_res_dict, dict) else None)
id_res_list.append(ocr_res_dict.get(field_name))
rebuild_compare_result = pre_compare(pos_content, ocr_res_dict, id_res_list)
return rebuild_compare_result