import os import time import random import datetime import fitz import shutil from django.utils import timezone from django.db.utils import IntegrityError from django.db.models import Q from rest_framework.permissions import IsAuthenticated from webargs import fields, validate from webargs.djangoparser import use_args, parser from settings import conf from common import response from common.mixins import GenericView from common.tools.file_tools import file_write from common.redis_cache import redis_handler as rh from .models import UploadDocRecords, DocStatus, PriorityApplication, GCAPRecords from .mixins import DocHandler from . import consts from apps.account.authentication import OAuth2AuthenticationWithUser # restframework将request.body封装至request.data, webargs从request.data中获取参数 @parser.location_loader("data") def load_data(request, schema): return request.data application_data_args = {'applicationId': fields.Str(required=True, validate=validate.Length(max=64))} applicant_data_args = { # 'mainApplicantName': fields.Str(required=True, validate=validate.Length(max=16)), # 'coApplicantName': fields.Str(required=True, validate=validate.Length(max=16)), # 'guarantor1Name': fields.Str(required=True, validate=validate.Length(max=16)), # 'guarantor2Name': fields.Str(required=True, validate=validate.Length(max=16)), 'mainApplicantName': fields.Str(required=True), 'coApplicantName': fields.Str(required=True), 'guarantor1Name': fields.Str(required=True), 'guarantor2Name': fields.Str(required=True), } document_args = { 'documentName': fields.Str(required=True, validate=validate.Length(max=255)), # Acceptance/Settlement/Contract Management 'documentScheme': fields.Str(required=True, validate=validate.Length(max=64)), 'businessType': fields.Str(required=True, validate=validate.Length(max=64)), # CO00001/CO00002 'uploadFinishTime': fields.DateTime(required=True), 'dataSource': fields.Str(required=True, validate=validate.Length(max=64)), # POS/EAPP/Econtract 'metadataVersionId': fields.Str(required=True, validate=validate.Length(max=64)), } doc_upload_args = { 'applicationData': fields.Nested(application_data_args, required=True), 'applicantData': fields.Nested(applicant_data_args, required=True), 'document': fields.Nested(document_args, required=True), } doc_list_args = { 'page': fields.Int(required=False, missing=consts.PAGE_DEFAULT, validate=lambda val: val >= 1), 'page_size': fields.Int(required=False, missing=consts.PAGE_SIZE_DEFAULT, validate=lambda val: val >= 1), 'status': fields.Int(required=False, validate=validate.OneOf(DocStatus.get_value_lst())), 'application_id': fields.Str(required=False, validate=validate.Length(max=64)), 'data_source': fields.Str(required=False, validate=validate.OneOf(consts.DATA_SOURCE_LIST)), 'business_type': fields.Str(required=True, validate=validate.OneOf(consts.BUSINESS_TYPE_LIST)), 'upload_time_start': fields.Date(required=False), 'upload_time_end': fields.Date(required=False), 'create_time_start': fields.Date(required=False), 'create_time_end': fields.Date(required=False), } upload_pdf_args = { 'pdf_file': fields.Raw(required=True), } application_information = { "SUBMIT_DATETIME": fields.DateTime(required=True), "STATUS": fields.Int(required=True), 'ENTITY': fields.Str(required=True, validate=validate.Length(max=100)), "RATING": fields.Int(required=True), "APPLICATION_ID": fields.Str(required=True, validate=validate.Length(max=100)), "APPLICATION_VERSION": fields.Int(required=True), "INTERMEDIATE_DECISION": fields.Int(required=True), } priority_doc_args = { 'APPLICATION_INFORMATION': fields.Nested(application_information, required=True) } class UploadDocView(GenericView, DocHandler): # permission_classes = [] # authentication_classes = [] permission_classes = [IsAuthenticated] authentication_classes = [OAuth2AuthenticationWithUser] # required_scopes = ['write'] # 上传(接收)文件接口 @use_args(doc_upload_args, location='data') def post(self, request, args): application_data = args.get('applicationData') applicant_data = args.get('applicantData') document = args.get('document') business_type = document.get('businessType') application_id = application_data.get('applicationId') document_scheme = document.get('documentScheme') data_source = document.get('dataSource') document_name = document.get('documentName') main_name = self.get_name(applicant_data, 'mainApplicantName', 16) co_name = self.get_name(applicant_data, 'coApplicantName', 16) g1_name = self.get_name(applicant_data, 'guarantor1Name', 16) g2_name = self.get_name(applicant_data, 'guarantor2Name', 16) try: # 1. 上传信息记录 UploadDocRecords.objects.create( metadata_version_id=document.get('metadataVersionId'), application_id=application_id, main_applicant=main_name, co_applicant=co_name, guarantor_1=g1_name, guarantor_2=g2_name, document_name=document_name, document_scheme=document_scheme, business_type=business_type, data_source=data_source, upload_finish_time=document.get('uploadFinishTime'), ) except IntegrityError as e: self.running_log.info('[doc upload fail] [args={0}] [err={1}]'.format(args, e)) self.invalid_params(msg='metadataVersionId repeat') else: data_source = self.fix_data_source(data_source) if data_source == consts.DATA_SOURCE_LIST[1]: if isinstance(document_name, str): if document_name.endswith('-证书.pdf') or document_name.endswith('-证书'): self.running_log.info('[doc upload success] [eapp license skip] [args={0}]'.format(args)) return response.ok() # 2. 根据业务类型分库存储 doc_class, prefix = self.get_doc_class(business_type) doc = doc_class.objects.create( metadata_version_id=document.get('metadataVersionId'), application_id=application_id, # main_applicant=applicant_data.get('mainApplicantName'), # co_applicant=applicant_data.get('coApplicantName'), # guarantor_1=applicant_data.get('guarantor1Name'), # guarantor_2=applicant_data.get('guarantor2Name'), document_name=document.get('documentName'), document_scheme=self.fix_scheme(document_scheme), data_source=self.fix_data_source(data_source), upload_finish_time=document.get('uploadFinishTime'), ) # 3. 选择队列进入 is_priority = PriorityApplication.objects.filter(application_id=application_id, on_off=True).exists() tasks = ['{0}{1}{2}'.format(prefix, consts.SPLIT_STR, doc.id)] enqueue_res = rh.enqueue(tasks, is_priority) self.running_log.info('[doc upload success] [args={0}] [business_type={1}] [doc_id={2}] ' '[is_priority={3}] [enqueue_res={4}]'.format(args, prefix, doc.id, is_priority, enqueue_res)) return response.ok() post.openapi_doc = ''' tags: [doc] summary: POS系统上传文件信息 consumes: [application/json] produces: [application/json] parameters: - in: body name: body required: true schema: $ref: "#/definitions/Doc" responses: 200: description: ok schema: $ref: '#/definitions/ApiResponse' ''' class PriorityDocView(GenericView, DocHandler): permission_classes = [IsAuthenticated] authentication_classes = [OAuth2AuthenticationWithUser] # 优先级订单接口 @use_args(priority_doc_args, location='data') def post(self, request, args): application_info = args.get('APPLICATION_INFORMATION') application_id = application_info.get('APPLICATION_ID') submit_datetime = application_info.get('SUBMIT_DATETIME') intermediate_decision = str(application_info.get('INTERMEDIATE_DECISION')) entity = application_info.get('ENTITY') if submit_datetime.utcoffset() is not None: submit_datetime = timezone.make_naive(submit_datetime, timezone.get_current_timezone()) GCAPRecords.objects.create( entity=entity, status=application_info.get('STATUS'), rating=application_info.get('RATING'), application_id=application_id, application_version=application_info.get('APPLICATION_VERSION'), intermediate_decision=intermediate_decision, submit_datetime=submit_datetime, ) if intermediate_decision not in consts.PRIORITY_WORDS: self.running_log.info('[priority doc skip] [args={0}]'.format(args)) return response.ok() _, created = PriorityApplication.objects.update_or_create(application_id=application_id, defaults={'on_off': True}) if created: doc_class, prefix = self.get_doc_class(entity) doc_ids = doc_class.objects.filter(application_id=application_id, status=DocStatus.INIT.value).values_list('id', flat=True) tasks_list = ['{0}{1}{2}'.format(prefix, consts.SPLIT_STR, doc_id) for doc_id in doc_ids] if not tasks_list: self.running_log.info( '[priority doc success] [args={0}]'.format(args)) else: enqueue_res = rh.enqueue(tasks_list, is_priority=True) self.running_log.info('[priority doc success] [args={0}] [tasks_list={1}] [enqueue_res={2}]'.format( args, tasks_list, enqueue_res)) return response.ok() post.openapi_doc = ''' tags: [doc] summary: GCAP提高申请单对应文件优先级 consumes: [application/json] produces: [application/json] parameters: - in: body name: body required: true schema: $ref: "#/definitions/Application" responses: 200: description: ok schema: $ref: '#/definitions/ApiResponse' ''' class DocView(GenericView, DocHandler): # 文件列表页 @use_args(doc_list_args, location='querystring') def get(self, request, args): page = args.get('page', consts.PAGE_DEFAULT) page_size = args.get('page_size', consts.PAGE_SIZE_DEFAULT) status = args.get('status') application_id = args.get('application_id') data_source = args.get('data_source') business_type = args.get('business_type') upload_time_start = args.get('upload_time_start') upload_time_end = args.get('upload_time_end') create_time_start = args.get('create_time_start') create_time_end = args.get('create_time_end') status_query = Q(status=status) if status is not None else Q() application_id_query = Q(application_id__contains=application_id) if application_id is not None else Q() data_source_query = Q(data_source=data_source) if data_source is not None else Q() upload_finish_time_query = Q(upload_finish_time__gte=upload_time_start, upload_finish_time__lt=upload_time_end + datetime.timedelta(days=1))\ if upload_time_start is not None and upload_time_end is not None else Q() create_time_query = Q(create_time__gte=create_time_start, create_time__lt=create_time_end + datetime.timedelta(days=1))\ if create_time_start is not None and create_time_end is not None else Q() query = application_id_query & status_query & data_source_query & upload_finish_time_query & create_time_query val_tuple = ('id', 'application_id', 'upload_finish_time', 'create_time', 'data_source', 'status') doc_class, prefix = self.get_doc_class(business_type) total = doc_class.objects.filter(query).count() start_index = page_size * (page - 1) end_index = page_size * page if start_index >= total > 0: raise self.invalid_params('页数不存在') doc_queryset = doc_class.objects.filter(query).values(*val_tuple).order_by('-create_time')[start_index: end_index] doc_list = self.get_doc_list(doc_queryset, prefix) # total = len(doc_list) pagination = {'current': page, 'total': total, 'page_size': page_size} res = { 'pagination': pagination, 'doc_list': doc_list } self.running_log.info('[get doc list] [args={0}] [res={1}]'.format(args, res)) return response.ok(data=res) # 上传pdf,模拟下单 @use_args(upload_pdf_args, location='files') def post(self, request, args): random_int = random.randint(0, consts.TIME_NUM) metadata_version_id = str(int(time.time()) - random_int) pdf_file = args.get('pdf_file') if isinstance(pdf_file.name, str): if not pdf_file.name.endswith('pdf') and not pdf_file.name.endswith('PDF'): self.invalid_params(msg='invalid params: not a PDF file') business_type = random.choice(consts.BUSINESS_TYPE_LIST) tmp_save_path = os.path.join(conf.DATA_DIR, business_type, '{0}.pdf'.format(metadata_version_id)) file_write(pdf_file, tmp_save_path) try: file = fitz.Document(tmp_save_path) except Exception as e: os.remove(tmp_save_path) raise self.invalid_params(msg='invalid params: not a PDF file') else: if not file.isPDF: file.close() os.remove(tmp_save_path) raise self.invalid_params(msg='invalid params: not a PDF file') file.close() # 1. 上传信息记录 application_id = '{0}{1}'.format(consts.FIXED_APPLICATION_ID_PREFIX, metadata_version_id) upload_finish_time = timezone.now() document_scheme = random.choice(consts.DOC_SCHEME_LIST) data_source = random.choice(consts.DATA_SOURCE_LIST) UploadDocRecords.objects.create( metadata_version_id=metadata_version_id, application_id=application_id, main_applicant='', co_applicant='', guarantor_1='', guarantor_2='', document_name=application_id, document_scheme=document_scheme, business_type=business_type, data_source=data_source, upload_finish_time=upload_finish_time, ) # 2. 根据业务类型分库存储 doc_class, prefix = self.get_doc_class(business_type) doc = doc_class.objects.create( metadata_version_id=metadata_version_id, application_id=application_id, # main_applicant='', # co_applicant='', # guarantor_1='', # guarantor_2='', document_name=application_id, document_scheme=document_scheme, data_source=data_source, upload_finish_time=upload_finish_time, ) # 3.pdf文件移动 save_dir_path = os.path.join(conf.DATA_DIR, business_type, consts.TMP_DIR_NAME, str(doc.id)) save_file_path = os.path.join(save_dir_path, '{0}.pdf'.format(doc.id)) os.makedirs(save_dir_path, exist_ok=True) # file_write(pdf_file, save_file_path) shutil.move(tmp_save_path, save_file_path) # 4. 选择队列进入 is_priority = False tasks = ['{0}{1}{2}'.format(prefix, consts.SPLIT_STR, doc.id)] enqueue_res = rh.enqueue(tasks, is_priority) self.running_log.info('[mock doc upload success] [args={0}] [business_type={1}] [doc_id={2}] ' '[is_priority={3}] [enqueue_res={4}]'.format(args, prefix, doc.id, is_priority, enqueue_res)) return response.ok()