import os import cv2 import shutil import fitz import math from PIL import Image from io import BytesIO # 页面保存为png图片参数 ZOOM_X_1 = ZOOM_Y_1 = 1.0 ZOOM_X_2 = ZOOM_Y_2 = 2.0 trans_1 = fitz.Matrix(ZOOM_X_1, ZOOM_X_1).preRotate(0) # zoom factor 1 in each dimension trans_2 = fitz.Matrix(ZOOM_X_2, ZOOM_X_2).preRotate(0) # zoom factor 2 in each dimension # 特殊filter处理 ADOBE_FILTER_SET = {'FlateDecode', 'JPXDecode', 'JBIG2Decode'} # 宽高阈值组合 WH_COUPLE_1 = (500, 500) WH_COUPLE_2 = (700, 647) WH_COUPLE_3 = (100, 100) WH_COUPLE_4 = (100, 300) WH_COUPLE_5 = (100, 200) class PDFHandler: def __init__(self, path, img_dir_path, document_name=None): self.path = path self.img_dir_path = img_dir_path self.img_path_list = [] self.img_count = 0 self.xref_set = set() self.img_suffixs = {'.jpeg', '.jpg', '.png', '.webp', '.bmp'} self.suffix = self.get_suffix(document_name) self.is_ebank = False self.page_text_list = [] def get_suffix(self, file_name): if file_name is None: return None try: _, src_suffix = os.path.splitext(file_name) lower_suffix = src_suffix.lower() if lower_suffix in self.img_suffixs: return lower_suffix except Exception as e: return def get_img_save_path(self, pno, img_index=0, ext='png'): return os.path.join(self.img_dir_path, 'page_{0}_img_{1}.{2}'.format(pno, img_index, ext)) def rebuild_bbox(self, src_width, src_height, pno): try: width = self.page_text_list[pno].pop('width') height = self.page_text_list[pno].pop('height') src_text_list = self.page_text_list[pno].pop('text') rotation = self.page_text_list[pno].pop('rotation') sin = math.sin(math.pi * rotation / 2) cos = math.cos(math.pi * rotation / 2) min_x = min_y = 0 for x, y in ((0, height), (width, 0), (width, height)): new_x = x * cos - y * sin new_y = x * sin + y * cos min_x = min(min_x, new_x) min_y = min(min_y, new_y) new_width = int((height * abs(sin)) + (width * abs(cos))) new_height = int((height * abs(cos)) + (width * abs(sin))) width_scale = src_width / new_width height_scale = src_height / new_height rebuild_text_list = [] for bbox, text in src_text_list: x0, y0, x1, y1 = bbox x0, y0, x1, y1 = (x0 * cos - y0 * sin, x0 * sin + y0 * cos, x1 * cos - y1 * sin, x1 * sin + y1 * cos) x_list = sorted([x0 - min_x, x1 - min_x]) y_list = sorted([y0 - min_y, y1 - min_y]) x0, y0, x1, y1 = (x_list[0], y_list[0], x_list[1], y_list[1]) x0 = x0 * width_scale y0 = y0 * height_scale x1 = x1 * width_scale y1 = y1 * height_scale rebuild_text_list.append( ((x0, y0, x1, y0, x1, y1, x0, y1), text) ) self.page_text_list[pno]['rebuild_text'] = rebuild_text_list except Exception as e: pass def page_to_png(self, page): if page.MediaBoxSize.x > 1500 or page.MediaBoxSize.y > 1500: pm = page.getPixmap(matrix=trans_1, alpha=False) else: pm = page.getPixmap(matrix=trans_2, alpha=False) img_save_path = self.get_img_save_path(page.number) pm.writePNG(img_save_path) self.img_path_list.append(img_save_path) if self.is_ebank: self.rebuild_bbox(pm.width, pm.height, page.number) @staticmethod def getimage(pix): # RGB if pix.colorspace.n != 4: return pix # GRAY/CMYK tpix = fitz.Pixmap(fitz.csRGB, pix) return tpix def recover_pix(self, doc, xref, smask, colorspace): if smask != 0: # we need to reconstruct the alpha channel with the smask pix1 = fitz.Pixmap(doc, xref) pix2 = fitz.Pixmap(doc, smask) # create pixmap of the /SMask entry # sanity check if not (pix1.irect == pix2.irect and pix1.alpha == pix2.alpha == 0 and pix2.n == 1): pix2 = None return self.getimage(pix1) pix = fitz.Pixmap(pix1) # copy of pix1, alpha channel added pix.setAlpha(pix2.samples) # treat pix2.samples as alpha value pix1 = pix2 = None # free temp pixmaps return self.getimage(pix) elif colorspace in {'Separation', 'DeviceCMYK'}: pix = fitz.Pixmap(doc, xref) tpix = fitz.Pixmap(fitz.csRGB, pix) return tpix else: return doc.extractImage(xref) @staticmethod def get_img_data(pix): if type(pix) is dict: # we got a raw image ext = pix["ext"] img_data = pix["image"] else: # we got a pixmap ext = 'png' img_data = pix.getPNGData() return ext, img_data def extract_single_image(self, pdf, xref, smask, colorspace, pno, img_index=0): pix = self.recover_pix(pdf, xref, smask, colorspace) ext, img_data = self.get_img_data(pix) if ext == 'jpx': img_save_path = self.get_img_save_path(pno, img_index=img_index, ext='jpeg') jpx_pix = fitz.Pixmap(img_data) jpx_pix.writeImage(img_save_path) jpx_pix = None else: img_save_path = self.get_img_save_path(pno, img_index=img_index, ext=ext) with open(img_save_path, "wb") as f: f.write(img_data) self.xref_set.add(xref) self.img_path_list.append(img_save_path) @staticmethod def split_il(il): broken_il = [] start = 0 length = len(il) page_to_png = None for i in range(length): # 当图片对象含有特殊filter时,特殊处理:整个页面保存为png图片 if il[i][-1] in ADOBE_FILTER_SET: page_to_png = True break else: for i in range(length): # 当图片对象够大时,不作碎图合并处理,而是单纯提取 if il[i][2] >= WH_COUPLE_2[0] and il[i][3] >= WH_COUPLE_2[1]: break if i == start: if i == length - 1: broken_il.append(il[start: length]) continue elif i == length - 1: if il[i][2] == il[i - 1][2]: broken_il.append(il[start: length]) else: broken_il.append(il[start: i]) broken_il.append(il[i: length]) continue if il[i][2] != il[i - 1][2]: broken_il.append(il[start: i]) start = i elif il[i][3] != il[i - 1][3]: broken_il.append(il[start: i + 1]) start = i + 1 else: # 碎图分组结果 return broken_il return page_to_png def merge_il(self, pdf, pno, il): # 尝试碎图合并前的分组 il.sort(key=lambda x: x[0]) broken_il = self.split_il(il) page_to_png = True # 3.1 当图片对象够大时,不作碎图合并处理,而是单纯提取 if broken_il is None: page_to_png = False for img_index, img in enumerate(il): xref, smask, width, height, _, colorspace, _, _, adobe_filter = img if width < WH_COUPLE_3[0] or height < WH_COUPLE_3[1]: # 过滤小图(如二维码) continue elif xref not in self.xref_set: self.extract_single_image(pdf, xref, smask, colorspace, pno, img_index) # 3.2 碎图按照分组合并 elif isinstance(broken_il, list) and len(broken_il) <= 2: for img_index, img_il in enumerate(broken_il): # 3.2.1 仅一张碎图,过滤或直接提取 if len(img_il) == 1: xref, smask, width, height, _, colorspace, _, _, adobe_filter = img_il[0] # 过滤小图(如二维码) if width < WH_COUPLE_4[0] or height < WH_COUPLE_4[1] or \ (width < WH_COUPLE_1[0] and height < WH_COUPLE_1[1]): continue elif xref not in self.xref_set: self.extract_single_image(pdf, xref, smask, colorspace, pno, img_index) page_to_png = False # 3.2.2 多张碎图,竖向拼接 else: height_sum = sum([img[3] for img in img_il]) width = img_il[0][2] # 过滤小图和不常规大图 if width < WH_COUPLE_5[0] or height_sum < WH_COUPLE_5[1] or \ (width > 1000 and height_sum > width * 3): continue im_list = [] for img in img_il: xref, smask, _, height, _, colorspace, _, _, adobe_filter = img pix = self.recover_pix(pdf, xref, smask, colorspace) ext, img_data = self.get_img_data(pix) im = Image.open(BytesIO(img_data)) im_list.append((height, im, ext)) new_img = Image.new(im_list[0][1].mode, (width, height_sum)) h_now = 0 for h, m, _ in im_list: new_img.paste(m, box=(0, h_now)) h_now += h img_save_path = self.get_img_save_path(pno, img_index, im_list[0][2]) new_img.save(img_save_path) page_to_png = False self.img_path_list.append(img_save_path) # 3.3 碎图分组大于2、全过滤、含特殊filter,特殊处理:整个页面保存为png图片 if page_to_png: page = pdf.loadPage(pno) self.page_to_png(page) def check_ebank(self, pdf): page_text_list = [] text_item_sum = 0 for pno in range(pdf.pageCount): page = pdf.loadPage(pno) if page.rotation is None: rotation = 0 elif isinstance(page.rotation, int): divisor, remainder = divmod(page.rotation, 90) if remainder != 0: return rotation = divmod(divisor, 4)[1] else: return textpage = page.getTextPage() text = textpage.extractDICT() text_list = [] for block in text.get('blocks'): for line in block.get('lines'): for span in line.get('spans'): char = span.get('text') bbox = span.get('bbox') if char.strip() == '': continue text_list.append((bbox, char)) text_item_sum += len(text_list) if text_item_sum < (pno + 1) * 5: return else: page_text_list.append( { 'width': text.get('width'), 'height': text.get('height'), 'rotation': rotation, 'text': text_list } ) self.is_ebank = True self.page_text_list = page_text_list def extract_image(self, max_img_count=None): self.img_path_list = [] self.xref_set = set() os.makedirs(self.img_dir_path, exist_ok=True) if self.suffix in self.img_suffixs: img_save_path = self.get_img_save_path(0, ext=self.suffix[1:]) shutil.copy(self.path, img_save_path) self.img_path_list.append(img_save_path) else: with fitz.Document(self.path) as pdf: if isinstance(max_img_count, int) and pdf.pageCount >= max_img_count: self.img_count = pdf.pageCount return # self.check_ebank(pdf) for pno in range(pdf.pageCount): il = pdf.getPageImageList(pno) # 获取页面图片对象 # (xref, smask, width, height, bpc, colorspace, alt.colorspace, name, filter, invoker) # 1.页面图片对象数目为0时,保存整个页面为png图片 if self.is_ebank or len(il) == 0: page = pdf.loadPage(pno) self.page_to_png(page) # 2.页面图片对象数目为1时: # 小图(如电子账单的盖章):保存整个页面为png图片 # 大图:提取图片对象 elif len(il) == 1: xref, smask, width, height, _, colorspace, _, _, _ = il[0] # 小图 if width < WH_COUPLE_1[0] and height < WH_COUPLE_1[1]: page = pdf.loadPage(pno) self.page_to_png(page) # 大图 elif xref not in self.xref_set: self.extract_single_image(pdf, xref, smask, colorspace, pno) # 3.页面图片对象数目大于1时,特殊处理 else: self.merge_il(pdf, pno, il) self.img_count = len(self.img_path_list) def extract_page_image(self): self.img_path_list = [] self.xref_set = set() os.makedirs(self.img_dir_path, exist_ok=True) with fitz.Document(self.path) as pdf: for pno in range(pdf.pageCount): page = pdf.loadPage(pno) self.page_to_png(page) self.img_count = len(self.img_path_list) def ebank_draw(self): for img_idx, img_path in enumerate(self.img_path_list): img = cv2.imread(img_path) pre, suf = os.path.splitext(img_path) output_path = '{0}_res{1}'.format(pre, suf) for (x0, y0, x1, y0, x1, y1, x0, y1), text in self.page_text_list[img_idx].pop('rebuild_text'): # print((x0, y0, x1, y0, x1, y1, x0, y1)) # print(text) cv2.rectangle(img, (int(x0), int(y0)), (int(x1), int(y1)), (0, 255, 0), 2) cv2.imwrite(output_path, img)