a4a63da9 by 周伟奇

metaVersionId unique & redis tool

1 parent 11b44dec
......@@ -16,11 +16,19 @@ class Command(BaseCommand):
def signal_handler(self, sig, frame):
self.switch = False # 停止处理文件
def get_task_info(self):
pass
def pdf_download(self, task_info):
pass
def handle(self, *args, **kwargs):
while self.switch:
# 从队列获取文件信息
task_info = self.get_task_info()
# 从EDMS获取PDF文件
# PDF文件分页转化为图片
pdf_path = self.pdf_download(task_info)
# PDF文件提取图片
# 图片调用算法判断是否为银行流水
# 图片调用算法OCR为excel文件
# 整合excel文件上传至EDMS
......
......@@ -3,18 +3,19 @@ from django.db import models
# Create your models here.
# 上传文件记录表/任务表
class UploadDocRecords(models.Model):
id = models.AutoField(primary_key=True, verbose_name="id")
metadata_version_id = models.CharField(max_length=64, verbose_name="元数据版本id")
application_id = models.CharField(max_length=64, verbose_name="申请id")
main_applicant = models.CharField(max_length=16, verbose_name="主申请人")
co_applicant = models.CharField(max_length=16, verbose_name="共同申请人")
guarantor_1 = models.CharField(max_length=16, verbose_name="担保人1")
guarantor_2 = models.CharField(max_length=16, verbose_name="担保人2")
document_name = models.CharField(max_length=255, verbose_name="文件名")
document_scheme = models.CharField(max_length=64, verbose_name="文件格式") # TODO 确认verbose_name
document_scheme = models.CharField(max_length=64, verbose_name="文件格式")
business_type = models.CharField(max_length=64, verbose_name="业务类型")
data_source = models.CharField(max_length=64, verbose_name="数据源")
metadata_version_id = models.CharField(max_length=64, verbose_name="元数据版本id")
upload_finish_time = models.DateTimeField(verbose_name="上传完成时间")
update_time = models.DateTimeField(auto_now=True, verbose_name='修改时间')
create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
......
from django.shortcuts import render
from django.db.utils import IntegrityError
from webargs import fields, validate
from webargs.djangoparser import use_args, parser
from common.mixins import GenericView
......@@ -49,21 +50,26 @@ class DocView(GenericView):
application_data = args.get('applicationData')
applicant_data = args.get('applicantData')
document = args.get('document')
UploadDocRecords.objects.create(
application_id=application_data.get('applicationId'),
main_applicant=applicant_data.get('mainApplicantName'),
co_applicant=applicant_data.get('coApplicantName'),
guarantor_1=applicant_data.get('guarantor1Name'),
guarantor_2=applicant_data.get('guarantor2Name'),
document_name=document.get('documentName'),
document_scheme=document.get('documentScheme'),
business_type=document.get('businessType'),
data_source=document.get('dataSource'),
metadata_version_id=document.get('metadataVersionId'),
upload_finish_time=document.get('uploadFinishTime'),
)
self.running_log.info('[doc upload success] [args={0}]'.format(args))
return response.ok()
try:
UploadDocRecords.objects.create(
metadata_version_id=document.get('metadataVersionId'),
application_id=application_data.get('applicationId'),
main_applicant=applicant_data.get('mainApplicantName'),
co_applicant=applicant_data.get('coApplicantName'),
guarantor_1=applicant_data.get('guarantor1Name'),
guarantor_2=applicant_data.get('guarantor2Name'),
document_name=document.get('documentName'),
document_scheme=document.get('documentScheme'),
business_type=document.get('businessType'),
data_source=document.get('dataSource'),
upload_finish_time=document.get('uploadFinishTime'),
)
except IntegrityError as e:
self.running_log.info('[doc upload fail] [args={0}] [err={1}]'.format(args, e))
self.invalid_params(msg='metadataVersionId repeat')
else:
self.running_log.info('[doc upload success] [args={0}]'.format(args))
return response.ok()
post.openapi_doc = '''
tags: [doc]
......
......@@ -54,8 +54,8 @@ def exception_handler(exc, context):
return APIResponse(meta_status, msg=str(exc))
elif isinstance(exc, Exception) and hasattr(exc, 'API_META_STATUS'):
msg = exc.API_META_STATUS.verbose_name
return APIResponse(exc.API_META_STATUS.value, msg=msg)
# msg = exc.API_META_STATUS.verbose_name
return APIResponse(exc.API_META_STATUS.value, msg=str(exc))
error_logger.exception('[system error]')
return APIResponse(MetaStatus.INTERNAL_ERROR.value,
......
from .base import Redis
from .handler import RedisHandler
from settings import conf
redis_url = conf.REDIS_URL
# redis = Redis(redis_url)
# redis_handler = RedisHandler(redis)
from typing import NamedTuple
from urllib.parse import parse_qsl, unquote, urlparse
from redis import StrictRedis, ConnectionPool
try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping
url_parts = NamedTuple('url_parts', [
('scheme', str),
('hostname', str),
('port', int),
('username', str),
('password', str),
('path', str),
('query', Mapping),
])
def url_to_parts(url):
# type: (str) -> urlparts
"""Parse URL into :class:`urlparts` tuple of components."""
scheme = urlparse(url).scheme
schemeless = url[len(scheme) + 3:]
# parse with HTTP URL semantics
parts = urlparse('http://' + schemeless)
path = parts.path or ''
path = path[1:] if path and path[0] == '/' else path
return url_parts(
scheme,
unquote(parts.hostname or '') or None,
parts.port,
unquote(parts.username or '') or None,
unquote(parts.password or '') or None,
unquote(path or '') or None,
dict(parse_qsl(parts.query)),
)
class Redis:
def __init__(self, url, connection_pool=None, max_connections=None, socket_timeout=120,
retry_on_timeout=None, socket_connect_timeout=None):
self._ConnectionPool = connection_pool
scheme, host, port, _, password, path, query = url_to_parts(url)
self.conn_params = {
'host': host,
'port': port,
'db': int(path),
'password': password,
'max_connections': max_connections,
'socket_timeout': socket_timeout and float(socket_timeout),
'retry_on_timeout': retry_on_timeout or False,
'socket_connect_timeout':
socket_connect_timeout and float(socket_connect_timeout),
'decode_responses': True
}
self.client = StrictRedis(
connection_pool=self._get_pool(**self.conn_params),
)
@property
def ConnectionPool(self):
if self._ConnectionPool is None:
self._ConnectionPool = ConnectionPool
return self._ConnectionPool
def _get_pool(self, **params):
return self.ConnectionPool(**params)
def get(self, key):
return self.client.get(key)
def mget(self, keys):
return self.client.mget(keys)
def set(self, key, value, expires=None):
if expires:
return self.client.setex(key, expires, value)
else:
return self.client.set(key, value)
def delete(self, key):
self.client.delete(key)
def incr(self, key):
return self.client.incr(key)
def expire(self, key, value):
return self.client.expire(key, value)
def hmset(self, name, mapping):
return self.client.hmset(name, mapping)
def hgetall(self, name):
return self.client.hgetall(name)
def hincrby(self, name, key, amount=1):
return self.client.hincrby(name, key, amount)
def zadd(self, name, mapping):
return self.client.zadd(name, mapping)
def zremrangebyrank(self, name, start, end):
with self.client.pipeline() as pipe:
pipe.zrange(name, start, end)
pipe.zremrangebyrank(name, start, end)
item = pipe.execute()
return item
def zrank(self, name, value):
return self.client.zrank(name, value)
def zrange(self, name, start, end):
return self.client.zrange(name, start, end)
import json
import time
import datetime
class DateTimeJSONEncoder(json.JSONEncoder):
"""
JSONEncoder subclass that knows how to encode date/time, decimal types, and
UUIDs.
"""
def default(self, o):
# See "Date Time String Format" in the ECMA-262 specification.
if isinstance(o, datetime.datetime):
r = o.isoformat()
if o.microsecond:
r = r[:23] + r[26:]
if r.endswith('+00:00'):
r = r[:-6] + 'Z'
return r
else:
return super().default(o)
def dict_str_to_int(res):
for k, v in res.items():
res[k] = int(v)
class RedisHandler:
def __init__(self, redis):
self.redis = redis
self.time_expires = datetime.timedelta(hours=24)
self.time_format = '%a %b %d %H:%M:%S %Y'
self.prefix = 'automl'
self.training_time_key = '{0}:training_time'.format(self.prefix)
self.queue_key = '{0}:queue'.format(self.prefix)
self.prefix_training = '{0}:training'.format(self.prefix)
self.prefix_models = '{0}:models'.format(self.prefix)
self.prefix_img_info = '{0}:img_info'.format(self.prefix)
def get_training_model_key(self, user_id, model_type):
return '{0}:{1}:{2}'.format(self.prefix_training, user_id, model_type)
def get_models_list_key(self, user_id, model_type):
return '{0}:{1}:{2}'.format(self.prefix_models, user_id, model_type)
def set_training_model(self, user_id, model_type, model_id, status):
# True
key = self.get_training_model_key(user_id, model_type)
mapping = {
'model_id': model_id,
'model_status': status
}
return self.redis.hmset(key, mapping)
def get_training_model(self, user_id, model_type):
# {}
# {'id': '1', 'status': '1'}
key = self.get_training_model_key(user_id, model_type)
res = self.redis.hgetall(key)
dict_str_to_int(res)
return res
def set_models_list(self, user_id, model_type, models_list):
key = self.get_models_list_key(user_id, model_type)
value = json.dumps(models_list, cls=DateTimeJSONEncoder)
return self.redis.set(key, value, expires=self.time_expires)
def get_models_list(self, user_id, model_type):
# list or None
key = self.get_models_list_key(user_id, model_type)
res_str = self.redis.get(key)
res = None if res_str is None else json.loads(res_str)
return res
def del_models_list(self, user_id, model_type):
# None
key = self.get_models_list_key(user_id, model_type)
return self.redis.delete(key)
def set_training_finish_time(self, finish_time):
# True
finish_time_str = datetime.datetime.strftime(finish_time, self.time_format)
return self.redis.set(self.training_time_key, finish_time_str)
def get_training_finish_time(self):
# datetime.datetime or None
res = self.redis.get(self.training_time_key)
finish_time = None if res is None else datetime.datetime.strptime(res, self.time_format)
return finish_time
def del_training_finish_time(self):
# None
return self.redis.delete(self.training_time_key)
def enqueue(self, model_id):
# 1
mapping = {model_id: time.time()}
return self.redis.zadd(self.queue_key, mapping)
def dequeue(self):
# model_id:int or None
res_list = self.redis.zremrangebyrank(self.queue_key, 0, 0)
pop_item_list = res_list[0]
pop_item = int(pop_item_list[0]) if pop_item_list else None
return pop_item
def get_queue_end(self):
# model_id:int or None
res_list = self.redis.zrange(self.queue_key, -1, -1)
end_id = int(res_list[0]) if res_list else None
return end_id
def get_queue_rank(self, model_id):
# rank:int or None
rank = self.redis.zrank(self.queue_key, model_id)
if rank is None:
return 0
return rank + 1
def set_img_info(self, user_id, model_id, count_sum, count_marked):
# True
key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
mapping = {
'count_sum': count_sum,
'count_marked': count_marked
}
return self.redis.hmset(key, mapping)
def get_img_info(self, user_id, model_id):
# {}
# {'count_sum': '70', 'count_marked': '0'}
key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
res = self.redis.hgetall(key)
dict_str_to_int(res)
return res
def update_img_info(self, user_id, model_id, del_img=False):
# res_count:int
key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
if del_img:
return self.redis.hincrby(key, 'count_sum', amount=-1)
else:
return self.redis.hincrby(key, 'count_marked')
def del_img_info(self, user_id, model_id):
# None
key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
return self.redis.delete(key)
def pipe_trained(self, user_id, model_type, model_id, status, success=True):
# redis.set_training_model(user_id, model_type, model_id, model_status)
# redis.del_training_finish_time()
# redis.del_models_list(user_id, model_type)
# redis.set_training_model(user_id, model_type, model_id, model_status)
# redis.del_training_finish_time()
training_model_key = self.get_training_model_key(user_id, model_type)
models_list_key = self.get_models_list_key(user_id, model_type)
mapping = {
'model_id': model_id,
'model_status': status
}
with self.redis.client.pipeline() as pipe:
pipe.hmset(training_model_key, mapping)
pipe.delete(self.training_time_key)
if success is True:
pipe.delete(models_list_key)
item = pipe.execute()
return item
def pipe_training(self, user_id, model_type, model_id, status, finish_time):
# redis.dequeue()
# redis.set_training_model(user_id, model_type, model_id, model_status)
# redis.set_training_finish_time(proleptic_finish_time)
training_model_key = self.get_training_model_key(user_id, model_type)
mapping = {
'model_id': model_id,
'model_status': status
}
finish_time_str = datetime.datetime.strftime(finish_time, self.time_format)
with self.redis.client.pipeline() as pipe:
pipe.zremrangebyrank(self.queue_key, 0, 0)
pipe.hmset(training_model_key, mapping)
pipe.set(self.training_time_key, finish_time_str)
item = pipe.execute()
return item
def pipe_enqueue(self, model_id, user_id, model_type, status, section=True):
# redis.enqueue(model_id)
# redis.set_training_model(user_id, model_type,
# model_id, ModelStatus.DATA_PRETREATMENT_DONE.value)
# if model_type == ModelType.SECTION.value:
# redis.del_img_info(user_id, model_id)
queue_mapping = {model_id: time.time()}
training_model_key = self.get_training_model_key(user_id, model_type)
mapping = {
'model_id': model_id,
'model_status': status
}
img_info_key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
with self.redis.client.pipeline() as pipe:
pipe.zadd(self.queue_key, queue_mapping)
pipe.hmset(training_model_key, mapping)
if section is True:
pipe.delete(img_info_key)
item = pipe.execute()
return item
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!