import os import time import json import shutil import base64 import signal import requests import traceback from django.core.management import BaseCommand from multiprocessing import Process from openpyxl import load_workbook, Workbook from settings import conf from common.mixins import LoggerMixin from common.tools.pdf_to_img import PDFHandler, PDFBuild from apps.doc import consts from apps.doc.exceptions import OCR1Exception, OCR2Exception class Command(BaseCommand, LoggerMixin): def __init__(self): super().__init__() self.log_base = '[folder f3 process]' # input folder self.input_dir = conf.F3_DIR # 处理文件开关 self.switch = True # 睡眠时间 self.sleep_time = float(conf.SLEEP_SECOND_FOLDER) # 输出结果 self.wb_name = 'result.xlsx' self.field_map = { # sheet_name, key_field, side_field_order, src_field_order consts.IC_CLASSIFY: (consts.IC_CN_NAME, '有效期限', consts.IC_FIELD_ORDER_3, consts.IC_FIELD_ORDER_2), } self.pdf_name_map = { consts.IC_CLASSIFY: consts.IC_CN_NAME, consts.BL_CLASSIFY: consts.BL_CN_NAME, } # ocr url self.ocr_url = conf.OCR_URL_FOLDER self.ocr_url_2 = conf.OCR2_URL_FOLDER # 优雅退出信号:15 signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, sig, frame): self.switch = False # 停止处理文件 @staticmethod def license1_process(ocr_data, ocr_res, classify): # 类别:'0'身份证, '1'居住证 license_data = ocr_data.get('data') if not license_data: return if isinstance(license_data, dict): license_data.pop('base64_img', '') if classify == consts.IC_CLASSIFY: id_card_dict = {} card_type = license_data.get('type', '') is_ic = card_type.startswith('身份证') is_info_side = card_type.endswith('信息面') id_card_dict['类别'] = '0' if is_ic else '1' if is_ic: field_map = consts.IC_MAP_0 if is_info_side else consts.IC_MAP_1 else: field_map = consts.RP_MAP_0 if is_info_side else consts.RP_MAP_1 for write_field, search_field in field_map: id_card_dict[write_field] = license_data.get('words_result', {}).get(search_field, {}).get('words', '') if not is_info_side: start_time = license_data.get('words_result', {}).get('签发日期', {}).get('words', '') end_time = license_data.get('words_result', {}).get('失效日期', {}).get('words', '') id_card_dict['有效期限'] = '{0}-{1}'.format(start_time, end_time) # for id_card_dict in license_data: # try: # id_card_dict.pop('base64_img') # except Exception as e: # continue ocr_res.setdefault(classify, []).append(id_card_dict) def license2_process(self, ocr_data, ocr_res, classify, img_path): if classify != consts.BL_CLASSIFY: return pid, _, _, _, _, _ = consts.LICENSE_CLASSIFY_MAPPING.get(classify) file_data = ocr_data.get('section_img') if file_data is None: with open(img_path, 'rb') as f: base64_data = base64.b64encode(f.read()) # 获取解码后的base64值 file_data = base64_data.decode() json_data_2 = { "pid": str(pid), "filedata": file_data } for times in range(consts.RETRY_TIMES): try: start_time = time.time() ocr_2_response = requests.post(self.ocr_url_2, data=json_data_2) if ocr_2_response.status_code != 200: raise OCR2Exception('ocr_2 status code: {0}'.format(ocr_2_response.status_code)) except Exception as e: self.folder_log.warn( '{0} [ocr_2 failed] [times={1}] [img_path={2}] [error={3}]'.format( self.log_base, times, img_path, traceback.format_exc())) else: ocr_res_2 = json.loads(ocr_2_response.text) end_time = time.time() speed_time = int(end_time - start_time) self.folder_log.info( '{0} [ocr_2 success] [img={1}] [speed_time={2}]'.format( self.log_base, img_path, speed_time)) if ocr_res_2.get('ErrorCode') in consts.SUCCESS_CODE_SET: if pid == consts.BC_PID: ocr_res.append(ocr_res_2) else: # 营业执照等 for result_dict in ocr_res_2.get('ResultList', []): res_dict = {} for field_dict in result_dict.get('FieldList', []): res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '') ocr_res.append(res_dict) break def wb_process(self, ocr_res, output_dir): excel_path = os.path.join(output_dir, self.wb_name) try: if os.path.exists(excel_path): wb = load_workbook(excel_path) else: wb = Workbook() for c, res_list in ocr_res.items(): sheet_name, key_field, side_field_order, src_field_order = self.field_map.get(c) if sheet_name in wb.sheetnames: ws = wb.get_sheet_by_name(sheet_name) else: ws = wb.create_sheet(sheet_name) for res in res_list: if key_field is not None and key_field in res: field_order = side_field_order else: field_order = src_field_order for search_field, write_field in field_order: field_value = res.get(search_field, '') if isinstance(field_value, list): ws.append((write_field, *field_value)) else: ws.append((write_field, field_value)) ws.append((None,)) if 'Sheet' in wb.sheetnames and len(wb.sheetnames) > 1: wb.remove(wb.get_sheet_by_name('Sheet')) wb.save(excel_path) except Exception as e: self.folder_log.error('{0} [wb build error] [path={1}] [error={2}]'.format( self.log_base, excel_path, traceback.format_exc())) def ocr_process(self, img_path, all_ocr_res, img_res): if os.path.exists(img_path): with open(img_path, 'rb') as f: base64_data = base64.b64encode(f.read()) # 获取解码后的base64值 file_data = base64_data.decode() json_data = { "file": file_data, "channel": consts.AFC_PREFIX, } for times in range(consts.RETRY_TIMES): try: start_time = time.time() ocr_response = requests.post(self.ocr_url, json=json_data) if ocr_response.status_code != 200: raise OCR1Exception('{0} ocr status code: {1}'.format(self.log_base, ocr_response.status_code)) except Exception as e: self.folder_log.warn('{0} [ocr failed] [times={1}] [img_path={2}] [error={3}]'.format( self.log_base, times, img_path, traceback.format_exc())) else: ocr_res = ocr_response.json() end_time = time.time() speed_time = int(end_time - start_time) self.folder_log.info('{0} [ocr success] [img={1}] [speed_time={2}]'.format( self.log_base, img_path, speed_time)) if isinstance(ocr_res, dict): if ocr_res.get('code') == 1: data_list = ocr_res.get('data', []) if isinstance(data_list, list): for ocr_data in data_list: classify = ocr_data.get('classify') img_res.setdefault(classify, set()).add(img_path) if classify in consts.LICENSE_CLASSIFY_SET_1: self.license1_process(ocr_data, all_ocr_res, classify) elif classify in consts.LICENSE_CLASSIFY_SET_2: self.license2_process(ocr_data, all_ocr_res, classify, img_path) break else: self.folder_log.warn('{0} [ocr failed] [img_path={1}]'.format(self.log_base, img_path)) def img_process(self, img_res, output_dir): for classify, img_path_set in img_res.items(): pdf_path = os.path.join(output_dir, '{0}.pdf'.format(self.pdf_name_map.get(classify, '其他'))) pdf_build = PDFBuild(pdf_path) pdf_build.insert_img(img_path_set) def images_process(self, img_path_list, output_dir): ocr_res = dict() img_res = dict() for img_path in img_path_list: self.ocr_process(img_path, ocr_res, img_res) self.wb_process(ocr_res, output_dir) self.img_process(img_res, output_dir) def pdf_process(self, name, path, img_output_dir, output_dir): pdf_handler = PDFHandler(path, os.path.join(img_output_dir, name)) try: self.folder_log.info('{0} [pdf to img start] [path={1}]'.format(self.log_base, path)) pdf_handler.extract_image() self.folder_log.info('{0} [pdf to img end] [path={1}]'.format(self.log_base, path)) except Exception as e: self.folder_log.error('{0} [pdf to img error] [path={1}] [error={2}]'.format( self.log_base, path, traceback.format_exc())) raise e else: self.images_process(pdf_handler.img_path_list, output_dir) def folder_process(self, unique_folder_name, folder_path, main_output_dir): output_dir = os.path.join(main_output_dir, unique_folder_name) img_output_dir = os.path.join(output_dir, 'image') failed_output_dir = os.path.join(output_dir, 'failed') os.makedirs(output_dir, exist_ok=True) os.makedirs(img_output_dir, exist_ok=True) os.makedirs(failed_output_dir, exist_ok=True) os_error_filename_set = set() list_dir = os.listdir(folder_path) if not list_dir and len(os_error_filename_set) == 0: self.folder_log.info('{0} [folder empty, completed] [path={1}]'.format(self.log_base, folder_path)) return all_file_set = set(list_dir) true_file_set = all_file_set - os_error_filename_set if len(true_file_set) == 0 and len(os_error_filename_set) > 0: true_file_set.add(os_error_filename_set.pop()) for name in true_file_set: time.sleep(5) path = os.path.join(folder_path, name) try: if not os.path.exists(path): self.folder_log.info('{0} [path is not exists] [path={1}]'.format(self.log_base, path)) continue elif os.path.isfile(path): self.folder_log.info('{0} [file start] [path={1}]'.format(self.log_base, path)) self.pdf_process(name, path, img_output_dir, output_dir) self.folder_log.info('{0} [file end] [path={1}]'.format(self.log_base, path)) else: self.folder_log.info('{0} [path is dir] [path={1}]'.format(self.log_base, path)) failed_path = os.path.join(failed_output_dir, name) shutil.copyfile(path, failed_path) except OSError: os_error_filename_set.add(name) self.folder_log.error('{0} [os error] [path={1}] [error={2}]'.format( self.log_base, path, traceback.format_exc())) except Exception as e: try: self.folder_log.error('{0} [file error] [path={1}] [error={2}]'.format(self.log_base, path, traceback.format_exc())) failed_path = os.path.join(failed_output_dir, name) shutil.copyfile(path, failed_path) except Exception as e: os_error_filename_set.add(name) self.folder_log.error('{0} [file move error] [path={1}] [error={2}]'.format( self.log_base, path, traceback.format_exc())) def main_folder_process(self, input_dir): while not os.path.isdir(input_dir): self.folder_log.info('{0} [input dir is not dir] [input_dir={1}]'.format(self.log_base, input_dir)) if self.switch: time.sleep(self.sleep_time) continue else: return output_dir = os.path.join(os.path.dirname(input_dir), 'Output') completed_output_dir = os.path.join(output_dir, 'Completed') failed_output_dir = os.path.join(output_dir, 'Failed') os.makedirs(output_dir, exist_ok=True) os.makedirs(completed_output_dir, exist_ok=True) os.makedirs(failed_output_dir, exist_ok=True) os_error_filename_set = set() while self.switch: # 1. 从input dir获取pdf or image list_dir = os.listdir(input_dir) if not list_dir and len(os_error_filename_set) == 0: self.folder_log.info('{0} [input dir empty] [input_dir={1}]'.format(self.log_base, input_dir)) time.sleep(self.sleep_time) continue all_file_set = set(list_dir) true_file_set = all_file_set - os_error_filename_set if len(true_file_set) == 0 and len(os_error_filename_set) > 0: true_file_set.add(os_error_filename_set.pop()) for name in true_file_set: time.sleep(10) unique_folder_name = '{0}_{1}'.format(time.time(), name) path = os.path.join(input_dir, name) try: if not os.path.exists(path): self.folder_log.info('{0} [path is not exists] [path={1}]'.format(self.log_base, path)) continue elif os.path.isdir(path): self.folder_log.info('{0} [dir start] [path={1}]'.format(self.log_base, name)) self.folder_process(unique_folder_name, path, output_dir) completed_path = os.path.join(completed_output_dir, unique_folder_name) shutil.move(path, completed_path) self.folder_log.info('{0} [dir end] [path={1}]'.format(self.log_base, name)) else: self.folder_log.info('{0} [path is not dir] [path={1}]'.format(self.log_base, path)) failed_path = os.path.join(failed_output_dir, unique_folder_name) shutil.move(path, failed_path) except OSError: os_error_filename_set.add(name) self.folder_log.error('{0} [os error] [path={1}] [error={2}]'.format( self.log_base, path, traceback.format_exc())) except Exception as e: try: self.folder_log.error('{0} [file error] [path={1}] [error={2}]'.format(self.log_base, path, traceback.format_exc())) failed_path = os.path.join(failed_output_dir, unique_folder_name) shutil.move(path, failed_path) except Exception as e: os_error_filename_set.add(name) self.folder_log.error('{0} [file move error] [path={1}] [error={2}]'.format( self.log_base, path, traceback.format_exc())) def handle(self, *args, **kwargs): process_list = [] process = Process(target=self.main_folder_process, args=(self.input_dir, )) process_list.append(process) for p in process_list: p.start() for p in process_list: p.join() self.folder_log.info('{0} [stop safely]'.format(self.log_base))