folder_f3_process.py 16.6 KB
import os
import time
import json
import shutil
import base64
import signal
import requests
import traceback
from django.core.management import BaseCommand
from multiprocessing import Process
from openpyxl import load_workbook, Workbook

from settings import conf
from common.mixins import LoggerMixin
from common.tools.pdf_to_img import PDFHandler, PDFBuild
from apps.doc import consts
from apps.doc.exceptions import OCR1Exception, OCR2Exception


class Command(BaseCommand, LoggerMixin):

    def __init__(self):
        super().__init__()
        self.log_base = '[folder f3 process]'
        # input folder
        self.input_dir = conf.F3_DIR
        # 处理文件开关
        self.switch = True
        # 睡眠时间
        self.sleep_time = float(conf.SLEEP_SECOND_FOLDER)
        # 输出结果
        self.wb_name = 'result.xlsx'

        self.field_map = {
            # sheet_name, key_field, side_field_order, src_field_order
            consts.IC_CLASSIFY: (consts.IC_CN_NAME, '有效期限', consts.IC_FIELD_ORDER_3, consts.IC_FIELD_ORDER_2),
        }

        self.pdf_name_map = {
            consts.IC_CLASSIFY: consts.IC_CN_NAME,
            consts.BL_CLASSIFY: consts.BL_CN_NAME,
        }

        # ocr url
        self.ocr_url = conf.OCR_URL_FOLDER
        self.ocr_url_2 = conf.OCR2_URL_FOLDER

        # 优雅退出信号:15
        signal.signal(signal.SIGTERM, self.signal_handler)

    def signal_handler(self, sig, frame):
        self.switch = False  # 停止处理文件

    @staticmethod
    def license1_process(ocr_data, ocr_res, classify):
        # 类别:'0'身份证, '1'居住证
        license_data = ocr_data.get('data')
        if not license_data:
            return
        if isinstance(license_data, dict):
            license_data.pop('base64_img', '')
        if classify == consts.IC_CLASSIFY:
            id_card_dict = {}
            card_type = license_data.get('type', '')
            is_ic = card_type.startswith('身份证')
            is_info_side = card_type.endswith('信息面')
            id_card_dict['类别'] = '0' if is_ic else '1'
            if is_ic:
                field_map = consts.IC_MAP_0 if is_info_side else consts.IC_MAP_1
            else:
                field_map = consts.RP_MAP_0 if is_info_side else consts.RP_MAP_1
            for write_field, search_field in field_map:
                id_card_dict[write_field] = license_data.get('words_result', {}).get(search_field, {}).get('words', '')
            if not is_info_side:
                start_time = license_data.get('words_result', {}).get('签发日期', {}).get('words', '')
                end_time = license_data.get('words_result', {}).get('失效日期', {}).get('words', '')
                id_card_dict['有效期限'] = '{0}-{1}'.format(start_time, end_time)
            # for id_card_dict in license_data:
            #     try:
            #         id_card_dict.pop('base64_img')
            #     except Exception as e:
            #         continue
            ocr_res.setdefault(classify, []).append(id_card_dict)

    def license2_process(self, ocr_data, ocr_res, classify, img_path):
        if classify != consts.BL_CLASSIFY:
            return

        pid, _, _, _, _, _ = consts.LICENSE_CLASSIFY_MAPPING.get(classify)
        file_data = ocr_data.get('section_img')
        if file_data is None:
            with open(img_path, 'rb') as f:
                base64_data = base64.b64encode(f.read())
                # 获取解码后的base64值
                file_data = base64_data.decode()
        json_data_2 = {
            "pid": str(pid),
            "filedata": file_data
        }

        for times in range(consts.RETRY_TIMES):
            try:
                start_time = time.time()
                ocr_2_response = requests.post(self.ocr_url_2, data=json_data_2)
                if ocr_2_response.status_code != 200:
                    raise OCR2Exception('ocr_2 status code: {0}'.format(ocr_2_response.status_code))
            except Exception as e:
                self.folder_log.warn(
                    '{0} [ocr_2 failed] [times={1}] [img_path={2}] [error={3}]'.format(
                        self.log_base, times, img_path, traceback.format_exc()))
            else:
                ocr_res_2 = json.loads(ocr_2_response.text)
                end_time = time.time()
                speed_time = int(end_time - start_time)
                self.folder_log.info(
                    '{0} [ocr_2 success] [img={1}] [speed_time={2}]'.format(
                        self.log_base, img_path, speed_time))

                if ocr_res_2.get('ErrorCode') in consts.SUCCESS_CODE_SET:
                    if pid == consts.BC_PID:
                        ocr_res.append(ocr_res_2)
                    else:
                        # 营业执照等
                        for result_dict in ocr_res_2.get('ResultList', []):
                            res_dict = {}
                            for field_dict in result_dict.get('FieldList', []):
                                res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '')
                            ocr_res.append(res_dict)
                break

    def wb_process(self, ocr_res, output_dir):
        excel_path = os.path.join(output_dir, self.wb_name)

        try:
            if os.path.exists(excel_path):
                wb = load_workbook(excel_path)
            else:
                wb = Workbook()

            for c, res_list in ocr_res.items():
                sheet_name, key_field, side_field_order, src_field_order = self.field_map.get(c)
                if sheet_name in wb.sheetnames:
                    ws = wb.get_sheet_by_name(sheet_name)
                    ws.append((None,))
                else:
                    ws = wb.create_sheet(sheet_name)

                for res in res_list:
                    if key_field is not None and key_field in res:
                        field_order = side_field_order
                    else:
                        field_order = src_field_order
                    for search_field, write_field in field_order:
                        field_value = res.get(search_field, '')
                        if isinstance(field_value, list):
                            ws.append((write_field, *field_value))
                        else:
                            ws.append((write_field, field_value))
                    ws.append((None,))

            if 'Sheet' in wb.sheetnames and len(wb.sheetnames) > 1:
                wb.remove(wb.get_sheet_by_name('Sheet'))
            wb.save(excel_path)
        except Exception as e:
            self.folder_log.error('{0} [wb build error] [path={1}] [error={2}]'.format(
                self.log_base, excel_path, traceback.format_exc()))

    def ocr_process(self, img_path, all_ocr_res, img_res):
        if os.path.exists(img_path):
            with open(img_path, 'rb') as f:
                base64_data = base64.b64encode(f.read())
                # 获取解码后的base64值
                file_data = base64_data.decode()
            json_data = {
                "file": file_data,
                "channel": consts.AFC_PREFIX,
            }

            for times in range(consts.RETRY_TIMES):
                try:
                    start_time = time.time()
                    ocr_response = requests.post(self.ocr_url, json=json_data)
                    if ocr_response.status_code != 200:
                        raise OCR1Exception('{0} ocr status code: {1}'.format(self.log_base, ocr_response.status_code))
                except Exception as e:
                    self.folder_log.warn('{0} [ocr failed] [times={1}] [img_path={2}] [error={3}]'.format(
                        self.log_base, times, img_path, traceback.format_exc()))
                else:
                    ocr_res = ocr_response.json()
                    end_time = time.time()
                    speed_time = int(end_time - start_time)
                    self.folder_log.info('{0} [ocr success] [img={1}] [speed_time={2}]'.format(
                        self.log_base, img_path, speed_time))

                    if isinstance(ocr_res, dict):
                        if ocr_res.get('code') == 1:
                            data_list = ocr_res.get('data', [])
                            if isinstance(data_list, list):
                                for ocr_data in data_list:
                                    classify = ocr_data.get('classify')
                                    img_res.setdefault(classify, set()).add(img_path)
                                    if classify in consts.LICENSE_CLASSIFY_SET_1:
                                        self.license1_process(ocr_data, all_ocr_res, classify)
                                    elif classify in consts.LICENSE_CLASSIFY_SET_2:
                                        self.license2_process(ocr_data, all_ocr_res, classify, img_path)
                    break
            else:
                self.folder_log.warn('{0} [ocr failed] [img_path={1}]'.format(self.log_base, img_path))

    def img_process(self, img_res, output_dir):
        for classify, img_path_set in img_res.items():
            pdf_path = os.path.join(output_dir, '{0}.pdf'.format(self.pdf_name_map.get(classify, '其他')))
            pdf_build = PDFBuild(pdf_path)
            pdf_build.insert_img(img_path_set)

    def images_process(self, img_path_list, output_dir):
        ocr_res = dict()
        img_res = dict()
        for img_path in img_path_list:
            self.ocr_process(img_path, ocr_res, img_res)
        self.wb_process(ocr_res, output_dir)
        self.img_process(img_res, output_dir)

    def pdf_process(self, name, path, img_output_dir, output_dir):
        pdf_handler = PDFHandler(path, os.path.join(img_output_dir, name))

        try:
            self.folder_log.info('{0} [pdf to img start] [path={1}]'.format(self.log_base, path))
            pdf_handler.extract_image()
            self.folder_log.info('{0} [pdf to img end] [path={1}]'.format(self.log_base, path))
        except Exception as e:
            self.folder_log.error('{0} [pdf to img error] [path={1}] [error={2}]'.format(
                self.log_base, path, traceback.format_exc()))
            raise e
        else:
            self.images_process(pdf_handler.img_path_list, output_dir)

    def folder_process(self, unique_folder_name, folder_path, main_output_dir):
        output_dir = os.path.join(main_output_dir, unique_folder_name)

        img_output_dir = os.path.join(output_dir, 'image')
        failed_output_dir = os.path.join(output_dir, 'failed')
        os.makedirs(output_dir, exist_ok=True)
        os.makedirs(img_output_dir, exist_ok=True)
        os.makedirs(failed_output_dir, exist_ok=True)

        os_error_filename_set = set()

        list_dir = os.listdir(folder_path)
        if not list_dir and len(os_error_filename_set) == 0:
            self.folder_log.info('{0} [folder empty, completed] [path={1}]'.format(self.log_base, folder_path))
            return

        all_file_set = set(list_dir)
        true_file_set = all_file_set - os_error_filename_set
        if len(true_file_set) == 0 and len(os_error_filename_set) > 0:
            true_file_set.add(os_error_filename_set.pop())
        for name in true_file_set:
            time.sleep(5)
            path = os.path.join(folder_path, name)

            try:
                if not os.path.exists(path):
                    self.folder_log.info('{0} [path is not exists] [path={1}]'.format(self.log_base, path))
                    continue
                elif os.path.isfile(path):
                    self.folder_log.info('{0} [file start] [path={1}]'.format(self.log_base, path))
                    self.pdf_process(name, path, img_output_dir, output_dir)
                    self.folder_log.info('{0} [file end] [path={1}]'.format(self.log_base, path))
                else:
                    self.folder_log.info('{0} [path is dir] [path={1}]'.format(self.log_base, path))
                    failed_path = os.path.join(failed_output_dir, name)
                    shutil.copyfile(path, failed_path)
            except OSError:
                os_error_filename_set.add(name)
                self.folder_log.error('{0} [os error] [path={1}] [error={2}]'.format(
                    self.log_base, path, traceback.format_exc()))
            except Exception as e:
                try:
                    self.folder_log.error('{0} [file error] [path={1}] [error={2}]'.format(self.log_base, path,
                                                                                           traceback.format_exc()))
                    failed_path = os.path.join(failed_output_dir, name)
                    shutil.copyfile(path, failed_path)
                except Exception as e:
                    os_error_filename_set.add(name)
                    self.folder_log.error('{0} [file move error] [path={1}] [error={2}]'.format(
                        self.log_base, path, traceback.format_exc()))

    def main_folder_process(self, input_dir):
        while not os.path.isdir(input_dir):
            self.folder_log.info('{0} [input dir is not dir] [input_dir={1}]'.format(self.log_base, input_dir))
            if self.switch:
                time.sleep(self.sleep_time)
                continue
            else:
                return

        output_dir = os.path.join(os.path.dirname(input_dir), 'Output')
        completed_output_dir = os.path.join(output_dir, 'Completed')
        failed_output_dir = os.path.join(output_dir, 'Failed')
        os.makedirs(output_dir, exist_ok=True)
        os.makedirs(completed_output_dir, exist_ok=True)
        os.makedirs(failed_output_dir, exist_ok=True)

        os_error_filename_set = set()
        while self.switch:
            # 1. 从input dir获取pdf or image
            list_dir = os.listdir(input_dir)
            if not list_dir and len(os_error_filename_set) == 0:
                self.folder_log.info('{0} [input dir empty] [input_dir={1}]'.format(self.log_base, input_dir))
                time.sleep(self.sleep_time)
                continue
            all_file_set = set(list_dir)
            true_file_set = all_file_set - os_error_filename_set
            if len(true_file_set) == 0 and len(os_error_filename_set) > 0:
                true_file_set.add(os_error_filename_set.pop())
            for name in true_file_set:
                time.sleep(10)
                unique_folder_name = '{0}_{1}'.format(time.time(), name)
                path = os.path.join(input_dir, name)

                try:
                    if not os.path.exists(path):
                        self.folder_log.info('{0} [path is not exists] [path={1}]'.format(self.log_base, path))
                        continue
                    elif os.path.isdir(path):
                        self.folder_log.info('{0} [dir start] [path={1}]'.format(self.log_base, name))
                        self.folder_process(unique_folder_name, path, output_dir)
                        completed_path = os.path.join(completed_output_dir, unique_folder_name)
                        shutil.move(path, completed_path)
                        self.folder_log.info('{0} [dir end] [path={1}]'.format(self.log_base, name))
                    else:
                        self.folder_log.info('{0} [path is not dir] [path={1}]'.format(self.log_base, path))
                        failed_path = os.path.join(failed_output_dir, unique_folder_name)
                        shutil.move(path, failed_path)
                except OSError:
                    os_error_filename_set.add(name)
                    self.folder_log.error('{0} [os error] [path={1}] [error={2}]'.format(
                        self.log_base, path, traceback.format_exc()))
                except Exception as e:
                    try:
                        self.folder_log.error('{0} [file error] [path={1}] [error={2}]'.format(self.log_base, path,
                                                                                               traceback.format_exc()))
                        failed_path = os.path.join(failed_output_dir, unique_folder_name)
                        shutil.move(path, failed_path)
                    except Exception as e:
                        os_error_filename_set.add(name)
                        self.folder_log.error('{0} [file move error] [path={1}] [error={2}]'.format(
                            self.log_base, path, traceback.format_exc()))

    def handle(self, *args, **kwargs):
        process_list = []

        process = Process(target=self.main_folder_process, args=(self.input_dir, ))
        process_list.append(process)

        for p in process_list:
            p.start()
        for p in process_list:
            p.join()

        self.folder_log.info('{0} [stop safely]'.format(self.log_base))