import re import json import random import locale import numpy as np from datetime import datetime from pandas._libs import tslib from pandas._libs.tslibs.nattype import NaTType from pandas.core.indexes.datetimes import DatetimeIndex from openpyxl import Workbook from openpyxl.styles import PatternFill, numbers from openpyxl.utils import get_column_letter from apps.doc import consts class BSWorkbook(Workbook): def __init__(self, interest_keyword, salary_keyword, loan_keyword, wechat_keyword, repayments_keyword, *args, **kwargs): super().__init__(*args, **kwargs) locale.setlocale(locale.LC_NUMERIC, 'en_US.UTF-8') self.meta_sheet_title = 'Key info' self.blank_row = (None,) self.code_header = ('页数', '电子回单验证码') self.verify_header = ('页数', '图片序号', '检测内容') self.date_header = ('打印时间', '起始日期', '终止日期', '流水区间结果') self.interest_keyword_header = ('结息关键词', '记账日期', '金额') self.salary_keyword_header = ('收入关键词', '记账日期', '金额') self.repayments_keyword_header = ('还款关键词', '记账日期', '金额') self.interest_keyword = self.replace_newline(interest_keyword) self.salary_keyword = self.replace_newline(salary_keyword) self.loan_keyword = self.replace_newline(loan_keyword) self.repayments_keyword = self.replace_newline(repayments_keyword) self.wechat_keyword = wechat_keyword self.proof_res = ('对', '错') self.loan_fill = PatternFill("solid", fgColor="00FFCC00") self.amount_fill = PatternFill("solid", fgColor="00FFFF00") # self.bd = Side(style='thin', color="000000") # self.border = Border(left=self.bd, top=self.bd, right=self.bd, bottom=self.bd) self.MAX_MEAN = 31 self.need_follow = False # @staticmethod # def date_calibration(date_str): # result = True # try: # if date_str[-2] not in ['20', '21']: # result = False # if date_str[-5:-3] not in ['03', '06', '09', '12']: # result = False # except Exception as e: # result = False # return result @staticmethod def replace_newline(queryset_value): new_set = set() for v in queryset_value: new_set.add(v.replace('\\n', '\n')) return new_set @staticmethod def get_new_card(card): if not isinstance(card, str): return consts.ERROR_CARD try: new_card = card.translate(consts.SHEET_TITLE_TRANS).strip()[-4:] if len(new_card) == 0: new_card = consts.ERROR_CARD except Exception as e: new_card = consts.ERROR_CARD return new_card @staticmethod def get_header_col(header_value, classify): if header_value is None: return if classify in consts.SPECIAL_HEADERS_CLASSIFY_SET: header_dict = consts.SPECIAL_HEADERS_MAPPING else: header_dict = consts.HEADERS_MAPPING header_col = header_dict.get(header_value) if header_col is None: for pattern in consts.PATTERN_LIST: if re.search(pattern, header_value) and len(header_value) - len(pattern) <= 5: header_col = header_dict.get(pattern) break return header_col def header_collect(self, ws, sheet_header_info, header_info, max_column_list, classify): # sheet_header_info = { # 'sheet_name': { # 'summary_col': 1, # 'date_col': 1, # 'amount_col': 1, # 'over_col': 1, # 'income_col': 1, # 'outlay_col': 1, # 'borrow_col': 1, # 'min_row': 2, # 'find_count': 3, # 'find_col': {1}, # 'header': ('日期', '金额') # } # } # header_info = { # 'summary_col': { # 5: 2, # 3: 1, # }, # 'date_col': {}, # 'amount_col': {}, # 'over_col': {}, # 'income_col': {}, # 'outlay_col': {}, # 'borrow_col': {}, # } # 第一行关键词 header_col_list = [] for first_row in ws.iter_rows(max_row=1, min_row=1, values_only=True): sheet_header_info.setdefault(ws.title, {}).setdefault(consts.HEADER_KEY, first_row) for idx, header_value in enumerate(first_row): header_col = self.get_header_col(header_value, classify) if classify == consts.MS_CLASSIFY and header_col == consts.OVER_KEY and \ header_value == '账户余额现转标志' and not first_row[idx - 1]: idx -= 1 if header_col is not None: header_col_list.append((idx, header_col)) find_count = len(header_col_list) if find_count < 2: find_count = 0 else: for idx, header_col in header_col_list: sheet_header_info.setdefault(ws.title, {}).setdefault(header_col, idx) find_col_set = sheet_header_info.setdefault(ws.title, {}).setdefault(consts.FIND_COL_KEY, set()) find_col_set.add(idx) col_count = header_info.setdefault(header_col, {}).get(idx) header_info.setdefault(header_col, {})[idx] = 1 if col_count is None else col_count+1 sheet_header_info.setdefault(ws.title, {}).setdefault(consts.FIND_COUNT_KEY, find_count) min_row = 1 if find_count == 0 else 2 sheet_header_info.setdefault(ws.title, {}).setdefault(consts.MIN_ROW_KEY, min_row) max_column_list.append(ws.max_column) @staticmethod def header_statistics(sheet_header_info, header_info, classify, special_nhzs): # statistics_header_info = { # SUMMARY_KEY: 2, # DATE_KEY: 3, # AMOUNT_KEY: 4, # OVER_KEY: 5, # IMCOME_KEY: 6, # OUTLAY_KEY: 7, # BORROW_KEY: 8, # 'header': ('日期', '金额') # } statistics_header_info = {} sheet_order_list = sorted(sheet_header_info, reverse=True, key=lambda x: sheet_header_info[x][consts.FIND_COUNT_KEY]) best_sheet_info = sheet_header_info.get(sheet_order_list[0]) max_find_count = best_sheet_info.get(consts.FIND_COUNT_KEY, 0) if max_find_count == 0 or classify == consts.NEW_ZHIFUBAO_CLASSIFY: # if max_find_count == 0: if special_nhzs: classify = consts.SPECIAL_NYZS_CLASSIFY for key, value in consts.CLASSIFY_MAP.items(): col = consts.CLASSIFY_LIST[classify][1][value] statistics_header_info[key] = col - 1 if isinstance(col, int) else None statistics_header_info[consts.HEADER_KEY] = consts.CLASSIFY_HEADER_LIST[classify] else: find_col_set = best_sheet_info.get(consts.FIND_COL_KEY, set()) # SUMMARY_KEY DATE_KEY OVER_KEY BORROW_KEY for key in consts.KEY_LIST: col = best_sheet_info.get(key) if col is None: col_dict = header_info.get(key, {}) for idx in sorted(col_dict, key=lambda x: col_dict[x], reverse=True): if idx in find_col_set: continue col = idx find_col_set.add(col) break else: fixed_col = consts.CLASSIFY_LIST[classify][1][consts.CLASSIFY_MAP[key]] if fixed_col not in find_col_set and isinstance(fixed_col, int): col = fixed_col - 1 find_col_set.add(col) statistics_header_info[key] = col statistics_header_info[consts.HEADER_KEY] = best_sheet_info.get(consts.HEADER_KEY) return statistics_header_info, max_find_count @staticmethod def get_data_col_min_row(sheet, sheet_header_info, header_info, classify): date_col = sheet_header_info.get(sheet, {}).get(consts.DATE_KEY) if date_col is None: date_col_dict = header_info.get(consts.DATE_KEY, {}) find_col_set = sheet_header_info.get(sheet, {}).get(consts.FIND_COL_KEY, set()) for idx in sorted(date_col_dict, key=lambda x: date_col_dict[x], reverse=True): if idx in find_col_set: continue date_col = idx break else: fixed_col = consts.CLASSIFY_LIST[classify][1][consts.CLASSIFY_MAP[consts.DATE_KEY]] if fixed_col not in find_col_set and isinstance(fixed_col, int): date_col = fixed_col - 1 min_row = sheet_header_info.get(sheet, {}).get(consts.MIN_ROW_KEY, 2) return date_col, min_row @staticmethod def get_confidence(max_find_count, classify): if classify == consts.NEW_ZHIFUBAO_CLASSIFY: return round(random.uniform(95, 100), 2) if max_find_count == 0: return round(random.uniform(75, 80), 2) elif max_find_count == 1: return round(random.uniform(80, 85), 2) elif max_find_count == 2: return round(random.uniform(85, 90), 2) elif max_find_count == 3: return round(random.uniform(90, 95), 2) else: return round(random.uniform(95, 100), 2) @staticmethod def month_split(dti, date_list, date_statistics): month_list = [] idx_list = [] month_pre = None for idx, month_str in enumerate(dti.strftime('%Y-%m')): if isinstance(month_str, float): continue if month_str != month_pre: month_list.append(month_str) if month_pre is None: if date_statistics: date_list.append(dti[idx].date()) idx = 0 idx_list.append(idx) month_pre = month_str if date_statistics: for idx in range(len(dti) - 1, -1, -1): if isinstance(dti[idx], NaTType): continue date_list.append(dti[idx].date()) break return month_list, idx_list @staticmethod def get_reverse_trend(day_idx, idx_list): reverse_trend = 0 pre_day = None for idx, day in enumerate(day_idx): if np.isnan(day): continue if idx in idx_list or pre_day is None: pre_day = day continue if day < pre_day: reverse_trend += 1 pre_day = day elif day > pre_day: reverse_trend -= 1 pre_day = day if reverse_trend > 0: reverse_trend = 1 elif reverse_trend < 0: reverse_trend = -1 return reverse_trend def sheet_split(self, ws, date_col, min_row, month_mapping, reverse_trend_list, date_list, date_statistics): if date_col is None: # month_info process month_info = month_mapping.setdefault('xxxx-xx', []) month_info.append((ws.title, min_row, ws.max_row, 0)) return date_col = date_col + 1 for date_tuple_src in ws.iter_cols(min_col=date_col, max_col=date_col, min_row=min_row, values_only=True): date_tuple = [date.replace('\n', '')[:10] if isinstance(date, str) else date for date in date_tuple_src] dt_array, _ = tslib.array_to_datetime( np.array(date_tuple, copy=False, dtype=np.object_), errors="coerce", utc=False, dayfirst=False, yearfirst=False, require_iso8601=True, ) dti = DatetimeIndex(dt_array, tz=None, name=None) rebuid = False for idx, d in enumerate(dti): try: if isinstance(d, NaTType) and isinstance(date_tuple[idx], str): match_obj = re.match(r'(\d{4})[7/](\d{2})[7/](\d{2})', date_tuple[idx]) if match_obj: dt_array[idx] = np.datetime64(datetime(int(match_obj.group(1)), int(match_obj.group(2)), int(match_obj.group(3)))) rebuid = True except Exception as e: continue if rebuid: dti = DatetimeIndex(dt_array, tz=None, name=None) month_list, idx_list = self.month_split(dti, date_list, date_statistics) if len(month_list) == 0: # month_info process month_info = month_mapping.setdefault('xxxx-xx', []) month_info.append((ws.title, min_row, ws.max_row, 0)) else: # reverse_trend_list process reverse_trend = self.get_reverse_trend(dti.day, idx_list) reverse_trend_list.append(reverse_trend) # month_info process day_idx = dti.day idx_list_max_idx = len(idx_list) - 1 for i, item in enumerate(month_list): if i == idx_list_max_idx: day_mean = np.mean(day_idx[idx_list[i]:].dropna()) month_mapping.setdefault(item, []).append( (ws.title, idx_list[i] + min_row, ws.max_row, day_mean)) else: day_mean = np.mean(day_idx[idx_list[i]: idx_list[i + 1]].dropna()) month_mapping.setdefault(item, []).append( (ws.title, idx_list[i] + min_row, idx_list[i + 1] + min_row - 1, day_mean)) def build_metadata_rows(self, confidence, code, verify_list, print_time, start_date, end_date, res_count_tuple, is_verify_classify, metadata): metadata_rows = [('流水识别置信度', confidence)] if is_verify_classify: verify_res = '疑似伪造' if len(verify_list) > 0 else '正常' else: verify_res = '' metadata_rows.append(('流水检测结果', verify_res)) metadata_rows.append(('图片总数', res_count_tuple[0])) metadata_rows.append(('识别成功', res_count_tuple[1])) metadata_rows.append(self.blank_row) # PDF info metadata_highlight_row = [] if isinstance(metadata, str): metadata_dict = json.loads(metadata) author = metadata_dict.pop('author', '') producer = metadata_dict.pop('producer', '') metadata_rows.append(('Author', author)) metadata_rows.append(('Producer', producer)) if len(author) > 0: metadata_highlight_row.append(6) if 'iText' not in producer and 'Qt' not in producer and 'Haru Free' not in producer and 'OpenPDF' not in producer: metadata_highlight_row.append(7) metadata_rows.append(self.blank_row) verify_highlight_row = [] if is_verify_classify and len(verify_list) > 0: metadata_rows.append(self.verify_header) verify_start = len(metadata_rows) metadata_rows.extend(verify_list) for r in range(verify_start, len(metadata_rows)+1): verify_highlight_row.append(r) metadata_rows.append(self.blank_row) metadata_rows.append(self.code_header) metadata_rows.extend(code) if start_date is None or end_date is None: timedelta = '' else: timedelta = (end_date - start_date).days metadata_rows.extend( [self.blank_row, self.date_header, (print_time, start_date, end_date, timedelta), self.blank_row, self.interest_keyword_header] ) if len(verify_highlight_row) > 0 or len(metadata_highlight_row) > 0: self.need_follow = True return metadata_rows, verify_highlight_row, timedelta, metadata_highlight_row def build_meta_sheet(self, role_name, card, confidence, code, verify_list, print_time, start_date, end_date, res_count_tuple, is_verify_classify, metadata): metadata_rows, verify_highlight_row, timedelta, metadata_highlight_row = \ self.build_metadata_rows(confidence, code, verify_list, print_time, start_date, end_date, res_count_tuple, is_verify_classify, metadata) if not isinstance(role_name, str): role_name = consts.UNKNOWN_ROLE ms = self.create_sheet('{0}{1}({2})'.format(self.meta_sheet_title, role_name, card)) for row in metadata_rows: ms.append(row) for row in metadata_highlight_row: for cell in ms[row]: cell.fill = self.amount_fill if len(verify_highlight_row) > 0: for cell in ms[2]: cell.fill = self.amount_fill for row in verify_highlight_row: for cell in ms[row]: cell.fill = self.amount_fill if res_count_tuple[0] != res_count_tuple[1]: for cell in ms[3]: cell.fill = self.amount_fill for cell in ms[4]: cell.fill = self.amount_fill # verify_res = False if len(metadata_highlight_row) > 0 or len(verify_highlight_row) > 0 else True verify_res_ebank = False if len(metadata_highlight_row) > 0 else True verify_res_paper_bank = False if len(verify_highlight_row) > 0 else True return ms, timedelta, verify_res_ebank, verify_res_paper_bank @staticmethod def amount_format(amount_str): if not isinstance(amount_str, str) or amount_str == '': return amount_str # 1.替换 res_str = amount_str.translate(consts.TRANS) # 2.首字符处理 first_char = res_str[0] if first_char in consts.ERROR_CHARS: first_char = '-' # 3.删除多余的- res_str = first_char + res_str[1:].replace('-', '') # 4.逗号与句号处理 if len(res_str) >= 4: period_idx = len(res_str) - 3 if res_str[period_idx] == '.' and res_str[period_idx - 1] in {',', '.'}: # 364,.92 364..92 res_str = '{0}{1}'.format(res_str[:period_idx - 1], res_str[period_idx:]) elif res_str[period_idx] in {',', ':', ':'}: if res_str[period_idx - 1] in {',', '.', ':', ':'}: # 364.,92 364,,92 pre_idx = period_idx - 1 else: # 364,92 pre_idx = period_idx res_str = '{0}.{1}'.format(res_str[:pre_idx], res_str[period_idx + 1:]) res_str = res_str[:period_idx].replace('.', '') + res_str[period_idx:] return res_str @staticmethod def rm_cn_char(row_value, pre_col, next_col): if len(row_value) <= next_col: return row_value row_value = list(row_value) if isinstance(row_value[pre_col], str): cn_chars = re.findall(consts.CN_RE, row_value[pre_col]) cn_str = ''.join(cn_chars) row_value[pre_col] = re.sub(consts.CN_RE, '', row_value[pre_col]) if row_value[next_col] is None: row_value[next_col] = cn_str elif isinstance(row_value[next_col], str): row_value[next_col] = '{0}\n{1}'.format(cn_str, row_value[next_col]) return row_value @staticmethod def rm_second_row(row_value, amount_cell_idx, over_cell_idx): row_value = list(row_value) if isinstance(over_cell_idx, int) and isinstance(amount_cell_idx, int): max_idx = max(over_cell_idx, amount_cell_idx) elif isinstance(over_cell_idx, int): max_idx = over_cell_idx elif isinstance(amount_cell_idx, int): max_idx = amount_cell_idx else: max_idx = 0 if 1 < max_idx < len(row_value): append_list = [] for i in range(2, max_idx+1): if isinstance(row_value[i], str): split_list = row_value[i].split('\n') row_value[i] = split_list[0] append_list.extend(split_list[1:]) if isinstance(row_value[1], str): append_list.insert(0, row_value[1]) row_value[1] = '\n'.join(append_list) return row_value def build_month_sheet(self, ms, role_name, card, month_mapping, is_reverse, statistics_header_info, max_column, classify): summary_cell_idx = statistics_header_info.get(consts.SUMMARY_KEY) date_cell_idx = statistics_header_info.get(consts.DATE_KEY) amount_cell_idx = statistics_header_info.get(consts.AMOUNT_KEY) # None or src or append over_cell_idx = statistics_header_info.get(consts.OVER_KEY) income_cell_idx = statistics_header_info.get(consts.IMCOME_KEY) outlay_cell_idx = statistics_header_info.get(consts.OUTLAY_KEY) borrow_cell_idx = statistics_header_info.get(consts.BORROW_KEY) header = list(statistics_header_info.get(consts.HEADER_KEY)) src_header_len = len(header) if max_column > src_header_len: for i in range(max_column - src_header_len): header.append(None) # 3.6 金额合计列 add_col = ['核对结果'] if amount_cell_idx is None: if income_cell_idx is not None or outlay_cell_idx is not None: add_col = ['金额', '核对结果'] amount_cell_idx = len(header) amount_col_letter = get_column_letter(amount_cell_idx + 1) if isinstance(amount_cell_idx, int) else None amount_sum = '=SUBTOTAL(9,{0}:{0})'.format(amount_col_letter) if isinstance(amount_cell_idx, int) else '' add_col.append('合计') add_col.append(amount_sum) header.extend(add_col) result_idx = len(header) - 3 tmp_ws = self.create_sheet('tmp_ws') tmp2_ws = self.create_sheet('tmp2_ws') tmp3_ws = self.create_sheet('tmp3_ws') if classify in consts.ALI_WECHART_CLASSIFY: high_light_keyword = self.wechat_keyword else: high_light_keyword = self.loan_keyword for month in sorted(month_mapping.keys(), reverse=True): # 3.1.拷贝数据 parts = month_mapping.get(month) new_ws = self.create_sheet('{0}({1})'.format(month, card)) new_ws.append(header) for part in parts: ws = self.get_sheet_by_name(part[0]) is_first_row = True for row_value in ws.iter_rows(min_row=part[1], max_row=part[2], values_only=True): if any(row_value): if classify == consts.SPECIAL_ZSYH_CLASSIFY and is_first_row: # 招商银行2行表头 find_count = 0 for tmp_idx, value_str in enumerate(row_value): if not isinstance(value_str, str): continue if tmp_idx >= len(consts.EN_HEADER_LIST): continue if value_str.find(consts.EN_HEADER_LIST[tmp_idx]) != -1: find_count += 1 if find_count > 1: is_first_row = False continue if classify == consts.WECHART_CLASSIFY: row_value = self.rm_cn_char(row_value, *consts.WECHART_ERROR_COL) elif classify == consts.MS_CLASSIFY: row_value = self.rm_cn_char(row_value, *consts.MS_ERROR_COL) elif classify in consts.NYYH_CLASSIFY: row_value = self.rm_second_row(row_value, amount_cell_idx, over_cell_idx) new_ws.append(row_value) is_first_row = False amount_mapping = {} amount_fill_row = set() fill_row = set() # 添加筛选 new_ws.auto_filter.ref = 'A1:{0}{1}'.format(get_column_letter(new_ws.max_column-2), new_ws.max_row) for rows in new_ws.iter_rows(min_row=2): length = len(rows) summary_cell = None if summary_cell_idx is None or summary_cell_idx >= length else rows[summary_cell_idx] date_cell = None if date_cell_idx is None or date_cell_idx >= length else rows[date_cell_idx] amount_cell = None if amount_cell_idx is None or amount_cell_idx >= length else rows[amount_cell_idx] over_cell = None if over_cell_idx is None or over_cell_idx >= length else rows[over_cell_idx] income_cell = None if income_cell_idx is None or income_cell_idx >= length else rows[income_cell_idx] outlay_cell = None if outlay_cell_idx is None or outlay_cell_idx >= length else rows[outlay_cell_idx] borrow_cell = None if borrow_cell_idx is None or borrow_cell_idx >= length else rows[borrow_cell_idx] summary_cell_value = None if summary_cell is None else summary_cell.value date_cell_value = None if date_cell is None else date_cell.value amount_cell_value = None if amount_cell is None else amount_cell.value over_cell_value = None if over_cell is None else over_cell.value income_cell_value = None if income_cell is None else income_cell.value outlay_cell_value = None if outlay_cell is None else outlay_cell.value borrow_cell_value = None if borrow_cell is None else borrow_cell.value # 贷款关键词高亮 if summary_cell is not None and summary_cell_value in high_light_keyword: fill_row.add(summary_cell.row) # 户名高亮 row_num = 2 for cell in rows: row_num = cell.row if isinstance(cell.value, str) and cell.value.find(role_name) != -1 and summary_cell is not None: fill_row.add(summary_cell.row) break # 3.3.余额转数值 over_success = False if over_cell is not None: try: over_cell.value = locale.atof(self.amount_format(over_cell_value)) except Exception as e: pass else: over_success = True over_cell.number_format = numbers.FORMAT_NUMBER_00 # 3.4.金额转数值 amount_success = False if amount_cell is not None: try: try: amount_cell.value = locale.atof(self.amount_format(amount_cell_value)) except Exception as e: try: amount_cell.value = locale.atof(self.amount_format(income_cell_value)) if amount_cell.value == 0: raise elif amount_cell.value < 0: amount_cell.value = -amount_cell.value except Exception as e: amount_cell.value = locale.atof(self.amount_format(outlay_cell_value)) if amount_cell.value > 0: amount_cell.value = -amount_cell.value except Exception as e: pass else: amount_success = True if borrow_cell_value in consts.BORROW_OUTLAY_SET: amount_cell.value = -amount_cell.value amount_cell.number_format = numbers.FORMAT_NUMBER_00 if date_cell is not None and isinstance(date_cell_value, str): same_amount_mapping = amount_mapping.get(date_cell_value[:10], {}) fill_rows_set = same_amount_mapping.get(-amount_cell.value, set()) if len(fill_rows_set) > 0: amount_fill_row.add(amount_cell.row) amount_fill_row.add(fill_rows_set.pop()) else: amount_mapping.setdefault(date_cell_value[:10], {}).setdefault( amount_cell.value, set()).add(amount_cell.row) # 3.5.核对结果 if amount_success and over_success and amount_cell.row > 2: over_col_letter = get_column_letter(over_cell_idx + 1) if is_reverse: rows[result_idx].value = '=IF({2}{0}=ROUND(SUM({2}{1},{3}{0}),4), "{4}", "{5}")'.format( amount_cell.row - 1, amount_cell.row, over_col_letter, amount_col_letter, *self.proof_res) else: rows[result_idx].value = '=IF({2}{0}=ROUND(SUM({2}{1},{3}{0}),4), "{4}", "{5}")'.format( amount_cell.row, amount_cell.row - 1, over_col_letter, amount_col_letter, *self.proof_res) # 3.2.提取信息、高亮 # row = summary_cell.row if summary_cell is not None: # 关键词1提取 if summary_cell_value in self.interest_keyword: new_amount_cell_value = None if amount_cell is None else amount_cell.value tmp3_ws.append((summary_cell_value, date_cell_value, new_amount_cell_value)) # 关键词2提取至临时表 elif summary_cell_value in self.salary_keyword: new_amount_cell_value = None if amount_cell is None else amount_cell.value tmp_ws.append((summary_cell_value, date_cell_value, new_amount_cell_value)) # 关键词3提取至临时表 elif summary_cell_value in self.repayments_keyword: new_amount_cell_value = None if amount_cell is None else amount_cell.value tmp2_ws.append((summary_cell_value, date_cell_value, new_amount_cell_value)) # 贷款关键词高亮 # elif summary_cell_value in high_light_keyword: # summary_cell.fill = self.amount_fill # if amount_cell is not None: # amount_cell.fill = self.amount_fill for row in fill_row: for cell in new_ws[row]: cell.fill = self.amount_fill # 3.6.同一天相同进出账高亮 del amount_mapping for row in amount_fill_row: for cell in new_ws[row]: cell.fill = self.amount_fill # new_ws[row][amount_cell_idx].fill = self.amount_fill # if summary_cell_idx is not None: # new_ws[row][summary_cell_idx].fill = self.amount_fill # 关键词1信息提取:结息 for row in tmp3_ws.iter_rows(values_only=True): ms.append(row) # # 建设银行 # if classify in consts.JSYH_CLASSIFY: # if isinstance(row[1], str) and self.date_calibration(row[1]): # pass # else: # for cell in ms[ms.max_row]: # cell.fill = self.amount_fill self.remove(tmp3_ws) # 关键词2信息提取 ms.append(self.blank_row) ms.append(self.salary_keyword_header) for row in tmp_ws.iter_rows(values_only=True): ms.append(row) self.remove(tmp_ws) # 关键词3信息提取 ms.append(self.blank_row) ms.append(self.repayments_keyword_header) for row in tmp2_ws.iter_rows(values_only=True): ms.append(row) self.remove(tmp2_ws) def bs_rebuild(self, bs_summary, res_count_tuple, metadata=None): # bs_summary = { # '卡号': { # 'classify': 0, # 'confidence': 0.9, # 'role': '柳雪', # 'code': [('page', 'code'), ], # 'verify': [(pno, ino, reason_str), ] # 'print_time': 'datetime', # 'start_date': 'datetime', # 'end_date': 'datetime', # 'sheet': ['sheet_name'] # } # } for card, summary in bs_summary.items(): special_nhzs = False new_card = self.get_new_card(card) # 1.原表表头收集、按照月份分割 # 1.1 总结首行信息 src_role_name = summary.get('role', consts.UNKNOWN_ROLE) role_name = src_role_name.translate(consts.SHEET_TITLE_TRANS).strip()[:3] if isinstance(src_role_name, str)\ else consts.UNKNOWN_ROLE classify = summary.get('classify', 0) sheet_header_info = {} header_info = {} max_column_list = [] sheets_list = summary.get('sheet', []) special_nhzs_max_col = 0 for sheet in sheets_list: ws = self.get_sheet_by_name(sheet) if classify == consts.NYZS_CLASSIFY: special_nhzs_max_col += ws.max_column self.header_collect(ws, sheet_header_info, header_info, max_column_list, classify) # 农业银行整数表头特殊处理 if classify == consts.NYZS_CLASSIFY and round(special_nhzs_max_col / len(sheets_list)) == 5: special_nhzs = True statistics_header_info, max_find_count = self.header_statistics( sheet_header_info, header_info, classify, special_nhzs) max_column = max(max_column_list) # 1.2.按月份分割 min_row 正文第一行 date_col 日期行 start_date = summary.get('start_date') end_date = summary.get('end_date') date_statistics = True if start_date is None or end_date is None else False # 用于判断是否需要收集各表中日期 date_list = [] # 用于收集各表中日期 month_mapping = {} # 用于创建月份表 reverse_trend_list = [] # 用于判断倒序与正序 for sheet in sheets_list: ws = self.get_sheet_by_name(sheet) date_col, min_row = self.get_data_col_min_row(sheet, sheet_header_info, header_info, classify) self.sheet_split(ws, date_col, min_row, month_mapping, reverse_trend_list, date_list, date_statistics) if date_statistics is True and len(date_list) > 1: start_date = min(date_list) if start_date is None else start_date end_date = max(date_list) if end_date is None else end_date # 2.元信息提取表 confidence = self.get_confidence(max_find_count, classify) is_verify_classify = classify in consts.BS_VERIFY_CLASSIFY ms, timedelta, verify_res_ebank, verify_res_paper_bank = self.build_meta_sheet(role_name, new_card, confidence, summary.get('code'), summary.get('verify'), summary.get('print_time'), start_date, end_date, res_count_tuple, is_verify_classify, metadata) summary['timedelta'] = timedelta summary['end_date'] = end_date summary['verify_res_ebank'] = verify_res_ebank summary['verify_res_paper_bank'] = verify_res_paper_bank # 3.创建月份表、提取/高亮关键行 # 倒序处理 is_reverse = True if sum(reverse_trend_list) > 0 else False for month_list in month_mapping.values(): month_list.sort(key=lambda x: x[-1], reverse=is_reverse) self.build_month_sheet(ms, role_name, new_card, month_mapping, is_reverse, statistics_header_info, max_column, classify) # 4.删除原表 for sheet in sheets_list: self.remove(self.get_sheet_by_name(sheet)) def license_rebuild(self, license_summary, document_scheme, count_list): for classify, (_, name, field_order, side_diff, scheme_diff, field_str) in consts.LICENSE_ORDER: license_list = license_summary.get(classify) if not license_list: continue count = 0 ws = self.create_sheet(name) if scheme_diff and document_scheme == consts.DOC_SCHEME_LIST[1]: classify = consts.MVC_CLASSIFY_SE for license_dict in license_list: if side_diff: key, field_order_yes, field_order_no = consts.FIELD_ORDER_MAP.get(classify) field_order = field_order_yes if key in license_dict else field_order_no for search_field, write_field in field_order: field_value = license_dict.get(search_field, '') if isinstance(field_value, list): ws.append((write_field, *field_value)) else: ws.append((write_field, field_value)) ws.append((None, )) count += 1 if field_str is not None: count_list.append((field_str, count)) def contract_rebuild(self, contract_result_dict, is_ca=False): for classify, contract_result in contract_result_dict.items(): if len(contract_result) == 0: continue if is_ca and classify not in consts.FSM_CONTRACT_CLASSIFY_SET: continue ws = self.create_sheet(consts.CONTRACT_MAP.get(classify)) for i in range(30): if str(i) in contract_result: page_num = str(i) info_list = contract_result.get(page_num, []) # for page_num, info_list in contract_result.items(): ws.append(('page {0}'.format(page_num), )) for info in info_list: for row in info: ws.append(row) ws.append((None, )) @staticmethod def remove_yuan(amount_key_set, key, src_str): if key in amount_key_set and isinstance(src_str, str): return src_str.replace('元', '') else: return src_str def ltgt_build(self, label, result_dict, amount_key_set): ws = self.create_sheet(label) rebuild_res = {} for key, value in result_dict.items(): if isinstance(value, list): value_list = [dict_item.get('words') for dict_item in value] ws.append((key, '、'.join(value_list))) rebuild_res[key] = '、'.join(value_list) elif isinstance(value, dict): if 'words' in value: new_value = self.remove_yuan(amount_key_set, key, value['words']) ws.append((key, new_value)) rebuild_res[key] = new_value else: for sub_key, sub_value in value.items(): if isinstance(sub_value, dict): new_value = self.remove_yuan(amount_key_set, sub_key, sub_value.get('words', '')) ws.append(('{0}: {1}'.format(key, sub_key), new_value)) rebuild_res['{0}: {1}'.format(key, sub_key)] = new_value else: new_value = self.remove_yuan(amount_key_set, sub_key, sub_value) ws.append(('{0}: {1}'.format(key, sub_key), new_value)) rebuild_res['{0}: {1}'.format(key, sub_key)] = new_value else: new_value = self.remove_yuan(amount_key_set, key, value) ws.append((key, new_value)) rebuild_res[key] = new_value return rebuild_res def simple_license_rebuild(self, license_summary, document_scheme): # for ic_license_dict in license_summary.get(consts.IC_CLASSIFY, []): # if ic_license_dict.get('类别') == '1': # license_summary.setdefault(consts.RP_CLASSIFY, []).append(ic_license_dict) # continue # # for vat_license_dict in license_summary.get(consts.VAT_CLASSIFY, []): # if vat_license_dict.get('发票类型') == 'special': # license_summary.setdefault(consts.VATS_CLASSIFY, []).append(vat_license_dict) # continue for classify, (_, name, field_order, side_diff, scheme_diff, _) in consts.FOLDER_LICENSE_ORDER: license_list = license_summary.get(classify) if not license_list: continue # if scheme_diff and document_scheme == consts.DOC_SCHEME_LIST[1]: # classify = consts.MVC_CLASSIFY_SE for license_dict in license_list: if classify == consts.IC_CLASSIFY and license_dict.get('类别') == '1': # 居住证处理 license_summary.setdefault(consts.RP_CLASSIFY, []).append(license_dict) continue if classify == consts.VAT_CLASSIFY and license_dict.get('发票类型') == 'special': license_summary.setdefault(consts.VATS_CLASSIFY, []).append(license_dict) continue if name in self.sheetnames: ws = self.get_sheet_by_name(name) else: ws = self.create_sheet(name) if side_diff: key, field_order_yes, field_order_no = consts.FIELD_ORDER_MAP.get(classify) field_order = field_order_yes if key in license_dict else field_order_no for search_field, write_field in field_order: field_value = license_dict.get(search_field, '') if isinstance(field_value, list): ws.append((write_field, *field_value)) else: ws.append((write_field, field_value)) ws.append((None, )) def res_sheet(self, res_list): if res_list: res_list.sort(key=lambda x: (x[0], x[1], x[2])) ws = self.create_sheet(consts.RES_SHEET_NAME) ws.append(consts.RES_SHEET_HEADER) success_count = 0 for res_tuple in res_list: if res_tuple[-1] not in consts.RES_FAILED_SET: success_count += 1 ws.append(res_tuple) return len(res_list), success_count else: return 0, 0 def move_res_sheet(self): sheet = self.get_sheet_by_name(consts.RES_SHEET_NAME) idx = self._sheets.index(sheet) del self._sheets[idx] self._sheets.append(sheet) def remove_base_sheet(self): if len(self.sheetnames) > 1: self.remove(self.get_sheet_by_name('Sheet')) def rebuild(self, bs_summary, license_summary, res_list, document_scheme, contract_result, metadata): res_count_tuple = self.res_sheet(res_list) count_list = [(consts.MODEL_FIELD_BS, len(bs_summary))] if document_scheme == consts.DOC_SCHEME_LIST[1]: self.license_rebuild(license_summary, document_scheme, count_list) self.contract_rebuild(contract_result) self.bs_rebuild(bs_summary, res_count_tuple, metadata) else: self.bs_rebuild(bs_summary, res_count_tuple, metadata) self.license_rebuild(license_summary, document_scheme, count_list) self.contract_rebuild(contract_result, True) self.move_res_sheet() self.remove_base_sheet() return count_list, self.need_follow