wb.py 10.4 KB
import locale
import numpy as np
from pandas._libs import tslib
from pandas._libs.tslibs.nattype import NaTType
from pandas.core.indexes.datetimes import DatetimeIndex
from openpyxl import Workbook
from openpyxl.styles import Border, Side, PatternFill, numbers
from openpyxl.utils import get_column_letter
from apps.doc import consts


class BSWorkbook(Workbook):

    def __init__(self, interest_keyword, salary_keyword, loan_keyword, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.meta_sheet_title = '关键信息提取和展示'
        self.blank_row = (None,)
        self.code_header = ('页数', '电子回单验证码')
        self.date_header = ('打印时间', '起始日期', '终止日期', '流水区间结果')
        self.keyword_header = ('关键词', '记账日期', '金额')
        self.interest_keyword = interest_keyword
        self.salary_keyword = salary_keyword
        self.loan_keyword = loan_keyword
        self.proof_res = ('对', '错')
        self.loan_fill = PatternFill("solid", fgColor="00FFCC00")
        self.amount_fill = PatternFill("solid", fgColor="00FFFF00")
        self.bd = Side(style='thin', color="000000")
        self.border = Border(left=self.bd, top=self.bd, right=self.bd, bottom=self.bd)
        self.MAX_MEAN = 31

    def sheet_prune(self, ws):
        ws.insert_cols(1, amount=consts.FIXED_COL_AMOUNT)
        for col in range(consts.FIXED_COL_AMOUNT + 1, ws.max_column + 1):
            header_value = ws.cell(1, col).value
            header_idx = consts.HEADERS_MAPPING.get(header_value)
            # TODO 关键字段再次查找
            if header_idx is None:
                continue
            letter = get_column_letter(col)
            ws.move_range("{0}1:{0}{1}".format(letter, ws.max_row), cols=header_idx - col)
        ws.delete_cols(consts.FIXED_COL_AMOUNT + 1, amount=ws.max_column)

    @staticmethod
    def month_split(dti, date_list):
        month_list = []
        idx_list = []
        month_pre = None
        for idx, month_str in enumerate(dti.strftime('%Y-%m')):
            if isinstance(month_str, float):
                continue
            if month_str != month_pre:
                month_list.append(month_str)
                if month_pre is None:
                    date_list.append(dti[idx].date())
                    idx = 0
                idx_list.append(idx)
                month_pre = month_str
        for idx in range(len(dti)-1, -1, -1):
            if isinstance(dti[idx], NaTType):
                continue
            date_list.append(dti[idx].date())
            break
        return month_list, idx_list

    def sheet_split(self, ws, month_mapping, date_list):
        for date_tuple in ws.iter_cols(min_col=1, max_col=1, min_row=2, values_only=True):
            dt_array, tz_parsed = tslib.array_to_datetime(
                np.array(date_tuple, copy=False, dtype=np.object_),
                errors="coerce",
                utc=False,
                dayfirst=False,
                yearfirst=False,
                require_iso8601=False,
            )
            dti = DatetimeIndex(dt_array, tz=None, name=None)

            month_list, idx_list = self.month_split(dti, date_list)

            if len(month_list) == 0:
                month_info = month_mapping.setdefault('xxxx-xx', [])
                month_info.append((ws.title, 2, ws.max_row, 0))
            elif len(month_list) == 1:
                month_info = month_mapping.setdefault(month_list[0], [])
                day_mean = np.mean(dti.day.dropna())
                if len(month_info) == 0:
                    month_info.append((ws.title, 2, ws.max_row, day_mean))
                else:
                    for i, item in enumerate(month_info):
                        # TODO 倒序处理
                        if day_mean <= item[-1]:
                            month_info.insert(i, (ws.title, 2, ws.max_row, day_mean))
                            break
                    else:
                        month_info.append((ws.title, 2, ws.max_row, day_mean))
            else:
                for i, item in enumerate(month_list[:-1]):
                    month_mapping.setdefault(item, []).append(
                        (ws.title, idx_list[i] + 2, idx_list[i + 1] + 1, self.MAX_MEAN))
                month_mapping.setdefault(month_list[-1], []).insert(
                    0, (ws.title, idx_list[-1] + 2, ws.max_row, 0))

    def build_metadata_rows(self, confidence_max, code_list, print_time, start_date, end_date, date_interval):
        metadata_rows = [('流水识别置信度', confidence_max), self.blank_row, self.code_header]
        metadata_rows.extend(code_list)
        metadata_rows.extend(
            [self.blank_row,
             self.date_header,
             (print_time, start_date, end_date, date_interval),
             self.blank_row,
             self.keyword_header]
        )
        return metadata_rows

    def create_meta_sheet(self, role):
        if self.worksheets[0].title == 'Sheet':
            ms = self.worksheets[0]
            ms.title = '{0}({1})'.format(self.meta_sheet_title, role)
        else:
            ms = self.create_sheet('{0}({1})'.format(self.meta_sheet_title, role))
        return ms

    def build_meta_sheet(self, role, confidence_max, code_list, print_time, start_date, end_date, date_interval):
        metadata_rows = self.build_metadata_rows(confidence_max, code_list, print_time,
                                                 start_date, end_date, date_interval)
        ms = self.create_meta_sheet(role)
        for row in metadata_rows:
            ms.append(row)
        return ms

    def build_month_sheet(self, role, month_mapping, ms):
        tmp_ws = self.create_sheet('tmp_ws')
        for month in sorted(month_mapping.keys()):
            # 3.1.拷贝数据
            parts = month_mapping.get(month)
            new_ws = self.create_sheet('{0}({1})'.format(month, role))
            new_ws.append(consts.FIXED_HEADERS)
            for part in parts:
                ws = self.get_sheet_by_name(part[0])
                for row in ws.iter_rows(min_row=part[1], max_row=part[2], values_only=True):
                    new_ws.append(row)
            # 3.2.提取信息、高亮
            amount_mapping = {}
            amount_fill_row = set()
            for rows in new_ws.iter_rows():
                is_fill = False
                summary_cell = rows[5]
                date_cell = rows[0]
                # 关键词1提取
                if summary_cell.value in self.interest_keyword:
                    ms.append((summary_cell.value, date_cell.value, rows[2].value))
                # 关键词2提取至临时表
                elif summary_cell.value in self.salary_keyword:
                    tmp_ws.append((summary_cell.value, date_cell.value, rows[2].value))
                # 贷款关键词高亮
                elif summary_cell.value in self.loan_keyword:
                    is_fill = True
                for i, cell in enumerate(rows):
                    cell.border = self.border
                    if is_fill:
                        cell.fill = self.loan_fill
                    if (i == 2 or i == 3) and cell.row > 1:
                        try:
                            # 3.3.金额、余额转数值
                            cell.value = locale.atof(cell.value)
                        except Exception:
                            continue
                        else:
                            cell.number_format = numbers.FORMAT_NUMBER_COMMA_SEPARATED1
                            if i == 2:
                                same_amount_mapping = amount_mapping.get(date_cell.value, {})
                                fill_rows = same_amount_mapping.get(-cell.value)
                                if fill_rows:
                                    amount_fill_row.add(cell.row)
                                    amount_fill_row.update(fill_rows)
                                amount_mapping.setdefault(date_cell.value, {}).setdefault(
                                    cell.value, []).append(cell.row)
                    # 3.4.核对结果
                    # TODO 借贷、开支类型银行流水,需要手动添加+-号
                    # TODO 倒序流水需要改变公式
                    if i == 9 and cell.row > 2:
                        cell.value = '=IF(D{0}=SUM(D{1},C{0}), "{2}", "{3}")'.format(cell.row, cell.row - 1,
                                                                                     *self.proof_res)

            # 3.5.同一天相同进出账高亮
            del amount_mapping
            for row in amount_fill_row:
                for cell in new_ws[row]:
                    cell.fill = self.amount_fill

        # 关键词2信息提取
        ms.append(self.blank_row)
        ms.append(self.keyword_header)
        for row in tmp_ws.iter_rows(values_only=True):
            ms.append(row)
        self.remove(tmp_ws)

    def rebuild(self, role_summary):
        # (sheet_name, confidence, page, code, print_time, start_date, end_date)
        for role, summary_list in role_summary.items():
            # 1.原表修剪、排列、按照月份分割
            confidence_max = 0
            code_list = []
            month_mapping = {}
            date_list = []
            start_date = end_date = date_interval = print_time = None
            for summary in summary_list:
                sheet_name, confidence, page, code, print_time_local, start_date_local, end_date_local = summary
                ws = self.get_sheet_by_name(sheet_name)
                # 1.1.删除多余列、排列
                self.sheet_prune(ws)
                # 1.2.按月份分割
                self.sheet_split(ws, month_mapping, date_list)
                # 1.3.元数据处理 TODO 时间与日期处理
                confidence_max = max(confidence, confidence_max)
                if code is not None:
                    code_list.append((page, code))

            if len(date_list) > 1:
                start_date = min(date_list)
                end_date = max(date_list)
                date_interval = (end_date - start_date).days
            # 2.元信息提取表
            ms = self.build_meta_sheet(role, confidence_max, code_list, print_time, start_date, end_date, date_interval)

            # 3.创建月份表、提取/高亮关键行
            self.build_month_sheet(role, month_mapping, ms)

            # 删除原表
            for summary in summary_list:
                self.remove(self.get_sheet_by_name(summary[0]))