doc_ocr_process.py 9.17 KB
import os
import time
import fitz
import signal
import base64
import asyncio
import aiohttp
# from openpyxl import Workbook
from apps.doc.ocr.wb import BSWorkbook, Workbook
from django.core.management import BaseCommand

from settings import conf
from common.mixins import LoggerMixin
from common.tools.file_tools import write_zip_file
from common.tools.pdf_to_img import PDFHandler
from apps.doc.models import DocStatus, HILDoc, AFCDoc, Keywords
from apps.doc.named_enum import KeywordsType
from apps.doc import consts
from apps.doc.ocr.edms import EDMS, rh


class Command(BaseCommand, LoggerMixin):

    def __init__(self):
        super().__init__()
        self.log_base = '[doc ocr process]'
        # 处理文件开关
        self.switch = True
        # 数据目录
        self.data_dir = conf.DATA_DIR
        # pdf页面转图片
        self.zoom_x = 2.0
        self.zoom_y = 2.0
        self.trans = fitz.Matrix(self.zoom_x, self.zoom_y).preRotate(0)  # zoom factor 2 in each dimension
        # ocr相关
        self.ocr_url = conf.OCR_URL
        self.ocr_header = {
            'X-Auth-Token': conf.OCR_TOKEN,
            'Content-Type': 'application/json'
        }
        # EDMS web_service_api
        self.edms = EDMS(conf.EDMS_USER, conf.EDMS_PWD)
        # 优雅退出信号:15
        signal.signal(signal.SIGTERM, self.signal_handler)

    def signal_handler(self, sig, frame):
        self.switch = False  # 停止处理文件

    def get_doc_info(self):
        task_str, is_priority = rh.dequeue()
        if task_str is None:
            self.cronjob_log.info('{0} [get_doc_info] [queue empty]'.format(self.log_base))
            return None, None

        business_type, doc_id_str = task_str.split(consts.SPLIT_STR)
        doc_id = int(doc_id_str)
        doc_class = HILDoc if business_type == consts.HIL_PREFIX else AFCDoc
        # doc_info = doc_class.objects.filter(id=doc_id, status=DocStatus.INIT.value).values(
        #     'id', 'metadata_version_id', 'application_id', 'document_name', 'document_scheme').first()
        doc = doc_class.objects.filter(id=doc_id).first()
        if doc is None:
            self.cronjob_log.warn('{0} [get_doc_info] [doc not exist] [task_str={1}] [is_priority={2}]'.format(
                self.log_base, task_str, is_priority))
            return None, None
        elif doc.status != DocStatus.INIT.value:
            self.cronjob_log.warn('{0} [get_doc_info] [doc status error] [task_str={1}] [is_priority={2}] '
                                  '[doc_status={3}]'.format(self.log_base, task_str, is_priority, doc.status))
            return None, None
        doc.status = DocStatus.PROCESSING.value
        doc.save()
        self.cronjob_log.info('{0} [get_doc_info] [success] [task_str={1}] [is_priority={2}]'.format(
            self.log_base, task_str, is_priority))
        return doc, business_type

    def pdf_download(self, doc, business_type):
        if doc is None:
            return None, None, None, None
        # TODO EDMS下载pdf
        doc_data_path = os.path.join(self.data_dir, business_type, str(doc.id))
        os.makedirs(doc_data_path, exist_ok=True)
        pdf_path = os.path.join(doc_data_path, '{0}.pdf'.format(doc.id))
        if not doc.application_id.startswith(consts.FIXED_APPLICATION_ID_PREFIX):
            self.edms.download(pdf_path, doc.metadata_version_id)

        excel_path = os.path.join(doc_data_path, '{0}.xlsx'.format(doc.id))
        src_excel_path = os.path.join(doc_data_path, 'src.xlsx')
        self.cronjob_log.info('{0} [pdf download success] [business_type={1}] [doc_id={2}] [pdf_path={3}]'.format(
            self.log_base, business_type, doc.id, pdf_path))
        return doc_data_path, excel_path, src_excel_path, pdf_path

    @staticmethod
    def append_sheet(wb, sheets_list, img_name, role_summary):
        for i, sheet in enumerate(sheets_list):
            sheet_name = '{0}_{1}'.format(img_name, i)
            role_summary['银行-户名'].append((sheet_name, 1, None, None, None, None, None))
            ws = wb.create_sheet(sheet_name)
            cells = sheet.get('cells')
            for cell in cells:
                c1 = cell.get('start_column')
                # c2 = cell.get('end_column')
                r1 = cell.get('start_row')
                # r2 = cell.get('end_row')
                words = cell.get('words')
                ws.cell(row=r1+1, column=c1+1, value=words)

    @staticmethod
    def get_ocr_json(img_path):
        with open(img_path, "rb") as f:
            base64_data = base64.b64encode(f.read())
        return {'imgBase64': base64_data.decode('utf-8')}

    async def fetch_ocr_result(self, img_path):
        async with aiohttp.ClientSession(
                headers=self.ocr_header, connector=aiohttp.TCPConnector(ssl=False)
        ) as session:
            json_data = self.get_ocr_json(img_path)
            async with session.post(self.ocr_url, json=json_data) as response:
                return await response.json()

    async def img_ocr_excel(self, wb, img_path, role_summary):
        res = await self.fetch_ocr_result(img_path)
        self.cronjob_log.info('{0} [fetch ocr result success] [img={1}] [res={2}]'.format(self.log_base, img_path, res))
        sheets_list = res.get('result').get('res')
        img_name = os.path.basename(img_path)
        self.append_sheet(wb, sheets_list, img_name, role_summary)

    # TODO 细化文件状态,不同异常状态采取不同的处理
    # TODO 调用接口重试
    def handle(self, *args, **kwargs):
        sleep_second = int(conf.SLEEP_SECOND)
        max_sleep_second = int(conf.MAX_SLEEP_SECOND)

        while self.switch:
            # 1. 从队列获取文件信息
            doc, business_type = self.get_doc_info()
            try:
                # 2. 从EDMS获取PDF文件
                doc_data_path, excel_path, src_excel_path, pdf_path = self.pdf_download(doc, business_type)
                # 队列为空时的处理
                if pdf_path is None:
                    time.sleep(sleep_second)
                    sleep_second = min(max_sleep_second, sleep_second+5)
                    continue
                sleep_second = int(conf.SLEEP_SECOND)
                # 3.PDF文件提取图片
                start_time = time.time()
                img_save_path = os.path.join(doc_data_path, 'img')
                self.cronjob_log.info('{0} [pdf to img start] [business_type={1}] [doc_id={2}]'.format(
                    self.log_base, business_type, doc.id))
                pdf_handler = PDFHandler(pdf_path, img_save_path)
                pdf_handler.extract_image()
                self.cronjob_log.info('{0} [pdf to img end] [business_type={1}] [doc_id={2}]'.format(
                    self.log_base, business_type, doc.id))
                write_zip_file(img_save_path, os.path.join(doc_data_path, '{0}_img.zip'.format(doc.id)))

                # 4.图片调用算法判断是否为银行流水, 图片调用算法OCR为excel文件
                role_summary = {
                    '银行-户名': []
                }
                # interest_keyword = Keywords.objects.filter(
                #     type=KeywordsType.INTEREST.value).values_list('keyword', flat=True)
                # salary_keyword = Keywords.objects.filter(
                #     type=KeywordsType.SALARY.value).values_list('keyword', flat=True)
                # loan_keyword = Keywords.objects.filter(type=KeywordsType.LOAN.value).values_list('keyword', flat=True)
                # wb = BSWorkbook(interest_keyword, salary_keyword, loan_keyword)
                wb = Workbook()
                loop = asyncio.get_event_loop()
                tasks = [self.img_ocr_excel(wb, img_path, role_summary) for img_path in pdf_handler.img_path_list]
                loop.run_until_complete(asyncio.wait(tasks))
                # loop.close()

                # 整合excel文件
                # wb.save(src_excel_path)
                # wb.rebuild(role_summary)
                wb.save(excel_path)
            except Exception as e:
                doc.status = DocStatus.PROCESS_FAILED.value
                doc.save()
                self.cronjob_log.error('{0} [process failed] [business_type={1}] [doc_id={2}] [err={3}]'.format(
                    self.log_base, business_type, doc.id, e))
            else:
                try:
                    # 5.上传至EDMS
                    # self.edms.upload(excel_path, doc, business_type)
                    print('upload pass')
                except Exception as e:
                    doc.status = DocStatus.UPLOAD_FAILED.value
                    doc.save()
                    self.cronjob_log.error('{0} [upload failed] [business_type={1}] [doc_id={2}] [err={3}]'.format(
                        self.log_base, business_type, doc.id, e))
                else:
                    doc.status = DocStatus.COMPLETE.value
                    doc.save()
                    end_time = time.time()
                    speed_time = int(end_time - start_time)
                    self.cronjob_log.info('{0} [doc process complete] [business_type={1}] [doc_id={2}] '
                                          '[speed_time={3}]'.format(self.log_base, business_type, doc.id, speed_time))

        self.cronjob_log.info('{0} [stop safely]'.format(self.log_base))