import os import time import random import datetime from django.utils import timezone from django.db.utils import IntegrityError from django.db.models import Q from rest_framework.permissions import IsAuthenticated from webargs import fields, validate from webargs.djangoparser import use_args, parser from settings import conf from common import response from common.mixins import GenericView from common.tools.file_tools import file_write from common.redis_cache import redis_handler as rh from .models import UploadDocRecords, DocStatus, PriorityApplication, GCAPRecords from .mixins import DocHandler from . import consts from apps.account.authentication import OAuth2AuthenticationWithUser # restframework将request.body封装至request.data, webargs从request.data中获取参数 @parser.location_loader("data") def load_data(request, schema): return request.data application_data_args = {'applicationId': fields.Str(required=True, validate=validate.Length(max=64))} applicant_data_args = { 'mainApplicantName': fields.Str(required=True, validate=validate.Length(max=16)), 'coApplicantName': fields.Str(required=True, validate=validate.Length(max=16)), 'guarantor1Name': fields.Str(required=True, validate=validate.Length(max=16)), 'guarantor2Name': fields.Str(required=True, validate=validate.Length(max=16)), } document_args = { 'documentName': fields.Str(required=True, validate=validate.Length(max=255)), # Acceptance/Settlement/Contract Management 'documentScheme': fields.Str(required=True, validate=validate.Length(max=64)), 'businessType': fields.Str(required=True, validate=validate.Length(max=64)), # CO00001/CO00002 'uploadFinishTime': fields.DateTime(required=True), 'dataSource': fields.Str(required=True, validate=validate.Length(max=64)), # POS/EAPP/Econtract 'metadataVersionId': fields.Str(required=True, validate=validate.Length(max=64)), } doc_upload_args = { 'applicationData': fields.Nested(application_data_args, required=True), 'applicantData': fields.Nested(applicant_data_args, required=True), 'document': fields.Nested(document_args, required=True), } doc_list_args = { 'page': fields.Int(required=False, missing=consts.PAGE_DEFAULT, validate=lambda val: val >= 1), 'page_size': fields.Int(required=False, missing=consts.PAGE_SIZE_DEFAULT, validate=lambda val: val >= 1), 'status': fields.Int(required=False, validate=validate.OneOf(DocStatus.get_value_lst())), 'application_id': fields.Str(required=False, validate=validate.Length(max=64)), 'data_source': fields.Str(required=False, validate=validate.Length(max=64)), 'business_type': fields.Str(required=True, validate=validate.OneOf(consts.BUSINESS_TYPE_LIST)), 'upload_time_start': fields.Date(required=False), 'upload_time_end': fields.Date(required=False), 'create_time_start': fields.Date(required=False), 'create_time_end': fields.Date(required=False), } upload_pdf_args = { 'pdf_file': fields.Raw(required=True), } application_information = { "SUBMIT_DATETIME": fields.DateTime(required=True), "STATUS": fields.Int(required=True), 'ENTITY': fields.Str(required=True, validate=validate.Length(max=100)), "RATING": fields.Int(required=True), "APPLICATION_ID": fields.Str(required=True, validate=validate.Length(max=100)), "APPLICATION_VERSION": fields.Int(required=True), "INTERMEDIATE_DECISION": fields.Str(required=True, validate=validate.Length(max=100)), } priority_doc_args = { 'APPLICATION_INFORMATION': fields.Nested(application_information, required=True) } class UploadDocView(GenericView, DocHandler): permission_classes = [IsAuthenticated] authentication_classes = [OAuth2AuthenticationWithUser] # required_scopes = ['write'] # 上传(接收)文件接口 @use_args(doc_upload_args, location='data') def post(self, request, args): application_data = args.get('applicationData') applicant_data = args.get('applicantData') document = args.get('document') business_type = document.get('businessType') application_id = application_data.get('applicationId') try: # 1. 上传信息记录 record = UploadDocRecords.objects.create( metadata_version_id=document.get('metadataVersionId'), application_id=application_id, main_applicant=applicant_data.get('mainApplicantName'), co_applicant=applicant_data.get('coApplicantName'), guarantor_1=applicant_data.get('guarantor1Name'), guarantor_2=applicant_data.get('guarantor2Name'), document_name=document.get('documentName'), document_scheme=document.get('documentScheme'), business_type=business_type, data_source=document.get('dataSource'), upload_finish_time=document.get('uploadFinishTime'), ) except IntegrityError as e: self.running_log.info('[doc upload fail] [args={0}] [err={1}]'.format(args, e)) self.invalid_params(msg='metadataVersionId repeat') else: # 2. 根据业务类型分库存储 doc_class, prefix = self.get_doc_class(business_type) doc = doc_class.objects.create( record_id=record.id, metadata_version_id=document.get('metadataVersionId'), application_id=application_id, main_applicant=applicant_data.get('mainApplicantName'), co_applicant=applicant_data.get('coApplicantName'), guarantor_1=applicant_data.get('guarantor1Name'), guarantor_2=applicant_data.get('guarantor2Name'), document_name=document.get('documentName'), document_scheme=document.get('documentScheme'), data_source=document.get('dataSource'), upload_finish_time=document.get('uploadFinishTime'), ) # 3. 选择队列进入 is_priority = PriorityApplication.objects.filter(application_id=application_id, on_off=True).exists() value = ['{0}_{1}'.format(prefix, doc.id)] redis_res = rh.enqueue(value, is_priority) self.running_log.info('[doc upload success] [args={0}] [record_id={1}] [prefix={2}] [doc_id={3}] ' '[is_priority={4}] [enqueue_res={5}]'.format(args, record.id, prefix, doc.id, is_priority, redis_res)) return response.ok() post.openapi_doc = ''' tags: [doc] summary: POS系统上传文件信息 consumes: [application/json] produces: [application/json] parameters: - in: body name: body required: true schema: $ref: "#/definitions/Doc" responses: 200: description: ok schema: $ref: '#/definitions/ApiResponse' ''' class PriorityDocView(GenericView, DocHandler): permission_classes = [IsAuthenticated] authentication_classes = [OAuth2AuthenticationWithUser] # 优先级订单接口 @use_args(priority_doc_args, location='data') def post(self, request, args): application_info = args.get('APPLICATION_INFORMATION') application_id = application_info.get('APPLICATION_ID') submit_datetime = application_info.get('SUBMIT_DATETIME') entity = application_info.get('ENTITY') submit_datetime = timezone.make_naive(submit_datetime, timezone.get_current_timezone()) GCAPRecords.objects.create( entity=entity, status=application_info.get('STATUS'), rating=application_info.get('RATING'), application_id=application_id, application_version=application_info.get('APPLICATION_VERSION'), intermediate_decision=application_info.get('INTERMEDIATE_DECISION'), submit_datetime=submit_datetime, ) _, created = PriorityApplication.objects.update_or_create(application_id=application_id, defaults={'on_off': True}) if created: doc_class, prefix = self.get_doc_class(entity) doc_ids = doc_class.objects.filter(application_id=application_id, status=DocStatus.INIT.value).values_list('id', flat=True) task_str_list = ['{0}_{1}'.format(prefix, doc_id) for doc_id in doc_ids] if not task_str_list: self.running_log.info( '[priority doc success] [args={0}] [task_str_list={1}]'.format(args, task_str_list)) else: enqueue_res = rh.enqueue(task_str_list, is_priority=True) self.running_log.info('[priority doc success] [args={0}] [task_str_list={1}] [enqueue_res={2}]'.format( args, task_str_list, enqueue_res)) return response.ok() post.openapi_doc = ''' tags: [doc] summary: GCAP提高申请单对应文件优先级 consumes: [application/json] produces: [application/json] parameters: - in: body name: body required: true schema: $ref: "#/definitions/Application" responses: 200: description: ok schema: $ref: '#/definitions/ApiResponse' ''' class DocView(GenericView, DocHandler): # 文件列表页 @use_args(doc_list_args, location='querystring') def get(self, request, args): page = args.get('page', consts.PAGE_DEFAULT) page_size = args.get('page_size', consts.PAGE_SIZE_DEFAULT) status = args.get('status') application_id = args.get('application_id') data_source = args.get('data_source') business_type = args.get('business_type') upload_time_start = args.get('upload_time_start') upload_time_end = args.get('upload_time_end') create_time_start = args.get('create_time_start') create_time_end = args.get('create_time_end') status_query = Q(status=status) if status is not None else Q() application_id_query = Q(application_id__contains=application_id) if application_id is not None else Q() data_source_query = Q(data_source=data_source) if data_source is not None else Q() upload_finish_time_query = Q(upload_finish_time__gte=upload_time_start, upload_finish_time__lt=upload_time_end + datetime.timedelta(days=1))\ if upload_time_start is not None and upload_time_end is not None else Q() create_time_query = Q(create_time__gte=create_time_start, create_time__lt=create_time_end + datetime.timedelta(days=1))\ if create_time_start is not None and create_time_end is not None else Q() query = application_id_query & status_query & data_source_query & upload_finish_time_query & create_time_query val_tuple = ('id', 'application_id', 'upload_finish_time', 'create_time', 'data_source', 'status') doc_class, _ = self.get_doc_class(business_type) doc_queryset = doc_class.objects.filter(query).values(*val_tuple).order_by('-upload_finish_time') doc_list = self.get_doc_list(doc_queryset) total = len(doc_list) start_index = page_size * (page - 1) end_index = page_size * page if start_index >= total > 0: raise self.invalid_params('页数不存在') pagination = {'current': page, 'total': total, 'page_size': page_size} res = { 'pagination': pagination, 'doc_list': doc_list[start_index: end_index] } self.running_log.info('[get doc list] [args={0}] [res={1}]'.format(args, res)) return response.ok(data=res) # 上传pdf,模拟下单 @use_args(upload_pdf_args, location='files') def post(self, request, args): # 1. 上传信息记录 const_str = '手工单' metadata_version_id = str(int(time.time())) upload_finish_time = timezone.now() document_scheme = random.choice(consts.DOC_SCHEME_LIST) data_source = random.choice(consts.DATA_SOURCE_LIST) business_type = random.choice(consts.BUSINESS_TYPE_LIST) record = UploadDocRecords.objects.create( metadata_version_id=metadata_version_id, application_id=const_str, main_applicant=const_str, co_applicant=const_str, guarantor_1=const_str, guarantor_2=const_str, document_name=const_str, document_scheme=document_scheme, business_type=business_type, data_source=data_source, upload_finish_time=upload_finish_time, ) # 2. 根据业务类型分库存储 doc_class, prefix = self.get_doc_class(business_type) doc = doc_class.objects.create( record_id=record.id, metadata_version_id=metadata_version_id, application_id=const_str, main_applicant=const_str, co_applicant=const_str, guarantor_1=const_str, guarantor_2=const_str, document_name=const_str, document_scheme=document_scheme, data_source=data_source, upload_finish_time=upload_finish_time, ) # 3. 选择队列进入 is_priority = False value = ['{0}_{1}'.format(prefix, doc.id)] redis_res = rh.enqueue(value, is_priority) pdf_file = args.get('pdf_file') save_dir_path = os.path.join(conf.DATA_DIR, business_type, str(doc.id)) save_file_path = os.path.join(save_dir_path, '{0}.pdf'.format(doc.id)) os.makedirs(save_dir_path, exist_ok=True) file_write(pdf_file, save_file_path) self.running_log.info('[mock doc upload success] [args={0}] [record_id={1}] [prefix={2}] [doc_id={3}] ' '[is_priority={4}] [enqueue_res={5}]'.format(args, record.id, prefix, doc.id, is_priority, redis_res)) return response.ok()