wb.py
12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import locale
import numpy as np
from pandas._libs import tslib
from pandas._libs.tslibs.nattype import NaTType
from pandas.core.indexes.datetimes import DatetimeIndex
from openpyxl import Workbook
from openpyxl.styles import Border, Side, PatternFill, numbers
from openpyxl.utils import get_column_letter
from apps.doc import consts
class BSWorkbook(Workbook):
def __init__(self, interest_keyword, salary_keyword, loan_keyword, *args, **kwargs):
super().__init__(*args, **kwargs)
self.meta_sheet_title = '关键信息提取和展示'
self.blank_row = (None,)
self.code_header = ('页数', '电子回单验证码')
self.date_header = ('打印时间', '起始日期', '终止日期', '流水区间结果')
self.keyword_header = ('关键词', '记账日期', '金额')
self.interest_keyword = interest_keyword
self.salary_keyword = salary_keyword
self.loan_keyword = loan_keyword
self.proof_res = ('对', '错')
self.loan_fill = PatternFill("solid", fgColor="00FFCC00")
self.amount_fill = PatternFill("solid", fgColor="00FFFF00")
self.bd = Side(style='thin', color="000000")
self.border = Border(left=self.bd, top=self.bd, right=self.bd, bottom=self.bd)
self.MAX_MEAN = 31
@staticmethod
def sheet_prune(ws):
ws.insert_cols(1, amount=consts.FIXED_COL_AMOUNT)
for col in range(consts.FIXED_COL_AMOUNT + 1, ws.max_column + 1):
header_value = ws.cell(1, col).value
header_idx = consts.HEADERS_MAPPING.get(header_value)
# TODO 关键字段再次查找
# TODO 支付宝、微信流水第一行非表头,怎么处理
if header_idx is None:
continue
letter = get_column_letter(col)
ws.move_range("{0}1:{0}{1}".format(letter, ws.max_row), cols=header_idx - col)
ws.delete_cols(consts.FIXED_COL_AMOUNT + 1, amount=ws.max_column)
@staticmethod
def month_split(dti, date_list):
month_list = []
idx_list = []
month_pre = None
for idx, month_str in enumerate(dti.strftime('%Y-%m')):
if isinstance(month_str, float):
continue
if month_str != month_pre:
month_list.append(month_str)
if month_pre is None:
date_list.append(dti[idx].date())
idx = 0
idx_list.append(idx)
month_pre = month_str
for idx in range(len(dti)-1, -1, -1):
if isinstance(dti[idx], NaTType):
continue
date_list.append(dti[idx].date())
break
return month_list, idx_list
@staticmethod
def get_reverse_trend(day_idx, idx_list):
reverse_trend = 0
pre_day = None
for idx, day in enumerate(day_idx):
if np.isnan(day):
continue
if idx in idx_list or pre_day is None:
pre_day = day
continue
if day < pre_day:
reverse_trend += 1
pre_day = day
elif day > pre_day:
reverse_trend -= 1
pre_day = day
if reverse_trend > 0:
reverse_trend = 1
elif reverse_trend < 0:
reverse_trend = -1
return reverse_trend
def sheet_split(self, ws, month_mapping, date_list, reverse_trend_list):
for date_tuple_src in ws.iter_cols(min_col=1, max_col=1, min_row=2, values_only=True):
date_tuple = [date[:10] if isinstance(date, str) else date for date in date_tuple_src]
dt_array, tz_parsed = tslib.array_to_datetime(
np.array(date_tuple, copy=False, dtype=np.object_),
errors="coerce",
utc=False,
dayfirst=False,
yearfirst=False,
require_iso8601=False,
)
dti = DatetimeIndex(dt_array, tz=None, name=None)
month_list, idx_list = self.month_split(dti, date_list)
if len(month_list) == 0:
# month_info process
month_info = month_mapping.setdefault('xxxx-xx', [])
month_info.append((ws.title, 2, ws.max_row, 0))
elif len(month_list) == 1:
# reverse_trend_list process
reverse_trend = self.get_reverse_trend(dti.day, idx_list)
reverse_trend_list.append(reverse_trend)
# month_info process
month_info = month_mapping.setdefault(month_list[0], [])
day_mean = np.mean(dti.day.dropna())
if len(month_info) == 0:
month_info.append((ws.title, 2, ws.max_row, day_mean))
else:
for i, item in enumerate(month_info):
if day_mean <= item[-1]:
month_info.insert(i, (ws.title, 2, ws.max_row, day_mean))
break
else:
month_info.append((ws.title, 2, ws.max_row, day_mean))
else:
# reverse_trend_list process
reverse_trend = self.get_reverse_trend(dti.day, idx_list)
reverse_trend_list.append(reverse_trend)
# month_info process
for i, item in enumerate(month_list[:-1]):
month_mapping.setdefault(item, []).append(
(ws.title, idx_list[i] + 2, idx_list[i + 1] + 1, self.MAX_MEAN))
month_mapping.setdefault(month_list[-1], []).insert(
0, (ws.title, idx_list[-1] + 2, ws.max_row, 0))
def build_metadata_rows(self, confidence_max, code_list, print_time, start_date, end_date, date_interval):
metadata_rows = [('流水识别置信度', confidence_max), self.blank_row, self.code_header]
metadata_rows.extend(code_list)
metadata_rows.extend(
[self.blank_row,
self.date_header,
(print_time, start_date, end_date, date_interval),
self.blank_row,
self.keyword_header]
)
return metadata_rows
def create_meta_sheet(self, role):
if self.worksheets[0].title == 'Sheet':
ms = self.worksheets[0]
ms.title = '{0}({1})'.format(self.meta_sheet_title, role)
else:
ms = self.create_sheet('{0}({1})'.format(self.meta_sheet_title, role))
return ms
def build_meta_sheet(self, role, confidence_max, code_list, print_time, start_date, end_date, date_interval):
metadata_rows = self.build_metadata_rows(confidence_max, code_list, print_time,
start_date, end_date, date_interval)
ms = self.create_meta_sheet(role)
for row in metadata_rows:
ms.append(row)
return ms
def build_month_sheet(self, role, month_mapping, ms, is_reverse):
tmp_ws = self.create_sheet('tmp_ws')
for month in sorted(month_mapping.keys()):
# 3.1.拷贝数据
parts = month_mapping.get(month)
new_ws = self.create_sheet('{0}({1})'.format(month, role))
new_ws.append(consts.FIXED_HEADERS)
for part in parts:
ws = self.get_sheet_by_name(part[0])
for row in ws.iter_rows(min_row=part[1], max_row=part[2], values_only=True):
new_ws.append(row)
# 3.2.提取信息、高亮
amount_mapping = {}
amount_fill_row = set()
for rows in new_ws.iter_rows():
summary_cell = rows[5]
date_cell = rows[0]
# 关键词1提取
if summary_cell.value in self.interest_keyword:
ms.append((summary_cell.value, date_cell.value, rows[2].value))
# 关键词2提取至临时表
elif summary_cell.value in self.salary_keyword:
tmp_ws.append((summary_cell.value, date_cell.value, rows[2].value))
# 贷款关键词高亮
elif summary_cell.value in self.loan_keyword:
summary_cell.fill = self.loan_fill
for i, cell in enumerate(rows):
cell.border = self.border
if (i == 2 or i == 3) and cell.row > 1:
try:
# 3.3.金额、余额转数值
cell.value = locale.atof(cell.value)
except Exception:
continue
else:
cell.number_format = numbers.FORMAT_NUMBER_COMMA_SEPARATED1
if i == 2:
same_amount_mapping = amount_mapping.get(date_cell.value, {})
fill_rows = same_amount_mapping.get(-cell.value)
if fill_rows:
amount_fill_row.add(cell.row)
amount_fill_row.update(fill_rows)
amount_mapping.setdefault(date_cell.value, {}).setdefault(
cell.value, []).append(cell.row)
# 3.4.核对结果
# TODO 借贷、开支类型银行流水,需要手动添加+-号
if i == 9 and cell.row > 2:
if is_reverse:
cell.value = '=IF(D{0}=SUM(D{1},C{0}), "{2}", "{3}")'.format(
cell.row - 1, cell.row, *self.proof_res)
else:
cell.value = '=IF(D{0}=SUM(D{1},C{0}), "{2}", "{3}")'.format(
cell.row, cell.row - 1, *self.proof_res)
# 3.5.同一天相同进出账高亮
del amount_mapping
for row in amount_fill_row:
new_ws[row][2].fill = self.amount_fill
# 关键词2信息提取
ms.append(self.blank_row)
ms.append(self.keyword_header)
for row in tmp_ws.iter_rows(values_only=True):
ms.append(row)
self.remove(tmp_ws)
def rebuild(self, role_summary):
# (sheet_name, confidence, page, code, print_time, start_date, end_date) # TODO 表名简化,+卡号
for role, summary_list in role_summary.items():
# 1.原表修剪、排列、按照月份分割
reverse_trend_list = []
confidence_max = 0
code_list = []
month_mapping = {}
date_list = []
start_date = end_date = date_interval = print_time = None
for summary in summary_list:
sheet_name, confidence, page, code, print_time_local, start_date_local, end_date_local = summary
ws = self.get_sheet_by_name(sheet_name)
# 1.1.删除多余列、排列
self.sheet_prune(ws)
# 1.2.按月份分割
self.sheet_split(ws, month_mapping, date_list, reverse_trend_list)
# 1.3.元数据处理 TODO 时间与日期处理
confidence_max = max(confidence, confidence_max)
if code is not None:
code_list.append((page, code))
if len(date_list) > 1:
start_date = min(date_list)
end_date = max(date_list)
date_interval = (end_date - start_date).days
# 2.元信息提取表
ms = self.build_meta_sheet(role, confidence_max, code_list, print_time, start_date, end_date, date_interval)
# 3.创建月份表、提取/高亮关键行
is_reverse = False
if sum(reverse_trend_list) > 0: # 倒序处理
is_reverse = True
for month_list in month_mapping.values():
month_list.sort(key=lambda x: x[-1], reverse=True)
self.build_month_sheet(role, month_mapping, ms, is_reverse)
# 删除原表
for summary in summary_list:
self.remove(self.get_sheet_by_name(summary[0]))