107dc260 by 周伟奇

add f3 folder

1 parent a825541a
import os
import time
import json
import shutil
import base64
import signal
import requests
import traceback
from django.core.management import BaseCommand
from multiprocessing import Process
from openpyxl import load_workbook, Workbook
from settings import conf
from common.mixins import LoggerMixin
from common.tools.pdf_to_img import PDFHandler, PDFBuild
from apps.doc import consts
from apps.doc.exceptions import OCR1Exception, OCR2Exception
class Command(BaseCommand, LoggerMixin):
def __init__(self):
super().__init__()
self.log_base = '[folder f3 process]'
# input folder
self.input_dir = conf.F3_DIR
# 处理文件开关
self.switch = True
# 睡眠时间
self.sleep_time = float(conf.SLEEP_SECOND_FOLDER)
# 输出结果
self.wb_name = 'result.xlsx'
self.field_map = {
# sheet_name, key_field, side_field_order, src_field_order
consts.IC_CLASSIFY: (consts.IC_CN_NAME, '有效期限', consts.IC_FIELD_ORDER_3, consts.IC_FIELD_ORDER_2),
}
self.pdf_name_map = {
consts.IC_CLASSIFY: consts.IC_CN_NAME,
consts.BL_CLASSIFY: consts.BL_CN_NAME,
}
# ocr url
self.ocr_url = conf.OCR_URL_FOLDER
self.ocr_url_2 = conf.OCR2_URL_FOLDER
# 优雅退出信号:15
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, sig, frame):
self.switch = False # 停止处理文件
@staticmethod
def license1_process(ocr_data, ocr_res, classify):
# 类别:'0'身份证, '1'居住证
license_data = ocr_data.get('data')
if not license_data:
return
if isinstance(license_data, dict):
license_data.pop('base64_img', '')
if classify == consts.IC_CLASSIFY:
id_card_dict = {}
card_type = license_data.get('type', '')
is_ic = card_type.startswith('身份证')
is_info_side = card_type.endswith('信息面')
id_card_dict['类别'] = '0' if is_ic else '1'
if is_ic:
field_map = consts.IC_MAP_0 if is_info_side else consts.IC_MAP_1
else:
field_map = consts.RP_MAP_0 if is_info_side else consts.RP_MAP_1
for write_field, search_field in field_map:
id_card_dict[write_field] = license_data.get('words_result', {}).get(search_field, {}).get('words', '')
if not is_info_side:
start_time = license_data.get('words_result', {}).get('签发日期', {}).get('words', '')
end_time = license_data.get('words_result', {}).get('失效日期', {}).get('words', '')
id_card_dict['有效期限'] = '{0}-{1}'.format(start_time, end_time)
# for id_card_dict in license_data:
# try:
# id_card_dict.pop('base64_img')
# except Exception as e:
# continue
ocr_res.setdefault(classify, []).append(id_card_dict)
def license2_process(self, ocr_data, ocr_res, classify, img_path):
if classify != consts.BL_CLASSIFY:
return
pid, _, _, _, _, _ = consts.LICENSE_CLASSIFY_MAPPING.get(classify)
file_data = ocr_data.get('section_img')
if file_data is None:
with open(img_path, 'rb') as f:
base64_data = base64.b64encode(f.read())
# 获取解码后的base64值
file_data = base64_data.decode()
json_data_2 = {
"pid": str(pid),
"filedata": file_data
}
for times in range(consts.RETRY_TIMES):
try:
start_time = time.time()
ocr_2_response = requests.post(self.ocr_url_2, data=json_data_2)
if ocr_2_response.status_code != 200:
raise OCR2Exception('ocr_2 status code: {0}'.format(ocr_2_response.status_code))
except Exception as e:
self.folder_log.warn(
'{0} [ocr_2 failed] [times={1}] [img_path={2}] [error={3}]'.format(
self.log_base, times, img_path, traceback.format_exc()))
else:
ocr_res_2 = json.loads(ocr_2_response.text)
end_time = time.time()
speed_time = int(end_time - start_time)
self.folder_log.info(
'{0} [ocr_2 success] [img={1}] [speed_time={2}]'.format(
self.log_base, img_path, speed_time))
if ocr_res_2.get('ErrorCode') in consts.SUCCESS_CODE_SET:
if pid == consts.BC_PID:
ocr_res.append(ocr_res_2)
else:
# 营业执照等
for result_dict in ocr_res_2.get('ResultList', []):
res_dict = {}
for field_dict in result_dict.get('FieldList', []):
res_dict[field_dict.get('chn_key', '')] = field_dict.get('value', '')
ocr_res.append(res_dict)
break
def wb_process(self, ocr_res, output_dir):
excel_path = os.path.join(output_dir, self.wb_name)
try:
if os.path.exists(excel_path):
wb = load_workbook(excel_path)
else:
wb = Workbook()
for c, res_list in ocr_res.items():
sheet_name, key_field, side_field_order, src_field_order = self.field_map.get(c)
if sheet_name in wb.sheetnames:
ws = wb.get_sheet_by_name(sheet_name)
else:
ws = wb.create_sheet(sheet_name)
for res in res_list:
if key_field is not None and key_field in res:
field_order = side_field_order
else:
field_order = src_field_order
for search_field, write_field in field_order:
field_value = res.get(search_field, '')
if isinstance(field_value, list):
ws.append((write_field, *field_value))
else:
ws.append((write_field, field_value))
ws.append((None,))
if 'Sheet' in wb.sheetnames and len(wb.sheetnames) > 1:
wb.remove(wb.get_sheet_by_name('Sheet'))
wb.save(excel_path)
except Exception as e:
self.folder_log.error('{0} [wb build error] [path={1}] [error={2}]'.format(
self.log_base, excel_path, traceback.format_exc()))
def ocr_process(self, img_path, all_ocr_res, img_res):
if os.path.exists(img_path):
with open(img_path, 'rb') as f:
base64_data = base64.b64encode(f.read())
# 获取解码后的base64值
file_data = base64_data.decode()
json_data = {
"file": file_data,
"channel": consts.AFC_PREFIX,
}
for times in range(consts.RETRY_TIMES):
try:
start_time = time.time()
ocr_response = requests.post(self.ocr_url, json=json_data)
if ocr_response.status_code != 200:
raise OCR1Exception('{0} ocr status code: {1}'.format(self.log_base, ocr_response.status_code))
except Exception as e:
self.folder_log.warn('{0} [ocr failed] [times={1}] [img_path={2}] [error={3}]'.format(
self.log_base, times, img_path, traceback.format_exc()))
else:
ocr_res = ocr_response.json()
end_time = time.time()
speed_time = int(end_time - start_time)
self.folder_log.info('{0} [ocr success] [img={1}] [speed_time={2}]'.format(
self.log_base, img_path, speed_time))
if isinstance(ocr_res, dict):
if ocr_res.get('code') == 1:
data_list = ocr_res.get('data', [])
if isinstance(data_list, list):
for ocr_data in data_list:
classify = ocr_data.get('classify')
img_res.setdefault(classify, set()).add(img_path)
if classify in consts.LICENSE_CLASSIFY_SET_1:
self.license1_process(ocr_data, all_ocr_res, classify)
elif classify in consts.LICENSE_CLASSIFY_SET_2:
self.license2_process(ocr_data, all_ocr_res, classify, img_path)
break
else:
self.folder_log.warn('{0} [ocr failed] [img_path={1}]'.format(self.log_base, img_path))
def img_process(self, img_res, output_dir):
for classify, img_path_set in img_res.items():
pdf_path = os.path.join(output_dir, '{0}.pdf'.format(self.pdf_name_map.get(classify, '其他')))
pdf_build = PDFBuild(pdf_path)
pdf_build.insert_img(img_path_set)
def images_process(self, img_path_list, output_dir):
ocr_res = dict()
img_res = dict()
for img_path in img_path_list:
self.ocr_process(img_path, ocr_res, img_res)
self.wb_process(ocr_res, output_dir)
self.img_process(img_res, output_dir)
def pdf_process(self, name, path, img_output_dir, output_dir):
pdf_handler = PDFHandler(path, os.path.join(img_output_dir, name))
try:
self.folder_log.info('{0} [pdf to img start] [path={1}]'.format(self.log_base, path))
pdf_handler.extract_image()
self.folder_log.info('{0} [pdf to img end] [path={1}]'.format(self.log_base, path))
except Exception as e:
self.folder_log.error('{0} [pdf to img error] [path={1}] [error={2}]'.format(
self.log_base, path, traceback.format_exc()))
raise e
else:
self.images_process(pdf_handler.img_path_list, output_dir)
def folder_process(self, unique_folder_name, folder_path, main_output_dir):
output_dir = os.path.join(main_output_dir, unique_folder_name)
img_output_dir = os.path.join(output_dir, 'image')
failed_output_dir = os.path.join(output_dir, 'failed')
os.makedirs(output_dir, exist_ok=True)
os.makedirs(img_output_dir, exist_ok=True)
os.makedirs(failed_output_dir, exist_ok=True)
os_error_filename_set = set()
list_dir = os.listdir(folder_path)
if not list_dir and len(os_error_filename_set) == 0:
self.folder_log.info('{0} [folder empty, completed] [path={1}]'.format(self.log_base, folder_path))
return
all_file_set = set(list_dir)
true_file_set = all_file_set - os_error_filename_set
if len(true_file_set) == 0 and len(os_error_filename_set) > 0:
true_file_set.add(os_error_filename_set.pop())
for name in true_file_set:
time.sleep(5)
path = os.path.join(folder_path, name)
try:
if not os.path.exists(path):
self.folder_log.info('{0} [path is not exists] [path={1}]'.format(self.log_base, path))
continue
elif os.path.isfile(path):
self.folder_log.info('{0} [file start] [path={1}]'.format(self.log_base, path))
self.pdf_process(name, path, img_output_dir, output_dir)
self.folder_log.info('{0} [file end] [path={1}]'.format(self.log_base, path))
else:
self.folder_log.info('{0} [path is dir] [path={1}]'.format(self.log_base, path))
failed_path = os.path.join(failed_output_dir, name)
shutil.copyfile(path, failed_path)
except OSError:
os_error_filename_set.add(name)
self.folder_log.error('{0} [os error] [path={1}] [error={2}]'.format(
self.log_base, path, traceback.format_exc()))
except Exception as e:
try:
self.folder_log.error('{0} [file error] [path={1}] [error={2}]'.format(self.log_base, path,
traceback.format_exc()))
failed_path = os.path.join(failed_output_dir, name)
shutil.copyfile(path, failed_path)
except Exception as e:
os_error_filename_set.add(name)
self.folder_log.error('{0} [file move error] [path={1}] [error={2}]'.format(
self.log_base, path, traceback.format_exc()))
def main_folder_process(self, input_dir):
while not os.path.isdir(input_dir):
self.folder_log.info('{0} [input dir is not dir] [input_dir={1}]'.format(self.log_base, input_dir))
if self.switch:
time.sleep(self.sleep_time)
continue
else:
return
output_dir = os.path.join(os.path.dirname(input_dir), 'Output')
completed_output_dir = os.path.join(output_dir, 'Completed')
failed_output_dir = os.path.join(output_dir, 'Failed')
os.makedirs(output_dir, exist_ok=True)
os.makedirs(completed_output_dir, exist_ok=True)
os.makedirs(failed_output_dir, exist_ok=True)
os_error_filename_set = set()
while self.switch:
# 1. 从input dir获取pdf or image
list_dir = os.listdir(input_dir)
if not list_dir and len(os_error_filename_set) == 0:
self.folder_log.info('{0} [input dir empty] [input_dir={1}]'.format(self.log_base, input_dir))
time.sleep(self.sleep_time)
continue
all_file_set = set(list_dir)
true_file_set = all_file_set - os_error_filename_set
if len(true_file_set) == 0 and len(os_error_filename_set) > 0:
true_file_set.add(os_error_filename_set.pop())
for name in true_file_set:
time.sleep(10)
unique_folder_name = '{0}_{1}'.format(time.time(), name)
path = os.path.join(input_dir, name)
try:
if not os.path.exists(path):
self.folder_log.info('{0} [path is not exists] [path={1}]'.format(self.log_base, path))
continue
elif os.path.isdir(path):
self.folder_log.info('{0} [dir start] [path={1}]'.format(self.log_base, name))
self.folder_process(unique_folder_name, path, output_dir)
completed_path = os.path.join(completed_output_dir, unique_folder_name)
shutil.move(path, completed_path)
self.folder_log.info('{0} [dir end] [path={1}]'.format(self.log_base, name))
else:
self.folder_log.info('{0} [path is not dir] [path={1}]'.format(self.log_base, path))
failed_path = os.path.join(failed_output_dir, unique_folder_name)
shutil.move(path, failed_path)
except OSError:
os_error_filename_set.add(name)
self.folder_log.error('{0} [os error] [path={1}] [error={2}]'.format(
self.log_base, path, traceback.format_exc()))
except Exception as e:
try:
self.folder_log.error('{0} [file error] [path={1}] [error={2}]'.format(self.log_base, path,
traceback.format_exc()))
failed_path = os.path.join(failed_output_dir, unique_folder_name)
shutil.move(path, failed_path)
except Exception as e:
os_error_filename_set.add(name)
self.folder_log.error('{0} [file move error] [path={1}] [error={2}]'.format(
self.log_base, path, traceback.format_exc()))
def handle(self, *args, **kwargs):
process_list = []
process = Process(target=self.main_folder_process, args=(self.input_dir, ))
process_list.append(process)
for p in process_list:
p.start()
for p in process_list:
p.join()
self.folder_log.info('{0} [stop safely]'.format(self.log_base))
......@@ -24,6 +24,23 @@ WH_COUPLE_4 = (100, 300)
WH_COUPLE_5 = (100, 200)
class PDFBuild:
def __init__(self, path):
self.path = path
def insert_img(self, img_path_list):
if os.path.exists(self.path):
pdf = fitz.Document(self.path)
else:
pdf = fitz.Document()
for img_path in img_path_list:
new_page = pdf.newPage()
new_page.insertImage(new_page.rect, img_path)
pdf.save(self.path)
pdf.close()
class PDFHandler:
def __init__(self, path, img_dir_path, document_name=None):
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!