wb.py
11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import locale
import numpy as np
from pandas._libs import tslib
from pandas._libs.tslibs.nattype import NaTType
from pandas.core.indexes.datetimes import DatetimeIndex
from openpyxl import Workbook
from openpyxl.styles import Border, Side, PatternFill, numbers
from openpyxl.utils import get_column_letter
class BSWorkbook(Workbook):
def __init__(self, interest_keyword, salary_keyword, loan_keyword, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fixed_headers = ('记账日期', '记账时间', '金额', '余额', '交易名称', '附言', '对方账户名',
'对方卡号/账号', '对方开户行', '核对结果')
self.fixed_col_amount = len(self.fixed_headers)
self.headers_mapping = {
'记账日期': 1,
'交易日期': 1,
'记账时间': 2,
'金额': 3,
'交易金额': 3,
'余额': 4,
'账户余额': 4,
'交易名称': 5,
'附言': 6,
'摘要': 6,
'对方账户名': 7,
'对方卡号/账号': 8,
'对方账号与户名': 8,
'对方开户行': 9,
}
self.meta_sheet_title = '关键信息提取和展示'
self.blank_row = (None,)
self.code_header = ('页数', '电子回单验证码')
self.date_header = ('打印时间', '起始日期', '终止日期', '流水区间结果')
self.keyword_header = ('关键词', '记账日期', '金额')
self.interest_keyword = interest_keyword
self.salary_keyword = salary_keyword
self.loan_keyword = loan_keyword
self.proof_res = ('对', '错')
self.loan_fill = PatternFill("solid", fgColor="00FFCC00")
self.amount_fill = PatternFill("solid", fgColor="00FFFF00")
self.bd = Side(style='thin', color="000000")
self.border = Border(left=self.bd, top=self.bd, right=self.bd, bottom=self.bd)
self.MAX_MEAN = 31
def sheet_prune(self, ws):
ws.insert_cols(1, amount=self.fixed_col_amount)
for col in range(self.fixed_col_amount + 1, ws.max_column + 1):
header_value = ws.cell(1, col).value
header_idx = self.headers_mapping.get(header_value)
# TODO 关键字段再次查找
if header_idx is None:
continue
letter = get_column_letter(header_idx)
ws.move_range("{0}1:{0}{1}".format(letter, ws.max_row), cols=header_idx - col)
ws.delete_cols(self.fixed_col_amount + 1, amount=ws.max_column)
@staticmethod
def month_split(dti, date_list):
month_list = []
idx_list = []
month_pre = None
for idx, month_str in enumerate(dti.strftime('%Y-%m')):
if isinstance(month_str, float):
continue
if month_str != month_pre:
month_list.append(month_str)
if month_pre is None:
date_list.append(dti[idx].date())
idx = 0
idx_list.append(idx)
month_pre = month_str
for idx in range(len(dti)-1, -1, -1):
if isinstance(dti[idx], NaTType):
continue
date_list.append(dti[idx].date())
break
return month_list, idx_list
def sheet_split(self, ws, month_mapping, date_list):
for date_tuple in ws.iter_cols(min_col=1, max_col=1, min_row=2, values_only=True):
dt_array, tz_parsed = tslib.array_to_datetime(
np.array(date_tuple, copy=False, dtype=np.object_),
errors="coerce",
utc=False,
dayfirst=False,
yearfirst=False,
require_iso8601=False,
)
dti = DatetimeIndex(dt_array, tz=None, name=None)
month_list, idx_list = self.month_split(dti, date_list)
if len(month_list) == 0:
month_info = month_mapping.setdefault('xxxx-xx', [])
month_info.append((ws.title, 2, ws.max_row, 0))
elif len(month_list) == 1:
month_info = month_mapping.setdefault(month_list[0], [])
day_mean = np.mean(dti.day.dropna())
if len(month_info) == 0:
month_info.append((ws.title, 2, ws.max_row, day_mean))
else:
for i, item in enumerate(month_info):
# TODO 倒序处理
if day_mean <= item[-1]:
month_info.insert(i, (ws.title, 2, ws.max_row, day_mean))
break
else:
month_info.append((ws.title, 2, ws.max_row, day_mean))
else:
for i, item in enumerate(month_list[:-1]):
month_mapping.setdefault(item, []).append(
(ws.title, idx_list[i] + 2, idx_list[i + 1] + 1, self.MAX_MEAN))
month_mapping.setdefault(month_list[-1], []).insert(
0, (ws.title, idx_list[-1] + 2, ws.max_row, 0))
def build_metadata_rows(self, confidence_max, code_list, print_time, start_date, end_date, date_interval):
metadata_rows = [('流水识别置信度', confidence_max), self.blank_row, self.code_header]
metadata_rows.extend(code_list)
metadata_rows.extend(
[self.blank_row,
self.date_header,
(print_time, start_date, end_date, date_interval),
self.blank_row,
self.keyword_header]
)
return metadata_rows
def create_meta_sheet(self, role):
if self.worksheets[0].title == 'Sheet':
ms = self.worksheets[0]
ms.title = '{0}({1})'.format(self.meta_sheet_title, role)
else:
ms = self.create_sheet('{0}({1})'.format(self.meta_sheet_title, role))
return ms
def build_meta_sheet(self, role, confidence_max, code_list, print_time, start_date, end_date, date_interval):
metadata_rows = self.build_metadata_rows(confidence_max, code_list, print_time,
start_date, end_date, date_interval)
ms = self.create_meta_sheet(role)
for row in metadata_rows:
ms.append(row)
return ms
def build_month_sheet(self, role, month_mapping, ms):
tmp_ws = self.create_sheet('tmp_ws')
for month, parts in month_mapping.items():
# 3.1.拷贝数据
new_ws = self.create_sheet('{0}({1})'.format(month, role))
new_ws.append(self.fixed_headers)
for part in parts:
ws = self.get_sheet_by_name(part[0])
for row in ws.iter_rows(min_row=part[1], max_row=part[2], values_only=True):
new_ws.append(row)
# 3.2.提取信息、高亮
amount_mapping = {}
amount_fill_row = set()
for rows in new_ws.iter_rows():
is_fill = False
summary_cell = rows[5]
date_cell = rows[0]
# 关键词1提取
if summary_cell.value in self.interest_keyword:
ms.append((summary_cell.value, date_cell.value, rows[2].value))
# 关键词2提取至临时表
elif summary_cell.value in self.salary_keyword:
tmp_ws.append((summary_cell.value, date_cell.value, rows[2].value))
# 贷款关键词高亮
elif summary_cell.value in self.loan_keyword:
is_fill = True
for i, cell in enumerate(rows):
cell.border = self.border
if is_fill:
cell.fill = self.loan_fill
if (i == 2 or i == 3) and cell.row > 1:
try:
# 3.3.金额、余额转数值
cell.value = locale.atof(cell.value)
except Exception:
continue
else:
cell.number_format = numbers.FORMAT_NUMBER_COMMA_SEPARATED1
if i == 2:
same_amount_mapping = amount_mapping.get(date_cell.value, {})
fill_rows = same_amount_mapping.get(-cell.value)
if fill_rows:
amount_fill_row.add(cell.row)
amount_fill_row.update(fill_rows)
amount_mapping.setdefault(date_cell.value, {}).setdefault(
cell.value, []).append(cell.row)
# 3.4.核对结果
# TODO 借贷、开支类型银行流水,需要手动添加+-号
# TODO 倒序流水需要改变公式
if i == 9 and cell.row > 2:
cell.value = '=IF(D{0}=SUM(D{1},C{0}), "{2}", "{3}")'.format(cell.row, cell.row - 1,
*self.proof_res)
# 3.5.同一天相同进出账高亮
del amount_mapping
for row in amount_fill_row:
for cell in new_ws[row]:
cell.fill = self.amount_fill
# 关键词2信息提取
ms.append(self.blank_row)
ms.append(self.keyword_header)
for row in tmp_ws.iter_rows(values_only=True):
ms.append(row)
self.remove(tmp_ws)
def rebuild(self, role_summary):
# (sheet_name, confidence, page, code, print_time, start_date, end_date)
for role, summary_list in role_summary.items():
# 1.原表修剪、排列、按照月份分割
confidence_max = 0
code_list = []
month_mapping = {}
date_list = []
start_date = end_date = date_interval = print_time = None
for summary in summary_list:
sheet_name, confidence, page, code, print_time_local, start_date_local, end_date_local = summary
ws = self.get_sheet_by_name(sheet_name)
# 1.1.删除多余列、排列
self.sheet_prune(ws)
# 1.2.按月份分割
self.sheet_split(ws, month_mapping, date_list)
# 1.3.元数据处理 TODO 时间与日期处理
confidence_max = max(confidence, confidence_max)
if code is not None:
code_list.append((page, code))
if len(date_list) > 1:
start_date = min(date_list)
end_date = max(date_list)
date_interval = (end_date - start_date).days
# 2.元信息提取表
ms = self.build_meta_sheet(role, confidence_max, code_list, print_time, start_date, end_date, date_interval)
# 3.创建月份表、提取/高亮关键行
self.build_month_sheet(role, month_mapping, ms)
# 删除原表
for summary in summary_list:
self.remove(self.get_sheet_by_name(summary[0]))