Merge branch 'feature/CHINARPA-5155'
Showing
2 changed files
with
112 additions
and
1 deletions
... | @@ -8,4 +8,6 @@ broker = conf.CELERY_BROKER_URL | ... | @@ -8,4 +8,6 @@ broker = conf.CELERY_BROKER_URL |
8 | 8 | ||
9 | app = Celery('celery_compare', broker=broker, include=['celery_compare.tasks']) | 9 | app = Celery('celery_compare', broker=broker, include=['celery_compare.tasks']) |
10 | 10 | ||
11 | app.conf.update(worker_max_tasks_per_child=5, timezone='Asia/Shanghai') | 11 | # worker_max_tasks_per_child ,worker执行了几次任务就会死 |
12 | #app.conf.update(worker_max_tasks_per_child=10, timezone='Asia/Shanghai') | ||
13 | app.conf.update(timezone='Asia/Shanghai') | ... | ... |
... | @@ -39,6 +39,7 @@ from apps.doc.models import ( | ... | @@ -39,6 +39,7 @@ from apps.doc.models import ( |
39 | HILCompareReportNew, | 39 | HILCompareReportNew, |
40 | AFCCompareReportNew, | 40 | AFCCompareReportNew, |
41 | AFCDoc, | 41 | AFCDoc, |
42 | HILDoc, | ||
42 | DealerMapping, | 43 | DealerMapping, |
43 | ) | 44 | ) |
44 | from apps.doc import consts | 45 | from apps.doc import consts |
... | @@ -49,6 +50,12 @@ from apps.doc.named_enum import RequestTeam, RequestTrigger, ProcessName, ErrorT | ... | @@ -49,6 +50,12 @@ from apps.doc.named_enum import RequestTeam, RequestTrigger, ProcessName, ErrorT |
49 | from common.tools.comparison import cp | 50 | from common.tools.comparison import cp |
50 | from common.tools.des import decode_des | 51 | from common.tools.des import decode_des |
51 | 52 | ||
53 | import threading | ||
54 | import concurrent.futures | ||
55 | from concurrent.futures import ThreadPoolExecutor | ||
56 | |||
57 | pool = ThreadPoolExecutor(max_workers=50, thread_name_prefix="compare_thread_") | ||
58 | |||
52 | compare_log = logging.getLogger('compare') | 59 | compare_log = logging.getLogger('compare') |
53 | log_base = '[Compare]' | 60 | log_base = '[Compare]' |
54 | # e_log_base = '[e-contract]' | 61 | # e_log_base = '[e-contract]' |
... | @@ -3663,6 +3670,24 @@ def se_compare(application_id, application_entity, ocr_res_id, last_obj, ocr_res | ... | @@ -3663,6 +3670,24 @@ def se_compare(application_id, application_entity, ocr_res_id, last_obj, ocr_res |
3663 | } | 3670 | } |
3664 | } | 3671 | } |
3665 | try: | 3672 | try: |
3673 | compare_log.info('{0} [SE] [cms sleep start] [entity={1}] [id={2}] '.format(log_base, application_entity, application_id)) | ||
3674 | # 实时查询延迟时间 | ||
3675 | try: | ||
3676 | cms_delay_time_config = Configs.objects.filter(id=5).first() | ||
3677 | if cms_delay_time_config is not None and cms_delay_time_config.value is not None and cms_delay_time_config.value.isdigit(): | ||
3678 | cms_delay_time = cms_delay_time_config.value | ||
3679 | else: | ||
3680 | cms_delay_time = 0 | ||
3681 | except Exception as e: | ||
3682 | cms_delay_time = 0 | ||
3683 | compare_log.info('[get cms_delay_time_config fail] [error={0}]'.format(traceback.format_exc())) | ||
3684 | |||
3685 | compare_log.info('cms_delay_time:{0}'.format(cms_delay_time)) | ||
3686 | try: | ||
3687 | time.sleep(int(cms_delay_time)) | ||
3688 | except Exception as e: | ||
3689 | compare_log.info('[sleep error] [error={0}]'.format(traceback.format_exc())) | ||
3690 | compare_log.info('{0} [SE] [cms sleep end] [entity={1}] [id={2}] '.format(log_base, application_entity, application_id)) | ||
3666 | response = cms.send(data) # interface_report ocr to cms | 3691 | response = cms.send(data) # interface_report ocr to cms |
3667 | except Exception as e: | 3692 | except Exception as e: |
3668 | is_success = False | 3693 | is_success = False |
... | @@ -3694,9 +3719,52 @@ def se_compare(application_id, application_entity, ocr_res_id, last_obj, ocr_res | ... | @@ -3694,9 +3719,52 @@ def se_compare(application_id, application_entity, ocr_res_id, last_obj, ocr_res |
3694 | 3719 | ||
3695 | @app.task | 3720 | @app.task |
3696 | def fsm_compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True, is_cms=False): | 3721 | def fsm_compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True, is_cms=False): |
3722 | # try: | ||
3723 | # producer_thread_fsm = threading.Thread(target=fsm_compare_thread, args=(application_id, application_entity, uniq_seq, ocr_res_id, is_ca, is_cms)) | ||
3724 | # producer_thread_fsm.start() | ||
3725 | # except Exception as e: | ||
3726 | # compare_log.info('[fsm thread error] [error={0}]'.format(traceback.format_exc())) | ||
3727 | |||
3728 | # with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: | ||
3729 | # # 使用map函数提交多个任务 | ||
3730 | # results = list(executor.map(fsm_compare_thread, application_id, application_entity, uniq_seq, ocr_res_id, is_ca, is_cms)) | ||
3731 | |||
3732 | compare_log.info('[fsm thread]') | ||
3733 | #pool = ThreadPoolExecutor(max_workers=6, thread_name_prefix="fsm_thread_") | ||
3734 | pool.submit(fsm_compare_thread, application_id, application_entity, uniq_seq, ocr_res_id, is_ca, is_cms) | ||
3735 | #pool.shutdown(wait=True) | ||
3736 | |||
3737 | |||
3738 | def fsm_compare_thread(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True, is_cms=False): | ||
3697 | compare_log.info('{0} [receive fsm task] [entity={1}] [id={2}] [uniq_seq={3}] [ocr_res_id={4}] [is_ca={5}] ' | 3739 | compare_log.info('{0} [receive fsm task] [entity={1}] [id={2}] [uniq_seq={3}] [ocr_res_id={4}] [is_ca={5}] ' |
3698 | '[is_cms={6}]'.format(log_base, application_entity, application_id, uniq_seq, ocr_res_id, | 3740 | '[is_cms={6}]'.format(log_base, application_entity, application_id, uniq_seq, ocr_res_id, |
3699 | is_ca, is_cms)) | 3741 | is_ca, is_cms)) |
3742 | |||
3743 | # 查看此订单号下是否有未完成的文件,如果有,等1分钟 | ||
3744 | doc_wait_file_class = HILDoc if application_entity == consts.HIL_PREFIX else AFCDoc | ||
3745 | doc_wait_file_result = doc_wait_file_class.objects.filter(application_id=application_id, status=1).first() | ||
3746 | compare_log.info('doc_wait_file_result:{0}'.format(doc_wait_file_result)) | ||
3747 | compare_log.info('{0} [comparison unfinished file check] [entity={1}] [id={2}] [doc_wait_file_result={3}]'.format(log_base, application_entity, application_id, doc_wait_file_result)) | ||
3748 | if doc_wait_file_result is not None: | ||
3749 | # 实时查询延迟时间 | ||
3750 | try: | ||
3751 | delay_time_config = Configs.objects.filter(id=4).first() | ||
3752 | if delay_time_config is not None and delay_time_config.value is not None and delay_time_config.value.isdigit(): | ||
3753 | delay_time = delay_time_config.value | ||
3754 | else: | ||
3755 | delay_time = 0 | ||
3756 | except Exception as e: | ||
3757 | delay_time = 0 | ||
3758 | compare_log.info('[get delay_time_config fail] [error={0}]'.format(traceback.format_exc())) | ||
3759 | compare_log.info('delay_time:{0}'.format(delay_time)) | ||
3760 | compare_log.info('{0} [comparison unfinished file wait delay_time start] [entity={1}] [id={2}] [doc_id={3}]'.format(log_base, application_entity, application_id, doc_wait_file_result.id)) | ||
3761 | try: | ||
3762 | time.sleep(int(delay_time)) | ||
3763 | except Exception as e: | ||
3764 | compare_log.info('[sleep error] [error={0}]'.format(traceback.format_exc())) | ||
3765 | compare_log.info('{0} [comparison unfinished file wait delay_time end] [entity={1}] [id={2}] [doc_id={3}]'.format(log_base, application_entity, application_id, doc_wait_file_result.id)) | ||
3766 | |||
3767 | |||
3700 | # 调用java fsm 比对流程接口(http) | 3768 | # 调用java fsm 比对流程接口(http) |
3701 | # 调用Java fsm 比对流程接口, fsm 是se流程, ca可以暂时忽略 | 3769 | # 调用Java fsm 比对流程接口, fsm 是se流程, ca可以暂时忽略 |
3702 | auto_class = HILAutoSettlement if application_entity == consts.HIL_PREFIX else AFCAutoSettlement | 3770 | auto_class = HILAutoSettlement if application_entity == consts.HIL_PREFIX else AFCAutoSettlement |
... | @@ -3729,6 +3797,22 @@ def fsm_compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca= | ... | @@ -3729,6 +3797,22 @@ def fsm_compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca= |
3729 | 3797 | ||
3730 | @app.task | 3798 | @app.task |
3731 | def compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True, is_cms=False): | 3799 | def compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True, is_cms=False): |
3800 | # try: | ||
3801 | # producer_thread = threading.Thread(target=compare_thread, args=(application_id, application_entity, uniq_seq, ocr_res_id, is_ca, is_cms)) | ||
3802 | # producer_thread.start() | ||
3803 | # except Exception as e: | ||
3804 | # compare_log.info('[thread error] [error={0}]'.format(traceback.format_exc())) | ||
3805 | |||
3806 | # with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: | ||
3807 | # # 使用map函数提交多个任务 | ||
3808 | # results = list(executor.map(compare_thread, application_id, application_entity, uniq_seq, ocr_res_id, is_ca, is_cms)) | ||
3809 | |||
3810 | compare_log.info('[non fsm thread]') | ||
3811 | #pool = ThreadPoolExecutor(max_workers=6, thread_name_prefix="non_fsm_thread_") | ||
3812 | pool.submit(compare_thread, application_id, application_entity, uniq_seq, ocr_res_id, is_ca, is_cms) | ||
3813 | #pool.shutdown(wait=True) | ||
3814 | |||
3815 | def compare_thread(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True, is_cms=False): | ||
3732 | # POS: application_id, application_entity, uniq_seq, None | 3816 | # POS: application_id, application_entity, uniq_seq, None |
3733 | # OCR: application_id, business_type(application_entity), None, ocr_res_id | 3817 | # OCR: application_id, business_type(application_entity), None, ocr_res_id |
3734 | 3818 | ||
... | @@ -3736,6 +3820,31 @@ def compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True | ... | @@ -3736,6 +3820,31 @@ def compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True |
3736 | '[is_cms={6}]'.format(log_base, application_entity, application_id, uniq_seq, ocr_res_id, | 3820 | '[is_cms={6}]'.format(log_base, application_entity, application_id, uniq_seq, ocr_res_id, |
3737 | is_ca, is_cms)) | 3821 | is_ca, is_cms)) |
3738 | 3822 | ||
3823 | # 查看此订单号下是否有未完成的文件,如果有,等?分钟 | ||
3824 | doc_wait_file_class = HILDoc if application_entity == consts.HIL_PREFIX else AFCDoc | ||
3825 | doc_wait_file_result = doc_wait_file_class.objects.filter(application_id=application_id, status=1).first() | ||
3826 | compare_log.info('doc_wait_file_result:{0}'.format(doc_wait_file_result)) | ||
3827 | compare_log.info('{0} [comparison unfinished file check] [entity={1}] [id={2}] [doc_wait_file_result={3}]'.format(log_base, application_entity, application_id, doc_wait_file_result)) | ||
3828 | if doc_wait_file_result is not None: | ||
3829 | # 实时查询延迟时间 | ||
3830 | try: | ||
3831 | delay_time_config = Configs.objects.filter(id=4).first() | ||
3832 | if delay_time_config is not None and delay_time_config.value is not None and delay_time_config.value.isdigit(): | ||
3833 | delay_time = delay_time_config.value | ||
3834 | else: | ||
3835 | delay_time = 0 | ||
3836 | except Exception as e: | ||
3837 | delay_time = 0 | ||
3838 | compare_log.info('[get delay_time_config fail] [error={0}]'.format(traceback.format_exc())) | ||
3839 | compare_log.info('delay_time:{0}'.format(delay_time)) | ||
3840 | compare_log.info('{0} [comparison unfinished file wait delay_time start] [entity={1}] [id={2}] [doc_id={3}]'.format(log_base, application_entity, application_id, doc_wait_file_result.id)) | ||
3841 | try: | ||
3842 | time.sleep(int(delay_time)) | ||
3843 | except Exception as e: | ||
3844 | compare_log.info('[sleep error] [error={0}]'.format(traceback.format_exc())) | ||
3845 | compare_log.info('{0} [comparison unfinished file wait delay_time end] [entity={1}] [id={2}] [doc_id={3}]'.format(log_base, application_entity, application_id, doc_wait_file_result.id)) | ||
3846 | |||
3847 | |||
3739 | # 根据application_id查找最新的比对信息,如果没有,结束 | 3848 | # 根据application_id查找最新的比对信息,如果没有,结束 |
3740 | if is_ca: | 3849 | if is_ca: |
3741 | comparison_class = HILComparisonInfo if application_entity == consts.HIL_PREFIX else AFCComparisonInfo | 3850 | comparison_class = HILComparisonInfo if application_entity == consts.HIL_PREFIX else AFCComparisonInfo | ... | ... |
-
Please register or sign in to post a comment