f77b2322 by 周伟奇

e-contract part 1

1 parent cc6c63c8
......@@ -1773,3 +1773,21 @@ APPLICANT_TYPE_MAP = {
}
APPLICANT_TYPE_ORDER = ['Borrower', 'Co-Borrower', 'Guarantor', 'Mortgager']
FILE_NAME_PREFIX_MAP = {
AFC_PREFIX: [
((CONTRACT_CLASSIFY, 0), '{0}_电子签署-汽车抵押贷款合同'),
((HMH_CLASSIFY, 0), '{0}_电子签署-抵押登记豁免函'),
],
HIL_PREFIX: [
((HIL_CONTRACT_1_CLASSIFY, HIL_CONTRACT_3_CLASSIFY), '{0}_电子签署-售后回租合同'),
((HIL_CONTRACT_2_CLASSIFY, 0), '{0}_电子签署-汽车租赁抵押合同'),
((HMH_CLASSIFY, 0), '{0}_电子签署-抵押登记豁免函'),
]
}
HIL_CONTRACT_TYPE_MAP = {
str(HIL_CONTRACT_1_CLASSIFY): 0,
str(HIL_CONTRACT_2_CLASSIFY): 2,
str(HIL_CONTRACT_3_CLASSIFY): 1,
}
\ No newline at end of file
......
......@@ -18,6 +18,8 @@ from settings import conf
from common.mixins import LoggerMixin
from common.tools.file_tools import write_zip_file
from common.tools.pdf_to_img import PDFHandler
from common.electronic_afc_contract.afc_contract_ocr import predict as afc_predict
from common.electronic_hil_contract.hil_contract_ocr import predict as hil_predict
from apps.doc import consts
# from apps.doc.ocr.edms import EDMS, rh
from apps.doc.ocr.ecm import ECM, rh
......@@ -47,6 +49,7 @@ class Command(BaseCommand, LoggerMixin):
def __init__(self):
super().__init__()
self.log_base = '[doc ocr process]'
self.e_log_base = '[e-contract ocr process]'
# 处理文件开关
self.switch = True
# 睡眠时间
......@@ -90,13 +93,20 @@ class Command(BaseCommand, LoggerMixin):
task_str, is_priority = rh.dequeue()
if task_str is None:
self.online_log.info('{0} [get_doc_info] [queue empty]'.format(self.log_base))
return None, None, None
return None, None, None, None, None
self.online_log.info('{0} [get_doc_info] [task={1}] [is_priority={2}]'.format(
self.log_base, task_str, is_priority))
try:
# doc, business_type = self.get_doc_object(task_str)
business_type, doc_id_str = task_str.split(consts.SPLIT_STR)
info_tuple = task_str.split(consts.SPLIT_STR)
if len(info_tuple) == 2:
business_type, doc_id_str = info_tuple
classify_1_str = classify_2_str = '0'
rebuild_task_str = task_str
else:
business_type, doc_id_str, classify_1_str, classify_2_str = info_tuple
rebuild_task_str = '{0}{1}{2}'.format(business_type, consts.SPLIT_STR, doc_id_str)
doc_id = int(doc_id_str)
doc_class = HILDoc if business_type == consts.HIL_PREFIX else AFCDoc
doc = doc_class.objects.filter(id=doc_id).first()
......@@ -104,11 +114,11 @@ class Command(BaseCommand, LoggerMixin):
if doc is None:
self.online_log.warn('{0} [get_doc_info] [doc not exist] [task_str={1}] [is_priority={2}]'.format(
self.log_base, task_str, is_priority))
return None, None, None
return None, None, None, None, None
elif doc.status != DocStatus.INIT.value:
self.online_log.warn('{0} [get_doc_info] [doc status error] [task_str={1}] [is_priority={2}] '
'[doc_status={3}]'.format(self.log_base, task_str, is_priority, doc.status))
return None, None, None
return None, None, None, None, None
doc.status = DocStatus.PROCESSING.value
doc.start_time = timezone.now()
doc.save()
......@@ -120,7 +130,7 @@ class Command(BaseCommand, LoggerMixin):
else:
self.online_log.info('{0} [get_doc_info] [db save end] [task_str={1}] [is_priority={2}]'.format(
self.log_base, task_str, is_priority))
return doc, business_type, task_str
return doc, business_type, rebuild_task_str, classify_1_str, classify_2_str
# def pdf_download(self, doc, pdf_path):
# if not doc.application_id.startswith(consts.FIXED_APPLICATION_ID_PREFIX):
......@@ -915,11 +925,11 @@ class Command(BaseCommand, LoggerMixin):
# summary['confidence'] = max(summary['confidence'])
return merged_bs_summary
def pdf_2_img_2_queue(self, img_queue, todo_count_dict, lock, error_list):
def pdf_2_img_2_queue(self, img_queue, todo_count_dict, lock, error_list, res_dict, finish_queue):
while self.switch:
try:
# 1. 从队列获取文件信息
doc, business_type, task_str = self.get_doc_info()
doc, business_type, task_str, classify_1_str, classify_2_str = self.get_doc_info()
# 队列为空时的处理
if doc is None:
time.sleep(self.sleep_time_doc_get)
......@@ -930,14 +940,16 @@ class Command(BaseCommand, LoggerMixin):
error_list.append(1)
return
else:
try:
# 2. 从EDMS获取PDF文件
doc_data_path = os.path.join(self.data_dir, business_type, consts.TMP_DIR_NAME, str(doc.id))
os.makedirs(doc_data_path, exist_ok=True)
img_save_path = os.path.join(doc_data_path, 'img')
pdf_path = os.path.join(doc_data_path, '{0}.pdf'.format(doc.id))
pdf_handler = PDFHandler(pdf_path, img_save_path, doc.document_name)
if classify_1_str == '0' or classify_1_str == str(consts.HMH_CLASSIFY):
try:
# 2. 从EDMS获取PDF文件
max_count_obj = Configs.objects.filter(id=2).first()
try:
max_img_count = int(max_count_obj.value)
......@@ -1057,6 +1069,107 @@ class Command(BaseCommand, LoggerMixin):
self.log_base, traceback.format_exc()))
error_list.append(1)
return
else: # e-contract
try:
# pdf下载 处理 图片存储 识别
for times in range(consts.RETRY_TIMES):
try:
self.edms.download(pdf_path, doc.metadata_version_id, doc.document_scheme, business_type)
self.online_log.info('{0} [edms download success] [task={1}] [times={2}] '
'[pdf_path={3}]'.format(self.e_log_base, task_str, times, pdf_path))
self.online_log.info('{0} [pdf to img start] [task={1}] [times={2}]'.format(
self.e_log_base, task_str, times))
pdf_handler.e_contract_process()
self.online_log.info(
'{0} [pdf to img end] [task={1}] [times={2}]'.format(self.e_log_base, task_str, times))
except Exception as e:
self.online_log.warn('{0} [download or pdf to img failed] [task={1}] [times={2}] '
'[error={3}]'.format(self.e_log_base, task_str, times,
traceback.format_exc()))
else:
break
else:
raise Exception('download or pdf to img failed')
if classify_1_str == str(consts.CONTRACT_CLASSIFY):
ocr_result = afc_predict(pdf_handler.pdf_info)
page_res = {}
for page_num, page_info in ocr_result.get('page_info', {}).items():
if isinstance(page_num, str) and page_num.startswith('page_'):
page_res[page_num] = {
'classify': int(classify_1_str),
'page_num': page_num,
'page_info': page_info
}
else:
file_type_1 = consts.HIL_CONTRACT_TYPE_MAP.get(classify_1_str)
file_type_2 = consts.HIL_CONTRACT_TYPE_MAP.get(classify_2_str)
ocr_result_1 = hil_predict(pdf_handler.pdf_info, file_type_1)
rebuild_res_1 = {}
page_res = {}
for field_name, field_info in ocr_result_1.items():
page_num = field_info.pop('page', 'page_1')
rebuild_res_1.setdefault(page_num, dict())[field_name] = field_info
for page_num, page_info in rebuild_res_1.items():
if isinstance(page_num, str) and page_num.startswith('page_'):
page_res[page_num] = {
'classify': int(classify_1_str),
'page_num': page_num,
'page_info': page_info
}
if isinstance(file_type_2, int):
rebuild_res_2 = {}
ocr_result_2 = hil_predict(pdf_handler.pdf_info, file_type_2)
for field_name, field_info in ocr_result_2.items():
page_num = field_info.pop('page', 'page_1')
rebuild_res_2.setdefault(page_num, dict())[field_name] = field_info
for page_num, page_info in ocr_result_2.items():
if isinstance(page_num, str) and page_num.startswith('page_'):
page_res[page_num] = {
'classify': int(classify_2_str),
'page_num': page_num,
'page_info': page_info
}
contract_res = {}
for img_path_tmp, page_key in pdf_handler.img_path_pno_list:
if page_key in page_res:
img_contract_res = {
'code': 1,
'data': [
{
'classify': page_res[page_key].pop('classify', consts.OTHER_CLASSIFY),
'data': page_res[page_key]
}
]
}
else:
img_contract_res = {
'code': 1,
'data': [
{
'classify': int(classify_1_str),
}
]
}
contract_res[img_path_tmp] = img_contract_res
with lock:
res_dict[task_str] = contract_res
finish_queue.put(task_str)
except Exception as e:
try:
doc.status = DocStatus.PROCESS_FAILED.value
doc.save()
self.online_log.warn('{0} [process failed (e-contract)] [task={1}] '
'[error={2}]'.format(self.e_log_base, task_str, traceback.format_exc()))
except Exception as e:
self.online_log.error('{0} [process error (db save)] [error={1}]'.format(
self.e_log_base, traceback.format_exc()))
error_list.append(1)
return
def img_2_ocr_1(self, img_queue, todo_count_dict, res_dict, finish_queue, lock, url, error_list):
while len(error_list) == 0 or not img_queue.empty():
......@@ -1801,7 +1914,7 @@ class Command(BaseCommand, LoggerMixin):
finish_queue = Queue()
process_list = []
pdf_process = Process(target=self.pdf_2_img_2_queue, args=(img_queue, todo_count_dict, lock, error_list))
pdf_process = Process(target=self.pdf_2_img_2_queue, args=(img_queue, todo_count_dict, lock, error_list, res_dict, finish_queue))
process_list.append(pdf_process)
for url in self.ocr_1_urls.values():
......
......@@ -789,3 +789,24 @@ class HILCACompareResultRecord(models.Model):
db_table = 'hil_ca_compare_result_record'
class HILContract(models.Model):
id = models.AutoField(primary_key=True, verbose_name="id") # 主键
application_id = models.CharField(max_length=64, verbose_name="申请id") # 索引
create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
class Meta:
managed = False
db_table = 'hil_contract'
class AFCContract(models.Model):
id = models.AutoField(primary_key=True, verbose_name="id") # 主键
application_id = models.CharField(max_length=64, verbose_name="申请id") # 索引
create_time = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
class Meta:
managed = False
db_table = 'afc_contract'
situ_db_label = 'afc'
......
import os
import base64
import requests
from common.redis_cache import redis_handler as rh
......@@ -44,7 +45,6 @@ class ECM:
"b_coborrower_id", "b_coborrower_name", "b_guarantor_id", "b_guarantor_name",
"b_frontend_partner", "b_dealer_code", "b_dealer_name", "b_input_date", "b_comment",
"b_contract_no", "b_location"]
self.contract_prefix = '电子'
def update_oauth_token(self):
response = requests.post(self.oauth_url, headers=self.oauth_headers, data=self.oauth_payload, verify=False)
......@@ -69,9 +69,9 @@ class ECM:
def get_headers(self):
return {'Authorization': '{0} {1}'.format(self.token_type, self.get_oauth_token())}
def search(self, application_id, business_type):
def search(self, application_id, business_type, prefix):
sql = "select * from {0} where b_application_no='{1}' and object_name like '{2}%'".format(
self.settlement_type, application_id, self.contract_prefix)
self.settlement_type, application_id, prefix)
search_args = {
"userName": self.username,
"password": self.pwd,
......@@ -96,7 +96,6 @@ class ECM:
result.append((object_name, object_id))
return result
def download(self, save_path, object_id, document_scheme, business_type):
doc_type, _, _ = self.doc_type_map.get(document_scheme)
download_json = {
......
......@@ -36,12 +36,14 @@ from .models import (
AFCSECompareResultRecord,
HILCACompareResultRecord,
HILSECompareResultRecord,
HILContract,
AFCContract,
)
from .named_enum import ErrorType
from .mixins import DocHandler
from . import consts
from apps.account.authentication import OAuth2AuthenticationWithUser
from celery_compare.tasks import compare
from celery_compare.tasks import compare, forwarding_station
class CustomDate(fields.Date):
......@@ -1164,5 +1166,11 @@ class SEContractView(GenericView):
# pos上传e-contract信息接口 SE
@use_args(se_contract_args, location='data')
def post(self, request, args):
self.running_log.info('e-contract in')
contract_info = args.get('content', {})
application_id = contract_info.get('applicationId', '')
entity = contract_info.get('applicationEntity', '')
table_class = HILContract if entity == consts.HIL_PREFIX else AFCContract
table_class.objects.create(application_id=application_id)
forwarding_station.apply_async((application_id, entity), queue='queue_compare', countdown=conf.DELAY_SECONDS)
self.running_log.info('[e-contract] [application_id={0}] [entity={1}]'.format(application_id, entity))
return response.ok()
......
......@@ -27,10 +27,13 @@ from apps.doc.models import (
AFCCACompareResult,
HILSECompareResult,
HILCACompareResult,
AFCDoc,
HILDoc
)
from apps.doc import consts
from apps.doc.ocr.gcap import gcap
from apps.doc.ocr.cms import cms
from apps.doc.ocr.ecm import ECM, rh
from apps.doc.exceptions import GCAPException
from apps.doc.named_enum import RequestTeam, RequestTrigger, ProcessName, ErrorType
from common.tools.comparison import cp
......@@ -38,9 +41,11 @@ from common.tools.des import decode_des
compare_log = logging.getLogger('compare')
log_base = '[Compare]'
e_log_base = '[e-contract]'
empty_str = ''
empty_error_type = 1000
des_key = conf.CMS_DES_KEY
ecm = ECM()
def rotate_bound(image, angle):
......@@ -1867,4 +1872,32 @@ def compare(application_id, application_entity, uniq_seq, ocr_res_id, is_ca=True
se_compare(application_id, application_entity, ocr_res_id, last_obj, ocr_res_dict, is_cms)
@app.task
def forwarding_station(application_id, entity):
compare_log.info('{0} [forward start] [application_id={1}] [entity={2}]'.format(e_log_base, application_id, entity))
doc_class = HILDoc if entity in consts.HIL_SET else AFCDoc
entity_prefix = consts.HIL_PREFIX if entity in consts.HIL_SET else consts.AFC_PREFIX
for (classify_1, classify_2), prefix in consts.FILE_NAME_PREFIX_MAP.get(entity):
try:
file_list = ecm.search(application_id, entity, prefix.format(application_id)) # TODO 获取最新文件
except Exception as e:
compare_log.error('{0} [search failed] [application_id={1}] [entity={2}] [error={3}]'.format(
e_log_base, application_id, entity, traceback.format_exc()))
else:
compare_log.info('{0} [search end] [application_id={1}] [entity={2}] [file_list={3}]'.format(
e_log_base, application_id, entity, file_list))
for object_name, object_id in file_list:
doc = doc_class.objects.create(
metadata_version_id=object_id,
application_id=application_id,
document_name=object_name,
document_scheme='SETTLEMENT',
data_source='POS',
upload_finish_time=datetime.now(),
)
task = consts.SPLIT_STR.join([entity_prefix, str(doc.id), str(classify_1), str(classify_2)])
enqueue_res = rh.enqueue([task], False)
compare_log.info('{0} [upload success] [res={1}] [application_id={2}] [entity={3}] [object_name={4}] '
'[object_id={5}] [doc_id={6}]'.format(e_log_base, enqueue_res, application_id, entity,
object_name, object_id, doc.id))
compare_log.info('{0} [forward end] [application_id={1}] [entity={2}]'.format(e_log_base, application_id, entity))
......
# -*- coding: utf-8 -*-
# @Author : lk
# @Email : 9428.al@gmail.com
# @Created Date : 2021-06-29 17:43:46
# @Last Modified : 2021-09-07 14:11:25
# @Description :
from .get_char import Finder
def predict(pdf_info):
# 输入是整个 PDF 中的信息
f = Finder(pdf_info)
results = f.get_info()
return results
# -*- coding: utf-8 -*-
# @Author : lk
# @Email : 9428.al@gmail.com
# @Create Date : 2021-07-20 16:42:41
# @Last Modified : 2021-09-07 19:52:39
# @Description :
import re
import numpy as np
from fuzzywuzzy import fuzz
class Finder:
def __init__(self, pdf_info):
self.pdf_info = pdf_info
self.is_asp = False
self.item = {"words": None,
"position": None,
}
def gen_init_result(self, is_asp):
# 格式化算法输出
self.init_result = {"page_1": {"合同编号": self.item,
"所购车辆价格": self.item,
"车架号": self.item,
"贷款本金金额": {"大写": self.item,
"小写": self.item,
"车辆贷款本金金额": self.item,
"附加产品融资贷款本金总金额": self.item,
},
"贷款期限": self.item,
"附加产品融资贷款本金总金额明细": self.item,
"借款人签字及时间": self.item,
},
"page_2": {"合同编号": self.item,
"借款人及抵押人": {"name": self.item,
"id": self.item,
},
"共同借款人及共同抵押人": {"name": self.item,
"id": self.item,
},
"保证人1": {"name": self.item,
"id": self.item,
},
"保证人2": {"name": self.item,
"id": self.item,
},
"所购车辆价格": self.item,
"车架号": self.item,
"经销商": self.item,
"贷款本金金额": {"大写": self.item,
"小写": self.item,
"车辆贷款本金金额": self.item,
"附加产品融资贷款本金总金额": self.item,
},
"贷款期限": self.item,
"还款账户": {"账号": self.item,
"户名": self.item,
"开户行": self.item,
},
},
"page_3": {"合同编号": self.item,
"还款计划表": self.item,
},
"page_4": {"合同编号": self.item,
"附加产品融资贷款本金总金额明细": self.item,
},
"page_5": {"合同编号": self.item,
},
"page_6": {"合同编号": self.item,
},
}
if self.is_asp == False:
self.init_result["page_7"] = {"合同编号": self.item,
"主借人签字": {"签字": self.item,
"日期": self.item,
},
"共借人签字": {"签字": self.item,
"日期": self.item,
},
"保证人1签字": {"签字": self.item,
"日期": self.item,
},
"保证人2签字": {"签字": self.item,
"日期": self.item,
},
"见证人签字": {"签字": self.item,
"日期": self.item,
},
}
else:
self.init_result["page_7"] = {"合同编号": self.item,
}
self.init_result["page_8"] = {"合同编号": self.item,
"主借人签字": {"签字": self.item,
"日期": self.item,
},
"共借人签字": {"签字": self.item,
"日期": self.item,
},
"保证人1签字": {"签字": self.item,
"日期": self.item,
},
"保证人2签字": {"签字": self.item,
"日期": self.item,
},
"见证人签字": {"签字": self.item,
"日期": self.item,
},
}
def get_contract_no(self, page_num):
"""传入页码,查看该页码右上角的编号
Args:
page_num (string):
Returns:
sting:
"""
contract_no = self.item.copy()
# 只看第一页
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '合同编号:' in text:
words = text.split(':')[-1]
contract_no['position'] = bbox
contract_no['words'] = words
return contract_no
def get_vehicle_price(self, page_num='0'):
vehicle_price = self.item.copy()
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '所购车辆价格为人民币' in text:
words = text.split('币')[-1]
vehicle_price['position'] = bbox
vehicle_price['words'] = words
return vehicle_price
def get_vin(self, page_num='0'):
vin = self.item.copy()
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '车架号:' in text:
words = text.split(':')[-1]
vin['position'] = bbox
vin['words'] = words
return vin
def get_loan_principal(self, page_num='0'):
chinese_keywords = ['壹', '贰', '叁', '肆', '伍', '陆', '柒', '捌', '玖', '拾',
'佰', '仟', '万', '亿', '元', '角', '分', '零', '整']
upper = self.item.copy()
lower = self.item.copy()
asp_1 = self.item.copy()
asp_2 = self.item.copy()
anchor_bbox = None
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if fuzz.ratio(''.join(chinese_keywords), text) > 15:
text = text.split(':')[-1].strip()
upper['position'] = bbox
upper['words'] = text
if '小写:¥' in text:
words = text.split('¥')[-1].strip()
lower['position'] = bbox
lower['words'] = words
if '附加产品融资贷款本金总金额' == text:
anchor_bbox = bbox
if anchor_bbox:
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if np.mean(bbox[1::2]) < np.mean(anchor_bbox[1::2]) and '人民币:小写:' in text:
words = re.findall(r'人民币:小写:\[(.*)\]', text)[0]
asp_1['position'] = bbox
asp_1['words'] = words
if np.mean(bbox[1::2]) > np.mean(anchor_bbox[1::2]) and '人民币:小写:' in text:
words = re.findall(r'人民币:小写:\[(.*)\]', text)[0]
asp_2['position'] = bbox
asp_2['words'] = words
return upper, lower, asp_1, asp_2
def get_loan_term(self, page_num='0'):
loan_term = self.item.copy()
all_text = ''
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
all_text += text
matchs = re.search(r'贷款期限(\d+)个月', all_text)
if matchs:
words = matchs.group(1)
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if f'{words}个月' in text:
loan_term['position'] = bbox
loan_term['words'] = words
return loan_term
def get_asp_details(self, page_num):
asp_details_table_term = self.item.copy()
asp_details_table = []
asp_details_text_list = []
table = False
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '附加产品融资贷款本金总金额明细' == text:
table = True
if '第二条' in text or '征信管理' in text:
table = False
if table == True:
asp_details_text_list.append(text)
for i in range((len(asp_details_text_list)+2)//3):
line = []
if i == 0:
line = [asp_details_text_list[0]]
else:
for j in range(3):
line.append(asp_details_text_list[i*3-2+j])
asp_details_table.append(line)
if len(asp_details_table) > 0:
asp_details_table_term['words'] = asp_details_table
return asp_details_table_term
def get_signature(self):
signature = self.item.copy()
for block in self.pdf_info['0']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '签署日期' in text:
words = text
signature['words'] = words
signature['position'] = bbox
return signature
def get_somebody(self, top, bottom):
# 指定上下边界后,返回上下边界内的客户信息
_name = self.item.copy()
_id = self.item.copy()
# 只看第一页,先划定上下边界
y_top = 0
y_bottom = 0
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if top in text:
y_top = bbox[3]
if bottom in text:
y_bottom = bbox[3]
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if y_top < bbox[3] < y_bottom:
if '姓名/名称' in text:
words = text.split(':')[-1]
_name['position'] = bbox
_name['words'] = words
if '自然人身份证件号码/法人执照号码' in text:
words = text.split(':')[-1]
_id['position'] = bbox
_id['words'] = words
return _name, _id
def get_seller(self):
seller = self.item.copy()
# 先找到 key
anchor_bbox = None
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '经销商' == text:
anchor_bbox = bbox
# 当找到了 key, 则根据 key 去匹配 value
if anchor_bbox:
half_width = self.pdf_info['1']['width'] * 0.5
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if anchor_bbox[2]<np.mean(bbox[::2])<half_width and \
anchor_bbox[1]<np.mean(bbox[1::2])<anchor_bbox[3]:
seller['position'] = bbox
seller['words'] = text
return seller
def get_payback_account(self):
account = self.item.copy()
account_name = self.item.copy()
account_bank = self.item.copy()
all_text = ''
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
all_text += text
# 首先确定账户信息是哪种,我们只输出非另行通知的格式
if '☑账号' in all_text:
all_text = all_text.replace(' ', '')
matchs_1 = re.findall(r'账号:(.*)户名', all_text)
if matchs_1:
words = matchs_1[0]
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if f'{words}' in text:
account['position'] = bbox
account['words'] = words
matchs_2 = re.findall(r'户名:(.*)开户行', all_text)
if matchs_2:
words = matchs_2[0]
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if f'{words}' in text:
account_name['position'] = bbox
account_name['words'] = words
matchs_3 = re.findall(r'开户行:(.*);', all_text)
if matchs_3:
words = matchs_3[0]
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if f'开户行:{words};' in text.replace(' ', ''):
account_bank['position'] = bbox
account_bank['words'] = words
return account, account_name, account_bank
def get_repayment_schedule(self):
repayment_schedule = self.item.copy()
# 只看第二页
repayment_schedule_table = []
repayment_schedule_text_list = []
table = False
for block in self.pdf_info['2']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '序号' == text:
table = True
if '以上表格中所列的序号并非还款期数' in text:
table = False
if table == True:
repayment_schedule_text_list.append(text)
for i in range(len(repayment_schedule_text_list)//5):
line = []
# 5表示5列的意思
for j in range(5):
line.append(repayment_schedule_text_list[i*5+j])
if str(i+1) == line[1]:
break
repayment_schedule_table.append(line)
if len(repayment_schedule_table) > 0:
repayment_schedule['words'] = repayment_schedule_table
return repayment_schedule
def get_signature_role_1(self):
signature_role_1 = self.init_item.copy()
# 先定位签字区域
texts = []
boxes = []
page_num = None
position = None
words = None
region = False
for i in list(self.pdf_info.keys()):
for block in self.pdf_info[i]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '借款人(抵押人)' in text:
region = True
if '日期' in text:
region = False
if region == True:
page_num = i
texts.append(text)
boxes.append(bbox)
if len(texts) > 4:
words = '有'
else:
words = '无'
boxes = np.array(boxes).reshape((-1, 2))
position = [min(boxes[:,0]), min(boxes[:,1]), max(boxes[:,0]), max(boxes[:,1])]
signature_role_1['page_num'] = page_num
signature_role_1['position'] = position
signature_role_1['words'] = words
return signature_role_1
def get_signature_role_2(self):
signature_role_2 = self.init_item.copy()
# 先定位签字区域
texts = []
boxes = []
page_num = None
position = None
words = None
region = False
for i in list(self.pdf_info.keys()):
for block in self.pdf_info[i]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '共同借款人(共同抵押人)' in text:
region = True
if '日期' in text:
region = False
if region == True:
page_num = i
texts.append(text)
boxes.append(bbox)
if len(texts) > 4:
words = '有'
else:
words = '无'
boxes = np.array(boxes).reshape((-1, 2))
position = [min(boxes[:,0]), min(boxes[:,1]), max(boxes[:,0]), max(boxes[:,1])]
signature_role_2['page_num'] = page_num
signature_role_2['position'] = position
signature_role_2['words'] = words
return signature_role_2
def get_signature_role_3(self):
signature_role_3 = self.init_item.copy()
# 先定位签字区域
texts = []
boxes = []
page_num = None
position = None
words = None
region = False
for i in list(self.pdf_info.keys()):
for block in self.pdf_info[i]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '保证人1' in text and int(i) != 0:
region = True
if '日期' in text:
region = False
if region == True:
page_num = i
texts.append(text)
boxes.append(bbox)
if len(texts) > 4:
words = '有'
else:
words = '无'
boxes = np.array(boxes).reshape((-1, 2))
position = [min(boxes[:,0]), min(boxes[:,1]), max(boxes[:,0]), max(boxes[:,1])]
signature_role_3['page_num'] = page_num
signature_role_3['position'] = position
signature_role_3['words'] = words
return signature_role_3
def get_signature_role_4(self):
signature_role_4 = self.init_item.copy()
# 先定位签字区域
texts = []
boxes = []
page_num = None
position = None
words = None
region = False
for i in list(self.pdf_info.keys()):
for block in self.pdf_info[i]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '保证人2' in text and int(i) != 0:
region = True
if '日期' in text:
region = False
if region == True:
page_num = i
texts.append(text)
boxes.append(bbox)
if len(texts) > 4:
words = '有'
else:
words = '无'
boxes = np.array(boxes).reshape((-1, 2))
position = [min(boxes[:,0]), min(boxes[:,1]), max(boxes[:,0]), max(boxes[:,1])]
signature_role_4['page_num'] = page_num
signature_role_4['position'] = position
signature_role_4['words'] = words
return signature_role_4
def get_signature_role_5(self):
signature_role_5 = self.init_item.copy()
# 先定位签字区域
texts = []
boxes = []
page_num = None
position = None
words = None
region = False
for i in list(self.pdf_info.keys()):
for block in self.pdf_info[i]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '见证人签字' in text and int(i) != 0:
region = True
if '年' in text:
region = False
if region == True:
page_num = i
texts.append(text)
boxes.append(bbox)
print(texts)
if len(texts) > 4:
words = '有'
else:
words = '无'
boxes = np.array(boxes).reshape((-1, 2))
position = [min(boxes[:,0]), min(boxes[:,1]), max(boxes[:,0]), max(boxes[:,1])]
signature_role_5['page_num'] = page_num
signature_role_5['position'] = position
signature_role_5['words'] = words
return signature_role_5
def get_last_page_signature(self, page_num, top, bottom):
signature_name = self.item.copy()
signature_date = self.item.copy()
anchor_top = None
anchor_bottom = None
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if top in text:
anchor_top = bbox[1]
if bottom in text:
anchor_bottom = bbox[1]
if anchor_top is not None and anchor_bottom is not None:
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '签署日期' in text and int(anchor_top)<np.mean(bbox[1::2])<int(anchor_bottom):
name = text.split(' ')[0]
date = text.split(':')[-1]
signature_name['words'] = name
signature_name['position'] = bbox
signature_date['words'] = date
signature_name['position'] = bbox
return signature_name, signature_date
def get_info(self):
"""
block['type'] == 0 : 表示该元素为图片
Returns:
dict: Description
"""
# 先判断是否为 ASP 产品
# 只看第一页,判断是否有 '附加产品融资贷款本金总金额' 这一句话,若有则为 ASP 产品
# print(self.pdf_info['0']['blocks'])
for block in self.pdf_info['0']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '附加产品融资贷款本金总金额' == text:
self.is_asp = True
self.gen_init_result(self.is_asp)
# Page 1
# 找合同编号
contract_no = self.get_contract_no(page_num='0')
self.init_result['page_1']['合同编号'] = contract_no
# 所购车辆价格
vehicle_price = self.get_vehicle_price()
self.init_result['page_1']['所购车辆价格'] = vehicle_price
# 车架号
vin = self.get_vin()
self.init_result['page_1']['车架号'] = vehicle_price
# 贷款本金金额(如果是 ASP产品)则'贷款本金金额'项目中包含'车辆贷款本金金额'和'附加产品融资贷款本金总金额'两个项目
upper, lower, asp_1, asp_2 = self.get_loan_principal()
self.init_result['page_1']['贷款本金金额']['大写'] = upper
self.init_result['page_1']['贷款本金金额']['小写'] = lower
self.init_result['page_1']['贷款本金金额']['车辆贷款本金金额'] = asp_1
self.init_result['page_1']['贷款本金金额']['附加产品融资贷款本金总金额'] = asp_2
# 贷款期限
loan_term = self.get_loan_term()
self.init_result['page_1']['贷款期限'] = loan_term
# 附加产品融资贷款本金总金额明细(ASP-表格)
asp_details_table = self.get_asp_details(page_num='0')
self.init_result['page_1']['附加产品融资贷款本金总金额明细'] = asp_details_table
# 借款人签字及时间
signature = self.get_signature()
self.init_result['page_1']['借款人签字及时间'] = signature
#######################################
# Page 2
# 找合同编号
contract_no = self.get_contract_no(page_num='0')
self.init_result['page_2']['合同编号'] = contract_no
# 找借款人及抵押人(地址字段原本有空格)
borrower_name, borrower_id = self.get_somebody(top='借款人及抵押人:', bottom='共同借款人及共同抵押人:')
self.init_result['page_2']['借款人及抵押人']['name'] = borrower_name
self.init_result['page_2']['借款人及抵押人']['id'] = borrower_id
# 找共同借款人及共同抵押人
co_borrower_name, co_borrower_id = self.get_somebody(top='共同借款人及共同抵押人:', bottom='保证人1:')
self.init_result['page_2']['共同借款人及共同抵押人']['name'] = co_borrower_name
self.init_result['page_2']['共同借款人及共同抵押人']['id'] = co_borrower_id
# 保证人1
first_guarantor_name, first_guarantor_id = self.get_somebody(top='保证人1:', bottom='保证人2:')
self.init_result['page_2']['保证人1']['name'] = first_guarantor_name
self.init_result['page_2']['保证人1']['id'] = first_guarantor_id
# 保证人2
second_guarantor_name, second_guarantor_id = self.get_somebody(top='保证人2:', bottom='第一章')
self.init_result['page_2']['保证人2']['name'] = second_guarantor_name
self.init_result['page_2']['保证人2']['id'] = second_guarantor_id
# 所购车辆价格
vehicle_price = self.get_vehicle_price(page_num='1')
self.init_result['page_2']['所购车辆价格'] = vehicle_price
# 车架号
vin = self.get_vin(page_num='1')
self.init_result['page_2']['车架号'] = vin
# 经销商
seller = self.get_seller()
self.init_result['page_2']['经销商'] = seller
# 贷款本金金额(如果是 ASP产品)则'贷款本金金额'项目中包含'车辆贷款本金金额'和'附加产品融资贷款本金总金额'两个项目
upper, lower, asp_1, asp_2 = self.get_loan_principal(page_num='1')
self.init_result['page_2']['贷款本金金额']['大写'] = upper
self.init_result['page_2']['贷款本金金额']['小写'] = lower
self.init_result['page_2']['贷款本金金额']['车辆贷款本金金额'] = asp_1
self.init_result['page_2']['贷款本金金额']['附加产品融资贷款本金总金额'] = asp_2
# 贷款期限
loan_term = self.get_loan_term(page_num='1')
self.init_result['page_2']['贷款期限'] = loan_term
# 还款账户
account, account_name, account_bank = self.get_payback_account()
self.init_result['page_2']['还款账户']['账号'] = account
self.init_result['page_2']['还款账户']['户名'] = account_name
self.init_result['page_2']['还款账户']['开户行'] = account_bank
#######################################
# Page 3
# 找合同编号
contract_no = self.get_contract_no(page_num='2')
self.init_result['page_3']['合同编号'] = contract_no
# 还款计划表(表格)
repayment_schedule_table = self.get_repayment_schedule()
self.init_result['page_3']['还款计划表'] = repayment_schedule_table
#######################################
# Page 4
# 找合同编号
contract_no = self.get_contract_no(page_num='3')
self.init_result['page_4']['合同编号'] = contract_no
# 附加产品融资贷款本金总金额明细(ASP-表格)
asp_details_table = self.get_asp_details(page_num='3')
self.init_result['page_4']['附加产品融资贷款本金总金额明细'] = asp_details_table
#######################################
# Page 5
# 找合同编号
contract_no = self.get_contract_no(page_num='4')
self.init_result['page_5']['合同编号'] = contract_no
#######################################
# Page 6
# 找合同编号
contract_no = self.get_contract_no(page_num='5')
self.init_result['page_6']['合同编号'] = contract_no
if self.is_asp == False:
# Page 7
# 找合同编号
contract_no = self.get_contract_no(page_num='6')
self.init_result['page_7']['合同编号'] = contract_no
signature_name, signature_date = self.get_last_page_signature(page_num='6',
top='借款人(抵押人)', bottom='共同借款人(共同抵押人)')
self.init_result['page_7']['主借人签字']['签字'] = signature_name
self.init_result['page_7']['主借人签字']['日期'] = signature_date
signature_name, signature_date = self.get_last_page_signature(page_num='6',
top='共同借款人(共同抵押人)', bottom='保证人1')
self.init_result['page_7']['共借人签字']['签字'] = signature_name
self.init_result['page_7']['共借人签字']['日期'] = signature_date
signature_name, signature_date = self.get_last_page_signature(page_num='6',
top='保证人1', bottom='保证人2')
self.init_result['page_7']['保证人1签字']['签字'] = signature_name
self.init_result['page_7']['保证人1签字']['日期'] = signature_date
signature_name, signature_date = self.get_last_page_signature(page_num='6',
top='保证人2', bottom='在本人面前亲笔签署本合同')
self.init_result['page_7']['保证人2签字']['签字'] = signature_name
self.init_result['page_7']['保证人2签字']['日期'] = signature_date
signature_name, signature_date = self.get_last_page_signature(page_num='6',
top='在本人面前亲笔签署本合同', bottom='(以下无正文)')
self.init_result['page_7']['见证人签字']['签字'] = signature_name
self.init_result['page_7']['见证人签字']['日期'] = signature_date
else:
# Page 7
# 找合同编号
contract_no = self.get_contract_no(page_num='6')
self.init_result['page_7']['合同编号'] = contract_no
# Page 8
# 找合同编号
contract_no = self.get_contract_no(page_num='7')
self.init_result['page_8']['合同编号'] = contract_no
signature_name, signature_date = self.get_last_page_signature(page_num='7',
top='借款人(抵押人)', bottom='共同借款人(共同抵押人)')
self.init_result['page_8']['主借人签字']['签字'] = signature_name
self.init_result['page_8']['主借人签字']['日期'] = signature_date
signature_name, signature_date = self.get_last_page_signature(page_num='7',
top='共同借款人(共同抵押人)', bottom='保证人1')
self.init_result['page_8']['共借人签字']['签字'] = signature_name
self.init_result['page_8']['共借人签字']['日期'] = signature_date
signature_name, signature_date = self.get_last_page_signature(page_num='7',
top='保证人1', bottom='保证人2')
self.init_result['page_8']['保证人1签字']['签字'] = signature_name
self.init_result['page_8']['保证人1签字']['日期'] = signature_date
signature_name, signature_date = self.get_last_page_signature(page_num='7',
top='保证人2', bottom='在本人面前亲笔签署本合同')
self.init_result['page_8']['保证人2签字']['签字'] = signature_name
self.init_result['page_8']['保证人2签字']['日期'] = signature_date
signature_name, signature_date = self.get_last_page_signature(page_num='7',
top='在本人面前亲笔签署本合同', bottom='(以下无正文)')
self.init_result['page_8']['见证人签字']['签字'] = signature_name
self.init_result['page_8']['见证人签字']['日期'] = signature_date
# 重新定制输出
new_results = {"is_asp": self.is_asp,
"page_info": self.init_result
}
return new_results
# -*- coding: utf-8 -*-
# @Author : lk
# @Email : 9428.al@gmail.com
# @Create Date : 2021-07-20 16:42:41
# @Last Modified : 2021-10-28 17:41:00
# @Description :
import re
import cv2
import base64
import numpy as np
from fuzzywuzzy import fuzz
class Finder:
def __init__(self, pdf_info):
self.pdf_info = pdf_info
self.item = {"words": None,
"page": None,
"position": None,
}
# 格式化算法输出
self.init_result = {"合同编号": self.item,
"承租人-姓名": self.item,
"承租人-证件号码": self.item,
"承租人-法定代表人或授权代表": self.item,
"保证人1-姓名": self.item,
"保证人1-证件号码": self.item,
"保证人1-法定代表人或授权代表": self.item,
"保证人2-姓名": self.item,
"保证人2-证件号码": self.item,
"保证人2-法定代表人或授权代表": self.item,
"保证人3-姓名": self.item,
"保证人3-证件号码": self.item,
"保证人3-法定代表人或授权代表": self.item,
"合同编号(正文)": self.item,
"车辆识别代码": self.item,
"车辆卖方(经销商)": self.item,
"车辆原始销售价格(《机动车销售统一发票》所列金额)": self.item,
"车辆附加产品明细表": self.item,
"融资成本总额": self.item,
"租期": self.item,
"付款计划表": self.item,
"银行账户-户名": self.item,
"银行账户-银行账号": self.item,
"银行账户-开户行": self.item,
"签字页-承租人姓名": self.item,
"签字页-承租人签章": self.item,
"签字页-保证人1姓名": self.item,
"签字页-保证人1签章": self.item,
"签字页-保证人2姓名": self.item,
"签字页-保证人2签章": self.item,
"签字页-保证人3姓名": self.item,
"签字页-保证人3签章": self.item,
}
# 格式化输出 车辆处置协议 要是别的字段
self.init_result_1 = {"合同编号": self.item,
"承租人-姓名": self.item,
"承租人-证件号码": self.item,
"销售经销商": self.item,
"合同编号(正文)": self.item,
"签字页-承租人姓名": self.item,
"签字页-承租人证件号码": self.item,
"签字页-承租人签章": self.item,
"签字页-销售经销商": self.item,
"签字页-销售经销商签章": self.item,
}
# 格式化输出 车辆租赁抵押合同
self.init_result_2 = {"合同编号": self.item,
"合同编号(正文)": self.item,
"抵押人姓名/名称": self.item,
"抵押人证件号码": self.item,
"车辆识别代码": self.item,
"租金总额": self.item,
"融资租赁期限": self.item,
"签字页-抵押人姓名": self.item,
"签字页-抵押人签章": self.item,
"签字页-抵押人配偶姓名": self.item,
"签字页-抵押人配偶签章": self.item,
}
def get_contract_no(self, page_num):
"""传入页码,查看该页码右上角的编号
Args:
page_num (string):
Returns:
sting:
"""
contract_no = self.item.copy()
# 只看第一页
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '合同编号:' in text:
words = text.split(':')[-1]
contract_no['position'] = bbox
contract_no['page'] = page_num
contract_no['words'] = words
if contract_no['words'] == '':
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if bbox[1] < contract_no['position'][3] and 'CH' in text:
contract_no['position'] = bbox
contract_no['page'] = page_num
contract_no['words'] = text
return contract_no
def get_vehicle_price(self, page_num='0'):
vehicle_price = self.item.copy()
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '所购车辆价格为人民币' in text:
words = text.split('币')[-1]
vehicle_price['position'] = bbox
vehicle_price['words'] = words
return vehicle_price
def get_contract_no_one(self):
# 查找正文中的合同编号,有可能存在换行的情况
contract_no = self.item.copy()
for pno in self.pdf_info:
all_text = ''
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
all_text += text
all_text = all_text.replace(' ', '')
matchObj = re.search(r'(合同编号:\[(.*?)\])', all_text)
if matchObj:
words = matchObj.group(1)
contract_no['position'] = None
contract_no['page'] = pno
contract_no['words'] = words
return contract_no
matchObj = re.search(r'编号为(.*?)的', all_text)
if matchObj:
words = matchObj.group(1).strip()
contract_no['position'] = None
contract_no['page'] = pno
contract_no['words'] = words
return contract_no
matchObj = re.search(r'编号为(.*?))的', all_text)
if matchObj:
words = matchObj.group(1).strip()
contract_no['position'] = None
contract_no['page'] = pno
contract_no['words'] = words
return contract_no
def get_key_value(self, key, page_num=None):
value = self.item.copy()
if page_num is not None:
pno = page_num
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if key in text:
words = text.split(':')[-1]
value['position'] = bbox
value['page'] = pno
value['words'] = words
else:
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if key in text:
# print(self.pdf_info[pno])
words = text.split(':')[-1]
value['position'] = bbox
value['page'] = pno
value['words'] = words
return value
def get_loan_principal(self, page_num='0'):
chinese_keywords = ['壹', '贰', '叁', '肆', '伍', '陆', '柒', '捌', '玖', '拾',
'佰', '仟', '万', '亿', '元', '角', '分', '零', '整']
upper = self.item.copy()
lower = self.item.copy()
asp_1 = self.item.copy()
asp_2 = self.item.copy()
anchor_bbox = None
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if fuzz.ratio(''.join(chinese_keywords), text) > 15:
text = text.split(':')[-1].strip()
upper['position'] = bbox
upper['words'] = text
if '小写:¥' in text:
words = text.split('¥')[-1].strip()
lower['position'] = bbox
lower['words'] = words
if '附加产品融资贷款本金总金额' == text:
anchor_bbox = bbox
if anchor_bbox:
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if np.mean(bbox[1::2]) < np.mean(anchor_bbox[1::2]) and '人民币:小写:' in text:
words = re.findall(r'人民币:小写:\[(.*)\]', text)[0]
asp_1['position'] = bbox
asp_1['words'] = words
if np.mean(bbox[1::2]) > np.mean(anchor_bbox[1::2]) and '人民币:小写:' in text:
words = re.findall(r'人民币:小写:\[(.*)\]', text)[0]
asp_2['position'] = bbox
asp_2['words'] = words
return upper, lower, asp_1, asp_2
def get_loan_term(self, page_num='0'):
loan_term = self.item.copy()
all_text = ''
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
all_text += text
matchs = re.search(r'贷款期限(\d+)个月', all_text)
if matchs:
words = matchs.group(1)
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if f'{words}个月' in text:
loan_term['position'] = bbox
loan_term['words'] = words
return loan_term
def get_asp_details(self, page_num):
asp_details_table_term = self.item.copy()
asp_details_table = []
asp_details_text_list = []
table = False
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '附加产品融资贷款本金总金额明细' == text:
table = True
if '第二条' in text or '征信管理' in text:
table = False
if table == True:
asp_details_text_list.append(text)
for i in range((len(asp_details_text_list)+2)//3):
line = []
if i == 0:
line = [asp_details_text_list[0]]
else:
for j in range(3):
line.append(asp_details_text_list[i*3-2+j])
asp_details_table.append(line)
if len(asp_details_table) > 0:
asp_details_table_term['words'] = asp_details_table
return asp_details_table_term
def get_signature(self):
signature = self.item.copy()
for block in self.pdf_info['0']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '签署日期' in text:
words = text
signature['words'] = words
signature['position'] = bbox
return signature
def get_somebody(self, top, bottom):
# 指定上下边界后,返回上下边界内的客户信息
_name = self.item.copy()
_id = self.item.copy()
# 只看第一页,先划定上下边界
y_top = 0
y_bottom = 0
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if top in text:
y_top = bbox[3]
if bottom in text:
y_bottom = bbox[3]
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if y_top < bbox[3] < y_bottom:
if '姓名/名称' in text:
words = text.split(':')[-1]
_name['position'] = bbox
_name['words'] = words
if '自然人身份证件号码/法人执照号码' in text:
words = text.split(':')[-1]
_id['position'] = bbox
_id['words'] = words
return _name, _id
def get_seller(self):
seller = self.item.copy()
# 先找到 key
anchor_bbox = None
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '经销商' == text:
anchor_bbox = bbox
# 当找到了 key, 则根据 key 去匹配 value
if anchor_bbox:
half_width = self.pdf_info['1']['width'] * 0.5
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if anchor_bbox[2]<np.mean(bbox[::2])<half_width and \
anchor_bbox[1]<np.mean(bbox[1::2])<anchor_bbox[3]:
seller['position'] = bbox
seller['words'] = text
return seller
def get_payback_account(self):
account = self.item.copy()
account_name = self.item.copy()
account_bank = self.item.copy()
all_text = ''
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
all_text += text
# 首先确定账户信息是哪种,我们只输出非另行通知的格式
if '☑账号' in all_text:
all_text = all_text.replace(' ', '')
matchs_1 = re.findall(r'账号:(.*)户名', all_text)
if matchs_1:
words = matchs_1[0]
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if f'{words}' in text:
account['position'] = bbox
account['words'] = words
matchs_2 = re.findall(r'户名:(.*)开户行', all_text)
if matchs_2:
words = matchs_2[0]
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if f'{words}' in text:
account_name['position'] = bbox
account_name['words'] = words
matchs_3 = re.findall(r'开户行:(.*);', all_text)
if matchs_3:
words = matchs_3[0]
for block in self.pdf_info['1']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if f'开户行:{words};' in text.replace(' ', ''):
account_bank['position'] = bbox
account_bank['words'] = words
return account, account_name, account_bank
def get_repayment_schedule(self):
repayment_schedule = self.item.copy()
repayment_schedule_text_list = []
table = False
page = None
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '以上表格中所列序号' in text:
table = False
if table == True:
repayment_schedule_text_list.append(text)
if '61.' in text:
page = pno
table = True
repayment_schedule_table = [['序号', '融资租赁成本', '融资租赁费用', '租金', '剩余融资租赁成本']]
for i in range(len(repayment_schedule_text_list)//4):
line = [f'{i+1}.']
# 4表示4列的意思
for j in range(4):
line.append(repayment_schedule_text_list[i*4+j])
repayment_schedule_table.append(line)
repayment_schedule['words'] = repayment_schedule_table
repayment_schedule['page'] = page
return repayment_schedule
def get_signature_role_1(self):
signature_role_1 = self.item.copy()
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '签署日期' in text:
signature_role_1['position'] = bbox
signature_role_1['page'] = pno
signature_role_1['words'] = text
return signature_role_1
def get_signature_role_2(self):
signature_role_2 = self.init_item.copy()
# 先定位签字区域
texts = []
boxes = []
page_num = None
position = None
words = None
region = False
for i in list(self.pdf_info.keys()):
for block in self.pdf_info[i]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '共同借款人(共同抵押人)' in text:
region = True
if '日期' in text:
region = False
if region == True:
page_num = i
texts.append(text)
boxes.append(bbox)
if len(texts) > 4:
words = '有'
else:
words = '无'
boxes = np.array(boxes).reshape((-1, 2))
position = [min(boxes[:,0]), min(boxes[:,1]), max(boxes[:,0]), max(boxes[:,1])]
signature_role_2['page_num'] = page_num
signature_role_2['position'] = position
signature_role_2['words'] = words
return signature_role_2
def get_signature_role_3(self):
signature_role_3 = self.init_item.copy()
# 先定位签字区域
texts = []
boxes = []
page_num = None
position = None
words = None
region = False
for i in list(self.pdf_info.keys()):
for block in self.pdf_info[i]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '保证人1' in text and int(i) != 0:
region = True
if '日期' in text:
region = False
if region == True:
page_num = i
texts.append(text)
boxes.append(bbox)
if len(texts) > 4:
words = '有'
else:
words = '无'
boxes = np.array(boxes).reshape((-1, 2))
position = [min(boxes[:,0]), min(boxes[:,1]), max(boxes[:,0]), max(boxes[:,1])]
signature_role_3['page_num'] = page_num
signature_role_3['position'] = position
signature_role_3['words'] = words
return signature_role_3
def get_signature_role_4(self):
signature_role_4 = self.init_item.copy()
# 先定位签字区域
texts = []
boxes = []
page_num = None
position = None
words = None
region = False
for i in list(self.pdf_info.keys()):
for block in self.pdf_info[i]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '保证人2' in text and int(i) != 0:
region = True
if '日期' in text:
region = False
if region == True:
page_num = i
texts.append(text)
boxes.append(bbox)
if len(texts) > 4:
words = '有'
else:
words = '无'
boxes = np.array(boxes).reshape((-1, 2))
position = [min(boxes[:,0]), min(boxes[:,1]), max(boxes[:,0]), max(boxes[:,1])]
signature_role_4['page_num'] = page_num
signature_role_4['position'] = position
signature_role_4['words'] = words
return signature_role_4
def get_signature_role_5(self):
signature_role_5 = self.init_item.copy()
# 先定位签字区域
texts = []
boxes = []
page_num = None
position = None
words = None
region = False
for i in list(self.pdf_info.keys()):
for block in self.pdf_info[i]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '见证人签字' in text and int(i) != 0:
region = True
if '年' in text:
region = False
if region == True:
page_num = i
texts.append(text)
boxes.append(bbox)
print(texts)
if len(texts) > 4:
words = '有'
else:
words = '无'
boxes = np.array(boxes).reshape((-1, 2))
position = [min(boxes[:,0]), min(boxes[:,1]), max(boxes[:,0]), max(boxes[:,1])]
signature_role_5['page_num'] = page_num
signature_role_5['position'] = position
signature_role_5['words'] = words
return signature_role_5
def get_last_page_signature(self, page_num, top, bottom):
signature_name = self.item.copy()
signature_date = self.item.copy()
anchor_top = None
anchor_bottom = None
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if top in text:
anchor_top = bbox[1]
if bottom in text:
anchor_bottom = bbox[1]
if anchor_top is not None and anchor_bottom is not None:
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '签署日期' in text and int(anchor_top)<np.mean(bbox[1::2])<int(anchor_bottom):
name = text.split(' ')[0]
date = text.split(':')[-1]
signature_name['words'] = name
signature_name['position'] = bbox
signature_date['words'] = date
signature_name['position'] = bbox
return signature_name, signature_date
def get_electronic_signature(self, top, bottom):
signature = self.item.copy()
anchor_top = None
anchor_bottom = None
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if top in text:
anchor_top = bbox[1]
if bottom in text:
anchor_bottom = bbox[1]
if anchor_top is not None and anchor_bottom is not None:
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '签署日期' in text and int(anchor_top)<np.mean(bbox[1::2])<int(anchor_bottom):
words = text
signature['words'] = words
signature['page'] = pno
signature['position'] = bbox
return signature
def get_role_info(self, role_key, page_num='0'):
name = self.item.copy()
id_num = self.item.copy()
representative = self.item.copy()
# 以保证人3 的左上角为定位点
anchor = None
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
# 找到角色姓名
if re.match('保证人3', text) is not None:
anchor = [bbox[0], bbox[1]]
if anchor is not None:
for block in self.pdf_info[page_num]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
# 找到角色姓名
if re.match(role_key, text) is not None:
words = text.split(':')[-1]
name['words'] = words
name['page'] = page_num
name['position'] = bbox
if role_key == '承租人:':
# 找到证件号码且确定位置
if re.match('证件号码:', text) is not None and np.mean(bbox[::2]) < anchor[0] and np.mean(bbox[1::2]) < anchor[1]:
words = text.split(':')[-1]
id_num['words'] = words
id_num['page'] = page_num
id_num['position'] = bbox
# 找到法人代表且确定位置
if re.match('法定代表人或授权代表:', text) is not None and np.mean(bbox[::2]) < anchor[0] and np.mean(bbox[1::2]) < anchor[1]:
words = text.split(':')[-1]
representative['words'] = words
representative['page'] = page_num
representative['position'] = bbox
if role_key == '保证人1:':
# 找到证件号码且确定位置
if re.match('证件号码:', text) is not None and np.mean(bbox[::2]) < anchor[0] and np.mean(bbox[1::2]) > anchor[1]:
words = text.split(':')[-1]
id_num['words'] = words
id_num['page'] = page_num
id_num['position'] = bbox
# 找到法人代表且确定位置
if re.match('法定代表人或授权代表:', text) is not None and np.mean(bbox[::2]) < anchor[0] and np.mean(bbox[1::2]) > anchor[1]:
words = text.split(':')[-1]
representative['words'] = words
representative['page'] = page_num
representative['position'] = bbox
if role_key == '保证人2:':
# 找到证件号码且确定位置
if re.match('证件号码:', text) is not None and np.mean(bbox[::2]) > anchor[0] and np.mean(bbox[1::2]) < anchor[1]:
words = text.split(':')[-1]
id_num['words'] = words
id_num['page'] = page_num
id_num['position'] = bbox
# 找到法人代表且确定位置
if re.match('法定代表人或授权代表:', text) is not None and np.mean(bbox[::2]) > anchor[0] and np.mean(bbox[1::2]) < anchor[1]:
words = text.split(':')[-1]
representative['words'] = words
representative['page'] = page_num
representative['position'] = bbox
if role_key == '保证人3:':
# 找到证件号码且确定位置
if re.match('证件号码:', text) is not None and np.mean(bbox[::2]) > anchor[0] and np.mean(bbox[1::2]) > anchor[1]:
words = text.split(':')[-1]
id_num['words'] = words
id_num['page'] = page_num
id_num['position'] = bbox
# 找到法人代表且确定位置
if re.match('法定代表人或授权代表:', text) is not None and np.mean(bbox[::2]) > anchor[0] and np.mean(bbox[1::2]) > anchor[1]:
words = text.split(':')[-1]
representative['words'] = words
representative['page'] = page_num
representative['position'] = bbox
return name, id_num, representative
def get_table_add_product(self):
table_add_product = self.item.copy()
items = []
start = False
page = None
for pno in self.pdf_info:
condition = False
for block in self.pdf_info[f'{pno}']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '总计' in text:
start = True
if '注:出租人向承租人购买租赁车辆的对价' in text:
page = pno
start = False
if start == True:
items.append(text)
lines = [['项目', '购买价格', '实际融资金额']]
for i in range(len(items)//3):
line = [items[2+i*3+0], items[2+i*3+1], items[2+i*3+2]]
lines.append(line)
if len(items) > 0:
lines.append([items[0], '', items[1]])
table_add_product['words'] = lines
table_add_product['page'] = page
table_add_product['position'] = None
return table_add_product
def get_contract_no_dy(self):
# 查找抵押合同编号
contract_no = self.item.copy()
key_box = None
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '抵押合同编号' in text:
key_box = bbox
if key_box is not None:
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if key_box[1] < np.mean(bbox[1::2]) < key_box[3] and 'CH-' in text:
contract_no['position'] = bbox
contract_no['page'] = pno
contract_no['words'] = text
return contract_no
def get_dyr_name_id(self):
name = self.item.copy()
_id = self.item.copy()
key_box = None
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if text == '抵押人':
key_box = bbox
if key_box is not None:
rh = abs(key_box[1]-key_box[3])
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if key_box[1] < np.mean(bbox[1::2]) < key_box[3]+rh*3 and '姓名' in text:
words = text.split(':')[-1]
name['position'] = bbox
name['page'] = pno
name['words'] = words
if key_box[1] < np.mean(bbox[1::2]) < key_box[3]+rh*3 and '证件号码' in text:
words = text.split(':')[-1]
_id['position'] = bbox
_id['page'] = pno
_id['words'] = words
return name, _id
def get_key_value_position(self, key):
value = self.item.copy()
key_box = None
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if text == key:
key_box = bbox
if key_box is not None:
rh = abs(key_box[1]-key_box[3])
for pno in self.pdf_info:
for block in self.pdf_info[pno]['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if key_box[1] < np.mean(bbox[1::2]) < key_box[3] and key_box[0] < bbox[0] and abs(key_box[2]-bbox[0]) < rh*10:
words = text
value['position'] = bbox
value['page'] = pno
value['words'] = words
return value
def get_info(self):
"""
block['type'] == 0 : 表示该元素为图片
Returns:
dict: Description
"""
if len(self.pdf_info) > 0:
# 取 Page 1 上的合同编号
contract_no = self.get_contract_no(page_num='0')
self.init_result['合同编号'] = contract_no
# 从第一页上取四个角色的姓名和证件号码
name, id_num, representative = self.get_role_info(role_key='承租人:', page_num='0')
self.init_result['承租人-姓名'] = name
self.init_result['承租人-证件号码'] = id_num
self.init_result['承租人-法定代表人或授权代表'] = representative
name, id_num, representative = self.get_role_info(role_key='保证人1:', page_num='0')
self.init_result['保证人1-姓名'] = name
self.init_result['保证人1-证件号码'] = id_num
self.init_result['保证人1-法定代表人或授权代表'] = representative
name, id_num, representative = self.get_role_info(role_key='保证人2:', page_num='0')
self.init_result['保证人2-姓名'] = name
self.init_result['保证人2-证件号码'] = id_num
self.init_result['保证人2-法定代表人或授权代表'] = representative
name, id_num, representative = self.get_role_info(role_key='保证人3:', page_num='0')
self.init_result['保证人3-姓名'] = name
self.init_result['保证人3-证件号码'] = id_num
self.init_result['保证人3-法定代表人或授权代表'] = representative
# 在所有页面中找正文中(第二部分 融资租赁主要条款及付款计划)的那个编号,因为存在换行的情况所以暂时不带位置输出
contract_no = self.get_contract_no_one()
self.init_result['合同编号(正文)'] = contract_no
# 找到车辆识别代码
vin = self.get_key_value(key='车辆识别代码:')
self.init_result['车辆识别代码'] = vin
# 找到经销商(车辆卖方(经销商))
seller = self.get_key_value(key='车辆卖方(经销商):')
self.init_result['车辆卖方(经销商)'] = seller
# 找到 —— 车辆原始销售价格
vehicle_price = self.get_key_value(key='车辆原始销售价格(《机动车销售统一发票》所列金额):')
self.init_result['车辆原始销售价格(《机动车销售统一发票》所列金额)'] = vehicle_price
# 找车辆附加产品明细(表)
table_add_product = self.get_table_add_product()
self.init_result['车辆附加产品明细表'] = table_add_product
# 找融资成本总额
financing_cost = self.get_key_value(key='融资成本总额:')
self.init_result['融资成本总额'] = financing_cost
# 找租期
lease_term = self.get_key_value(key='租期:')
self.init_result['租期'] = lease_term
# 找还款计划(表)
repayment_schedule = self.get_repayment_schedule()
self.init_result['付款计划表'] = repayment_schedule
# 找开户行户名、银行账号、银行
name = self.get_key_value(key='户名:')
self.init_result['银行账户-户名'] = name
account = self.get_key_value(key='银行账号:')
self.init_result['银行账户-银行账号'] = account
bank = self.get_key_value(key='开户银行:')
self.init_result['银行账户-开户行'] = bank
# 找签字页上的系列信息
# 承租人姓名、签章
name = self.get_key_value(key='承租人姓名:')
electronic_signature = self.get_electronic_signature(top='承租人姓名:', bottom='保证人1姓名:')
self.init_result['签字页-承租人姓名'] = name
self.init_result['签字页-承租人签章'] = electronic_signature
# 保证人1姓名、签章
name = self.get_key_value(key='保证人1姓名:')
electronic_signature = self.get_electronic_signature(top='保证人1姓名:', bottom='保证人2姓名:')
self.init_result['签字页-保证人1姓名'] = name
self.init_result['签字页-保证人1签章'] = electronic_signature
# 保证人2姓名、签章
name = self.get_key_value(key='保证人2姓名:')
electronic_signature = self.get_electronic_signature(top='保证人2姓名:', bottom='保证人3姓名:')
self.init_result['签字页-保证人2姓名'] = name
self.init_result['签字页-保证人2签章'] = electronic_signature
# 保证人2姓名、签章
name = self.get_key_value(key='保证人3姓名:')
electronic_signature = self.get_electronic_signature(top='保证人3姓名:', bottom='日期:')
self.init_result['签字页-保证人3姓名'] = name
self.init_result['签字页-保证人3签章'] = electronic_signature
return self.init_result
# results['is_shhz_contract'] = True
# results['pdf_info'] = self.init_result
# return results
def get_info_1(self):
if len(self.pdf_info) > 0:
contract_no = self.get_contract_no(page_num='0')
self.init_result_1['合同编号'] = contract_no
# 承租人姓名
name = self.get_key_value(key='承租人:', page_num='0')
self.init_result_1['承租人-姓名'] = name
# 承租人证件号码
_id = self.get_key_value(key='证件号码:', page_num='0')
self.init_result_1['承租人-证件号码'] = _id
# 销售经销商
seller = self.get_key_value(key='销售经销商:', page_num='0')
self.init_result_1['销售经销商'] = seller
# 合同编号(正文)
contract_no = self.get_contract_no_one()
self.init_result_1['合同编号(正文)'] = contract_no
# 签字页-承租人姓名
name = self.get_key_value(key='姓名/名称:')
self.init_result_1['签字页-承租人姓名'] = name
# 签字页-承租人证件号码
_id = self.get_key_value(key='自然人身份证件号码/法人执照号码:')
self.init_result_1['签字页-承租人证件号码'] = _id
# 签字页-承租人签章
signature_role_1 = self.get_signature_role_1()
self.init_result_1['签字页-承租人签章'] = signature_role_1
# 签字页-销售经销商
seller = self.get_key_value(key='销售经销商:')
self.init_result_1['签字页-销售经销商'] = seller
# 经销商签章
pass
return self.init_result_1
def get_info_2(self):
if len(self.pdf_info) > 0:
contract_no = self.get_contract_no_dy()
self.init_result_2['合同编号'] = contract_no
# 合同编号(正文)
contract_no = self.get_contract_no_one()
self.init_result_2['合同编号(正文)'] = contract_no
# 抵押人姓名/名称
name, _id = self.get_dyr_name_id()
self.init_result_2['抵押人姓名/名称'] = name
self.init_result_2['抵押人证件号码'] = _id
# 车辆识别代码
vin = self.get_key_value(key='车辆识别代码:')
self.init_result_2['车辆识别代码'] = vin
# 租金总额
rent = self.get_key_value_position(key='租金总额')
self.init_result_2['租金总额'] = rent
# 融资租赁期限
lease_term = self.get_key_value_position(key='融资租赁期限')
self.init_result_2['融资租赁期限'] = lease_term
# 签字页抵押人姓名和签章
name = self.get_key_value(key='抵押人姓名:')
electronic_signature = self.get_electronic_signature(top='抵押权人盖章', bottom='抵押人配偶姓名:')
self.init_result_2['签字页-抵押人姓名'] = name
self.init_result_2['签字页-抵押人签章'] = electronic_signature
# 签字页抵押人配偶姓名和签章
name = self.get_key_value(key='抵押人配偶姓名:')
electronic_signature = self.get_electronic_signature(top='抵押人配偶姓名:', bottom='日期')
self.init_result_2['签字页-抵押人配偶姓名'] = name
self.init_result_2['签字页-抵押人配偶签章'] = electronic_signature
return self.init_result_2
# -*- coding: utf-8 -*-
# @Author : lk
# @Email : 9428.al@gmail.com
# @Created Date : 2021-06-29 17:43:46
# @Last Modified : 2021-11-03 16:07:36
# @Description :
from .get_char import Finder
def predict(pdf_info, file_cls):
"""Summary
Args:
pdf_info (TYPE): Description
file_cls (TYPE): file_cls = 0: 售后回租合同; file_cls = 1: 车辆处置协议; file_cls = 2: 车辆租赁抵押合同
Returns:
TYPE: Description
"""
# 0: 售后回租合同
pdf_info_0 = []
for pno in pdf_info:
for block in pdf_info[f'{pno}']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '售后回租合同_' in text:
pdf_info_0.append(pdf_info[pno])
# 1: 车辆处置协议
pdf_info_1 = []
for pno in pdf_info:
for block in pdf_info[f'{pno}']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '售后回租合同附件一' in text:
pdf_info_1.append(pdf_info[pno])
# 2: 车辆租赁抵押合同
pdf_info_2 = []
for pno in pdf_info:
for block in pdf_info[f'{pno}']['blocks']:
if block['type'] != 0:
continue
for line in block['lines']:
for span in line['spans']:
bbox, text = span['bbox'], span['text']
if '车辆租赁抵押合同_' in text:
pdf_info_2.append(pdf_info[pno])
is_clczxy = False
# 如果 pdf_info_1 == 4 页,则说明此时输入包含了车辆处置协议
if len(pdf_info_1) == 4 and file_cls == 1 and len(pdf_info_0) != 0:
is_clczxy = True
pdf_info = dict()
for pno, page_info in enumerate(pdf_info_1):
pdf_info[str(pno)] = page_info
f = Finder(pdf_info)
if file_cls == 0:
results = f.get_info()
if file_cls == 1:
# 提取信息 ———— 车辆处置协议
results = f.get_info_1()
if file_cls == 2:
# 提取信息 ———— 车辆租赁抵押合同
results = f.get_info_2()
if is_clczxy == True:
for key in results:
if results[key]['page'] is not None:
results[key]['page'] = str(int(results[key]['page'])+6)
for key in results:
if results[key]['page'] is not None:
results[key]['page'] = 'page_' + str(int(results[key]['page'])+1)
return results
import pyodbc
afc_sql = """
create table afc_contract
(
id bigint identity primary key,
application_id nvarchar(64) not null,
create_time datetime not null
);
create index afc_contract_application_id_index
on afc_contract (application_id);
"""
hil_sql = """
create table hil_contract
(
id bigint identity primary key,
application_id nvarchar(64) not null,
create_time datetime not null
);
create index hil_contract_application_id_index
on hil_contract (application_id);
"""
hil_cnxn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};', autocommit=True)
hil_cursor = hil_cnxn.cursor()
hil_cursor.execute(hil_sql)
hil_cursor.close()
hil_cnxn.close()
afc_cnxn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};', autocommit=True)
afc_cursor = afc_cnxn.cursor()
afc_cursor.execute(afc_sql)
afc_cursor.close()
afc_cnxn.close()
import os
import json
import cv2
import shutil
import fitz
......@@ -35,6 +36,8 @@ class PDFHandler:
self.suffix = self.get_suffix(document_name)
self.is_ebank = False
self.page_text_list = []
self.pdf_info = {}
self.img_path_pno_list = []
def get_suffix(self, file_name):
if file_name is None:
......@@ -296,6 +299,17 @@ class PDFHandler:
self.is_ebank = True
self.page_text_list = page_text_list
def e_contract_process(self):
with fitz.Document(self.path) as pdf:
for pno in range(pdf.pageCount):
page = pdf.loadPage(pno)
self.pdf_info[str(pno)] = json.loads(page.getText('json'))
pix = page.getPixmap()
img_save_path = self.get_img_save_path(page.number)
self.img_path_pno_list.append((img_save_path, 'page_{0}'.format(str(pno+1))))
pix.writePNG(img_save_path)
def extract_image(self, max_img_count=None):
self.img_path_list = []
self.xref_set = set()
......
......@@ -14,3 +14,5 @@ DEALER_CODE = ocr_group
BASE_URL = https://li19dkocruat02vm.bmwgroup.net
DELAY_SECONDS = 60
......
......@@ -13,3 +13,5 @@ EDMS_UPLOAD_URL = https://edms-test.bmw.com/FH/FileHold/DocumentRepository/Uploa
DEALER_CODE = ocr_situ_group
BASE_URL = https://staging-bmw-ocr.situdata.com
DELAY_SECONDS = 60
\ No newline at end of file
......
......@@ -13,3 +13,5 @@ EDMS_UPLOAD_URL = http://sccn0637.bmwgroup.net/FH/FileHold/DocumentRepository/Up
DEALER_CODE = ocr_situ_group
BASE_URL = https://li19dkocruat01vm.bmwgroup.net
DELAY_SECONDS = 60
\ No newline at end of file
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!