import re import random import locale import numpy as np from datetime import datetime from pandas._libs import tslib from pandas._libs.tslibs.nattype import NaTType from pandas.core.indexes.datetimes import DatetimeIndex from openpyxl import Workbook from openpyxl.styles import Border, Side, PatternFill, numbers from openpyxl.utils import get_column_letter from apps.doc import consts class BSWorkbook(Workbook): def __init__(self, interest_keyword, salary_keyword, loan_keyword, wechat_keyword, *args, **kwargs): super().__init__(*args, **kwargs) locale.setlocale(locale.LC_NUMERIC, 'en_US.UTF-8') self.meta_sheet_title = '关键信息提取和展示' self.blank_row = (None,) self.code_header = ('页数', '电子回单验证码') self.date_header = ('打印时间', '起始日期', '终止日期', '流水区间结果') self.keyword_header = ('关键词', '记账日期', '金额') self.interest_keyword = self.replace_newline(interest_keyword) self.salary_keyword = self.replace_newline(salary_keyword) self.loan_keyword = self.replace_newline(loan_keyword) self.wechat_keyword = wechat_keyword self.proof_res = ('对', '错') self.loan_fill = PatternFill("solid", fgColor="00FFCC00") self.amount_fill = PatternFill("solid", fgColor="00FFFF00") # self.bd = Side(style='thin', color="000000") # self.border = Border(left=self.bd, top=self.bd, right=self.bd, bottom=self.bd) self.MAX_MEAN = 31 @staticmethod def replace_newline(queryset_value): new_set = set() for v in queryset_value: new_set.add(v.replace('\\n', '\n')) return new_set @staticmethod def get_new_card(card): if not isinstance(card, str): return consts.ERROR_CARD try: new_card = card.translate(consts.SHEET_TITLE_TRANS).strip()[-6:] if len(new_card) == 0: new_card = consts.ERROR_CARD except Exception as e: new_card = consts.ERROR_CARD return new_card @staticmethod def get_header_col(header_value, classify): if header_value is None: return if classify == consts.WECHART_CLASSIFY: header_dict = consts.WECHART_HEADERS_MAPPING else: header_dict = consts.HEADERS_MAPPING header_col = header_dict.get(header_value) if header_col is None: for pattern in consts.PATTERN_LIST: if re.search(pattern, header_value): header_col = header_dict.get(pattern) break return header_col def header_collect(self, ws, sheet_header_info, header_info, max_column_list, classify): # sheet_header_info = { # 'sheet_name': { # 'summary_col': 1, # 'date_col': 1, # 'amount_col': 1, # 'over_col': 1, # 'income_col': 1, # 'outlay_col': 1, # 'borrow_col': 1, # 'min_row': 2, # 'find_count': 3, # 'find_col': {1}, # 'header': ('日期', '金额') # } # } # header_info = { # 'summary_col': { # 5: 2, # 3: 1, # }, # 'date_col': {}, # 'amount_col': {}, # 'over_col': {}, # 'income_col': {}, # 'outlay_col': {}, # 'borrow_col': {}, # } # 第一行关键词 header_col_list = [] for first_row in ws.iter_rows(max_row=1, min_row=1, values_only=True): sheet_header_info.setdefault(ws.title, {}).setdefault(consts.HEADER_KEY, first_row) for idx, header_value in enumerate(first_row): header_col = self.get_header_col(header_value, classify) if classify == consts.MS_CLASSIFY and header_col == consts.OVER_KEY and \ header_value == '账户余额现转标志' and not first_row[idx - 1]: idx -= 1 if header_col is not None: header_col_list.append((idx, header_col)) find_count = len(header_col_list) if find_count < 2: find_count = 0 else: for idx, header_col in header_col_list: sheet_header_info.setdefault(ws.title, {}).setdefault(header_col, idx) find_col_set = sheet_header_info.setdefault(ws.title, {}).setdefault(consts.FIND_COL_KEY, set()) find_col_set.add(idx) col_count = header_info.setdefault(header_col, {}).get(idx) header_info.setdefault(header_col, {})[idx] = 1 if col_count is None else col_count+1 sheet_header_info.setdefault(ws.title, {}).setdefault(consts.FIND_COUNT_KEY, find_count) min_row = 1 if find_count == 0 else 2 sheet_header_info.setdefault(ws.title, {}).setdefault(consts.MIN_ROW_KEY, min_row) max_column_list.append(ws.max_column) @staticmethod def header_statistics(sheet_header_info, header_info, classify, special_nhzs): # statistics_header_info = { # SUMMARY_KEY: 2, # DATE_KEY: 3, # AMOUNT_KEY: 4, # OVER_KEY: 5, # IMCOME_KEY: 6, # OUTLAY_KEY: 7, # BORROW_KEY: 8, # 'header': ('日期', '金额') # } statistics_header_info = {} sheet_order_list = sorted(sheet_header_info, reverse=True, key=lambda x: sheet_header_info[x][consts.FIND_COUNT_KEY]) best_sheet_info = sheet_header_info.get(sheet_order_list[0]) max_find_count = best_sheet_info.get(consts.FIND_COUNT_KEY, 0) if max_find_count == 0: if special_nhzs: classify = consts.SPECIAL_NYZS_CLASSIFY for key, value in consts.CLASSIFY_MAP.items(): col = consts.CLASSIFY_LIST[classify][1][value] statistics_header_info[key] = col - 1 if isinstance(col, int) else None statistics_header_info[consts.HEADER_KEY] = consts.CLASSIFY_HEADER_LIST[classify] else: find_col_set = best_sheet_info.get(consts.FIND_COL_KEY, set()) # SUMMARY_KEY DATE_KEY OVER_KEY BORROW_KEY for key in consts.KEY_LIST: col = best_sheet_info.get(key) if col is None: col_dict = header_info.get(key, {}) for idx in sorted(col_dict, key=lambda x: col_dict[x], reverse=True): if idx in find_col_set: continue col = idx find_col_set.add(col) break else: fixed_col = consts.CLASSIFY_LIST[classify][1][consts.CLASSIFY_MAP[key]] if fixed_col not in find_col_set and isinstance(fixed_col, int): col = fixed_col - 1 find_col_set.add(col) statistics_header_info[key] = col statistics_header_info[consts.HEADER_KEY] = best_sheet_info.get(consts.HEADER_KEY) return statistics_header_info, max_find_count @staticmethod def get_data_col_min_row(sheet, sheet_header_info, header_info, classify): date_col = sheet_header_info.get(sheet, {}).get(consts.DATE_KEY) if date_col is None: date_col_dict = header_info.get(consts.DATE_KEY, {}) find_col_set = sheet_header_info.get(sheet, {}).get(consts.FIND_COL_KEY, set()) for idx in sorted(date_col_dict, key=lambda x: date_col_dict[x], reverse=True): if idx in find_col_set: continue date_col = idx break else: fixed_col = consts.CLASSIFY_LIST[classify][1][consts.CLASSIFY_MAP[consts.DATE_KEY]] if fixed_col not in find_col_set and isinstance(fixed_col, int): date_col = fixed_col - 1 min_row = sheet_header_info.get(sheet, {}).get(consts.MIN_ROW_KEY, 2) return date_col, min_row @staticmethod def get_confidence(max_find_count): if max_find_count == 0: return round(random.uniform(75, 80), 2) elif max_find_count == 1: return round(random.uniform(80, 85), 2) elif max_find_count == 2: return round(random.uniform(85, 90), 2) elif max_find_count == 3: return round(random.uniform(90, 95), 2) else: return round(random.uniform(95, 100), 2) @staticmethod def month_split(dti, date_list, date_statistics): month_list = [] idx_list = [] month_pre = None for idx, month_str in enumerate(dti.strftime('%Y-%m')): if isinstance(month_str, float): continue if month_str != month_pre: month_list.append(month_str) if month_pre is None: if date_statistics: date_list.append(dti[idx].date()) idx = 0 idx_list.append(idx) month_pre = month_str if date_statistics: for idx in range(len(dti) - 1, -1, -1): if isinstance(dti[idx], NaTType): continue date_list.append(dti[idx].date()) break return month_list, idx_list @staticmethod def get_reverse_trend(day_idx, idx_list): reverse_trend = 0 pre_day = None for idx, day in enumerate(day_idx): if np.isnan(day): continue if idx in idx_list or pre_day is None: pre_day = day continue if day < pre_day: reverse_trend += 1 pre_day = day elif day > pre_day: reverse_trend -= 1 pre_day = day if reverse_trend > 0: reverse_trend = 1 elif reverse_trend < 0: reverse_trend = -1 return reverse_trend def sheet_split(self, ws, date_col, min_row, month_mapping, reverse_trend_list, date_list, date_statistics): if date_col is None: # month_info process month_info = month_mapping.setdefault('xxxx-xx', []) month_info.append((ws.title, min_row, ws.max_row, 0)) return date_col = date_col + 1 for date_tuple_src in ws.iter_cols(min_col=date_col, max_col=date_col, min_row=min_row, values_only=True): date_tuple = [date[:10] if isinstance(date, str) else date for date in date_tuple_src] dt_array, _ = tslib.array_to_datetime( np.array(date_tuple, copy=False, dtype=np.object_), errors="coerce", utc=False, dayfirst=False, yearfirst=False, require_iso8601=True, ) dti = DatetimeIndex(dt_array, tz=None, name=None) rebuid = False for idx, d in enumerate(dti): try: if isinstance(d, NaTType) and isinstance(date_tuple[idx], str): match_obj = re.match(r'(\d{4})[7/](\d{2})[7/](\d{2})', date_tuple[idx]) if match_obj: dt_array[idx] = np.datetime64(datetime(int(match_obj.group(1)), int(match_obj.group(2)), int(match_obj.group(3)))) rebuid = True except Exception as e: continue if rebuid: dti = DatetimeIndex(dt_array, tz=None, name=None) month_list, idx_list = self.month_split(dti, date_list, date_statistics) if len(month_list) == 0: # month_info process month_info = month_mapping.setdefault('xxxx-xx', []) month_info.append((ws.title, min_row, ws.max_row, 0)) else: # reverse_trend_list process reverse_trend = self.get_reverse_trend(dti.day, idx_list) reverse_trend_list.append(reverse_trend) # month_info process day_idx = dti.day idx_list_max_idx = len(idx_list) - 1 for i, item in enumerate(month_list): if i == idx_list_max_idx: day_mean = np.mean(day_idx[idx_list[i]:].dropna()) month_mapping.setdefault(item, []).append( (ws.title, idx_list[i] + min_row, ws.max_row, day_mean)) else: day_mean = np.mean(day_idx[idx_list[i]: idx_list[i + 1]].dropna()) month_mapping.setdefault(item, []).append( (ws.title, idx_list[i] + min_row, idx_list[i + 1] + min_row - 1, day_mean)) def build_metadata_rows(self, confidence, code, print_time, start_date, end_date): if start_date is None or end_date is None: timedelta = None else: timedelta = (end_date - start_date).days metadata_rows = [ ('流水识别置信度', confidence), self.blank_row, self.code_header, ] metadata_rows.extend(code) metadata_rows.extend( [self.blank_row, self.date_header, (print_time, start_date, end_date, timedelta), self.blank_row, self.keyword_header] ) return metadata_rows def build_meta_sheet(self, card, confidence, code, print_time, start_date, end_date): metadata_rows = self.build_metadata_rows(confidence, code, print_time, start_date, end_date) ms = self.create_sheet('{0}({1})'.format(self.meta_sheet_title, card)) for row in metadata_rows: ms.append(row) return ms @staticmethod def amount_format(amount_str): if not isinstance(amount_str, str) or amount_str == '': return amount_str # 1.替换 res_str = amount_str.translate(consts.TRANS) # 2.首字符处理 first_char = res_str[0] if first_char in consts.ERROR_CHARS: first_char = '-' # 3.删除多余的- res_str = first_char + res_str[1:].replace('-', '') # 4.逗号与句号处理 if len(res_str) >= 4: period_idx = len(res_str) - 3 if res_str[period_idx] == '.' and res_str[period_idx - 1] in {',', '.'}: # 364,.92 364..92 res_str = '{0}{1}'.format(res_str[:period_idx - 1], res_str[period_idx:]) elif res_str[period_idx] in {',', ':', ':'}: if res_str[period_idx - 1] in {',', '.', ':', ':'}: # 364.,92 364,,92 pre_idx = period_idx - 1 else: # 364,92 pre_idx = period_idx res_str = '{0}.{1}'.format(res_str[:pre_idx], res_str[period_idx + 1:]) res_str = res_str[:period_idx].replace('.', '') + res_str[period_idx:] return res_str @staticmethod def rm_cn_char(row_value, pre_col, next_col): if len(row_value) <= next_col: return row_value row_value = list(row_value) if isinstance(row_value[pre_col], str): cn_chars = re.findall(consts.CN_RE, row_value[pre_col]) cn_str = ''.join(cn_chars) row_value[pre_col] = re.sub(consts.CN_RE, '', row_value[pre_col]) if row_value[next_col] is None: row_value[next_col] = cn_str elif isinstance(row_value[next_col], str): row_value[next_col] = '{0}\n{1}'.format(cn_str, row_value[next_col]) return row_value @staticmethod def rm_second_row(row_value, amount_cell_idx, over_cell_idx): row_value = list(row_value) if isinstance(over_cell_idx, int) and isinstance(amount_cell_idx, int): max_idx = max(over_cell_idx, amount_cell_idx) elif isinstance(over_cell_idx, int): max_idx = over_cell_idx elif isinstance(amount_cell_idx, int): max_idx = amount_cell_idx else: max_idx = 0 if 1 < max_idx < len(row_value): append_list = [] for i in range(2, max_idx+1): if isinstance(row_value[i], str): split_list = row_value[i].split('\n') row_value[i] = split_list[0] append_list.extend(split_list[1:]) if isinstance(row_value[1], str): append_list.insert(0, row_value[1]) row_value[1] = '\n'.join(append_list) return row_value def build_month_sheet(self, ms, card, month_mapping, is_reverse, statistics_header_info, max_column, classify): summary_cell_idx = statistics_header_info.get(consts.SUMMARY_KEY) date_cell_idx = statistics_header_info.get(consts.DATE_KEY) amount_cell_idx = statistics_header_info.get(consts.AMOUNT_KEY) # None or src or append over_cell_idx = statistics_header_info.get(consts.OVER_KEY) income_cell_idx = statistics_header_info.get(consts.IMCOME_KEY) outlay_cell_idx = statistics_header_info.get(consts.OUTLAY_KEY) borrow_cell_idx = statistics_header_info.get(consts.BORROW_KEY) header = list(statistics_header_info.get(consts.HEADER_KEY)) src_header_len = len(header) if max_column > src_header_len: for i in range(max_column - src_header_len): header.append(None) add_col = ['核对结果'] if amount_cell_idx is None: if income_cell_idx is not None or outlay_cell_idx is not None: add_col = ['金额', '核对结果'] amount_cell_idx = len(header) header.extend(add_col) result_idx = len(header) - 1 tmp_ws = self.create_sheet('tmp_ws') if classify in consts.ALI_WECHART_CLASSIFY: high_light_keyword = self.wechat_keyword else: high_light_keyword = self.loan_keyword for month in sorted(month_mapping.keys()): # 3.1.拷贝数据 parts = month_mapping.get(month) new_ws = self.create_sheet('{0}({1})'.format(month, card)) new_ws.append(header) for part in parts: ws = self.get_sheet_by_name(part[0]) for row_value in ws.iter_rows(min_row=part[1], max_row=part[2], values_only=True): if any(row_value): if classify == consts.WECHART_CLASSIFY: row_value = self.rm_cn_char(row_value, *consts.WECHART_ERROR_COL) elif classify == consts.MS_CLASSIFY: row_value = self.rm_cn_char(row_value, *consts.MS_ERROR_COL) elif classify in consts.NYYH_CLASSIFY: row_value = self.rm_second_row(row_value, amount_cell_idx, over_cell_idx) new_ws.append(row_value) amount_mapping = {} amount_fill_row = set() loan_fill_row = set() for rows in new_ws.iter_rows(min_row=2): length = len(rows) summary_cell = None if summary_cell_idx is None or summary_cell_idx >= length else rows[summary_cell_idx] date_cell = None if date_cell_idx is None or date_cell_idx >= length else rows[date_cell_idx] amount_cell = None if amount_cell_idx is None or amount_cell_idx >= length else rows[amount_cell_idx] over_cell = None if over_cell_idx is None or over_cell_idx >= length else rows[over_cell_idx] income_cell = None if income_cell_idx is None or income_cell_idx >= length else rows[income_cell_idx] outlay_cell = None if outlay_cell_idx is None or outlay_cell_idx >= length else rows[outlay_cell_idx] borrow_cell = None if borrow_cell_idx is None or borrow_cell_idx >= length else rows[borrow_cell_idx] summary_cell_value = None if summary_cell is None else summary_cell.value date_cell_value = None if date_cell is None else date_cell.value amount_cell_value = None if amount_cell is None else amount_cell.value over_cell_value = None if over_cell is None else over_cell.value income_cell_value = None if income_cell is None else income_cell.value outlay_cell_value = None if outlay_cell is None else outlay_cell.value borrow_cell_value = None if borrow_cell is None else borrow_cell.value # 贷款关键词高亮 if summary_cell is not None and summary_cell_value in high_light_keyword: loan_fill_row.add(summary_cell.row) # 3.3.余额转数值 over_success = False if over_cell is not None: try: over_cell.value = locale.atof(self.amount_format(over_cell_value)) except Exception as e: pass else: over_success = True over_cell.number_format = numbers.FORMAT_NUMBER_00 # 3.4.金额转数值 amount_success = False if amount_cell is not None: try: try: amount_cell.value = locale.atof(self.amount_format(amount_cell_value)) except Exception as e: try: amount_cell.value = locale.atof(self.amount_format(income_cell_value)) if amount_cell.value == 0: raise elif amount_cell.value < 0: amount_cell.value = -amount_cell.value except Exception as e: amount_cell.value = locale.atof(self.amount_format(outlay_cell_value)) if amount_cell.value > 0: amount_cell.value = -amount_cell.value except Exception as e: pass else: amount_success = True if borrow_cell_value in consts.BORROW_OUTLAY_SET: amount_cell.value = -amount_cell.value amount_cell.number_format = numbers.FORMAT_NUMBER_00 if date_cell is not None and isinstance(date_cell_value, str): same_amount_mapping = amount_mapping.get(date_cell_value[:10], {}) fill_rows = same_amount_mapping.get(-amount_cell.value) if fill_rows: amount_fill_row.add(amount_cell.row) amount_fill_row.update(fill_rows) amount_mapping.setdefault(date_cell_value[:10], {}).setdefault( amount_cell.value, []).append(amount_cell.row) # 3.5.核对结果 if amount_success and over_success and amount_cell.row > 2: amount_col_letter = get_column_letter(amount_cell_idx + 1) over_col_letter = get_column_letter(over_cell_idx + 1) if is_reverse: rows[result_idx].value = '=IF({2}{0}=ROUND(SUM({2}{1},{3}{0}),4), "{4}", "{5}")'.format( amount_cell.row - 1, amount_cell.row, over_col_letter, amount_col_letter, *self.proof_res) else: rows[result_idx].value = '=IF({2}{0}=ROUND(SUM({2}{1},{3}{0}),4), "{4}", "{5}")'.format( amount_cell.row, amount_cell.row - 1, over_col_letter, amount_col_letter, *self.proof_res) # 3.2.提取信息、高亮 # row = summary_cell.row if summary_cell is not None: # 关键词1提取 if summary_cell_value in self.interest_keyword: new_amount_cell_value = None if amount_cell is None else amount_cell.value ms.append((summary_cell_value, date_cell_value, new_amount_cell_value)) # 关键词2提取至临时表 elif summary_cell_value in self.salary_keyword: new_amount_cell_value = None if amount_cell is None else amount_cell.value tmp_ws.append((summary_cell_value, date_cell_value, new_amount_cell_value)) # 贷款关键词高亮 # elif summary_cell_value in high_light_keyword: # summary_cell.fill = self.amount_fill # if amount_cell is not None: # amount_cell.fill = self.amount_fill for row in loan_fill_row: for cell in new_ws[row]: cell.fill = self.amount_fill # 3.6.同一天相同进出账高亮 del amount_mapping for row in amount_fill_row: for cell in new_ws[row]: cell.fill = self.amount_fill # new_ws[row][amount_cell_idx].fill = self.amount_fill # if summary_cell_idx is not None: # new_ws[row][summary_cell_idx].fill = self.amount_fill # 关键词2信息提取 ms.append(self.blank_row) ms.append(self.keyword_header) for row in tmp_ws.iter_rows(values_only=True): ms.append(row) self.remove(tmp_ws) def bs_rebuild(self, bs_summary): # bs_summary = { # '卡号': { # 'classify': 0, # 'confidence': 0.9, # 'role': '柳雪', # 'code': [('page', 'code')], # 'print_time': 'datetime', # 'start_date': 'datetime', # 'end_date': 'datetime', # 'sheet': ['sheet_name'] # } # } for card, summary in bs_summary.items(): special_nhzs = False new_card = self.get_new_card(card) # 1.原表表头收集、按照月份分割 # 1.1 总结首行信息 classify = summary.get('classify', 0) sheet_header_info = {} header_info = {} max_column_list = [] sheets_list = summary.get('sheet', []) special_nhzs_max_col = 0 for sheet in sheets_list: ws = self.get_sheet_by_name(sheet) if classify == consts.NYZS_CLASSIFY: special_nhzs_max_col += ws.max_column self.header_collect(ws, sheet_header_info, header_info, max_column_list, classify) # 农业银行整数表头特殊处理 if classify == consts.NYZS_CLASSIFY and round(special_nhzs_max_col / len(sheets_list)) == 5: special_nhzs = True statistics_header_info, max_find_count = self.header_statistics( sheet_header_info, header_info, classify, special_nhzs) max_column = max(max_column_list) # 1.2.按月份分割 min_row 正文第一行 date_col 日期行 start_date = summary.get('start_date') end_date = summary.get('end_date') date_statistics = True if start_date is None or end_date is None else False # 用于判断是否需要收集各表中日期 date_list = [] # 用于收集各表中日期 month_mapping = {} # 用于创建月份表 reverse_trend_list = [] # 用于判断倒序与正序 for sheet in sheets_list: ws = self.get_sheet_by_name(sheet) date_col, min_row = self.get_data_col_min_row(sheet, sheet_header_info, header_info, classify) self.sheet_split(ws, date_col, min_row, month_mapping, reverse_trend_list, date_list, date_statistics) if date_statistics is True and len(date_list) > 1: start_date = min(date_list) if start_date is None else start_date end_date = max(date_list) if end_date is None else end_date # 2.元信息提取表 confidence = self.get_confidence(max_find_count) ms = self.build_meta_sheet(new_card, confidence, summary.get('code'), summary.get('print_time'), start_date, end_date) # 3.创建月份表、提取/高亮关键行 # 倒序处理 is_reverse = True if sum(reverse_trend_list) > 0 else False for month_list in month_mapping.values(): month_list.sort(key=lambda x: x[-1], reverse=is_reverse) self.build_month_sheet(ms, new_card, month_mapping, is_reverse, statistics_header_info, max_column, classify) # 4.删除原表 for sheet in sheets_list: self.remove(self.get_sheet_by_name(sheet)) def license_rebuild(self, license_summary, document_scheme, count_list): for classify, (_, name, field_order, side_diff, scheme_diff, field_str) in consts.LICENSE_ORDER: license_list = license_summary.get(classify) if not license_list: continue count = 0 ws = self.create_sheet(name) if scheme_diff and document_scheme == consts.DOC_SCHEME_LIST[1]: classify = consts.MVC_CLASSIFY_SE for license_dict in license_list: if side_diff: key, field_order_yes, field_order_no = consts.FIELD_ORDER_MAP.get(classify) field_order = field_order_yes if key in license_dict else field_order_no for search_field, write_field in field_order: field_value = license_dict.get(search_field, '') if isinstance(field_value, list): ws.append((write_field, *field_value)) else: ws.append((write_field, field_value)) ws.append((None, )) count += 1 if field_str is not None: count_list.append((field_str, count)) def simple_license_rebuild(self, license_summary, document_scheme): # for ic_license_dict in license_summary.get(consts.IC_CLASSIFY, []): # if ic_license_dict.get('类别') == '1': # license_summary.setdefault(consts.RP_CLASSIFY, []).append(ic_license_dict) # continue # # for vat_license_dict in license_summary.get(consts.VAT_CLASSIFY, []): # if vat_license_dict.get('发票类型') == 'special': # license_summary.setdefault(consts.VATS_CLASSIFY, []).append(vat_license_dict) # continue for classify, (_, name, field_order, side_diff, scheme_diff, _) in consts.FOLDER_LICENSE_ORDER: license_list = license_summary.get(classify) if not license_list: continue # if scheme_diff and document_scheme == consts.DOC_SCHEME_LIST[1]: # classify = consts.MVC_CLASSIFY_SE for license_dict in license_list: if classify == consts.IC_CLASSIFY and license_dict.get('类别') == '1': # 居住证处理 license_summary.setdefault(consts.RP_CLASSIFY, []).append(license_dict) continue if classify == consts.VAT_CLASSIFY and license_dict.get('发票类型') == 'special': license_summary.setdefault(consts.VATS_CLASSIFY, []).append(license_dict) continue if name in self.sheetnames: ws = self.get_sheet_by_name(name) else: ws = self.create_sheet(name) if side_diff: key, field_order_yes, field_order_no = consts.FIELD_ORDER_MAP.get(classify) field_order = field_order_yes if key in license_dict else field_order_no for search_field, write_field in field_order: field_value = license_dict.get(search_field, '') if isinstance(field_value, list): ws.append((write_field, *field_value)) else: ws.append((write_field, field_value)) ws.append((None, )) def res_sheet(self, res_list): if res_list: res_list.sort(key=lambda x: (x[0], x[1], x[2])) ws = self.create_sheet(consts.RES_SHEET_NAME) ws.append(consts.RES_SHEET_HEADER) for res_tuple in res_list: ws.append(res_tuple) def remove_base_sheet(self): if len(self.sheetnames) > 1: self.remove(self.get_sheet_by_name('Sheet')) def rebuild(self, bs_summary, license_summary, res_list, document_scheme): count_list = [(consts.MODEL_FIELD_BS, len(bs_summary))] if document_scheme == consts.DOC_SCHEME_LIST[1]: self.license_rebuild(license_summary, document_scheme, count_list) self.bs_rebuild(bs_summary) else: self.bs_rebuild(bs_summary) self.license_rebuild(license_summary, document_scheme, count_list) self.res_sheet(res_list) self.remove_base_sheet() return count_list