6966f069 by 周伟奇

Merge branch 'feature/zip'

2 parents 7cf03ec9 6010c32f
......@@ -16,7 +16,7 @@ from multiprocessing import Process, Queue, Manager, Lock
from settings import conf
from common.mixins import LoggerMixin
from common.tools.file_tools import write_zip_file
from common.tools.file_tools import get_pwd_list_from_str, extract_zip_or_rar, get_file_paths
from common.tools.pdf_to_img import PDFHandler
from common.electronic_afc_contract.afc_contract_ocr import predict as afc_predict
from common.electronic_hil_contract.hil_contract_ocr import predict as hil_predict
......@@ -89,14 +89,39 @@ class Command(BaseCommand, LoggerMixin):
# doc = doc_class.objects.filter(id=doc_id).first()
# return doc, business_type
def get_doc_info(self):
task_str, is_priority = rh.dequeue()
if task_str is None:
self.online_log.info('{0} [get_doc_info] [queue empty]'.format(self.log_base))
return None, None, None, None
def get_zip_doc_info(self, task_str):
try:
info_tuple = task_str.split(consts.SPLIT_STR)
if len(info_tuple) == 2:
business_type, doc_id_str = info_tuple
else:
business_type, doc_id_str, classify_1_str = info_tuple
doc_id = int(doc_id_str)
doc_class = HILDoc if business_type == consts.HIL_PREFIX else AFCDoc
zip_doc = doc_class.objects.filter(id=doc_id).first()
self.online_log.info('{0} [get_doc_info] [task={1}] [is_priority={2}]'.format(
self.log_base, task_str, is_priority))
if zip_doc is None:
self.online_log.warn('{0} [zip_2_pdfs] [doc not exist] [task_str={1}]'.format(
self.log_base, task_str))
return None, business_type
elif zip_doc.status != DocStatus.INIT.value:
self.online_log.warn('{0} [zip_2_pdfs] [doc status error] [task_str={1}] [doc_status={2}]'.format(
self.log_base, task_str, zip_doc.status))
return None, business_type
zip_doc.status = DocStatus.PROCESSING.value
zip_doc.start_time = timezone.now()
zip_doc.save()
except Exception as e:
self.online_log.error('{0} [process error (zip_2_pdfs)] [error={1}]'.format(
self.log_base, traceback.format_exc()))
return None, None
else:
self.online_log.info('{0} [zip_2_pdfs] [db save end] [task_str={1}]'.format(
self.log_base, task_str))
return zip_doc, business_type
def get_doc_info(self, task_str, is_priority=False):
try:
# doc, business_type = self.get_doc_object(task_str)
info_tuple = task_str.split(consts.SPLIT_STR)
......@@ -1094,11 +1119,153 @@ class Command(BaseCommand, LoggerMixin):
# summary['confidence'] = max(summary['confidence'])
return merged_bs_summary
def pdf_2_img_2_queue(self, img_queue, todo_count_dict, lock, error_list, res_dict, finish_queue):
def zip_2_pdfs(self, zip_task_queue, error_list):
while len(error_list) == 0:
# 1. 从redis队列中读取任务: AFC_111_0
task_str = rh.dequeue_zip()
if task_str is None:
self.online_log.info('{0} [zip_2_pdfs] [zip queue empty]'.format(self.log_base))
time.sleep(self.sleep_time_doc_get)
continue
self.online_log.info('{0} [zip_2_pdfs] [task={1}]'.format(self.log_base, task_str))
# 2. 修改doc状态: 识别中
zip_doc, business_type = self.get_zip_doc_info(task_str)
if zip_doc is None:
time.sleep(self.sleep_time_doc_get)
continue
# 3. 从ECM下载压缩包
doc_data_path = os.path.join(self.data_dir, business_type, consts.TMP_DIR_NAME, str(zip_doc.id))
os.makedirs(doc_data_path, exist_ok=True)
zip_path = os.path.join(doc_data_path, zip_doc.document_name)
for times in range(consts.RETRY_TIMES):
try:
self.edms.download(zip_path, zip_doc.metadata_version_id, zip_doc.document_scheme, business_type)
except Exception as e:
self.online_log.warn('{0} [zip_2_pdfs] [ecm download failed] [task={1}] [times={2}] '
'[error={3}]'.format(self.log_base, task_str, times,
traceback.format_exc()))
else:
self.online_log.info('{0} [zip_2_pdfs] [ecm download success] [task={1}] [times={2}] '
'[zip_path={3}]'.format(self.log_base, task_str, times, zip_path))
break
else:
try:
zip_doc.status = DocStatus.PROCESS_FAILED.value
zip_doc.save()
except Exception as e:
self.online_log.error('{0} [zip_2_pdfs] [process error (db save)] [task={1}] [error={2}]'.format(
self.log_base, task_str, traceback.format_exc()))
time.sleep(self.sleep_time_doc_get)
continue
# 4. 解压
extract_path = os.path.join(doc_data_path, 'extract_content')
os.makedirs(extract_path, exist_ok=True)
try:
pwd_list = get_pwd_list_from_str(zip_doc.document_name)
is_success = extract_zip_or_rar(zip_path, extract_path, pwd_list)
except Exception as e:
is_success = False
if not is_success:
self.online_log.warn('{0} [zip_2_pdfs] [extract failed] [task={1}] [error={2}]'.format(
self.log_base, task_str, traceback.format_exc()))
try:
zip_doc.status = DocStatus.PROCESS_FAILED.value
zip_doc.save()
except Exception as e:
self.online_log.error('{0} [zip_2_pdfs] [process error (db save)] [task={1}] [error={2}]'.format(
self.log_base, task_str, traceback.format_exc()))
time.sleep(self.sleep_time_doc_get)
continue
self.online_log.info('{0} [zip_2_pdfs] [extract success] [task={1}] [extract_path={2}]'.format(
self.log_base, task_str, extract_path))
# 5. 找出PDF文件重命名并移动到目标文件夹中。新建doc记录,新建task_str进入队列
pdf_paths = get_file_paths(extract_path, ['.pdf', '.PDF'])
count = 0
pdf_task_str_list = []
for pdf_path in pdf_paths:
if count > 50:
self.online_log.info('{0} [zip_2_pdfs] [pdf count > 50, skip] [task={1}]'.format(
self.log_base, task_str))
break
count += 1
try:
doc_class = HILDoc if business_type == consts.HIL_PREFIX else AFCDoc
pdf_doc = doc_class.objects.create(
metadata_version_id='from: {0}'.format(zip_doc.id),
application_id=zip_doc.application_id,
# main_applicant=applicant_data.get('mainApplicantName'),
# co_applicant=applicant_data.get('coApplicantName'),
# guarantor_1=applicant_data.get('guarantor1Name'),
# guarantor_2=applicant_data.get('guarantor2Name'),
document_name=os.path.basename(pdf_path),
document_scheme=zip_doc.document_scheme,
data_source=zip_doc.data_source,
upload_finish_time=zip_doc.upload_finish_time,
)
pdf_doc_data_path = os.path.join(self.data_dir, business_type, consts.TMP_DIR_NAME, str(pdf_doc.id))
os.makedirs(pdf_doc_data_path, exist_ok=True)
target_pdf_path = os.path.join(pdf_doc_data_path, '{0}.pdf'.format(pdf_doc.id))
shutil.move(pdf_path, target_pdf_path)
pdf_task_str = consts.SPLIT_STR.join([business_type, str(pdf_doc.id), '0'])
pdf_task_str_list.append(pdf_task_str)
except Exception as e:
self.online_log.warn('{0} [zip_2_pdfs] [recreate pdf task failed] [task={1}] [pdf_path={2}]'
' [error={3}]'.format(self.log_base, task_str, pdf_path,
traceback.format_exc()))
else:
self.online_log.info('{0} [zip_2_pdfs] [recreate pdf task success] [task={1}] '
'[pdf_task={2}]'.format(self.log_base, task_str, pdf_path,
traceback.format_exc()))
if len(pdf_task_str_list) > 0:
for pdf_task_str in pdf_task_str_list:
try:
zip_task_queue.put(pdf_task_str)
except Exception as e:
self.online_log.warn('{0} [zip_2_pdfs] [put pdf task failed] [task={1}] [pdf_task={2}]'
' [error={3}]'.format(self.log_base, task_str, pdf_task_str,
traceback.format_exc()))
else:
self.online_log.info('{0} [zip_2_pdfs] [zip task no pdf] [task={1}]'.format(self.log_base, task_str))
# 6. 完成,修改doc状态:识别完成
try:
zip_doc.status = DocStatus.COMPLETE.value
zip_doc.end_time = timezone.now()
zip_doc.duration = min((zip_doc.end_time - zip_doc.start_time).seconds, 32760)
zip_doc.save()
except Exception as e:
self.online_log.error('{0} [zip_2_pdfs] [process error (db save)] [task={1}] [error={2}]'.format(
self.log_base, task_str, traceback.format_exc()))
def pdf_2_img_2_queue(self, img_queue, todo_count_dict, lock, error_list, res_dict, finish_queue, zip_task_queue):
while self.switch:
try:
task_str = zip_task_queue.get(block=False)
is_priority = False
except Exception as e:
task_str, is_priority = rh.dequeue()
if task_str is None:
self.online_log.info('{0} [get_doc_info] [queue empty]'.format(self.log_base))
time.sleep(self.sleep_time_doc_get)
continue
self.online_log.info('{0} [get_doc_info] [task={1}] [is_priority={2}]'.format(
self.log_base, task_str, is_priority))
try:
# 1. 从队列获取文件信息
doc, business_type, task_str, classify_1_str = self.get_doc_info()
doc, business_type, task_str, classify_1_str = self.get_doc_info(task_str, is_priority)
# 队列为空时的处理
if doc is None:
time.sleep(self.sleep_time_doc_get)
......@@ -1119,19 +1286,29 @@ class Command(BaseCommand, LoggerMixin):
if classify_1_str == '0':
try:
# 2. 从EDMS获取PDF文件
max_count_obj = Configs.objects.filter(id=2).first()
try:
max_img_count = int(max_count_obj.value)
except Exception as e:
# max_count_obj = Configs.objects.filter(id=2).first()
# try:
# max_img_count = int(max_count_obj.value)
# except Exception as e:
max_img_count = 500
for times in range(consts.RETRY_TIMES):
try:
if not doc.application_id.startswith(consts.FIXED_APPLICATION_ID_PREFIX):
if doc.application_id.startswith(consts.FIXED_APPLICATION_ID_PREFIX):
self.online_log.info('{0} [mo ni xia dan] [task={1}] [times={2}] '
'[pdf_path={3}]'.format(self.log_base, task_str,
times, pdf_path))
elif os.path.exists(pdf_path):
self.online_log.info('{0} [pdf from zip file] [task={1}] [times={2}] '
'[pdf_path={3}]'.format(self.log_base, task_str,
times, pdf_path))
else:
# self.edms.download(pdf_path, doc.metadata_version_id)
self.edms.download(pdf_path, doc.metadata_version_id, doc.document_scheme, business_type)
self.online_log.info('{0} [edms download success] [task={1}] [times={2}] '
'[pdf_path={3}]'.format(self.log_base, task_str, times, pdf_path))
self.edms.download(pdf_path, doc.metadata_version_id, doc.document_scheme,
business_type)
self.online_log.info('{0} [ecm download success] [task={1}] [times={2}] '
'[pdf_path={3}]'.format(self.log_base, task_str,
times, pdf_path))
# 3.PDF文件提取图片
self.online_log.info('{0} [pdf to img start] [task={1}] [times={2}]'.format(
......@@ -2098,9 +2275,16 @@ class Command(BaseCommand, LoggerMixin):
res_dict = manager.dict()
img_queue = Queue(self.img_queue_size)
finish_queue = Queue()
zip_task_queue = Queue()
process_list = []
pdf_process = Process(target=self.pdf_2_img_2_queue, args=(img_queue, todo_count_dict, lock, error_list, res_dict, finish_queue))
zip_process = Process(target=self.zip_2_pdfs,
args=(zip_task_queue, error_list))
process_list.append(zip_process)
pdf_process = Process(target=self.pdf_2_img_2_queue,
args=(img_queue, todo_count_dict, lock, error_list, res_dict,
finish_queue, zip_task_queue))
process_list.append(pdf_process)
for url in self.ocr_1_urls.values():
......
......@@ -570,12 +570,11 @@ class UploadDocView(GenericView, DocHandler):
data_source = self.fix_data_source(data_source)
document_scheme = self.fix_scheme(document_scheme)
if document_name.endswith('.zip'):
self.running_log.info('[doc upload success] [zip file skip] [args={0}]'.format(args))
return response.ok()
# if document_name.endswith('.zip'):
# self.running_log.info('[doc upload success] [zip file skip] [args={0}]'.format(args))
# return response.ok()
if data_source == consts.DATA_SOURCE_LIST[1]:
if isinstance(document_name, str):
if document_name.endswith('-证书.pdf') or document_name.endswith('-证书'):
self.running_log.info('[doc upload success] [eapp license skip] [args={0}]'.format(args))
return response.ok()
......@@ -594,17 +593,24 @@ class UploadDocView(GenericView, DocHandler):
data_source=data_source,
upload_finish_time=document.get('uploadFinishTime'),
)
# 3. 选择队列进入
is_priority = PriorityApplication.objects.filter(application_id=application_id, on_off=True).exists()
is_zip = False
classify_1 = 0
# 电子合同
if data_source == consts.DATA_SOURCE_LIST[-1] and document_scheme == consts.DOC_SCHEME_LIST[1]:
for keyword, classify_1_tmp in consts.ECONTRACT_KEYWORDS_MAP.get(prefix):
if keyword in document_name:
classify_1 = classify_1_tmp
break
elif document_name.endswith('.zip') or document_name.endswith('.rar') or document_name.endswith('.ZIP') \
or document_name.endswith('.RAR'):
is_zip = True
task = consts.SPLIT_STR.join([prefix, str(doc.id), str(classify_1)])
enqueue_res = rh.enqueue([task], is_priority)
enqueue_res = rh.enqueue([task], is_priority, is_zip)
self.running_log.info('[doc upload success] [args={0}] [business_type={1}] [doc_id={2}] '
'[is_priority={3}] [enqueue_res={4}]'.format(args, prefix, doc.id,
is_priority, enqueue_res))
......@@ -669,7 +675,7 @@ class PriorityDocView(GenericView, DocHandler):
self.running_log.info(
'[priority doc success] [args={0}]'.format(args))
else:
enqueue_res = rh.enqueue(tasks_list, is_priority=True)
enqueue_res = rh.enqueue(tasks_list, is_priority=True) # TODO 可能把压缩文件放入优先队列
self.running_log.info('[priority doc success] [args={0}] [tasks_list={1}] [enqueue_res={2}]'.format(
args, tasks_list, enqueue_res))
return response.ok()
......
......@@ -35,16 +35,27 @@ class RedisHandler:
self.prefix = 'bwm_ocr'
self.common_queue_key = '{0}:common_queue'.format(self.prefix)
self.priority_queue_key = '{0}:priority_queue'.format(self.prefix)
self.zip_queue_key = '{0}:zip_queue'.format(self.prefix)
self.session_id_key = '{0}:session_id'.format(self.prefix)
self.cms_token_key = '{0}:cms_token'.format(self.prefix)
self.ecm_token_key = '{0}:ecm_token'.format(self.prefix)
self.login_limit_key = '{0}:login_limit'.format(self.prefix)
def enqueue(self, tasks, is_priority=False):
def enqueue(self, tasks, is_priority=False, is_zip=False):
# 1
key = self.priority_queue_key if is_priority else self.common_queue_key
if is_zip:
key = self.zip_queue_key
elif is_priority:
key = self.priority_queue_key
else:
key = self.common_queue_key
return self.redis.lpush(key, tasks)
def dequeue_zip(self):
# task or None
task = self.redis.rpop(self.zip_queue_key)
return task
def dequeue(self):
# task or None
task = self.redis.rpop(self.priority_queue_key)
......
import os
import re
import zipfile
import rarfile
from zipfile import ZipFile
......@@ -18,3 +22,77 @@ def write_zip_file(dir_name, zipfile_path):
src_file_path = os.path.join(root, single_file)
file_target_path = os.path.join(root_target_path, single_file)
z.write(src_file_path, file_target_path)
def get_pwd_list_from_str(doc_name):
try:
pwd_list = re.findall(r'\d{6}', doc_name)
return pwd_list
except Exception as e:
return []
def extract_zip_or_rar(file_path, extract_path, pwd_list=[]):
if file_path.endswith('.zip') or file_path.endswith('.ZIP'):
if len(pwd_list) > 0:
for password in pwd_list:
try:
with zipfile.ZipFile(file_path) as zf:
zf.extractall(extract_path, pwd=bytes(password, 'utf-8'))
except Exception as e:
continue
else:
return True
else:
return False
else:
try:
with zipfile.ZipFile(file_path) as zf:
zf.extractall(extract_path)
except Exception as e:
return False
else:
return True
elif file_path.endswith('.rar') or file_path.endswith('.RAR'):
if len(pwd_list) > 0:
for password in pwd_list:
try:
with rarfile.RarFile(file_path) as rf:
rf.extractall(extract_path, pwd=password)
except Exception as e:
continue
else:
return True
else:
return False
else:
try:
with rarfile.RarFile(file_path) as rf:
rf.extractall(extract_path)
except Exception as e:
return False
else:
return True
else:
return False
def get_file_paths(input_path, suffix_list):
"""
Args:
input_path: str 目标目录
suffix_list: list 搜索的文件的后缀列表
Returns: list 搜索到的相关文件绝对路径列表
"""
for parent, _, filenames in os.walk(input_path):
for filename in filenames:
for suffix in suffix_list:
if filename.endswith(suffix):
file_path = os.path.join(parent, filename)
break
else:
continue
yield file_path
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!