Skip to content
Toggle navigation
Toggle navigation
This project
Loading...
Sign in
周伟奇
/
bmw-ocr
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Graphs
Network
Create a new issue
Commits
Issue Boards
Files
Commits
Network
Compare
Branches
Tags
6010c32f
authored
2022-09-01 16:08:01 +0800
by
周伟奇
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
add zip & rar file
1 parent
f80b8302
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
311 additions
and
32 deletions
src/apps/doc/management/commands/ocr_process.py
src/apps/doc/views.py
src/common/redis_cache/handler.py
src/common/tools/file_tools.py
src/apps/doc/management/commands/ocr_process.py
View file @
6010c32
...
...
@@ -16,7 +16,7 @@ from multiprocessing import Process, Queue, Manager, Lock
from
settings
import
conf
from
common.mixins
import
LoggerMixin
from
common.tools.file_tools
import
write_zip_file
from
common.tools.file_tools
import
get_pwd_list_from_str
,
extract_zip_or_rar
,
get_file_paths
from
common.tools.pdf_to_img
import
PDFHandler
from
common.electronic_afc_contract.afc_contract_ocr
import
predict
as
afc_predict
from
common.electronic_hil_contract.hil_contract_ocr
import
predict
as
hil_predict
...
...
@@ -89,14 +89,39 @@ class Command(BaseCommand, LoggerMixin):
# doc = doc_class.objects.filter(id=doc_id).first()
# return doc, business_type
def
get_doc_info
(
self
):
task_str
,
is_priority
=
rh
.
dequeue
()
if
task_str
is
None
:
self
.
online_log
.
info
(
'{0} [get_doc_info] [queue empty]'
.
format
(
self
.
log_base
))
return
None
,
None
,
None
,
None
def
get_zip_doc_info
(
self
,
task_str
):
try
:
info_tuple
=
task_str
.
split
(
consts
.
SPLIT_STR
)
if
len
(
info_tuple
)
==
2
:
business_type
,
doc_id_str
=
info_tuple
else
:
business_type
,
doc_id_str
,
classify_1_str
=
info_tuple
doc_id
=
int
(
doc_id_str
)
doc_class
=
HILDoc
if
business_type
==
consts
.
HIL_PREFIX
else
AFCDoc
zip_doc
=
doc_class
.
objects
.
filter
(
id
=
doc_id
)
.
first
()
if
zip_doc
is
None
:
self
.
online_log
.
warn
(
'{0} [zip_2_pdfs] [doc not exist] [task_str={1}]'
.
format
(
self
.
log_base
,
task_str
))
return
None
,
business_type
elif
zip_doc
.
status
!=
DocStatus
.
INIT
.
value
:
self
.
online_log
.
warn
(
'{0} [zip_2_pdfs] [doc status error] [task_str={1}] [doc_status={2}]'
.
format
(
self
.
log_base
,
task_str
,
zip_doc
.
status
))
return
None
,
business_type
zip_doc
.
status
=
DocStatus
.
PROCESSING
.
value
zip_doc
.
start_time
=
timezone
.
now
()
zip_doc
.
save
()
except
Exception
as
e
:
self
.
online_log
.
error
(
'{0} [process error (zip_2_pdfs)] [error={1}]'
.
format
(
self
.
log_base
,
traceback
.
format_exc
()))
return
None
,
None
else
:
self
.
online_log
.
info
(
'{0} [zip_2_pdfs] [db save end] [task_str={1}]'
.
format
(
self
.
log_base
,
task_str
))
return
zip_doc
,
business_type
self
.
online_log
.
info
(
'{0} [get_doc_info] [task={1}] [is_priority={2}]'
.
format
(
self
.
log_base
,
task_str
,
is_priority
))
def
get_doc_info
(
self
,
task_str
,
is_priority
=
False
):
try
:
# doc, business_type = self.get_doc_object(task_str)
info_tuple
=
task_str
.
split
(
consts
.
SPLIT_STR
)
...
...
@@ -1094,11 +1119,153 @@ class Command(BaseCommand, LoggerMixin):
# summary['confidence'] = max(summary['confidence'])
return
merged_bs_summary
def
pdf_2_img_2_queue
(
self
,
img_queue
,
todo_count_dict
,
lock
,
error_list
,
res_dict
,
finish_queue
):
def
zip_2_pdfs
(
self
,
zip_task_queue
,
error_list
):
while
len
(
error_list
)
==
0
:
# 1. 从redis队列中读取任务: AFC_111_0
task_str
=
rh
.
dequeue_zip
()
if
task_str
is
None
:
self
.
online_log
.
info
(
'{0} [zip_2_pdfs] [zip queue empty]'
.
format
(
self
.
log_base
))
time
.
sleep
(
self
.
sleep_time_doc_get
)
continue
self
.
online_log
.
info
(
'{0} [zip_2_pdfs] [task={1}]'
.
format
(
self
.
log_base
,
task_str
))
# 2. 修改doc状态: 识别中
zip_doc
,
business_type
=
self
.
get_zip_doc_info
(
task_str
)
if
zip_doc
is
None
:
time
.
sleep
(
self
.
sleep_time_doc_get
)
continue
# 3. 从ECM下载压缩包
doc_data_path
=
os
.
path
.
join
(
self
.
data_dir
,
business_type
,
consts
.
TMP_DIR_NAME
,
str
(
zip_doc
.
id
))
os
.
makedirs
(
doc_data_path
,
exist_ok
=
True
)
zip_path
=
os
.
path
.
join
(
doc_data_path
,
zip_doc
.
document_name
)
for
times
in
range
(
consts
.
RETRY_TIMES
):
try
:
self
.
edms
.
download
(
zip_path
,
zip_doc
.
metadata_version_id
,
zip_doc
.
document_scheme
,
business_type
)
except
Exception
as
e
:
self
.
online_log
.
warn
(
'{0} [zip_2_pdfs] [ecm download failed] [task={1}] [times={2}] '
'[error={3}]'
.
format
(
self
.
log_base
,
task_str
,
times
,
traceback
.
format_exc
()))
else
:
self
.
online_log
.
info
(
'{0} [zip_2_pdfs] [ecm download success] [task={1}] [times={2}] '
'[zip_path={3}]'
.
format
(
self
.
log_base
,
task_str
,
times
,
zip_path
))
break
else
:
try
:
zip_doc
.
status
=
DocStatus
.
PROCESS_FAILED
.
value
zip_doc
.
save
()
except
Exception
as
e
:
self
.
online_log
.
error
(
'{0} [zip_2_pdfs] [process error (db save)] [task={1}] [error={2}]'
.
format
(
self
.
log_base
,
task_str
,
traceback
.
format_exc
()))
time
.
sleep
(
self
.
sleep_time_doc_get
)
continue
# 4. 解压
extract_path
=
os
.
path
.
join
(
doc_data_path
,
'extract_content'
)
os
.
makedirs
(
extract_path
,
exist_ok
=
True
)
try
:
pwd_list
=
get_pwd_list_from_str
(
zip_doc
.
document_name
)
is_success
=
extract_zip_or_rar
(
zip_path
,
extract_path
,
pwd_list
)
except
Exception
as
e
:
is_success
=
False
if
not
is_success
:
self
.
online_log
.
warn
(
'{0} [zip_2_pdfs] [extract failed] [task={1}] [error={2}]'
.
format
(
self
.
log_base
,
task_str
,
traceback
.
format_exc
()))
try
:
zip_doc
.
status
=
DocStatus
.
PROCESS_FAILED
.
value
zip_doc
.
save
()
except
Exception
as
e
:
self
.
online_log
.
error
(
'{0} [zip_2_pdfs] [process error (db save)] [task={1}] [error={2}]'
.
format
(
self
.
log_base
,
task_str
,
traceback
.
format_exc
()))
time
.
sleep
(
self
.
sleep_time_doc_get
)
continue
self
.
online_log
.
info
(
'{0} [zip_2_pdfs] [extract success] [task={1}] [extract_path={2}]'
.
format
(
self
.
log_base
,
task_str
,
extract_path
))
# 5. 找出PDF文件重命名并移动到目标文件夹中。新建doc记录,新建task_str进入队列
pdf_paths
=
get_file_paths
(
extract_path
,
[
'.pdf'
,
'.PDF'
])
count
=
0
pdf_task_str_list
=
[]
for
pdf_path
in
pdf_paths
:
if
count
>
50
:
self
.
online_log
.
info
(
'{0} [zip_2_pdfs] [pdf count > 50, skip] [task={1}]'
.
format
(
self
.
log_base
,
task_str
))
break
count
+=
1
try
:
doc_class
=
HILDoc
if
business_type
==
consts
.
HIL_PREFIX
else
AFCDoc
pdf_doc
=
doc_class
.
objects
.
create
(
metadata_version_id
=
'from: {0}'
.
format
(
zip_doc
.
id
),
application_id
=
zip_doc
.
application_id
,
# main_applicant=applicant_data.get('mainApplicantName'),
# co_applicant=applicant_data.get('coApplicantName'),
# guarantor_1=applicant_data.get('guarantor1Name'),
# guarantor_2=applicant_data.get('guarantor2Name'),
document_name
=
os
.
path
.
basename
(
pdf_path
),
document_scheme
=
zip_doc
.
document_scheme
,
data_source
=
zip_doc
.
data_source
,
upload_finish_time
=
zip_doc
.
upload_finish_time
,
)
pdf_doc_data_path
=
os
.
path
.
join
(
self
.
data_dir
,
business_type
,
consts
.
TMP_DIR_NAME
,
str
(
pdf_doc
.
id
))
os
.
makedirs
(
pdf_doc_data_path
,
exist_ok
=
True
)
target_pdf_path
=
os
.
path
.
join
(
pdf_doc_data_path
,
'{0}.pdf'
.
format
(
pdf_doc
.
id
))
shutil
.
move
(
pdf_path
,
target_pdf_path
)
pdf_task_str
=
consts
.
SPLIT_STR
.
join
([
business_type
,
str
(
pdf_doc
.
id
),
'0'
])
pdf_task_str_list
.
append
(
pdf_task_str
)
except
Exception
as
e
:
self
.
online_log
.
warn
(
'{0} [zip_2_pdfs] [recreate pdf task failed] [task={1}] [pdf_path={2}]'
' [error={3}]'
.
format
(
self
.
log_base
,
task_str
,
pdf_path
,
traceback
.
format_exc
()))
else
:
self
.
online_log
.
info
(
'{0} [zip_2_pdfs] [recreate pdf task success] [task={1}] '
'[pdf_task={2}]'
.
format
(
self
.
log_base
,
task_str
,
pdf_path
,
traceback
.
format_exc
()))
if
len
(
pdf_task_str_list
)
>
0
:
for
pdf_task_str
in
pdf_task_str_list
:
try
:
zip_task_queue
.
put
(
pdf_task_str
)
except
Exception
as
e
:
self
.
online_log
.
warn
(
'{0} [zip_2_pdfs] [put pdf task failed] [task={1}] [pdf_task={2}]'
' [error={3}]'
.
format
(
self
.
log_base
,
task_str
,
pdf_task_str
,
traceback
.
format_exc
()))
else
:
self
.
online_log
.
info
(
'{0} [zip_2_pdfs] [zip task no pdf] [task={1}]'
.
format
(
self
.
log_base
,
task_str
))
# 6. 完成,修改doc状态:识别完成
try
:
zip_doc
.
status
=
DocStatus
.
COMPLETE
.
value
zip_doc
.
end_time
=
timezone
.
now
()
zip_doc
.
duration
=
min
((
zip_doc
.
end_time
-
zip_doc
.
start_time
)
.
seconds
,
32760
)
zip_doc
.
save
()
except
Exception
as
e
:
self
.
online_log
.
error
(
'{0} [zip_2_pdfs] [process error (db save)] [task={1}] [error={2}]'
.
format
(
self
.
log_base
,
task_str
,
traceback
.
format_exc
()))
def
pdf_2_img_2_queue
(
self
,
img_queue
,
todo_count_dict
,
lock
,
error_list
,
res_dict
,
finish_queue
,
zip_task_queue
):
while
self
.
switch
:
try
:
task_str
=
zip_task_queue
.
get
(
block
=
False
)
is_priority
=
False
except
Exception
as
e
:
task_str
,
is_priority
=
rh
.
dequeue
()
if
task_str
is
None
:
self
.
online_log
.
info
(
'{0} [get_doc_info] [queue empty]'
.
format
(
self
.
log_base
))
time
.
sleep
(
self
.
sleep_time_doc_get
)
continue
self
.
online_log
.
info
(
'{0} [get_doc_info] [task={1}] [is_priority={2}]'
.
format
(
self
.
log_base
,
task_str
,
is_priority
))
try
:
# 1. 从队列获取文件信息
doc
,
business_type
,
task_str
,
classify_1_str
=
self
.
get_doc_info
()
doc
,
business_type
,
task_str
,
classify_1_str
=
self
.
get_doc_info
(
task_str
,
is_priority
)
# 队列为空时的处理
if
doc
is
None
:
time
.
sleep
(
self
.
sleep_time_doc_get
)
...
...
@@ -1119,19 +1286,29 @@ class Command(BaseCommand, LoggerMixin):
if
classify_1_str
==
'0'
:
try
:
# 2. 从EDMS获取PDF文件
max_count_obj
=
Configs
.
objects
.
filter
(
id
=
2
)
.
first
()
try
:
max_img_count
=
int
(
max_count_obj
.
value
)
except
Exception
as
e
:
max_img_count
=
500
#
max_count_obj = Configs.objects.filter(id=2).first()
#
try:
#
max_img_count = int(max_count_obj.value)
#
except Exception as e:
max_img_count
=
500
for
times
in
range
(
consts
.
RETRY_TIMES
):
try
:
if
not
doc
.
application_id
.
startswith
(
consts
.
FIXED_APPLICATION_ID_PREFIX
):
if
doc
.
application_id
.
startswith
(
consts
.
FIXED_APPLICATION_ID_PREFIX
):
self
.
online_log
.
info
(
'{0} [mo ni xia dan] [task={1}] [times={2}] '
'[pdf_path={3}]'
.
format
(
self
.
log_base
,
task_str
,
times
,
pdf_path
))
elif
os
.
path
.
exists
(
pdf_path
):
self
.
online_log
.
info
(
'{0} [pdf from zip file] [task={1}] [times={2}] '
'[pdf_path={3}]'
.
format
(
self
.
log_base
,
task_str
,
times
,
pdf_path
))
else
:
# self.edms.download(pdf_path, doc.metadata_version_id)
self
.
edms
.
download
(
pdf_path
,
doc
.
metadata_version_id
,
doc
.
document_scheme
,
business_type
)
self
.
online_log
.
info
(
'{0} [edms download success] [task={1}] [times={2}] '
'[pdf_path={3}]'
.
format
(
self
.
log_base
,
task_str
,
times
,
pdf_path
))
self
.
edms
.
download
(
pdf_path
,
doc
.
metadata_version_id
,
doc
.
document_scheme
,
business_type
)
self
.
online_log
.
info
(
'{0} [ecm download success] [task={1}] [times={2}] '
'[pdf_path={3}]'
.
format
(
self
.
log_base
,
task_str
,
times
,
pdf_path
))
# 3.PDF文件提取图片
self
.
online_log
.
info
(
'{0} [pdf to img start] [task={1}] [times={2}]'
.
format
(
...
...
@@ -2094,9 +2271,16 @@ class Command(BaseCommand, LoggerMixin):
res_dict
=
manager
.
dict
()
img_queue
=
Queue
(
self
.
img_queue_size
)
finish_queue
=
Queue
()
zip_task_queue
=
Queue
()
process_list
=
[]
pdf_process
=
Process
(
target
=
self
.
pdf_2_img_2_queue
,
args
=
(
img_queue
,
todo_count_dict
,
lock
,
error_list
,
res_dict
,
finish_queue
))
zip_process
=
Process
(
target
=
self
.
zip_2_pdfs
,
args
=
(
zip_task_queue
,
error_list
))
process_list
.
append
(
zip_process
)
pdf_process
=
Process
(
target
=
self
.
pdf_2_img_2_queue
,
args
=
(
img_queue
,
todo_count_dict
,
lock
,
error_list
,
res_dict
,
finish_queue
,
zip_task_queue
))
process_list
.
append
(
pdf_process
)
for
url
in
self
.
ocr_1_urls
.
values
():
...
...
src/apps/doc/views.py
View file @
6010c32
...
...
@@ -566,15 +566,14 @@ class UploadDocView(GenericView, DocHandler):
data_source
=
self
.
fix_data_source
(
data_source
)
document_scheme
=
self
.
fix_scheme
(
document_scheme
)
if
document_name
.
endswith
(
'.zip'
):
self
.
running_log
.
info
(
'[doc upload success] [zip file skip] [args={0}]'
.
format
(
args
))
return
response
.
ok
()
#
if document_name.endswith('.zip'):
#
self.running_log.info('[doc upload success] [zip file skip] [args={0}]'.format(args))
#
return response.ok()
if
data_source
==
consts
.
DATA_SOURCE_LIST
[
1
]:
if
isinstance
(
document_name
,
str
):
if
document_name
.
endswith
(
'-证书.pdf'
)
or
document_name
.
endswith
(
'-证书'
):
self
.
running_log
.
info
(
'[doc upload success] [eapp license skip] [args={0}]'
.
format
(
args
))
return
response
.
ok
()
if
document_name
.
endswith
(
'-证书.pdf'
)
or
document_name
.
endswith
(
'-证书'
):
self
.
running_log
.
info
(
'[doc upload success] [eapp license skip] [args={0}]'
.
format
(
args
))
return
response
.
ok
()
# 2. 根据业务类型分库存储
doc_class
,
prefix
=
self
.
get_doc_class
(
business_type
)
...
...
@@ -590,17 +589,24 @@ class UploadDocView(GenericView, DocHandler):
data_source
=
data_source
,
upload_finish_time
=
document
.
get
(
'uploadFinishTime'
),
)
# 3. 选择队列进入
is_priority
=
PriorityApplication
.
objects
.
filter
(
application_id
=
application_id
,
on_off
=
True
)
.
exists
()
is_zip
=
False
classify_1
=
0
# 电子合同
if
data_source
==
consts
.
DATA_SOURCE_LIST
[
-
1
]
and
document_scheme
==
consts
.
DOC_SCHEME_LIST
[
1
]:
for
keyword
,
classify_1_tmp
in
consts
.
ECONTRACT_KEYWORDS_MAP
.
get
(
prefix
):
if
keyword
in
document_name
:
classify_1
=
classify_1_tmp
break
elif
document_name
.
endswith
(
'.zip'
)
or
document_name
.
endswith
(
'.rar'
)
or
document_name
.
endswith
(
'.ZIP'
)
\
or
document_name
.
endswith
(
'.RAR'
):
is_zip
=
True
task
=
consts
.
SPLIT_STR
.
join
([
prefix
,
str
(
doc
.
id
),
str
(
classify_1
)])
enqueue_res
=
rh
.
enqueue
([
task
],
is_priority
)
enqueue_res
=
rh
.
enqueue
([
task
],
is_priority
,
is_zip
)
self
.
running_log
.
info
(
'[doc upload success] [args={0}] [business_type={1}] [doc_id={2}] '
'[is_priority={3}] [enqueue_res={4}]'
.
format
(
args
,
prefix
,
doc
.
id
,
is_priority
,
enqueue_res
))
...
...
@@ -665,7 +671,7 @@ class PriorityDocView(GenericView, DocHandler):
self
.
running_log
.
info
(
'[priority doc success] [args={0}]'
.
format
(
args
))
else
:
enqueue_res
=
rh
.
enqueue
(
tasks_list
,
is_priority
=
True
)
enqueue_res
=
rh
.
enqueue
(
tasks_list
,
is_priority
=
True
)
# TODO 可能把压缩文件放入优先队列
self
.
running_log
.
info
(
'[priority doc success] [args={0}] [tasks_list={1}] [enqueue_res={2}]'
.
format
(
args
,
tasks_list
,
enqueue_res
))
return
response
.
ok
()
...
...
src/common/redis_cache/handler.py
View file @
6010c32
...
...
@@ -35,16 +35,27 @@ class RedisHandler:
self
.
prefix
=
'bwm_ocr'
self
.
common_queue_key
=
'{0}:common_queue'
.
format
(
self
.
prefix
)
self
.
priority_queue_key
=
'{0}:priority_queue'
.
format
(
self
.
prefix
)
self
.
zip_queue_key
=
'{0}:zip_queue'
.
format
(
self
.
prefix
)
self
.
session_id_key
=
'{0}:session_id'
.
format
(
self
.
prefix
)
self
.
cms_token_key
=
'{0}:cms_token'
.
format
(
self
.
prefix
)
self
.
ecm_token_key
=
'{0}:ecm_token'
.
format
(
self
.
prefix
)
self
.
login_limit_key
=
'{0}:login_limit'
.
format
(
self
.
prefix
)
def
enqueue
(
self
,
tasks
,
is_priority
=
False
):
def
enqueue
(
self
,
tasks
,
is_priority
=
False
,
is_zip
=
False
):
# 1
key
=
self
.
priority_queue_key
if
is_priority
else
self
.
common_queue_key
if
is_zip
:
key
=
self
.
zip_queue_key
elif
is_priority
:
key
=
self
.
priority_queue_key
else
:
key
=
self
.
common_queue_key
return
self
.
redis
.
lpush
(
key
,
tasks
)
def
dequeue_zip
(
self
):
# task or None
task
=
self
.
redis
.
rpop
(
self
.
zip_queue_key
)
return
task
def
dequeue
(
self
):
# task or None
task
=
self
.
redis
.
rpop
(
self
.
priority_queue_key
)
...
...
src/common/tools/file_tools.py
View file @
6010c32
import
os
import
re
import
zipfile
import
rarfile
from
zipfile
import
ZipFile
...
...
@@ -18,3 +22,77 @@ def write_zip_file(dir_name, zipfile_path):
src_file_path
=
os
.
path
.
join
(
root
,
single_file
)
file_target_path
=
os
.
path
.
join
(
root_target_path
,
single_file
)
z
.
write
(
src_file_path
,
file_target_path
)
def
get_pwd_list_from_str
(
doc_name
):
try
:
pwd_list
=
re
.
findall
(
r'\d{6}'
,
doc_name
)
return
pwd_list
except
Exception
as
e
:
return
[]
def
extract_zip_or_rar
(
file_path
,
extract_path
,
pwd_list
=
[]):
if
file_path
.
endswith
(
'.zip'
)
or
file_path
.
endswith
(
'.ZIP'
):
if
len
(
pwd_list
)
>
0
:
for
password
in
pwd_list
:
try
:
with
zipfile
.
ZipFile
(
file_path
)
as
zf
:
zf
.
extractall
(
extract_path
,
pwd
=
bytes
(
password
,
'utf-8'
))
except
Exception
as
e
:
continue
else
:
return
True
else
:
return
False
else
:
try
:
with
zipfile
.
ZipFile
(
file_path
)
as
zf
:
zf
.
extractall
(
extract_path
)
except
Exception
as
e
:
return
False
else
:
return
True
elif
file_path
.
endswith
(
'.rar'
)
or
file_path
.
endswith
(
'.RAR'
):
if
len
(
pwd_list
)
>
0
:
for
password
in
pwd_list
:
try
:
with
rarfile
.
RarFile
(
file_path
)
as
rf
:
rf
.
extractall
(
extract_path
,
pwd
=
password
)
except
Exception
as
e
:
continue
else
:
return
True
else
:
return
False
else
:
try
:
with
rarfile
.
RarFile
(
file_path
)
as
rf
:
rf
.
extractall
(
extract_path
)
except
Exception
as
e
:
return
False
else
:
return
True
else
:
return
False
def
get_file_paths
(
input_path
,
suffix_list
):
"""
Args:
input_path: str 目标目录
suffix_list: list 搜索的文件的后缀列表
Returns: list 搜索到的相关文件绝对路径列表
"""
for
parent
,
_
,
filenames
in
os
.
walk
(
input_path
):
for
filename
in
filenames
:
for
suffix
in
suffix_list
:
if
filename
.
endswith
(
suffix
):
file_path
=
os
.
path
.
join
(
parent
,
filename
)
break
else
:
continue
yield
file_path
...
...
Write
Preview
Styling with
Markdown
is supported
Attach a file
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to post a comment