d5c6be4b by 周伟奇

doc process part 1

1 parent a4a63da9
......@@ -28,6 +28,7 @@ sftp-config.json
*.sqlite3
conf/*
data/*
# 脚本
src/*.sh
\ No newline at end of file
......
......@@ -4,12 +4,17 @@ Django==2.1
djangorestframework==3.9.0
djangorestframework-jwt==1.11.0
marshmallow==3.6.1
pdfminer3k==1.3.4
Pillow==7.1.2
ply==3.11
PyJWT==1.7.1
PyMuPDF==1.17.0
PyMySQL==0.9.3
pytz==2020.1
# simple-config @ http://gitlab.situdata.com/zhouweiqi/simple_config/repository/archive.tar.gz?ref=master
# situlogger @ http://gitlab.situdata.com/zhouweiqi/situlogger/repository/archive.tar.gz?ref=master
PyYAML==5.3.1
redis==3.4.1
simple-config @ http://gitlab.situdata.com/zhouweiqi/simple_config/repository/archive.tar.gz?ref=master
situlogger @ http://gitlab.situdata.com/zhouweiqi/situlogger/repository/archive.tar.gz?ref=master
six==1.14.0
SQLAlchemy==0.9.10
webargs==6.1.0
......
import time
import os
import signal
import fitz
from PIL import Image
from io import BytesIO
from django.core.management import BaseCommand
from common.mixins import LoggerMixin
from common.redis_cache import redis_handler as rh
from apps.doc.models import UploadDocRecords
from settings import conf
class Command(BaseCommand):
class Command(BaseCommand, LoggerMixin):
def __init__(self):
super().__init__()
self.log_base = '[doc process]'
# 处理文件开关
self.switch = True
# 数据目录
self.data_dir = conf.DATA_DIR
# pdf页面转图片
self.zoom_x = 2.0
self.zoom_y = 2.0
self.trans = fitz.Matrix(self.zoom_x, self.zoom_y).preRotate(0) # zoom factor 2 in each dimension
# 优雅退出信号:15
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, sig, frame):
self.switch = False # 停止处理文件
def get_task_info(self):
pass
def get_task_info(self): # TODO 优先队列 & status modify
task_id = rh.dequeue()
if task_id is None:
self.cronjob_log.info('{0} [get_task_info] [queue empty]'.format(self.log_base))
return
task_info = UploadDocRecords.objects.filter(id=task_id).values(
'id', 'metadata_version_id', 'document_name').first()
if task_info is None:
self.cronjob_log.warn('{0} [get_task_info] [task not found] [task_id={1}]'.format(self.log_base, task_id))
self.cronjob_log.info('{0} [get_task_info success] [task_info={1}]'.format(self.log_base, task_info))
return task_info
def pdf_download(self, task_info):
pass
if task_info is None:
return
# TODO EDMS下载pdf
pdf_path = '/Users/clay/Desktop/biz/biz_logic/data/2/横版-表格-工商银行CH-B008802400.pdf'
self.cronjob_log.info('{0} [pdf download success] [task_info={1}] [pdf_path={2}]'.format(
self.log_base, task_info, pdf_path))
return pdf_path
@staticmethod
def getimage(pix):
if pix.colorspace.n != 4:
return pix
tpix = fitz.Pixmap(fitz.csRGB, pix)
return tpix
def recoverpix(self, doc, item):
x = item[0] # xref of PDF image
s = item[1] # xref of its /SMask
is_rgb = True if item[5] == 'DeviceRGB' else False
# RGB
if is_rgb:
if s == 0:
return doc.extractImage(x)
# we need to reconstruct the alpha channel with the smask
pix1 = fitz.Pixmap(doc, x)
pix2 = fitz.Pixmap(doc, s) # create pixmap of the /SMask entry
# sanity check
if not (pix1.irect == pix2.irect and pix1.alpha == pix2.alpha == 0 and pix2.n == 1):
pix2 = None
return self.getimage(pix1)
pix = fitz.Pixmap(pix1) # copy of pix1, alpha channel added
pix.setAlpha(pix2.samples) # treat pix2.samples as alpha value
pix1 = pix2 = None # free temp pixmaps
return self.getimage(pix)
# GRAY/CMYK
pix1 = fitz.Pixmap(doc, x)
pix = fitz.Pixmap(pix1) # copy of pix1, alpha channel added
if s != 0:
pix2 = fitz.Pixmap(doc, s) # create pixmap of the /SMask entry
# sanity check
if not (pix1.irect == pix2.irect and pix1.alpha == pix2.alpha == 0 and pix2.n == 1):
pix2 = None
return self.getimage(pix1)
pix.setAlpha(pix2.samples) # treat pix2.samples as alpha value
pix1 = pix2 = None # free temp pixmaps
pix = fitz.Pixmap(fitz.csRGB, pix) # GRAY/CMYK to RGB
return self.getimage(pix)
@staticmethod
def get_img_data(pix):
if type(pix) is dict: # we got a raw image
ext = pix["ext"]
img_data = pix["image"]
else: # we got a pixmap
ext = 'png'
img_data = pix.getPNGData()
return ext, img_data
@staticmethod
def split_il(il):
img_il_list = []
start = 0
length = len(il)
for i in range(length):
if i == start:
if i == length - 1:
img_il_list.append(il[start: length])
continue
elif i == length - 1:
img_il_list.append(il[start: length])
continue
if il[i][2] != il[i - 1][2]:
img_il_list.append(il[start: i])
start = i
elif il[i][3] != il[i - 1][3]:
img_il_list.append(il[start: i + 1])
start = i + 1
return img_il_list
def handle(self, *args, **kwargs):
while self.switch:
......@@ -28,8 +138,65 @@ class Command(BaseCommand):
task_info = self.get_task_info()
# 从EDMS获取PDF文件
pdf_path = self.pdf_download(task_info)
# 队列为空时的处理
if pdf_path is None:
time.sleep(10)
continue
# PDF文件提取图片
img_save_path = os.path.join(os.path.dirname(pdf_path), 'img')
os.makedirs(img_save_path, exist_ok=True)
with fitz.Document(pdf_path) as pdf:
self.cronjob_log.info('{0} [pdf_path={1}] [pdf_metadata={2}]'.format(
self.log_base, pdf_path, pdf.metadata))
# xref_list = [] # TODO 图片去重
for pno in range(pdf.pageCount):
il = pdf.getPageImageList(pno)
il.sort(key=lambda x: x[0])
img_il_list = self.split_il(il)
del il
if len(img_il_list) > 3: # 单页无规律小图过多时,使用页面转图片
page = pdf.loadPage(pno)
pm = page.getPixmap(matrix=self.trans, alpha=False)
save_path = os.path.join(img_save_path, 'page_{0}_img_0.png'.format(page.number))
# pm.writePNG(save_path)
pm.writeImage(save_path)
else: # 提取图片
for img_count, img_il in enumerate(img_il_list):
if len(img_il) == 1: # 当只有一张图片时, 简化处理
pix = self.recoverpix(pdf, img_il[0])
ext, img_data = self.get_img_data(pix)
save_path = os.path.join(img_save_path, 'page_{0}_img_{1}.{2}'.format(
pno, img_count, ext))
with open(save_path, "wb") as f:
f.write(img_data)
else: # 多张图片,竖向拼接
height_sum = 0
im_list = []
width = img_il[0][2]
for img in img_il:
# xref = img[0]
# if xref in xref_list:
# continue
height = img[3]
pix = self.recoverpix(pdf, img)
ext, img_data = self.get_img_data(pix)
# xref_list.append(xref)
im = Image.open(BytesIO(img_data))
im_list.append((height, im, ext))
height_sum += height
save_path = os.path.join(img_save_path, 'page_{0}_img_{1}.{2}'.format(
pno, img_count, im_list[0][2]))
res = Image.new(im_list[0][1].mode, (width, height_sum))
h_now = 0
for h, m, _ in im_list:
res.paste(m, box=(0, h_now))
h_now += h
res.save(save_path)
# 图片调用算法判断是否为银行流水
# 图片调用算法OCR为excel文件
# 整合excel文件上传至EDMS
pass
......
......@@ -4,7 +4,7 @@ from django.db import models
# 上传文件记录表/任务表
class UploadDocRecords(models.Model):
class UploadDocRecords(models.Model): # TODO add status
id = models.AutoField(primary_key=True, verbose_name="id")
metadata_version_id = models.CharField(max_length=64, verbose_name="元数据版本id")
application_id = models.CharField(max_length=64, verbose_name="申请id")
......
......@@ -5,6 +5,7 @@ from webargs.djangoparser import use_args, parser
from common.mixins import GenericView
from common import response
from .models import UploadDocRecords
from common.redis_cache import redis_handler as rh
# Create your views here.
......@@ -51,7 +52,7 @@ class DocView(GenericView):
applicant_data = args.get('applicantData')
document = args.get('document')
try:
UploadDocRecords.objects.create(
task = UploadDocRecords.objects.create(
metadata_version_id=document.get('metadataVersionId'),
application_id=application_data.get('applicationId'),
main_applicant=applicant_data.get('mainApplicantName'),
......@@ -68,6 +69,8 @@ class DocView(GenericView):
self.running_log.info('[doc upload fail] [args={0}] [err={1}]'.format(args, e))
self.invalid_params(msg='metadataVersionId repeat')
else:
# TODO 查询加入优先队列 or 普通队列
rh.enqueue(task.id)
self.running_log.info('[doc upload success] [args={0}]'.format(args))
return response.ok()
......
......@@ -4,5 +4,5 @@ from settings import conf
redis_url = conf.REDIS_URL
# redis = Redis(redis_url)
# redis_handler = RedisHandler(redis)
redis = Redis(redis_url)
redis_handler = RedisHandler(redis)
......
......@@ -106,7 +106,7 @@ class Redis:
def zremrangebyrank(self, name, start, end):
with self.client.pipeline() as pipe:
pipe.zrange(name, start, end)
pipe.zrange(name, start, end) # TODO 可能出现不一致性
pipe.zremrangebyrank(name, start, end)
item = pipe.execute()
return item
......
......@@ -32,71 +32,12 @@ class RedisHandler:
self.redis = redis
self.time_expires = datetime.timedelta(hours=24)
self.time_format = '%a %b %d %H:%M:%S %Y'
self.prefix = 'automl'
self.training_time_key = '{0}:training_time'.format(self.prefix)
self.prefix = 'bwm_ocr'
self.queue_key = '{0}:queue'.format(self.prefix)
self.prefix_training = '{0}:training'.format(self.prefix)
self.prefix_models = '{0}:models'.format(self.prefix)
self.prefix_img_info = '{0}:img_info'.format(self.prefix)
def get_training_model_key(self, user_id, model_type):
return '{0}:{1}:{2}'.format(self.prefix_training, user_id, model_type)
def get_models_list_key(self, user_id, model_type):
return '{0}:{1}:{2}'.format(self.prefix_models, user_id, model_type)
def set_training_model(self, user_id, model_type, model_id, status):
# True
key = self.get_training_model_key(user_id, model_type)
mapping = {
'model_id': model_id,
'model_status': status
}
return self.redis.hmset(key, mapping)
def get_training_model(self, user_id, model_type):
# {}
# {'id': '1', 'status': '1'}
key = self.get_training_model_key(user_id, model_type)
res = self.redis.hgetall(key)
dict_str_to_int(res)
return res
def set_models_list(self, user_id, model_type, models_list):
key = self.get_models_list_key(user_id, model_type)
value = json.dumps(models_list, cls=DateTimeJSONEncoder)
return self.redis.set(key, value, expires=self.time_expires)
def get_models_list(self, user_id, model_type):
# list or None
key = self.get_models_list_key(user_id, model_type)
res_str = self.redis.get(key)
res = None if res_str is None else json.loads(res_str)
return res
def del_models_list(self, user_id, model_type):
# None
key = self.get_models_list_key(user_id, model_type)
return self.redis.delete(key)
def set_training_finish_time(self, finish_time):
# True
finish_time_str = datetime.datetime.strftime(finish_time, self.time_format)
return self.redis.set(self.training_time_key, finish_time_str)
def get_training_finish_time(self):
# datetime.datetime or None
res = self.redis.get(self.training_time_key)
finish_time = None if res is None else datetime.datetime.strptime(res, self.time_format)
return finish_time
def del_training_finish_time(self):
# None
return self.redis.delete(self.training_time_key)
def enqueue(self, model_id):
def enqueue(self, task_id):
# 1
mapping = {model_id: time.time()}
mapping = {task_id: time.time()}
return self.redis.zadd(self.queue_key, mapping)
def dequeue(self):
......@@ -106,110 +47,3 @@ class RedisHandler:
pop_item = int(pop_item_list[0]) if pop_item_list else None
return pop_item
def get_queue_end(self):
# model_id:int or None
res_list = self.redis.zrange(self.queue_key, -1, -1)
end_id = int(res_list[0]) if res_list else None
return end_id
def get_queue_rank(self, model_id):
# rank:int or None
rank = self.redis.zrank(self.queue_key, model_id)
if rank is None:
return 0
return rank + 1
def set_img_info(self, user_id, model_id, count_sum, count_marked):
# True
key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
mapping = {
'count_sum': count_sum,
'count_marked': count_marked
}
return self.redis.hmset(key, mapping)
def get_img_info(self, user_id, model_id):
# {}
# {'count_sum': '70', 'count_marked': '0'}
key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
res = self.redis.hgetall(key)
dict_str_to_int(res)
return res
def update_img_info(self, user_id, model_id, del_img=False):
# res_count:int
key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
if del_img:
return self.redis.hincrby(key, 'count_sum', amount=-1)
else:
return self.redis.hincrby(key, 'count_marked')
def del_img_info(self, user_id, model_id):
# None
key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
return self.redis.delete(key)
def pipe_trained(self, user_id, model_type, model_id, status, success=True):
# redis.set_training_model(user_id, model_type, model_id, model_status)
# redis.del_training_finish_time()
# redis.del_models_list(user_id, model_type)
# redis.set_training_model(user_id, model_type, model_id, model_status)
# redis.del_training_finish_time()
training_model_key = self.get_training_model_key(user_id, model_type)
models_list_key = self.get_models_list_key(user_id, model_type)
mapping = {
'model_id': model_id,
'model_status': status
}
with self.redis.client.pipeline() as pipe:
pipe.hmset(training_model_key, mapping)
pipe.delete(self.training_time_key)
if success is True:
pipe.delete(models_list_key)
item = pipe.execute()
return item
def pipe_training(self, user_id, model_type, model_id, status, finish_time):
# redis.dequeue()
# redis.set_training_model(user_id, model_type, model_id, model_status)
# redis.set_training_finish_time(proleptic_finish_time)
training_model_key = self.get_training_model_key(user_id, model_type)
mapping = {
'model_id': model_id,
'model_status': status
}
finish_time_str = datetime.datetime.strftime(finish_time, self.time_format)
with self.redis.client.pipeline() as pipe:
pipe.zremrangebyrank(self.queue_key, 0, 0)
pipe.hmset(training_model_key, mapping)
pipe.set(self.training_time_key, finish_time_str)
item = pipe.execute()
return item
def pipe_enqueue(self, model_id, user_id, model_type, status, section=True):
# redis.enqueue(model_id)
# redis.set_training_model(user_id, model_type,
# model_id, ModelStatus.DATA_PRETREATMENT_DONE.value)
# if model_type == ModelType.SECTION.value:
# redis.del_img_info(user_id, model_id)
queue_mapping = {model_id: time.time()}
training_model_key = self.get_training_model_key(user_id, model_type)
mapping = {
'model_id': model_id,
'model_status': status
}
img_info_key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
with self.redis.client.pipeline() as pipe:
pipe.zadd(self.queue_key, queue_mapping)
pipe.hmset(training_model_key, mapping)
if section is True:
pipe.delete(img_info_key)
item = pipe.execute()
return item
......
import fitz
import os
from PIL import Image, ImageCms
from PIL import Image
from io import BytesIO
......@@ -126,7 +126,8 @@ class PdfHandler:
fout.close()
xreflist.append(xref)
def split_il(self, il):
@staticmethod
def split_il(il):
img_il_list = []
start = 0
length = len(il)
......
......@@ -4,6 +4,7 @@ import os
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
COMMON_CONF_DIR = os.path.dirname(os.path.abspath(__file__))
SECRET_CONF_DIR = os.path.join(os.path.dirname(BASE_DIR), 'conf')
DATA_DIR = os.path.join(os.path.dirname(BASE_DIR), 'data')
SECRET_CONF_FILE = os.path.join(SECRET_CONF_DIR, 'secret.ini')
LOGGING_CONFIG_FILE = os.path.join(COMMON_CONF_DIR, 'logging.conf')
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!