b2945296 by 周伟奇

update pdf to img

1 parent b6896a10
......@@ -5,16 +5,12 @@ import signal
import base64
import asyncio
import aiohttp
import locale
from PIL import Image
from io import BytesIO
from openpyxl import Workbook
from openpyxl.styles import numbers
from openpyxl.utils import get_column_letter
from django.core.management import BaseCommand
from common.mixins import LoggerMixin
from common.tools.file_tools import write_zip_file
from common.tools.pdf_to_img import PDFHandler
from apps.doc.models import DocStatus, HILDoc, AFCDoc
from apps.doc import consts
from settings import conf
......@@ -123,126 +119,6 @@ class Command(BaseCommand, LoggerMixin):
img_name = os.path.basename(img_path)
self.append_sheet(wb, sheets_list, img_name)
def proof(self, ws):
# 找到金额、余额列
amount_col = overage_col = None
for i in ws[1]:
if i.value in consts.AMOUNT_COL_TITLE_SET:
amount_col = i.column
amount_col_letter = get_column_letter(amount_col)
elif i.value in consts.OVERAGE_COL_TITLE_SET:
overage_col = i.column
overage_col_letter = get_column_letter(overage_col)
if amount_col is None or overage_col is None:
return
# 文本转数值
for col_tuple in ws.iter_cols(min_row=2, min_col=amount_col, max_col=overage_col):
for c in col_tuple:
try:
c.value = locale.atof(c.value)
c.number_format = numbers.FORMAT_NUMBER_00
except Exception:
continue
# 增加核对结果列
proof_col_letter = get_column_letter(ws.max_column + 1)
for c in ws[proof_col_letter]:
if c.row == 1:
c.value = consts.PROOF_COL_TITLE
elif c.row == 2:
continue
else:
c.value = '=IF({3}{0}=SUM({2}{0},{3}{1}), "{4}", "{5}")'.format(
c.row, c.row - 1, amount_col_letter, overage_col_letter, *consts.PROOF_RES)
def wb_process(self, wb, excel_path):
locale.setlocale(locale.LC_NUMERIC, 'en_US.UTF-8')
for ws in wb.worksheets:
if ws.title == 'Sheet':
ws.title = consts.META_SHEET_TITLE
else:
self.proof(ws)
wb.save(excel_path) # TODO no sheet (res always [])
@staticmethod
def getimage(pix):
if pix.colorspace.n != 4:
return pix
tpix = fitz.Pixmap(fitz.csRGB, pix)
return tpix
def recoverpix(self, doc, item):
x = item[0] # xref of PDF image
s = item[1] # xref of its /SMask
is_rgb = True if item[5] == 'DeviceRGB' else False
# RGB
if is_rgb:
if s == 0:
return doc.extractImage(x)
# we need to reconstruct the alpha channel with the smask
pix1 = fitz.Pixmap(doc, x)
pix2 = fitz.Pixmap(doc, s) # create pixmap of the /SMask entry
# sanity check
if not (pix1.irect == pix2.irect and pix1.alpha == pix2.alpha == 0 and pix2.n == 1):
pix2 = None
return self.getimage(pix1)
pix = fitz.Pixmap(pix1) # copy of pix1, alpha channel added
pix.setAlpha(pix2.samples) # treat pix2.samples as alpha value
pix1 = pix2 = None # free temp pixmaps
return self.getimage(pix)
# CMYK
pix1 = fitz.Pixmap(doc, x)
pix = fitz.Pixmap(pix1) # copy of pix1, alpha channel added
if s != 0:
pix2 = fitz.Pixmap(doc, s) # create pixmap of the /SMask entry
# sanity check
if not (pix1.irect == pix2.irect and pix1.alpha == pix2.alpha == 0 and pix2.n == 1):
pix2 = None
return self.getimage(pix1)
pix.setAlpha(pix2.samples) # treat pix2.samples as alpha value
pix1 = pix2 = None # free temp pixmaps
pix = fitz.Pixmap(fitz.csRGB, pix) # GRAY/CMYK to RGB
return self.getimage(pix)
@staticmethod
def get_img_data(pix):
if type(pix) is dict: # we got a raw image
ext = pix["ext"]
img_data = pix["image"]
else: # we got a pixmap
ext = 'png'
img_data = pix.getPNGData()
return ext, img_data
@staticmethod
def split_il(il):
img_il_list = []
start = 0
length = len(il)
for i in range(length):
if i == start:
if i == length - 1:
img_il_list.append(il[start: length])
continue
elif i == length - 1:
img_il_list.append(il[start: length])
continue
if il[i][2] != il[i - 1][2]:
img_il_list.append(il[start: i])
start = i
elif il[i][3] != il[i - 1][3]:
img_il_list.append(il[start: i + 1])
start = i + 1
return img_il_list
# TODO 细化文件状态,不同异常状态采取不同的处理
# TODO 调用接口重试
def handle(self, *args, **kwargs):
......@@ -252,98 +128,33 @@ class Command(BaseCommand, LoggerMixin):
while self.switch:
# 1. 从队列获取文件信息
doc, business_type = self.get_doc_info()
try:
# 2. 从EDMS获取PDF文件
doc_data_path, excel_path, pdf_path = self.pdf_download(doc, business_type)
# 队列为空时的处理
if pdf_path is None:
time.sleep(sleep_second)
sleep_second = min(max_sleep_second, sleep_second+5)
continue
sleep_second = int(conf.SLEEP_SECOND)
# 3.PDF文件提取图片
img_save_path = os.path.join(doc_data_path, 'img')
os.makedirs(img_save_path, exist_ok=True)
img_path_list = []
with fitz.Document(pdf_path) as pdf:
self.cronjob_log.info('{0} [pdf_path={1}] [metadata={2}]'.format(
self.log_base, pdf_path, pdf.metadata))
# xref_list = [] # TODO 图片去重 特殊pdf:如电子发票
for pno in range(pdf.pageCount):
il = pdf.getPageImageList(pno)
il.sort(key=lambda x: x[0])
img_il_list = self.split_il(il)
del il
if len(img_il_list) > 3: # 单页无规律小图过多时,使用页面转图片
page = pdf.loadPage(pno)
pm = page.getPixmap(matrix=self.trans, alpha=False)
save_path = os.path.join(img_save_path, 'page_{0}_img_0.png'.format(page.number))
pm.writePNG(save_path)
img_path_list.append(save_path)
self.cronjob_log.info('{0} [page to img success] [pdf_path={1}] [page={2}]'.format(
self.log_base, pdf_path, page.number))
else: # 提取图片
for img_index, img_il in enumerate(img_il_list):
if len(img_il) == 1: # 当只有一张图片时, 简化处理
pix = self.recoverpix(pdf, img_il[0])
ext, img_data = self.get_img_data(pix)
save_path = os.path.join(img_save_path, 'page_{0}_img_{1}.{2}'.format(
pno, img_index, ext))
with open(save_path, "wb") as f:
f.write(img_data)
img_path_list.append(save_path)
self.cronjob_log.info(
'{0} [extract img success] [pdf_path={1}] [page={2}] [img_index={3}]'.format(
self.log_base, pdf_path, pno, img_index))
else: # 多张图片,竖向拼接
height_sum = 0
im_list = []
width = img_il[0][2]
for img in img_il:
# xref = img[0]
# if xref in xref_list:
# continue
height = img[3]
pix = self.recoverpix(pdf, img)
ext, img_data = self.get_img_data(pix)
# xref_list.append(xref)
im = Image.open(BytesIO(img_data))
im_list.append((height, im, ext))
height_sum += height
save_path = os.path.join(img_save_path, 'page_{0}_img_{1}.{2}'.format(
pno, img_index, im_list[0][2]))
res = Image.new(im_list[0][1].mode, (width, height_sum))
h_now = 0
for h, m, _ in im_list:
res.paste(m, box=(0, h_now))
h_now += h
res.save(save_path)
img_path_list.append(save_path)
self.cronjob_log.info(
'{0} [extract img success] [pdf_path={1}] [page={2}] [img_index={3}]'.format(
self.log_base, pdf_path, pno, img_index))
self.cronjob_log.info('{0} [pdf to img success] [business_type={1}] [doc_id={2}]'.format(
self.cronjob_log.info('{0} [pdf to img start] [business_type={1}] [doc_id={2}]'.format(
self.log_base, business_type, doc.id))
pdf_handler = PDFHandler(pdf_path, img_save_path)
pdf_handler.extract_image()
self.cronjob_log.info('{0} [pdf to img end] [business_type={1}] [doc_id={2}]'.format(
self.log_base, business_type, doc.id))
write_zip_file(img_save_path, os.path.join(doc_data_path, '{0}_img.zip'.format(doc.id)))
# 4.图片调用算法判断是否为银行流水, 图片调用算法OCR为excel文件
wb = Workbook()
loop = asyncio.get_event_loop()
tasks = [self.img_ocr_excel(wb, img_path) for img_path in img_path_list]
tasks = [self.img_ocr_excel(wb, img_path) for img_path in pdf_handler.img_path_list]
loop.run_until_complete(asyncio.wait(tasks))
# loop.close()
# 整合excel文件
# self.wb_process(wb, excel_path)
wb.save(excel_path)
except Exception as e:
doc.status = DocStatus.PROCESS_FAILED.value
......
import os
import fitz
import signal
from PIL import Image
from io import BytesIO
from django.core.management import BaseCommand
from common.mixins import LoggerMixin
class Command(BaseCommand, LoggerMixin):
def __init__(self):
super().__init__()
self.log_base = '[pdf to img]'
# 处理文件开关
self.switch = True
# pdf页面转图片
self.zoom_x = 2.0
self.zoom_y = 2.0
self.trans = fitz.Matrix(self.zoom_x, self.zoom_y).preRotate(0) # zoom factor 2 in each dimension
# 优雅退出信号:15
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, sig, frame):
self.switch = False # 停止处理文件
@staticmethod
def getimage(pix):
if pix.colorspace.n != 4:
return pix
tpix = fitz.Pixmap(fitz.csRGB, pix)
return tpix
def recoverpix(self, doc, item):
x = item[0] # xref of PDF image
s = item[1] # xref of its /SMask
is_rgb = True if item[5] == 'DeviceRGB' else False
# RGB
if is_rgb:
if s == 0:
return doc.extractImage(x)
# we need to reconstruct the alpha channel with the smask
pix1 = fitz.Pixmap(doc, x)
pix2 = fitz.Pixmap(doc, s) # create pixmap of the /SMask entry
# sanity check
if not (pix1.irect == pix2.irect and pix1.alpha == pix2.alpha == 0 and pix2.n == 1):
pix2 = None
return self.getimage(pix1)
pix = fitz.Pixmap(pix1) # copy of pix1, alpha channel added
pix.setAlpha(pix2.samples) # treat pix2.samples as alpha value
pix1 = pix2 = None # free temp pixmaps
return self.getimage(pix)
# GRAY/CMYK
pix1 = fitz.Pixmap(doc, x)
pix = fitz.Pixmap(pix1) # copy of pix1, alpha channel added
if s != 0:
pix2 = fitz.Pixmap(doc, s) # create pixmap of the /SMask entry
# sanity check
if not (pix1.irect == pix2.irect and pix1.alpha == pix2.alpha == 0 and pix2.n == 1):
pix2 = None
return self.getimage(pix1)
pix.setAlpha(pix2.samples) # treat pix2.samples as alpha value
pix1 = pix2 = None # free temp pixmaps
pix = fitz.Pixmap(fitz.csRGB, pix) # GRAY/CMYK to RGB
return self.getimage(pix)
@staticmethod
def get_img_data(pix):
if type(pix) is dict: # we got a raw image
ext = pix["ext"]
img_data = pix["image"]
else: # we got a pixmap
ext = 'png'
img_data = pix.getPNGData()
return ext, img_data
@staticmethod
def split_il(il):
small_img_il_list = []
big_img_il_list = []
start = 0
index = 0
length = len(il)
for i in range(length):
if il[i][2] >= 700 and il[i][3] >= 647:
if start < i:
small_img_il_list.append((il[start: i], index))
index += 1
else:
start += 1
big_img_il_list.append((il[i], index))
index += 1
continue
if i == start:
if i == length - 1:
small_img_il_list.append((il[start: length], index))
continue
elif i == length - 1:
if il[i][2] == il[i - 1][2]:
small_img_il_list.append((il[start: length], index))
else:
small_img_il_list.append((il[start: i], index))
small_img_il_list.append((il[i: length], index+1))
continue
if il[i][2] != il[i - 1][2]:
small_img_il_list.append((il[start: i], index))
index += 1
start = i
elif il[i][3] != il[i - 1][3] and il[i][2] < 1200:
small_img_il_list.append((il[start: i + 1], index))
index += 1
start = i + 1
return small_img_il_list, big_img_il_list
def handle(self, *args, **kwargs):
pdf_dir = '/Users/clay/Desktop/问题PDF'
img_dir = '/Users/clay/Desktop/问题PDF'
for d in os.listdir(pdf_dir):
# if d in ['.DS_Store', 'CH-B008003736.pdf', 'CH-B006317088.pdf', 'CH-B008487476.pdf', 'CH-B006337608.pdf',
# 'CH-B006391612.pdf', 'CH-B006536124.pdf', 'CH-B006526652.pdf', 'CH-B009003592.pdf']:
# continue
# if d != 'CH-B006393152.PDF':
# if d != 'CH-B006526652.pdf':
if d != 'CH-B008487944.pdf':
continue
pdf_path = os.path.join(pdf_dir, d)
if os.path.isfile(pdf_path):
img_save_path = os.path.join(img_dir, d[:-4])
# if os.path.exists(img_save_path):
# continue
os.makedirs(img_save_path, exist_ok=True)
with fitz.Document(pdf_path) as pdf:
self.cronjob_log.info('{0} [pdf_path={1}] [metadata={2}]'.format(
self.log_base, pdf_path, pdf.metadata))
xref_set = set()
for pno in range(pdf.pageCount):
print('---------------------------------------')
il = pdf.getPageImageList(pno)
# (xref, smask, width, height, bpc, colorspace, alt.colorspace, name, filter, invoker)
print(il)
# for img_index, img in enumerate(il):
# pix = self.recoverpix(pdf, img)
# ext, img_data = self.get_img_data(pix)
# save_path = os.path.join(img_save_path, 'page_{0}_img_{1}.{2}'.format(
# pno, img_index, ext))
# with open(save_path, "wb") as f:
# f.write(img_data)
if len(il) == 0:
page = pdf.loadPage(pno)
pm = page.getPixmap(matrix=self.trans, alpha=False)
save_path = os.path.join(img_save_path, 'page_{0}_img_0.png'.format(page.number))
pm.writePNG(save_path)
elif len(il) == 1:
width = il[0][2]
height = il[0][3]
colorspace = il[0][5]
adobe_filter = il[0][-1]
if colorspace == '' or adobe_filter in ['', '']:
continue
# 小图
if width < 500 and height < 500:
page = pdf.loadPage(pno)
pm = page.getPixmap(matrix=self.trans, alpha=False)
save_path = os.path.join(img_save_path, 'page_{0}_img_0.png'.format(page.number))
pm.writePNG(save_path)
# 大图
elif il[0][0] not in xref_set:
pix = self.recoverpix(pdf, il[0])
ext, img_data = self.get_img_data(pix)
save_path = os.path.join(img_save_path, 'page_{0}_img_0.{1}'.format(pno, ext))
with open(save_path, "wb") as f:
f.write(img_data)
xref_set.add(il[0][0])
else:
il.sort(key=lambda x: x[0])
small_img_il_list, big_img_il_list = self.split_il(il)
print(small_img_il_list)
print(big_img_il_list)
print('+++++++++++++++++++++++++++++++++++')
if len(small_img_il_list) > 2: # 单页无规律小图过多时,使用页面转图片
page = pdf.loadPage(pno)
pm = page.getPixmap(matrix=self.trans, alpha=False)
save_path = os.path.join(img_save_path, 'page_{0}_img_0.png'.format(page.number))
pm.writePNG(save_path)
else: # 提取图片
for img_il, img_index in big_img_il_list:
if img_il[0] in xref_set:
continue
pix = self.recoverpix(pdf, img_il)
ext, img_data = self.get_img_data(pix)
save_path = os.path.join(img_save_path, 'page_{0}_img_{1}.{2}'.format(
pno, img_index, ext))
with open(save_path, "wb") as f:
f.write(img_data)
xref_set.add(img_il[0])
for img_il, img_index in small_img_il_list:
# 小图
if len(img_il) == 1 and img_il[0][2] < 500 and img_il[0][3] < 500:
page = pdf.loadPage(pno)
pm = page.getPixmap(matrix=self.trans, alpha=False)
save_path = os.path.join(img_save_path,
'page_{0}_img_0.png'.format(page.number))
pm.writePNG(save_path)
elif len(img_il) == 1 and img_il[0][0] not in xref_set: # 当只有一张图片时, 简化处理
pix = self.recoverpix(pdf, img_il[0])
ext, img_data = self.get_img_data(pix)
save_path = os.path.join(img_save_path, 'page_{0}_img_{1}.{2}'.format(
pno, img_index, ext))
with open(save_path, "wb") as f:
f.write(img_data)
xref_set.add(img_il[0][0])
else: # 多张图片,竖向拼接
height_sum = 0
im_list = []
width = img_il[0][2]
for img in img_il:
# xref = img[0]
# if xref in xref_list:
# continue
height = img[3]
pix = self.recoverpix(pdf, img)
ext, img_data = self.get_img_data(pix)
# xref_list.append(xref)
im = Image.open(BytesIO(img_data))
im_list.append((height, im, ext))
height_sum += height
save_path = os.path.join(img_save_path, 'page_{0}_img_{1}.{2}'.format(
pno, img_index, im_list[0][2]))
res = Image.new(im_list[0][1].mode, (width, height_sum))
h_now = 0
for h, m, _ in im_list:
res.paste(m, box=(0, h_now))
h_now += h
res.save(save_path)
......@@ -28,7 +28,8 @@ class DocHandler:
def get_doc_class(business_type):
return (HILDoc, consts.HIL_PREFIX) if business_type in consts.HIL_SET else (AFCDoc, consts.AFC_PREFIX)
def fix_scheme(self, scheme):
@staticmethod
def fix_scheme(scheme):
if scheme in consts.DOC_SCHEME_LIST:
return scheme
elif scheme.upper() in consts.DOC_SCHEME_LIST:
......@@ -36,7 +37,8 @@ class DocHandler:
else:
return consts.DOC_SCHEME_LIST[0]
def fix_data_source(self, data_source):
@staticmethod
def fix_data_source(data_source):
if data_source in consts.DATA_SOURCE_LIST:
return data_source
elif data_source.upper() in consts.DATA_SOURCE_LIST:
......
import os
import fitz
from PIL import Image
from io import BytesIO
# 页面保存为png图片参数
ZOOM_X = ZOOM_Y = 2.0
trans = fitz.Matrix(ZOOM_X, ZOOM_X).preRotate(0) # zoom factor 2 in each dimension
# 特殊filter处理
ADOBE_FILTER_SET = {'FlateDecode', 'JPXDecode', 'JBIG2Decode'}
# 宽高阈值组合
WH_COUPLE_1 = (500, 500)
WH_COUPLE_2 = (700, 647)
WH_COUPLE_3 = (100, 100)
WH_COUPLE_4 = (100, 300)
WH_COUPLE_5 = (100, 200)
class PDFHandler:
def __init__(self, path, img_dir_path):
self.path = path
self.img_dir_path = img_dir_path
self.img_path_list = []
self.xref_set = set()
def get_img_save_path(self, pno, img_index=0, ext='png'):
return os.path.join(self.img_dir_path, 'page_{0}_img_{1}.{2}'.format(pno, img_index, ext))
def page_to_png(self, page):
pm = page.getPixmap(matrix=trans, alpha=False)
img_save_path = self.get_img_save_path(page.number)
pm.writePNG(img_save_path)
self.img_path_list.append(img_save_path)
@staticmethod
def getimage(pix):
# RGB
if pix.colorspace.n != 4:
return pix
# GRAY/CMYK
tpix = fitz.Pixmap(fitz.csRGB, pix)
return tpix
def recover_pix(self, doc, xref, smask, colorspace):
if smask != 0:
# we need to reconstruct the alpha channel with the smask
pix1 = fitz.Pixmap(doc, xref)
pix2 = fitz.Pixmap(doc, smask) # create pixmap of the /SMask entry
# sanity check
if not (pix1.irect == pix2.irect and pix1.alpha == pix2.alpha == 0 and pix2.n == 1):
pix2 = None
return self.getimage(pix1)
pix = fitz.Pixmap(pix1) # copy of pix1, alpha channel added
pix.setAlpha(pix2.samples) # treat pix2.samples as alpha value
pix1 = pix2 = None # free temp pixmaps
return self.getimage(pix)
elif colorspace in {'Separation', 'DeviceCMYK'}:
pix = fitz.Pixmap(doc, xref)
tpix = fitz.Pixmap(fitz.csRGB, pix)
return tpix
else:
return doc.extractImage(xref)
@staticmethod
def get_img_data(pix):
if type(pix) is dict: # we got a raw image
ext = pix["ext"]
img_data = pix["image"]
else: # we got a pixmap
ext = 'png'
img_data = pix.getPNGData()
return ext, img_data
def extract_single_image(self, pdf, xref, smask, colorspace, pno, img_index=0):
pix = self.recover_pix(pdf, xref, smask, colorspace)
ext, img_data = self.get_img_data(pix)
img_save_path = self.get_img_save_path(pno, img_index=img_index, ext=ext)
with open(img_save_path, "wb") as f:
f.write(img_data)
self.xref_set.add(xref)
self.img_path_list.append(img_save_path)
@staticmethod
def split_il(il):
broken_il = []
start = 0
length = len(il)
page_to_png = None
for i in range(length):
# 当图片对象含有特殊filter时,特殊处理:整个页面保存为png图片
if il[i][-1] in ADOBE_FILTER_SET:
page_to_png = True
break
else:
for i in range(length):
# 当图片对象够大时,不作碎图合并处理,而是单纯提取
if il[i][2] >= WH_COUPLE_2[0] and il[i][3] >= WH_COUPLE_2[1]:
break
if i == start:
if i == length - 1:
broken_il.append(il[start: length])
continue
elif i == length - 1:
if il[i][2] == il[i - 1][2]:
broken_il.append(il[start: length])
else:
broken_il.append(il[start: i])
broken_il.append(il[i: length])
continue
if il[i][2] != il[i - 1][2]:
broken_il.append(il[start: i])
start = i
elif il[i][3] != il[i - 1][3]:
broken_il.append(il[start: i + 1])
start = i + 1
else:
# 碎图分组结果
return broken_il
return page_to_png
def merge_il(self, pdf, pno, il):
# 尝试碎图合并前的分组
il.sort(key=lambda x: x[0])
broken_il = self.split_il(il)
page_to_png = True
# 3.1 当图片对象够大时,不作碎图合并处理,而是单纯提取
if broken_il is None:
page_to_png = False
for img_index, img in enumerate(il):
xref, smask, width, height, _, colorspace, _, _, adobe_filter = img
if width < WH_COUPLE_3[0] or height < WH_COUPLE_3[1]: # 过滤小图(如二维码)
continue
elif xref not in self.xref_set:
self.extract_single_image(pdf, xref, smask, colorspace, pno, img_index)
# 3.2 碎图按照分组合并
elif isinstance(broken_il, list) and len(broken_il) <= 2:
for img_index, img_il in enumerate(broken_il):
# 3.2.1 仅一张碎图,过滤或直接提取
if len(img_il) == 1:
xref, smask, width, height, _, colorspace, _, _, adobe_filter = img_il[0]
# 过滤小图(如二维码)
if width < WH_COUPLE_4[0] or height < WH_COUPLE_4[1] or \
(width < WH_COUPLE_1[0] and height < WH_COUPLE_1[1]):
continue
elif xref not in self.xref_set:
self.extract_single_image(pdf, xref, smask, colorspace, pno, img_index)
page_to_png = False
# 3.2.2 多张碎图,竖向拼接
else:
height_sum = sum([img[3] for img in img_il])
width = img_il[0][2]
# 过滤小图和不常规大图
if width < WH_COUPLE_5[0] or height_sum < WH_COUPLE_5[1] or \
(width > 1000 and height_sum > width * 3):
continue
im_list = []
for img in img_il:
xref, smask, _, height, _, colorspace, _, _, adobe_filter = img
pix = self.recover_pix(pdf, xref, smask, colorspace)
ext, img_data = self.get_img_data(pix)
im = Image.open(BytesIO(img_data))
im_list.append((height, im, ext))
new_img = Image.new(im_list[0][1].mode, (width, height_sum))
h_now = 0
for h, m, _ in im_list:
new_img.paste(m, box=(0, h_now))
h_now += h
img_save_path = self.get_img_save_path(pno, img_index, im_list[0][2])
new_img.save(img_save_path)
page_to_png = False
self.img_path_list.append(img_save_path)
# 3.3 碎图分组大于2、全过滤、含特殊filter,特殊处理:整个页面保存为png图片
if page_to_png:
page = pdf.loadPage(pno)
self.page_to_png(page)
def extract_image(self):
os.makedirs(self.img_dir_path, exist_ok=True)
with fitz.Document(self.path) as pdf:
for pno in range(pdf.pageCount):
il = pdf.getPageImageList(pno) # 获取页面图片对象
# (xref, smask, width, height, bpc, colorspace, alt.colorspace, name, filter, invoker)
# 1.页面图片对象数目为0时,保存整个页面为png图片
if len(il) == 0:
page = pdf.loadPage(pno)
self.page_to_png(page)
# 2.页面图片对象数目为1时:
# 小图(如电子账单的盖章):保存整个页面为png图片
# 大图:提取图片对象
elif len(il) == 1:
xref, smask, width, height, _, colorspace, _, _, _ = il[0]
# 小图
if width < WH_COUPLE_1[0] and height < WH_COUPLE_1[1]:
page = pdf.loadPage(pno)
self.page_to_png(page)
# 大图
elif xref not in self.xref_set:
self.extract_single_image(pdf, xref, smask, colorspace, pno)
# 3.页面图片对象数目大于1时,特殊处理
else:
self.merge_il(pdf, pno, il)
import fitz
import os
from PIL import Image
from io import BytesIO
class PdfHandler:
def __init__(self, pdf_path):
self.pdf_path = pdf_path
self.pdf_name = os.path.splitext(os.path.basename(pdf_path))[0]
def page_to_pix_img(self, save_dir_path, zoom_x, zoom_y):
trans = fitz.Matrix(zoom_x, zoom_y).preRotate(0) # zoom factor 2 in each dimension
with fitz.Document(self.pdf_path) as pdf:
# print(pdf.metadata)
# print(pdf.getPageImageList(0))
# print(pdf.getToC()) # 获取大纲
for page in pdf:
pm = page.getPixmap(matrix=trans, alpha=False)
# print(pm.samples) # a rectangular area of bytes representing the image data (a Python bytes object).
# print(pm.width)
# print(pm.height)
# print(pm.stride) # number of bytes of one horizontal image line)
save_path = os.path.join(save_dir_path, '{0}_{1}.png'.format(self.pdf_name, page.number))
# pm.writePNG(save_path)
pm.writeImage(save_path)
def page_to_svg_img(self, save_dir_path):
with fitz.Document(self.pdf_path) as pdf:
for page in pdf:
svg = page.getSVGimage(matrix=fitz.Identity) # UTF-8 string svg
save_path = os.path.join(save_dir_path, '{0}_{1}.svg'.format(self.pdf_name, page.number))
with open(save_path, 'w') as f:
f.write(svg)
@staticmethod
def getimage(pix):
if pix.colorspace.n != 4:
return pix
tpix = fitz.Pixmap(fitz.csRGB, pix)
return tpix
def recoverpix(self, doc, item):
x = item[0] # xref of PDF image
s = item[1] # xref of its /SMask
is_rgb = True if item[5] == 'DeviceRGB' else False
# RGB
if is_rgb:
if s == 0:
return doc.extractImage(x)
# we need to reconstruct the alpha channel with the smask
pix1 = fitz.Pixmap(doc, x)
pix2 = fitz.Pixmap(doc, s) # create pixmap of the /SMask entry
# sanity check
if not (pix1.irect == pix2.irect and pix1.alpha == pix2.alpha == 0 and pix2.n == 1):
pix2 = None
return self.getimage(pix1)
pix = fitz.Pixmap(pix1) # copy of pix1, alpha channel added
pix.setAlpha(pix2.samples) # treat pix2.samples as alpha value
pix1 = pix2 = None # free temp pixmaps
return self.getimage(pix)
# GRAY/CMYK
pix1 = fitz.Pixmap(doc, x)
pix = fitz.Pixmap(pix1) # copy of pix1, alpha channel added
if s != 0:
pix2 = fitz.Pixmap(doc, s) # create pixmap of the /SMask entry
# sanity check
if not (pix1.irect == pix2.irect and pix1.alpha == pix2.alpha == 0 and pix2.n == 1):
pix2 = None
return self.getimage(pix1)
pix.setAlpha(pix2.samples) # treat pix2.samples as alpha value
pix1 = pix2 = None # free temp pixmaps
pix = fitz.Pixmap(fitz.csRGB, pix) # GRAY/CMYK to RGB
return self.getimage(pix)
def extract_images(self, save_dir_path):
dimlimit = 100 # each image side must be greater than this
relsize = 0.05 # image : pixmap size ratio must be larger than this (5%)
abssize = 2048 # absolute image size limit 2 KB: ignore if smaller
imgdir = save_dir_path # found images are stored in this subfolder
xreflist = []
with fitz.Document(self.pdf_path) as pdf:
for pno in range(pdf.pageCount):
il = pdf.getPageImageList(pno)
for img in il:
print(img)
xref = img[0]
if xref in xreflist:
continue
width = img[2]
height = img[3]
print(xref, width, height)
# if min(width, height) <= dimlimit:
# continue
pix = self.recoverpix(pdf, img)
if type(pix) is dict: # we got a raw image
ext = pix["ext"]
imgdata = pix["image"]
n = pix["colorspace"]
imgfile = os.path.join(imgdir, "img-%i.%s" % (xref, ext))
else: # we got a pixmap
imgfile = os.path.join(imgdir, "img-%i.png" % xref)
n = pix.n
imgdata = pix.getPNGData()
# if len(imgdata) <= abssize:
# continue
#
# if len(imgdata) / (width * height * n) <= relsize:
# continue
fout = open(imgfile, "wb")
fout.write(imgdata)
fout.close()
xreflist.append(xref)
@staticmethod
def split_il(il):
img_il_list = []
start = 0
length = len(il)
for i in range(length):
if i == start:
if i == length - 1:
img_il_list.append(il[start: length])
continue
elif i == length - 1:
img_il_list.append(il[start: length])
continue
if il[i][2] != il[i - 1][2]:
img_il_list.append(il[start: i])
start = i
elif il[i][3] != il[i - 1][3]:
img_il_list.append(il[start: i + 1])
start = i + 1
return img_il_list
def extract_images_pro(self, save_dir_path):
with fitz.Document(self.pdf_path) as pdf:
print('----------------------------')
print(self.pdf_name)
print(pdf.metadata)
# xref_list = []
for pno in range(pdf.pageCount):
print('========================')
il = pdf.getPageImageList(pno)
il.sort(key=lambda x: x[0])
# (xref, smask, width, height, bpc, colorspace, alt.colorspace, name, filter, invoker)
img_il_list = self.split_il(il)
il = None
print(img_il_list)
print(len(img_il_list))
for img_count, img_il in enumerate(img_il_list):
print(img_il)
height_sum = 0
im_list = []
for img in img_il:
# xref = img[0]
# if xref in xref_list:
# continue
width = img[2]
height = img[3]
pix = self.recoverpix(pdf, img)
if type(pix) is dict: # we got a raw image
ext = pix["ext"]
img_data = pix["image"]
else: # we got a pixmap
ext = 'png'
img_data = pix.getPNGData()
# xref_list.append(xref)
im = Image.open(BytesIO(img_data))
im_list.append((width, height, im, ext))
height_sum += height
print(im_list)
save_path = os.path.join(save_dir_path, 'page_{0}_img_{1}.{2}'.format(
pno, img_count, im_list[0][3]))
# 当只有一张图片时, 简化处理
if len(im_list) == 1:
im_list[0][2].save(save_path)
# 多张图片,竖向拼接
else:
res = Image.new(im_list[0][2].mode, (im_list[0][0], height_sum))
h_now = 0
for _, h, m, _ in im_list:
res.paste(m, box=(0, h_now))
h_now += h
res.save(save_path)
if __name__ == '__main__':
dir_path = '/Users/clay/Desktop/biz/pdf_test/银行流水/'
pdf_list = os.listdir(dir_path)
for path in pdf_list:
if path == '.DS_Store':
continue
pdf_handler = PdfHandler(os.path.join(dir_path, path))
save_path = os.path.join('/Users/clay/Desktop/biz/pdf_test/', 'test', os.path.splitext(os.path.basename(path))[0])
os.mkdir(save_path)
pdf_handler.extract_images_pro(save_path)
# pdf_handler = PdfHandler('/Users/clay/Desktop/biz/pdf_test/银行流水/竖版-特殊-邮储银行-一本通绿卡通交易明细(客户).pdf')
# pdf_handler = PdfHandler('/Users/clay/Desktop/biz/pdf_test/银行流水/横版-表格-工商银行 借记卡账户历史明细清单 .pdf')
# pdf_handler.page_to_pix_img('/Users/clay/Desktop/biz/pdf_test/', 3.0, 3.0)
# pdf_handler.page_to_svg_img('/Users/clay/Desktop/biz/pdf_test/')
# pdf_handler.extract_images_pro('/Users/clay/Desktop/biz/pdf_test/test')
# pix = fitz.Pixmap(sys.argv[1]) # read image file
# rgb = "RGB" # set PIL parameter
# if pix.alpha: # JPEG cannot have alpha!
# pix0 = fitz.Pixmap(pix, 0) # drop alpha channel
# pix = pix0 # rename pixmap
#
# img = Image.frombuffer(rgb, [pix.width, pix.height], pix.samples, "raw", rgb, 0, 1)
# img.save(outputFileName)
# 录题系统开发规范
# 宝马OCR系统开发规范
## 代码规范
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!