ad161125 by 周伟奇

priority queue

1 parent e325cfc3
PAGE_DEFAULT = 1
PAGE_SIZE_DEFAULT = 10
BUSINESS_TYPE = ['HIL', 'AFC']
HIL_SET = {'HIL', 'hil', 'CO00002', 'C000002'}
HIL_PREFIX = 'HIL'
AFC_PREFIX = 'AFC'
......
......@@ -3,5 +3,5 @@ from . import views
urlpatterns = [
path(r'', views.UploadDocView.as_view()),
path(r'v1', views.UploadDocView.as_view()),
]
......
......@@ -13,7 +13,8 @@ from django.core.management import BaseCommand
from common.mixins import LoggerMixin
from common.redis_cache import redis_handler as rh
from common.tools.file_tools import write_zip_file
from apps.doc.models import UploadDocRecords, DocStatus
from apps.doc.models import DocStatus, HILDoc, AFCDoc
from apps.doc import consts
from settings import conf
......@@ -42,33 +43,39 @@ class Command(BaseCommand, LoggerMixin):
def signal_handler(self, sig, frame):
self.switch = False # 停止处理文件
def get_doc_info(self): # TODO 优先队列
doc_id = rh.dequeue()
if doc_id is None:
def get_doc_info(self):
task_str, is_priority = rh.dequeue()
if task_str is None:
self.cronjob_log.info('{0} [get_doc_info] [queue empty]'.format(self.log_base))
return
doc_info = UploadDocRecords.objects.filter(id=doc_id).values(
'id', 'metadata_version_id', 'document_name').first()
if doc_info is None:
self.cronjob_log.warn('{0} [get_doc_info] [doc not found] [doc_id={1}]'.format(self.log_base, doc_id))
return
UploadDocRecords.objects.filter(id=doc_id).update(status=DocStatus.PROCESSING.value)
self.cronjob_log.info('{0} [get_task_info success] [doc_info={1}]'.format(self.log_base, doc_info))
return doc_info
return None, None, None, None
def pdf_download(self, doc_info):
business_type, doc_id_str = task_str.split('_')
doc_id = int(doc_id_str)
doc_class = HILDoc if business_type == consts.HIL_PREFIX else AFCDoc
doc_info = doc_class.objects.filter(id=doc_id, status=DocStatus.INIT.value).values(
'id', 'metadata_version_id', 'document_name').first() # TODO 查不到时是否为None
if doc_info is None:
self.cronjob_log.warn('{0} [get_doc_info] [doc completed] [task_str={1}] [is_priority={2}]'.format(
self.log_base, task_str, is_priority))
return None, None, None, None
doc_class.objects.filter(id=doc_id).update(status=DocStatus.PROCESSING.value)
self.cronjob_log.info('{0} [get_doc_info] [task_str={1}] [is_priority={2}] [doc_info={3}]'.format(
self.log_base, task_str, is_priority, doc_info))
return doc_info, doc_class, doc_id, business_type
def pdf_download(self, doc_id, doc_info, business_type):
if doc_info is None:
return None, None, None
# TODO EDMS下载pdf
# pdf_path = '/Users/clay/Desktop/biz/biz_logic/data/2/横版-表格-工商银行CH-B008802400.pdf'
# doc_data_path = os.path.dirname(pdf_path)
doc_id = doc_info['id']
doc_data_path = os.path.join(self.data_dir, str(doc_id))
doc_data_path = os.path.join(self.data_dir, business_type, str(doc_id))
pdf_path = os.path.join(doc_data_path, '{0}.pdf'.format(doc_id))
excel_path = os.path.join(doc_data_path, '{0}.xls'.format(doc_id))
self.cronjob_log.info('{0} [pdf download success] [doc_info={1}] [pdf_path={2}]'.format(
self.log_base, doc_info, pdf_path))
return doc_data_path, excel_path, pdf_path, doc_id
self.cronjob_log.info('{0} [pdf download success] [business_type={1}] [doc_info={2}] [pdf_path={3}]'.format(
self.log_base, business_type, doc_info, pdf_path))
return doc_data_path, excel_path, pdf_path
@staticmethod
def append_sheet(wb, sheets_list, img_name):
......@@ -189,9 +196,9 @@ class Command(BaseCommand, LoggerMixin):
max_sleep_second = 60
while self.switch:
# 从队列获取文件信息
doc_info = self.get_doc_info()
doc_info, doc_class, doc_id, business_type = self.get_doc_info()
# 从EDMS获取PDF文件
doc_data_path, excel_path, pdf_path, doc_id = self.pdf_download(doc_info)
doc_data_path, excel_path, pdf_path = self.pdf_download(doc_id, doc_info, business_type)
# 队列为空时的处理
if pdf_path is None:
time.sleep(sleep_second)
......@@ -276,10 +283,10 @@ class Command(BaseCommand, LoggerMixin):
wb.save(excel_path) # TODO no sheet (res always [])
# 整合excel文件上传至EDMS
except Exception as e:
UploadDocRecords.objects.filter(id=doc_id).update(status=DocStatus.PROCESS_FAILED.value)
doc_class.objects.filter(id=doc_id).update(status=DocStatus.PROCESS_FAILED.value)
self.cronjob_log.error('{0} [process failed] [doc_id={1}] [err={2}]'.format(self.log_base, doc_id, e))
else:
UploadDocRecords.objects.filter(id=doc_id).update(status=DocStatus.COMPLETE.value)
doc_class.objects.filter(id=doc_id).update(status=DocStatus.COMPLETE.value)
self.cronjob_log.info('{0} [doc process complete] [doc_id={1}]'.format(self.log_base, doc_id))
self.cronjob_log.info('{0} [stop safely]')
self.cronjob_log.info('{0} [stop safely]'.format(self.log_base))
......
......@@ -5,7 +5,7 @@ from .named_enum import DocStatus
# 上传文件记录表/任务表
class UploadDocRecords(models.Model): # TODO records一张表、文件(任务)根据business_type分库存储
class UploadDocRecords(models.Model):
id = models.AutoField(primary_key=True, verbose_name="id")
metadata_version_id = models.CharField(max_length=64, verbose_name="元数据版本id")
application_id = models.CharField(max_length=64, verbose_name="申请id")
......@@ -13,7 +13,6 @@ class UploadDocRecords(models.Model): # TODO records一张表、文件(任务
co_applicant = models.CharField(max_length=16, verbose_name="共同申请人")
guarantor_1 = models.CharField(max_length=16, verbose_name="担保人1")
guarantor_2 = models.CharField(max_length=16, verbose_name="担保人2")
status = models.SmallIntegerField(default=DocStatus.INIT.value, verbose_name="文件状态")
document_name = models.CharField(max_length=255, verbose_name="文件名")
document_scheme = models.CharField(max_length=64, verbose_name="文件方案")
business_type = models.CharField(max_length=64, verbose_name="业务类型")
......@@ -26,3 +25,62 @@ class UploadDocRecords(models.Model): # TODO records一张表、文件(任务
managed = False
db_table = 'upload_doc_records'
class HILDoc(models.Model):
id = models.AutoField(primary_key=True, verbose_name="id")
record_id = models.IntegerField(verbose_name='记录id')
metadata_version_id = models.CharField(max_length=64, verbose_name="元数据版本id")
application_id = models.CharField(max_length=64, verbose_name="申请id") # 联合索引
status = models.SmallIntegerField(default=DocStatus.INIT.value, verbose_name="文件状态") # 联合索引
main_applicant = models.CharField(max_length=16, verbose_name="主申请人")
co_applicant = models.CharField(max_length=16, verbose_name="共同申请人")
guarantor_1 = models.CharField(max_length=16, verbose_name="担保人1")
guarantor_2 = models.CharField(max_length=16, verbose_name="担保人2")
document_name = models.CharField(max_length=255, verbose_name="文件名")
document_scheme = models.CharField(max_length=64, verbose_name="文件方案")
data_source = models.CharField(max_length=64, verbose_name="数据源")
upload_finish_time = models.DateTimeField(verbose_name="上传完成时间") # 索引
update_time = models.DateTimeField(auto_now=True, verbose_name='修改时间')
create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间') # 索引
class Meta:
managed = False
db_table = 'hil_doc'
class AFCDoc(models.Model):
id = models.AutoField(primary_key=True, verbose_name="id")
record_id = models.IntegerField(verbose_name='记录id')
metadata_version_id = models.CharField(max_length=64, verbose_name="元数据版本id")
application_id = models.CharField(max_length=64, verbose_name="申请id")
status = models.SmallIntegerField(default=DocStatus.INIT.value, verbose_name="文件状态")
main_applicant = models.CharField(max_length=16, verbose_name="主申请人")
co_applicant = models.CharField(max_length=16, verbose_name="共同申请人")
guarantor_1 = models.CharField(max_length=16, verbose_name="担保人1")
guarantor_2 = models.CharField(max_length=16, verbose_name="担保人2")
document_name = models.CharField(max_length=255, verbose_name="文件名")
document_scheme = models.CharField(max_length=64, verbose_name="文件方案")
data_source = models.CharField(max_length=64, verbose_name="数据源")
upload_finish_time = models.DateTimeField(verbose_name="上传完成时间")
update_time = models.DateTimeField(auto_now=True, verbose_name='修改时间')
create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
class Meta:
managed = False
situ_db_label = 'afc'
db_table = 'afc_doc'
class PriorityApplication(models.Model):
id = models.AutoField(primary_key=True, verbose_name="id")
application_id = models.CharField(max_length=64, verbose_name="申请id") # 联合索引
business_type = models.CharField(max_length=64, verbose_name="业务类型") # 联合索引
on_off = models.BooleanField(default=True, verbose_name="是否有效") # 联合索引
update_time = models.DateTimeField(auto_now=True, verbose_name='修改时间')
create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
class Meta:
managed = False
situ_db_label = 'afc'
db_table = 'priority_application'
......
from django.urls import path
from . import views
urlpatterns = [
path(r'v1', views.PriorityDocView.as_view()),
]
......@@ -12,7 +12,7 @@ from common import response
from common.mixins import GenericView
from common.tools.file_tools import file_write
from common.redis_cache import redis_handler as rh
from .models import UploadDocRecords, DocStatus
from .models import UploadDocRecords, DocStatus, HILDoc, AFCDoc, PriorityApplication
from .mixins import DocHandler
from . import consts
......@@ -61,7 +61,7 @@ doc_list_args = {
validate=validate.OneOf(DocStatus.get_value_lst())),
'application_id': fields.Str(required=False, validate=validate.Length(max=64)),
'data_source': fields.Str(required=False, validate=validate.Length(max=64)),
'business_type': fields.Str(required=False, validate=validate.Length(max=64)),
'business_type': fields.Str(required=True, validate=validate.OneOf(consts.BUSINESS_TYPE)),
'upload_time_start': fields.Date(required=False),
'upload_time_end': fields.Date(required=False),
'create_time_start': fields.Date(required=False),
......@@ -73,6 +73,12 @@ upload_pdf_args = {
}
priority_doc_args = {
'applicationId': fields.Str(required=True, validate=validate.Length(max=64)),
'businessType': fields.Str(required=True, validate=validate.OneOf(consts.BUSINESS_TYPE)),
}
class UploadDocView(GenericView):
permission_classes = []
......@@ -82,17 +88,21 @@ class UploadDocView(GenericView):
application_data = args.get('applicationData')
applicant_data = args.get('applicantData')
document = args.get('document')
business_type = document.get('businessType')
application_id = application_data.get('applicationId')
is_hil = business_type in consts.HIL_SET
try:
doc = UploadDocRecords.objects.create(
# 1. 上传信息记录
record = UploadDocRecords.objects.create(
metadata_version_id=document.get('metadataVersionId'),
application_id=application_data.get('applicationId'),
application_id=application_id,
main_applicant=applicant_data.get('mainApplicantName'),
co_applicant=applicant_data.get('coApplicantName'),
guarantor_1=applicant_data.get('guarantor1Name'),
guarantor_2=applicant_data.get('guarantor2Name'),
document_name=document.get('documentName'),
document_scheme=document.get('documentScheme'),
business_type=document.get('businessType'),
business_type=business_type,
data_source=document.get('dataSource'),
upload_finish_time=document.get('uploadFinishTime'),
)
......@@ -100,9 +110,28 @@ class UploadDocView(GenericView):
self.running_log.info('[doc upload fail] [args={0}] [err={1}]'.format(args, e))
self.invalid_params(msg='metadataVersionId repeat')
else:
# TODO 查询加入优先队列 or 普通队列
rh.enqueue(doc.id)
self.running_log.info('[doc upload success] [args={0}]'.format(args))
# 2. 根据业务类型分库存储
doc_class, prefix = (HILDoc, consts.HIL_PREFIX) if is_hil else (AFCDoc, consts.AFC_PREFIX)
doc = doc_class.objects.create(
record_id=record.id,
metadata_version_id=document.get('metadataVersionId'),
application_id=application_id,
main_applicant=applicant_data.get('mainApplicantName'),
co_applicant=applicant_data.get('coApplicantName'),
guarantor_1=applicant_data.get('guarantor1Name'),
guarantor_2=applicant_data.get('guarantor2Name'),
document_name=document.get('documentName'),
document_scheme=document.get('documentScheme'),
data_source=document.get('dataSource'),
upload_finish_time=document.get('uploadFinishTime'),
)
# 3. 选择队列进入
is_priority = PriorityApplication.objects.filter(application_id=application_id, on_off=True).exists()
value = ['{0}_{1}'.format(prefix, doc.id)]
redis_res = rh.enqueue(value, is_priority)
self.running_log.info('[doc upload success] [args={0}] [record_id={1}] [is_hil={2}] [doc_id={3}] '
'[is_priority={4}] [enqueue_res={5}]'.format(args, record.id, is_hil, doc.id,
is_priority, redis_res))
return response.ok()
post.openapi_doc = '''
......@@ -125,6 +154,29 @@ class UploadDocView(GenericView):
'''
class PriorityDocView(GenericView):
permission_classes = []
# 优先级订单接口
@use_args(priority_doc_args, location='data')
def post(self, request, args):
application_id = args.get('applicationId')
business_type = args.get('businessType')
_, created = PriorityApplication.objects.update_or_create(application_id=application_id,
business_type=business_type,
defaults={'on_off': True})
if created:
doc_class, prefix = (HILDoc, consts.HIL_PREFIX) if business_type == consts.HIL_PREFIX \
else (AFCDoc, consts.AFC_PREFIX)
doc_ids = doc_class.objects.filter(application_id=application_id,
status=DocStatus.INIT.value).values_list('id', flat=True)
task_str_list = ['{0}_{1}'.format(prefix, doc_id) for doc_id in doc_ids]
enqueue_res = rh.enqueue(task_str_list, is_priority=True)
self.running_log.info('[priority doc success] [args={0}] [task_str_list={1}] [enqueue_res={2}]'.format(
args, task_str_list, enqueue_res))
return response.ok()
class DocView(GenericView, DocHandler):
# 文件列表页
......@@ -140,21 +192,20 @@ class DocView(GenericView, DocHandler):
upload_time_end = args.get('upload_time_end')
create_time_start = args.get('create_time_start')
create_time_end = args.get('create_time_end')
status_query = Q(status=status) if status is not None else Q()
application_id_query = Q(application_id=application_id) if application_id is not None else Q()
data_source_query = Q(data_source=data_source) if data_source is not None else Q()
business_type_query = Q(business_type=business_type) if business_type is not None else Q()
upload_finish_time_query = Q(upload_finish_time__gte=upload_time_start,
upload_finish_time__lt=upload_time_end + datetime.timedelta(days=1))\
if upload_time_start is not None and upload_time_end is not None else Q()
create_time_query = Q(create_time__gte=create_time_start,
create_time__lt=create_time_end + datetime.timedelta(days=1))\
if create_time_start is not None and create_time_end is not None else Q()
query = status_query & application_id_query & data_source_query & business_type_query\
& upload_finish_time_query & create_time_query
val_tuple = ('id', 'application_id', 'upload_finish_time', 'create_time',
'business_type', 'data_source', 'status')
doc_queryset = UploadDocRecords.objects.filter(query).values(*val_tuple).order_by('-upload_finish_time')
query = application_id_query & status_query & data_source_query & upload_finish_time_query & create_time_query
val_tuple = ('id', 'application_id', 'upload_finish_time', 'create_time', 'data_source', 'status')
doc_class = HILDoc if business_type == consts.HIL_PREFIX else AFCDoc
doc_queryset = doc_class.objects.filter(query).values(*val_tuple).order_by('-upload_finish_time')
doc_list = self.get_doc_list(doc_queryset)
total = len(doc_list)
......@@ -167,31 +218,61 @@ class DocView(GenericView, DocHandler):
'pagination': pagination,
'doc_list': doc_list[start_index: end_index]
}
self.running_log.info('[get doc list] [args={0}] [res={1}]'.format(args, res))
return response.ok(data=res)
# 上传pdf,模拟下单
@use_args(upload_pdf_args, location='files')
def post(self, request, args):
# 1. 上传信息记录
const_str = '手工单'
doc = UploadDocRecords.objects.create(
metadata_version_id=str(int(time.time())),
metadata_version_id = str(int(time.time()))
upload_finish_time = timezone.now()
document_scheme = random.choice(['Acceptance', 'Settlement', 'Contract Management'])
data_source = random.choice(['POS', 'EAPP', 'Econtract'])
business_type = random.choice(['AFC', 'HIL'])
is_hil = business_type in consts.HIL_SET
record = UploadDocRecords.objects.create(
metadata_version_id=metadata_version_id,
application_id=const_str,
main_applicant=const_str,
co_applicant=const_str,
guarantor_1=const_str,
guarantor_2=const_str,
document_name=const_str,
document_scheme=document_scheme,
business_type=business_type,
data_source=data_source,
upload_finish_time=upload_finish_time,
)
# 2. 根据业务类型分库存储
doc_class, prefix = (HILDoc, consts.HIL_PREFIX) if is_hil else (AFCDoc, consts.AFC_PREFIX)
doc = doc_class.objects.create(
record_id=record.id,
metadata_version_id=metadata_version_id,
application_id=const_str,
main_applicant=const_str,
co_applicant=const_str,
guarantor_1=const_str,
guarantor_2=const_str,
document_name=const_str,
document_scheme=random.choice(['Acceptance', 'Settlement', 'Contract Management']),
business_type=random.choice(['AFC', 'HIL']),
data_source=random.choice(['POS', 'EAPP', 'Econtract']),
upload_finish_time=timezone.now(),
document_scheme=document_scheme,
data_source=data_source,
upload_finish_time=upload_finish_time,
)
enqueue_res = rh.enqueue(doc.id)
# 3. 选择队列进入
is_priority = False
value = ['{0}_{1}'.format(prefix, doc.id)]
redis_res = rh.enqueue(value, is_priority)
pdf_file = args.get('pdf_file')
save_dir_path = os.path.join(conf.DATA_DIR, str(doc.id))
save_dir_path = os.path.join(conf.DATA_DIR, business_type, str(doc.id))
save_file_path = os.path.join(save_dir_path, '{0}.pdf'.format(doc.id))
os.makedirs(save_dir_path, exist_ok=True)
file_write(pdf_file, save_file_path)
self.running_log.info('[mock doc upload success] [doc_id={0}] [enqueue_res={1}]'.format(doc.id, enqueue_res))
self.running_log.info('[mock doc upload success] [args={0}] [record_id={1}] [is_hil={2}] [doc_id={3}] '
'[is_priority={4}] [enqueue_res={5}]'.format(args, record.id, is_hil, doc.id,
is_priority, redis_res))
return response.ok()
......
......@@ -19,6 +19,7 @@ from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path(r'api/user/', include('apps.account.urls')),
path(r'api/create/v1', include('apps.doc.urls')),
path(r'api/create/', include('apps.doc.create_urls')),
path(r'api/priority/', include('apps.doc.priority_urls')),
path(r'api/doc/', include('apps.doc.internal_urls')),
]
......
......@@ -92,27 +92,13 @@ class Redis:
def expire(self, key, value):
return self.client.expire(key, value)
def hmset(self, name, mapping):
return self.client.hmset(name, mapping)
def lpush(self, key, values):
return self.client.lpush(key, *values) # int
def hgetall(self, name):
return self.client.hgetall(name)
def lrange(self, key, start, end):
return self.client.lrange(key, start, end) # list
def hincrby(self, name, key, amount=1):
return self.client.hincrby(name, key, amount)
def rpop(self, key):
return self.client.rpop(key) # str or None
def zadd(self, name, mapping):
return self.client.zadd(name, mapping)
def zremrangebyrank(self, name, start, end):
with self.client.pipeline() as pipe:
pipe.zrange(name, start, end) # TODO 可能出现不一致性
pipe.zremrangebyrank(name, start, end)
item = pipe.execute()
return item
def zrank(self, name, value):
return self.client.zrank(name, value)
def zrange(self, name, start, end):
return self.client.zrange(name, start, end)
......
......@@ -33,17 +33,20 @@ class RedisHandler:
self.time_expires = datetime.timedelta(hours=24)
self.time_format = '%a %b %d %H:%M:%S %Y'
self.prefix = 'bwm_ocr'
self.queue_key = '{0}:queue'.format(self.prefix)
self.common_queue_key = '{0}:common_queue'.format(self.prefix)
self.priority_queue_key = '{0}:priority_queue'.format(self.prefix)
def enqueue(self, task_id):
def enqueue(self, tasks, is_priority=False):
# 1
mapping = {task_id: time.time()}
return self.redis.zadd(self.queue_key, mapping)
key = self.priority_queue_key if is_priority else self.common_queue_key
return self.redis.lpush(key, tasks)
def dequeue(self):
# model_id:int or None
res_list = self.redis.zremrangebyrank(self.queue_key, 0, 0)
pop_item_list = res_list[0]
pop_item = int(pop_item_list[0]) if pop_item_list else None
return pop_item
# task or None
task = self.redis.rpop(self.priority_queue_key)
is_priority = True
if task is None:
task = self.redis.rpop(self.common_queue_key)
is_priority = False
return task, is_priority
......
......@@ -152,7 +152,7 @@ class PdfHandler:
print('----------------------------')
print(self.pdf_name)
print(pdf.metadata)
# xref_list = [] # TODO 图片去重
# xref_list = []
for pno in range(pdf.pageCount):
print('========================')
il = pdf.getPageImageList(pno)
......@@ -162,7 +162,7 @@ class PdfHandler:
img_il_list = self.split_il(il)
il = None
print(img_il_list)
print(len(img_il_list)) # TODO 判断单页图片过多时,使用页面转图片
print(len(img_il_list))
for img_count, img_il in enumerate(img_il_list):
print(img_il)
......
......@@ -91,7 +91,8 @@ WSGI_APPLICATION = 'wsgi.application'
# }
DATABASES = {
'default': conf.get_namespace('MYSQL_')
'default': conf.get_namespace('MYSQL_DEFAULT_'),
'afc': conf.get_namespace('MYSQL_AFC_')
}
DATABASE_ROUTERS = ['settings.database.DBRouter']
MYSQLPOOL_ARGUMENTS = database.MYSQLPOOL_ARGUMENTS
......
......@@ -15,7 +15,7 @@ options.DEFAULT_NAMES = tuple(list(options.DEFAULT_NAMES) + ['situ_db_label'])
# 数据库连接池配置
MYSQLPOOL_ARGUMENTS = {
'recycle': 30,
'pool_size': 128,
'pool_size': 64,
'max_overflow': 10,
'timeout': 5,
'use_threadlocal': True,
......@@ -26,12 +26,12 @@ class DBRouter(object):
def db_for_read(self, model, **hints):
if hasattr(model._meta, 'situ_db_label'):
return model._meta.aft_db_label
return model._meta.situ_db_label
return None
def db_for_write(self, model, **hints):
if hasattr(model._meta, 'situ_db_label'):
return model._meta.aft_db_label
return model._meta.situ_db_label
return None
def allow_relation(self, obj1, obj2, **hints):
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!