wb.py 8.76 KB
import numpy as np
import locale
from pandas._libs import tslib
from pandas.core.indexes.datetimes import DatetimeIndex
from openpyxl import Workbook
from openpyxl.styles import Border, Side, PatternFill, numbers
from openpyxl.utils import get_column_letter


class BSWorkbook(Workbook):

    def __init__(self, interest_keyword, salary_keyword, loan_keyword, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.fixed_headers = ('记账日期', '记账时间', '金额', '余额', '交易名称', '附言', '对方账户名',
                              '对方卡号/账号', '对方开户行', '核对结果')
        self.fixed_col_amount = len(self.fixed_headers)
        self.headers_mapping = {
            '记账日期': 1,
            '交易日期': 1,
            '记账时间': 2,
            '金额': 3,
            '交易金额': 3,
            '余额': 4,
            '账户余额': 4,
            '交易名称': 5,
            '附言': 6,
            '摘要': 6,
            '对方账户名': 7,
            '对方卡号/账号': 8,
            '对方账号与户名': 8,
            '对方开户行': 9,
        }
        self.meta_sheet_title = '关键信息提取和展示'
        self.blank_row = (None,)
        self.code_header = ('页数', '电子回单验证码')
        self.date_header = ('打印时间', '起始日期', '终止日期', '流水区间结果')
        self.keyword_header = ('关键词', '记账日期', '金额')
        self.interest_keyword = interest_keyword
        self.salary_keyword = salary_keyword
        self.loan_keyword = loan_keyword
        self.proof_res = ('对', '错')
        self.loan_fill = PatternFill("solid", fgColor="00FFCC00")
        self.amount_fill = PatternFill("solid", fgColor="00FFFF00")
        self.bd = Side(style='thin', color="000000")
        self.border = Border(left=self.bd, top=self.bd, right=self.bd, bottom=self.bd)

    def sheet_prune(self, ws):
        ws.insert_cols(1, amount=self.fixed_col_amount)
        for col in range(self.fixed_col_amount + 1, ws.max_column + 1):
            header_value = ws.cell(1, col).value
            header_idx = self.headers_mapping.get(header_value)
            # TODO 关键字段再次查找
            if header_idx is None:
                continue
            letter = get_column_letter(header_idx)
            ws.move_range("{0}1:{0}{1}".format(letter, ws.max_row), cols=header_idx - col)
        ws.delete_cols(self.fixed_col_amount + 1, amount=ws.max_column)

    def sheet_split(self, ws, month_mapping):
        for date_tuple in ws.iter_cols(min_col=1, max_col=1, min_row=2, values_only=True):
            dt_array, tz_parsed = tslib.array_to_datetime(
                np.array(date_tuple, copy=False, dtype=np.object_),
                errors="coerce",
                utc=False,
                dayfirst=False,
                yearfirst=False,
                require_iso8601=False,
            )
            dti = DatetimeIndex(dt_array, tz=None, name=None)

    def build_metadata_rows(self, confidence_max, code_list, print_time, start_date, end_date, date_interval):
        metadata_rows = [('流水识别置信度', confidence_max), self.blank_row, self.code_header]
        metadata_rows.extend(code_list)
        metadata_rows.extend(
            [self.blank_row,
             self.date_header,
             (print_time, start_date, end_date, date_interval),
             self.blank_row,
             self.keyword_header]
        )
        return metadata_rows

    def create_meta_sheet(self, role):
        if self.worksheets[0].title == 'Sheet':
            ms = self.worksheets[0]
            ms.title = '{0}({1})'.format(self.meta_sheet_title, role)
        else:
            ms = self.create_sheet('{0}({1})'.format(self.meta_sheet_title, role))
        return ms

    def build_meta_sheet(self, role, confidence_max, code_list, print_time, start_date, end_date, date_interval):
        metadata_rows = self.build_metadata_rows(confidence_max, code_list, print_time,
                                                 start_date, end_date, date_interval)
        ms = self.create_meta_sheet(role)
        for row in metadata_rows:
            ms.append(row)
        return ms

    def build_month_sheet(self, role, month_mapping, ms):
        tmp_ws = self.create_sheet('tmp_ws')
        for month, parts in month_mapping.items():
            # 3.1.拷贝数据
            new_ws = self.create_sheet('{0}({1})'.format(month, role))
            new_ws.append(self.fixed_headers)
            for part in parts:
                ws = self.get_sheet_by_name(part[0])
                for row in ws.iter_rows(min_row=part[1], max_row=part[2], values_only=True):
                    new_ws.append(row)
            # 3.2.提取信息、高亮
            amount_mapping = {}
            amount_fill_row = set()
            for rows in new_ws.iter_rows():
                is_fill = False
                summary_cell = rows[5]
                date_cell = rows[0]
                # 关键词1提取
                if summary_cell.value in self.interest_keyword:
                    ms.append((summary_cell.value, date_cell.value, rows[2].value))
                # 关键词2提取至临时表
                elif summary_cell.value in self.salary_keyword:
                    tmp_ws.append((summary_cell.value, date_cell.value, rows[2].value))
                # 贷款关键词高亮
                elif summary_cell.value in self.loan_keyword:
                    is_fill = True
                for i, cell in enumerate(rows):
                    cell.border = self.border
                    if is_fill:
                        cell.fill = self.loan_fill
                    if (i == 2 or i == 3) and cell.row > 1:
                        try:
                            # 3.3.金额、余额转数值
                            cell.value = locale.atof(cell.value)
                        except Exception:
                            continue
                        else:
                            cell.number_format = numbers.FORMAT_NUMBER_COMMA_SEPARATED1
                            if i == 2:
                                same_amount_mapping = amount_mapping.get(date_cell.value, {})
                                fill_rows = same_amount_mapping.get(-cell.value)
                                if fill_rows:
                                    amount_fill_row.add(cell.row)
                                    amount_fill_row.update(fill_rows)
                                amount_mapping.setdefault(date_cell.value, {}).setdefault(
                                    cell.value, []).append(cell.row)
                    # 3.4.核对结果
                    # TODO 借贷、开支类型银行流水,需要手动添加+-号
                    # TODO 倒序流水需要改变公式
                    if i == 9 and cell.row > 2:
                        cell.value = '=IF(D{0}=SUM(D{1},C{0}), "{2}", "{3}")'.format(cell.row, cell.row - 1,
                                                                                     *self.proof_res)

            # 3.5.同一天相同进出账高亮
            del amount_mapping
            for row in amount_fill_row:
                for cell in new_ws[row]:
                    cell.fill = self.amount_fill

        # 关键词2信息提取
        ms.append(self.blank_row)
        ms.append(self.keyword_header)
        for row in tmp_ws.iter_rows(values_only=True):
            ms.append(row)
        self.remove(tmp_ws)

    def rebuild(self, role_summary):
        # (sheet_name, confidence, page, code, print_time, start_date, end_date)
        for role, summary_list in role_summary.items():
            # 1.原表修剪、排列、按照月份分割
            confidence_max = 0
            code_list = []
            month_mapping = {}
            print_time = start_date = end_date = date_interval = None
            for summary in summary_list:
                sheet_name, confidence, page, code, print_time, start_date, end_date = summary
                ws = self.get_sheet_by_name(sheet_name)
                # 1.1.删除多余列、排列
                self.sheet_prune(ws)
                # 1.2.TODO 按月份分割
                self.sheet_split(ws, month_mapping)
                # 1.3.元数据处理 TODO 时间与日期处理
                # confidence_max = max(confidence, confidence_max)
                # if code is not None:
                #     code_list.append((page, code))

            # 2.元信息提取表
            ms = self.build_meta_sheet(role, confidence_max, code_list, print_time, start_date, end_date, date_interval)

            # 3.创建月份表、提取/高亮关键行
            self.build_month_sheet(role, month_mapping, ms)

            # 删除原表
            for summary in summary_list:
                self.remove(self.get_sheet_by_name(summary[0]))