add new ocr flow
Showing
4 changed files
with
651 additions
and
4 deletions
... | @@ -36,7 +36,7 @@ APPLICATION_ID_META_FIELD_id = 1 | ... | @@ -36,7 +36,7 @@ APPLICATION_ID_META_FIELD_id = 1 |
36 | DEALER_CODE_META_FIELD_id = 13 | 36 | DEALER_CODE_META_FIELD_id = 13 |
37 | BUSINESS_TYPE_META_FIELD_id = 93 | 37 | BUSINESS_TYPE_META_FIELD_id = 93 |
38 | 38 | ||
39 | RETRY_TIMES = 3 | 39 | RETRY_TIMES = 2 |
40 | 40 | ||
41 | # ---------银行流水模板相关-------------------------------------------------------------------------------------------- | 41 | # ---------银行流水模板相关-------------------------------------------------------------------------------------------- |
42 | 42 | ||
... | @@ -77,7 +77,8 @@ RES_SHEET_HEADER = ('页码', '序号', '结果') | ... | @@ -77,7 +77,8 @@ RES_SHEET_HEADER = ('页码', '序号', '结果') |
77 | RES_SUCCESS = '识别成功' | 77 | RES_SUCCESS = '识别成功' |
78 | RES_SUCCESS_OTHER = '识别成功(其他类)' | 78 | RES_SUCCESS_OTHER = '识别成功(其他类)' |
79 | RES_SUCCESS_EMPTY = '识别成功(空数据)' | 79 | RES_SUCCESS_EMPTY = '识别成功(空数据)' |
80 | RES_FAILED = '识别失败' | 80 | RES_FAILED_1 = '识别失败(阶段1)' |
81 | RES_FAILED_2 = '识别失败(阶段2)' | ||
81 | 82 | ||
82 | CARD_RATIO = 0.9 | 83 | CARD_RATIO = 0.9 |
83 | UNKNOWN_CARD = '未知卡号' | 84 | UNKNOWN_CARD = '未知卡号' | ... | ... |
1 | import os | ||
2 | import time | ||
3 | import json | ||
4 | import base64 | ||
5 | import signal | ||
6 | import asyncio | ||
7 | import aiohttp | ||
8 | import difflib | ||
9 | import requests | ||
10 | from collections import Counter | ||
11 | from datetime import datetime, date | ||
12 | from django.core.management import BaseCommand | ||
13 | from multiprocessing import Process, Queue, Manager, Lock | ||
14 | |||
15 | from settings import conf | ||
16 | from common.mixins import LoggerMixin | ||
17 | from common.tools.file_tools import write_zip_file | ||
18 | from common.tools.pdf_to_img import PDFHandler | ||
19 | from apps.doc import consts | ||
20 | from apps.doc.ocr.edms import EDMS, rh | ||
21 | from apps.doc.named_enum import KeywordsType | ||
22 | from apps.doc.exceptions import EDMSException, OCR1Exception, OCR2Exception | ||
23 | from apps.doc.ocr.wb import BSWorkbook, Workbook | ||
24 | from apps.doc.models import DocStatus, HILDoc, AFCDoc, Keywords | ||
25 | |||
26 | |||
27 | class Command(BaseCommand, LoggerMixin): | ||
28 | |||
29 | def __init__(self): | ||
30 | super().__init__() | ||
31 | self.log_base = '[doc ocr process]' | ||
32 | # 处理文件开关 | ||
33 | self.switch = True | ||
34 | # 睡眠时间 | ||
35 | self.sleep_time = conf.SLEEP_SECOND | ||
36 | # 数据目录 | ||
37 | self.data_dir = conf.DATA_DIR | ||
38 | # ocr相关 | ||
39 | self.ocr_1_urls = conf.get_namespace('OCR_URL_1_') | ||
40 | self.ocr_url_2 = conf.OCR_URL_2 | ||
41 | self.ocr_url_3 = conf.BC_URL | ||
42 | # EDMS web_service_api | ||
43 | self.edms = EDMS() | ||
44 | # 优雅退出信号:15 | ||
45 | signal.signal(signal.SIGTERM, self.signal_handler) | ||
46 | |||
47 | def signal_handler(self, sig, frame): | ||
48 | self.switch = False # 停止处理文件 | ||
49 | |||
50 | @staticmethod | ||
51 | def get_doc_object(task_str): | ||
52 | business_type, doc_id_str = task_str.split(consts.SPLIT_STR) | ||
53 | doc_id = int(doc_id_str) | ||
54 | doc_class = HILDoc if business_type == consts.HIL_PREFIX else AFCDoc | ||
55 | # doc_info = doc_class.objects.filter(id=doc_id, status=DocStatus.INIT.value).values( | ||
56 | # 'id', 'metadata_version_id', 'application_id', 'document_name', 'document_scheme').first() | ||
57 | doc = doc_class.objects.filter(id=doc_id).first() | ||
58 | return doc, business_type | ||
59 | |||
60 | def get_doc_info(self): | ||
61 | task_str, is_priority = rh.dequeue() | ||
62 | if task_str is None: | ||
63 | self.cronjob_log.info('{0} [get_doc_info] [queue empty]'.format(self.log_base)) | ||
64 | return None, None, None | ||
65 | |||
66 | doc, business_type = self.get_doc_object(task_str) | ||
67 | |||
68 | if doc is None: | ||
69 | self.cronjob_log.warn('{0} [get_doc_info] [doc not exist] [task_str={1}] [is_priority={2}]'.format( | ||
70 | self.log_base, task_str, is_priority)) | ||
71 | return None, None, None | ||
72 | elif doc.status != DocStatus.INIT.value: | ||
73 | self.cronjob_log.warn('{0} [get_doc_info] [doc status error] [task_str={1}] [is_priority={2}] ' | ||
74 | '[doc_status={3}]'.format(self.log_base, task_str, is_priority, doc.status)) | ||
75 | return None, None, None | ||
76 | doc.status = DocStatus.PROCESSING.value # TODO update_time --> start_time | ||
77 | doc.save() | ||
78 | self.cronjob_log.info('{0} [get_doc_info] [success] [task_str={1}] [is_priority={2}]'.format( | ||
79 | self.log_base, task_str, is_priority)) | ||
80 | return doc, business_type, task_str | ||
81 | |||
82 | def pdf_download(self, doc, pdf_path): | ||
83 | if not doc.application_id.startswith(consts.FIXED_APPLICATION_ID_PREFIX): | ||
84 | for times in range(consts.RETRY_TIMES): | ||
85 | try: | ||
86 | self.edms.download(pdf_path, doc.metadata_version_id) | ||
87 | except Exception as e: | ||
88 | self.cronjob_log.warn('{0} [edms download failed] [times={1}] [pdf_path={2}] ' | ||
89 | '[error={3}]'.format(self.log_base, times, pdf_path, e)) | ||
90 | edms_exc = str(e) | ||
91 | else: | ||
92 | break | ||
93 | else: | ||
94 | raise EDMSException(edms_exc) | ||
95 | self.cronjob_log.info('{0} [edms download success] [pdf_path={1}]'.format(self.log_base, pdf_path)) | ||
96 | |||
97 | def bs_process(self, wb, ocr_data, bs_summary, unknown_summary, classify, res_list, pno, ino): | ||
98 | sheets = ocr_data.get('data', []) | ||
99 | if not sheets: | ||
100 | res_list.append((pno, ino, consts.RES_SUCCESS_EMPTY)) | ||
101 | return | ||
102 | confidence = ocr_data.get('confidence', 1) | ||
103 | img_name = 'page_{0}_img_{1}'.format(pno, ino) | ||
104 | cells_exists = False | ||
105 | for i, sheet in enumerate(sheets): | ||
106 | cells = sheet.get('cells') | ||
107 | if not cells: | ||
108 | continue | ||
109 | cells_exists = True | ||
110 | sheet_name = '{0}_{1}'.format(img_name, i) | ||
111 | ws = wb.create_sheet(sheet_name) | ||
112 | for cell in cells: | ||
113 | c1 = cell.get('start_column') | ||
114 | r1 = cell.get('start_row') | ||
115 | words = cell.get('words') | ||
116 | ws.cell(row=r1 + 1, column=c1 + 1, value=words) | ||
117 | |||
118 | # ['户名', '卡号', '页码', '回单验证码', '打印时间', '起始时间', '终止时间'] | ||
119 | summary = sheet.get('summary') | ||
120 | card = summary[1] | ||
121 | if card is None: | ||
122 | classify_dict = unknown_summary.setdefault(classify, {}) | ||
123 | role = consts.UNKNOWN_ROLE if summary[0] is None else summary[0] | ||
124 | role_dict = classify_dict.setdefault(role, {}) | ||
125 | role_dict['classify'] = classify | ||
126 | role_dict['role'] = role | ||
127 | role_dict.setdefault('sheet', []).append(sheet_name) | ||
128 | role_dict.setdefault('confidence', []).append(confidence) | ||
129 | code_list = role_dict.setdefault('code', []) | ||
130 | pt_list = role_dict.setdefault('print_time', []) | ||
131 | sd_list = role_dict.setdefault('start_date', []) | ||
132 | ed_list = role_dict.setdefault('end_date', []) | ||
133 | if summary[3] is not None: | ||
134 | code_list.append((summary[2], summary[3])) | ||
135 | if summary[4] is not None: | ||
136 | pt_list.append(summary[4]) | ||
137 | if summary[5] is not None: | ||
138 | sd_list.append(summary[5]) | ||
139 | if summary[6] is not None: | ||
140 | ed_list.append(summary[6]) | ||
141 | else: | ||
142 | card_dict = bs_summary.setdefault(card, {}) | ||
143 | card_dict['count'] = card_dict.get('count', 0) + 1 | ||
144 | card_dict.setdefault('classify', []).append(classify) | ||
145 | card_dict.setdefault('confidence', []).append(confidence) | ||
146 | card_dict.setdefault('sheet', []).append(sheet_name) | ||
147 | role_list = card_dict.setdefault('role', []) | ||
148 | role_set = card_dict.setdefault('role_set', set()) | ||
149 | code_list = card_dict.setdefault('code', []) | ||
150 | pt_list = card_dict.setdefault('print_time', []) | ||
151 | sd_list = card_dict.setdefault('start_date', []) | ||
152 | ed_list = card_dict.setdefault('end_date', []) | ||
153 | if summary[0] is not None: | ||
154 | role_list.append(summary[0]) | ||
155 | role_set.add(summary[0]) | ||
156 | if summary[3] is not None: | ||
157 | code_list.append((summary[2], summary[3])) | ||
158 | if summary[4] is not None: | ||
159 | pt_list.append(summary[4]) | ||
160 | if summary[5] is not None: | ||
161 | sd_list.append(summary[5]) | ||
162 | if summary[6] is not None: | ||
163 | ed_list.append(summary[6]) | ||
164 | |||
165 | if cells_exists: | ||
166 | res_list.append((pno, ino, consts.RES_SUCCESS)) | ||
167 | else: | ||
168 | res_list.append((pno, ino, consts.RES_SUCCESS_EMPTY)) | ||
169 | |||
170 | def license1_process(self, ocr_data, license_summary, classify, res_list, pno, ino): | ||
171 | # 类别:'0'身份证, '1'居住证 | ||
172 | license_data = ocr_data.get('data', []) | ||
173 | if not license_data: | ||
174 | res_list.append((pno, ino, consts.RES_SUCCESS_EMPTY)) | ||
175 | return | ||
176 | res_list.append((pno, ino, consts.RES_SUCCESS)) | ||
177 | license_summary.setdefault(classify, []).extend(license_data) | ||
178 | |||
179 | def license2_process(self, ocr_res_2, license_summary, pid, classify, res_list, pno, ino): | ||
180 | if ocr_res_2.get('ErrorCode') in consts.SUCCESS_CODE_SET: | ||
181 | res_list.append((pno, ino, consts.RES_SUCCESS)) | ||
182 | if pid == consts.BC_PID: | ||
183 | # 银行卡 | ||
184 | # res_dict = {} | ||
185 | # for en_key, chn_key in consts.BC_FIELD: | ||
186 | # res_dict[chn_key] = ocr_res_2.get(en_key, '') | ||
187 | license_summary.setdefault(classify, []).append(ocr_res_2) | ||
188 | else: | ||
189 | # 营业执照等 | ||
190 | for result_dict in ocr_res_2.get('ResultList', []): | ||
191 | res_dict = {} | ||
192 | for field_dict in result_dict.get('FieldList', []): | ||
193 | res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '') | ||
194 | license_summary.setdefault(classify, []).append(res_dict) | ||
195 | else: | ||
196 | res_list.append((pno, ino, consts.RES_FAILED_2)) | ||
197 | |||
198 | @staticmethod | ||
199 | def parse_img_path(img_path): | ||
200 | img_name, _ = os.path.splitext(os.path.basename(img_path)) | ||
201 | part_list = img_name.split('_') | ||
202 | # page_7_img_11_0 | ||
203 | return int(part_list[1])+1, int(part_list[3])+1 | ||
204 | |||
205 | @staticmethod | ||
206 | def get_most(value_list): | ||
207 | if value_list: | ||
208 | most_common = Counter(value_list).most_common(1) | ||
209 | return most_common[0][0] if most_common else None | ||
210 | |||
211 | @staticmethod | ||
212 | def date_format(date_str, format_str): | ||
213 | try: | ||
214 | date_res = datetime.strptime(date_str, format_str).date() | ||
215 | except Exception as e: | ||
216 | return | ||
217 | else: | ||
218 | return date_res | ||
219 | |||
220 | def get_validate_date(self, date_list): | ||
221 | for date_str in date_list: | ||
222 | for format_str in consts.DATE_FORMAT: | ||
223 | date_res = self.date_format(date_str, format_str) | ||
224 | if isinstance(date_res, date): | ||
225 | return date_res | ||
226 | |||
227 | def merge_card(self, bs_summary): | ||
228 | merged_bs_summary = {} | ||
229 | sorted_card = sorted(bs_summary.keys(), key=lambda x: bs_summary[x]['count'], reverse=True) | ||
230 | for main_card in sorted_card: | ||
231 | if bs_summary.get(main_card) is None: | ||
232 | continue | ||
233 | merged_bs_summary[main_card] = bs_summary.pop(main_card) | ||
234 | del merged_bs_summary[main_card]['count'] | ||
235 | merge_cards = [] | ||
236 | for card in bs_summary.keys(): | ||
237 | if difflib.SequenceMatcher(None, main_card, card).quick_ratio() > consts.CARD_RATIO: | ||
238 | merged_bs_summary[main_card]['classify'].extend(bs_summary[card]['classify']) | ||
239 | merged_bs_summary[main_card]['confidence'].extend(bs_summary[card]['confidence']) | ||
240 | merged_bs_summary[main_card]['sheet'].extend(bs_summary[card]['sheet']) | ||
241 | merged_bs_summary[main_card]['role'].extend(bs_summary[card]['role']) | ||
242 | merged_bs_summary[main_card]['role_set'].update(bs_summary[card]['role_set']) | ||
243 | merged_bs_summary[main_card]['code'].extend(bs_summary[card]['code']) | ||
244 | merged_bs_summary[main_card]['print_time'].extend(bs_summary[card]['print_time']) | ||
245 | merged_bs_summary[main_card]['start_date'].extend(bs_summary[card]['start_date']) | ||
246 | merged_bs_summary[main_card]['end_date'].extend(bs_summary[card]['end_date']) | ||
247 | merge_cards.append(card) | ||
248 | for card in merge_cards: | ||
249 | del bs_summary[card] | ||
250 | merged_bs_summary[main_card]['classify'] = self.get_most(merged_bs_summary[main_card]['classify']) | ||
251 | merged_bs_summary[main_card]['role'] = self.get_most(merged_bs_summary[main_card]['role']) | ||
252 | del bs_summary | ||
253 | return merged_bs_summary | ||
254 | |||
255 | def prune_bs_summary(self, bs_summary): | ||
256 | for summary in bs_summary.values(): | ||
257 | del summary['count'] | ||
258 | summary['classify'] = self.get_most(summary['classify']) | ||
259 | summary['role'] = self.get_most(summary['role']) | ||
260 | return bs_summary | ||
261 | |||
262 | def rebuild_bs_summary(self, bs_summary, unknown_summary): | ||
263 | # bs_summary = { | ||
264 | # '卡号': { | ||
265 | # 'count': 100, | ||
266 | # 'classify': [], | ||
267 | # 'confidence': [], | ||
268 | # 'role': [], | ||
269 | # 'code': [('page', 'code')], | ||
270 | # 'print_time': [], | ||
271 | # 'start_date': [], | ||
272 | # 'end_date': [], | ||
273 | # 'sheet': ['sheet_name'] | ||
274 | # } | ||
275 | # } | ||
276 | # | ||
277 | # unknown_summary = { | ||
278 | # 0: { | ||
279 | # '户名': { | ||
280 | # 'classify': 0, | ||
281 | # 'confidence': [], | ||
282 | # 'role': '户名', | ||
283 | # 'code': [('page', 'code')], | ||
284 | # 'print_time': [], | ||
285 | # 'start_date': [], | ||
286 | # 'end_date': [], | ||
287 | # 'sheet': ['sheet_name'] | ||
288 | # } | ||
289 | # } | ||
290 | # } | ||
291 | # 无卡号 | ||
292 | if len(bs_summary) == 0: | ||
293 | del bs_summary | ||
294 | merged_bs_summary = {} | ||
295 | card_num = 1 | ||
296 | for role_dict in unknown_summary.values(): | ||
297 | if len(role_dict) == 2 and consts.UNKNOWN_ROLE in role_dict: | ||
298 | summary_dict = role_dict.pop(consts.UNKNOWN_ROLE, {}) | ||
299 | for summary in role_dict.values(): | ||
300 | summary_dict['confidence'].extend(summary['confidence']) | ||
301 | summary_dict['role'] = summary['role'] | ||
302 | summary_dict['code'].extend(summary['code']) | ||
303 | summary_dict['print_time'].extend(summary['print_time']) | ||
304 | summary_dict['start_date'].extend(summary['start_date']) | ||
305 | summary_dict['end_date'].extend(summary['end_date']) | ||
306 | summary_dict['sheet'].extend(summary['sheet']) | ||
307 | card = '{0}_{1}'.format(consts.UNKNOWN_CARD, card_num) | ||
308 | merged_bs_summary[card] = summary_dict | ||
309 | else: | ||
310 | for summary in role_dict.values(): | ||
311 | card = '{0}_{1}'.format(consts.UNKNOWN_CARD, card_num) | ||
312 | card_num += 1 | ||
313 | merged_bs_summary[card] = summary | ||
314 | else: | ||
315 | # 1卡号 | ||
316 | one_card = False | ||
317 | if len(bs_summary) == 1: | ||
318 | merged_bs_summary = self.prune_bs_summary(bs_summary) | ||
319 | one_card = True | ||
320 | # 多卡号 | ||
321 | else: | ||
322 | merged_bs_summary = self.merge_card(bs_summary) | ||
323 | |||
324 | for card_summary in merged_bs_summary.values(): | ||
325 | merge_role = [] | ||
326 | classify_summary = unknown_summary.get(card_summary['classify'], {}) | ||
327 | for role, summary in classify_summary.items(): | ||
328 | if one_card or role in card_summary['role_set']: | ||
329 | merge_role.append(role) | ||
330 | card_summary['confidence'].extend(summary['confidence']) | ||
331 | card_summary['sheet'].extend(summary['sheet']) | ||
332 | card_summary['code'].extend(summary['code']) | ||
333 | card_summary['print_time'].extend(summary['print_time']) | ||
334 | card_summary['start_date'].extend(summary['start_date']) | ||
335 | card_summary['end_date'].extend(summary['end_date']) | ||
336 | |||
337 | for role in merge_role: | ||
338 | del classify_summary[role] | ||
339 | |||
340 | card_num = 1 | ||
341 | for role_dict in unknown_summary.values(): | ||
342 | for summary in role_dict.values(): | ||
343 | card = '{0}_{1}'.format(consts.UNKNOWN_CARD, card_num) | ||
344 | card_num += 1 | ||
345 | merged_bs_summary[card] = summary | ||
346 | |||
347 | del unknown_summary | ||
348 | for summary in merged_bs_summary.values(): | ||
349 | if summary.get('role_set') is not None: | ||
350 | del summary['role_set'] | ||
351 | summary['print_time'] = self.get_validate_date(summary['print_time']) | ||
352 | summary['start_date'] = self.get_validate_date(summary['start_date']) | ||
353 | summary['end_date'] = self.get_validate_date(summary['end_date']) | ||
354 | summary['confidence'] = max(summary['confidence']) | ||
355 | return merged_bs_summary | ||
356 | |||
357 | def pdf_2_img_2_queue(self, img_queue, todo_count_dict, lock): | ||
358 | while self.switch: | ||
359 | # 1. 从队列获取文件信息 | ||
360 | doc, business_type, task_str = self.get_doc_info() | ||
361 | # 队列为空时的处理 | ||
362 | if doc is None: | ||
363 | time.sleep(self.sleep_time) | ||
364 | continue | ||
365 | |||
366 | try: | ||
367 | # 2. 从EDMS获取PDF文件 | ||
368 | doc_data_path = os.path.join(self.data_dir, business_type, str(doc.id)) | ||
369 | os.makedirs(doc_data_path, exist_ok=True) | ||
370 | pdf_path = os.path.join(doc_data_path, '{0}.pdf'.format(doc.id)) | ||
371 | img_save_path = os.path.join(doc_data_path, 'img') | ||
372 | self.pdf_download(doc, pdf_path) | ||
373 | |||
374 | # 3.PDF文件提取图片 | ||
375 | self.cronjob_log.info('{0} [pdf to img start] [task={1}]'.format(self.log_base, task_str)) | ||
376 | pdf_handler = PDFHandler(pdf_path, img_save_path) | ||
377 | pdf_handler.extract_image() | ||
378 | self.cronjob_log.info('{0} [pdf to img end] [task={1}]'.format(self.log_base, task_str)) | ||
379 | |||
380 | with lock: | ||
381 | todo_count_dict[task_str] = len(pdf_handler.img_path_list) | ||
382 | for img_path in pdf_handler.img_path_list: | ||
383 | img_queue.put(img_path) # TODO 队列控制 | ||
384 | except EDMSException as e: | ||
385 | doc.status = DocStatus.PROCESS_FAILED.value | ||
386 | doc.save() | ||
387 | self.cronjob_log.error('{0} [process failed (edms download)] [task={1}] [err={2}]'.format( | ||
388 | self.log_base, task_str, e)) | ||
389 | except Exception as e: | ||
390 | doc.status = DocStatus.PROCESS_FAILED.value | ||
391 | doc.save() | ||
392 | self.cronjob_log.error('{0} [process failed (pdf to img)] [task={1}] [err={2}]'.format( | ||
393 | self.log_base, task_str, e)) | ||
394 | |||
395 | def img_2_ocr_1(self, img_queue, todo_count_dict, res_dict, finish_queue, lock, url): | ||
396 | while True: | ||
397 | try: | ||
398 | img_path = img_queue.get(block=False) | ||
399 | except Exception as e: | ||
400 | self.cronjob_log.info('{0} [img_2_ocr_1] [queue empty]'.format(self.log_base)) | ||
401 | time.sleep(0.5) | ||
402 | continue | ||
403 | else: | ||
404 | self.cronjob_log.info('{0} [img_2_ocr_1] [get img] [img_path={1}]'.format(self.log_base, img_path)) | ||
405 | with open(img_path, 'rb') as f: | ||
406 | base64_data = base64.b64encode(f.read()) | ||
407 | # 获取解码后的base64值 | ||
408 | file_data = base64_data.decode() | ||
409 | json_data_1 = { | ||
410 | "file": file_data | ||
411 | } | ||
412 | |||
413 | for times in range(consts.RETRY_TIMES): | ||
414 | try: | ||
415 | start_time = time.time() | ||
416 | ocr_1_response = requests.post(url, json=json_data_1) | ||
417 | if ocr_1_response.status_code != 200: | ||
418 | raise OCR1Exception('ocr_1 status code: {0}'.format(ocr_1_response.status_code)) | ||
419 | except Exception as e: | ||
420 | self.cronjob_log.warn('{0} [ocr_1 failed] [times={1}] [img_path={2}] [error={3}]'.format( | ||
421 | self.log_base, times, img_path, e)) | ||
422 | else: | ||
423 | ocr_1_res = ocr_1_response.json() | ||
424 | end_time = time.time() | ||
425 | speed_time = int(end_time - start_time) | ||
426 | self.cronjob_log.info('{0} [ocr_1 success] [img={1}] [res={2}] [speed_time={3}]'.format( | ||
427 | self.log_base, img_path, ocr_1_res, speed_time)) | ||
428 | break | ||
429 | else: | ||
430 | ocr_1_res = {} | ||
431 | self.cronjob_log.warn('{0} [ocr_1 failed] [img_path={1}]'.format(self.log_base, img_path)) | ||
432 | # continue | ||
433 | |||
434 | del json_data_1 | ||
435 | # /data/bmw-ocr-data/AFC/6/img/page_0_img_0.jpeg | ||
436 | # AFC_2 | ||
437 | path_split = img_path.split('/') | ||
438 | task_str = consts.SPLIT_STR.join((path_split[-4], path_split[-3])) | ||
439 | |||
440 | with lock: | ||
441 | doc_res_dict = res_dict.setdefault(task_str, {}) | ||
442 | doc_res_dict[os.path.basename(img_path)] = ocr_1_res | ||
443 | res_dict[task_str] = doc_res_dict | ||
444 | todo_count = todo_count_dict.get(task_str) | ||
445 | if todo_count == 1: | ||
446 | finish_queue.put(task_str) | ||
447 | del todo_count_dict[task_str] | ||
448 | else: | ||
449 | todo_count_dict[task_str] = todo_count - 1 | ||
450 | |||
451 | def res_2_wb(self, res_dict, finish_queue, lock): | ||
452 | while True: | ||
453 | try: | ||
454 | task_str = finish_queue.get(block=False) | ||
455 | except Exception as e: | ||
456 | self.cronjob_log.info('{0} [res_2_wb] [queue empty]'.format(self.log_base)) | ||
457 | time.sleep(0.5) | ||
458 | continue | ||
459 | else: | ||
460 | self.cronjob_log.info('{0} [res_2_wb] [get task] [task={1}]'.format(self.log_base, task_str)) | ||
461 | ocr_1_res = res_dict.get(task_str, {}) | ||
462 | self.cronjob_log.info('{0} [res_2_wb] [get task res] [task={1}] [res={2}]'.format( | ||
463 | self.log_base, task_str, ocr_1_res)) | ||
464 | |||
465 | try: | ||
466 | # 4.OCR结果并且构建excel文件 | ||
467 | bs_summary = {} | ||
468 | license_summary = {} | ||
469 | unknown_summary = {} | ||
470 | res_list = [] | ||
471 | interest_keyword = Keywords.objects.filter( | ||
472 | type=KeywordsType.INTEREST.value, on_off=True).values_list('keyword', flat=True) | ||
473 | salary_keyword = Keywords.objects.filter( | ||
474 | type=KeywordsType.SALARY.value, on_off=True).values_list('keyword', flat=True) | ||
475 | loan_keyword = Keywords.objects.filter( | ||
476 | type__in=[KeywordsType.LOAN.value, KeywordsType.ALI_WECHART.value], on_off=True).values_list( | ||
477 | 'keyword', flat=True) | ||
478 | wb = BSWorkbook(interest_keyword, salary_keyword, loan_keyword) | ||
479 | for img_path, res in ocr_1_res.items(): | ||
480 | pno, ino = self.parse_img_path(img_path) | ||
481 | if res.get('code') == 1: | ||
482 | ocr_data = res.get('data', {}) | ||
483 | classify = ocr_data.get('classify') | ||
484 | if classify is None: | ||
485 | res_list.append((pno, ino, consts.RES_FAILED_1)) | ||
486 | self.cronjob_log.info('{0} [ocr_1 res error] [img={1}] [res={2}]'.format( | ||
487 | self.log_base, img_path, res)) | ||
488 | continue | ||
489 | elif classify in consts.OTHER_CLASSIFY_SET: # 其他类 | ||
490 | res_list.append((pno, ino, consts.RES_SUCCESS_OTHER)) | ||
491 | continue | ||
492 | elif classify in consts.LICENSE_CLASSIFY_SET_1: # 证件1 | ||
493 | self.license1_process(ocr_data, license_summary, classify, res_list, pno, ino) | ||
494 | elif classify in consts.LICENSE_CLASSIFY_SET_2: # 证件2 | ||
495 | pid, _, _, _, _ = consts.LICENSE_CLASSIFY_MAPPING.get(classify) | ||
496 | with open(img_path, 'rb') as f: | ||
497 | base64_data = base64.b64encode(f.read()) | ||
498 | # 获取解码后的base64值 | ||
499 | file_data = base64_data.decode() | ||
500 | json_data_2 = { | ||
501 | "pid": str(pid), | ||
502 | "filedata": file_data | ||
503 | } | ||
504 | |||
505 | for times in range(consts.RETRY_TIMES): | ||
506 | try: | ||
507 | start_time = time.time() | ||
508 | ocr_2_response = requests.post(self.ocr_url_2, data=json_data_2) | ||
509 | if ocr_2_response.status_code != 200: | ||
510 | raise OCR2Exception('ocr_2 status code: {0}'.format(ocr_2_response.status_code)) | ||
511 | except Exception as e: | ||
512 | self.cronjob_log.warn( | ||
513 | '{0} [ocr_2 failed] [times={1}] [img_path={2}] [error={3}]'.format( | ||
514 | self.log_base, times, img_path, e)) | ||
515 | else: | ||
516 | ocr_2_res = json.loads(ocr_2_response.text) | ||
517 | end_time = time.time() | ||
518 | speed_time = int(end_time - start_time) | ||
519 | self.cronjob_log.info( | ||
520 | '{0} [ocr_2 success] [img={1}] [res={2}] [speed_time={3}]'.format( | ||
521 | self.log_base, img_path, ocr_2_res, speed_time)) | ||
522 | |||
523 | if classify == consts.BC_CLASSIFY: | ||
524 | name = '有' | ||
525 | json_data_3 = { | ||
526 | "file": file_data, | ||
527 | 'card_res': ocr_2_res | ||
528 | } | ||
529 | card_name_response = requests.post(self.ocr_url_3, json_data_3) | ||
530 | if card_name_response.status_code == 200: | ||
531 | card_name_res = card_name_response.json() | ||
532 | if isinstance(card_name_res, dict) and \ | ||
533 | card_name_res.get('data', {}).get('is_exists_name') == 0: | ||
534 | name = '无' | ||
535 | ocr_2_res['Name'] = name | ||
536 | self.license2_process(ocr_2_res, license_summary, pid, classify, res_list, pno, ino) | ||
537 | break | ||
538 | else: | ||
539 | res_list.append((pno, ino, consts.RES_FAILED_2)) | ||
540 | self.cronjob_log.warn( | ||
541 | '{0} [ocr_2 failed] [img_path={1}]'.format(self.log_base, img_path)) | ||
542 | else: # 流水处理 | ||
543 | self.bs_process(wb, ocr_data, bs_summary, unknown_summary, classify, res_list, pno, ino) | ||
544 | else: | ||
545 | res_list.append((pno, ino, consts.RES_FAILED_1)) | ||
546 | self.cronjob_log.info('{0} [ocr_1 res error] [img={1}] [res={2}]'.format( | ||
547 | self.log_base, img_path, res)) | ||
548 | |||
549 | with lock: | ||
550 | del res_dict[task_str] | ||
551 | self.cronjob_log.info('{0} [res_dict record] [res_dict={1}]'.format( | ||
552 | self.log_base, res_dict)) | ||
553 | |||
554 | self.cronjob_log.info('{0} [task={1}] [bs_summary={2}] [unknown_summary={3}] ' | ||
555 | '[license_summary={4}]'.format(self.log_base, task_str, bs_summary, | ||
556 | unknown_summary, license_summary)) | ||
557 | |||
558 | merged_bs_summary = self.rebuild_bs_summary(bs_summary, unknown_summary) | ||
559 | |||
560 | self.cronjob_log.info('{0} [task={1}] [merged_bs_summary={2}] [unknown_summary={3}] ' | ||
561 | '[res_list={4}]'.format(self.log_base, task_str, merged_bs_summary, | ||
562 | unknown_summary, res_list)) | ||
563 | del unknown_summary | ||
564 | |||
565 | # 4.2 重构Excel文件 | ||
566 | doc, business_type = self.get_doc_object(task_str) | ||
567 | doc_data_path = os.path.join(self.data_dir, business_type, str(doc.id)) | ||
568 | excel_path = os.path.join(doc_data_path, '{0}.xlsx'.format(doc.id)) | ||
569 | img_save_path = os.path.join(doc_data_path, 'img') | ||
570 | # wb.save(src_excel_path) | ||
571 | wb.rebuild(merged_bs_summary, license_summary, res_list, doc.document_scheme) | ||
572 | wb.save(excel_path) | ||
573 | except Exception as e: | ||
574 | with lock: | ||
575 | if task_str in res_dict: | ||
576 | del res_dict[task_str] | ||
577 | doc, _ = self.get_doc_object(task_str) | ||
578 | doc.status = DocStatus.PROCESS_FAILED.value | ||
579 | doc.save() # TODO end_time | ||
580 | self.cronjob_log.error('{0} [process failed (res to wb)] [task={1}] [err={2}]'.format( | ||
581 | self.log_base, task_str, e)) | ||
582 | else: | ||
583 | try: | ||
584 | # 5.上传至EDMS | ||
585 | for times in range(consts.RETRY_TIMES): | ||
586 | try: | ||
587 | self.edms.upload(excel_path, doc, business_type) | ||
588 | except Exception as e: | ||
589 | self.cronjob_log.warn( | ||
590 | '{0} [edms upload failed] [times={1}] [task={2}] [error={3}]'.format( | ||
591 | self.log_base, times, task_str, e)) | ||
592 | edms_exc = str(e) | ||
593 | else: | ||
594 | break | ||
595 | else: | ||
596 | raise EDMSException(edms_exc) | ||
597 | except Exception as e: | ||
598 | doc.status = DocStatus.UPLOAD_FAILED.value # TODO end_time | ||
599 | doc.save() | ||
600 | self.cronjob_log.error('{0} [process failed (edms upload)] [task={1}] [err={2}]'.format( | ||
601 | self.log_base, task_str, e)) | ||
602 | write_zip_file(img_save_path, os.path.join(doc_data_path, '{0}_img.zip'.format(doc.id))) | ||
603 | |||
604 | else: | ||
605 | doc.status = DocStatus.COMPLETE.value | ||
606 | doc.save() # TODO end_time | ||
607 | self.cronjob_log.info('{0} [process complete] [task={1}]'.format(self.log_base, task_str)) | ||
608 | write_zip_file(img_save_path, os.path.join(doc_data_path, '{0}_img.zip'.format(doc.id))) | ||
609 | |||
610 | # TODO 细化文件状态,不同异常状态,归还队列,重试时采取不同的处理 | ||
611 | # TODO 异常邮件通知 | ||
612 | # 识别失败:普通异常,如PDF异常、构建过程异常 | ||
613 | # EDMS异常:下载异常-->回队列-->邮件;上传异常-->重新上传队列-->邮件 | ||
614 | # 算法异常:第一道异常-->识别失败-->邮件;第二道异常-->识别失败-->邮件 | ||
615 | # TODO OCR接口调用重试 | ||
616 | def handle(self, *args, **kwargs): | ||
617 | lock = Lock() | ||
618 | with Manager() as manager: | ||
619 | todo_count_dict = manager.dict() | ||
620 | res_dict = manager.dict() | ||
621 | img_queue = Queue() | ||
622 | finish_queue = Queue() | ||
623 | |||
624 | process_list = [] | ||
625 | pdf_process = Process(target=self.pdf_2_img_2_queue, args=(img_queue, todo_count_dict, lock)) | ||
626 | process_list.append(pdf_process) | ||
627 | |||
628 | for url in self.ocr_1_urls.values(): | ||
629 | ocr_1_process = Process(target=self.img_2_ocr_1, args=( | ||
630 | img_queue, todo_count_dict, res_dict, finish_queue, lock, url)) | ||
631 | process_list.append(ocr_1_process) | ||
632 | |||
633 | wb_process = Process(target=self.res_2_wb, args=(res_dict, finish_queue, lock)) | ||
634 | process_list.append(wb_process) | ||
635 | |||
636 | for p in process_list: | ||
637 | p.start() | ||
638 | p.join() | ||
639 | |||
640 | self.cronjob_log.info('{0} [stop safely]'.format(self.log_base)) |
-
Please register or sign in to post a comment