Appearance
6.3 发票识别与归档系统
在企业财务管理中,发票处理是一项繁琐但必不可少的工作。传统的发票处理流程通常涉及手动录入、分类和归档,这不仅耗时耗力,还容易出错。本节将介绍如何使用Python构建一个自动化的发票识别与归档系统,实现从OCR识别发票图片、提取关键字段、存入数据库到生成PDF汇总表的全流程自动化。
需求分析
一个完整的发票识别与归档系统通常需要实现以下功能:
- 发票图像采集:支持扫描仪输入、手机拍照上传或批量导入图片文件
- OCR识别处理:识别发票上的文字信息,包括发票代码、号码、日期、金额等关键字段
- 数据提取与验证:从OCR结果中提取结构化数据,并进行有效性验证
- 数据存储管理:将提取的发票信息存入数据库,便于查询和管理
- 报表生成:根据存储的发票数据,生成PDF格式的汇总报表
- 归档与检索:对发票图像和数据进行归档,支持多维度检索
技术选型
我们将使用以下Python库来实现各个环节:
- Pillow/OpenCV:用于图像预处理,提高OCR识别率
- Tesseract-OCR/PaddleOCR:用于文字识别
- pytesseract/paddleocr:Python调用OCR引擎的接口
- re/pandas:用于数据提取和处理
- SQLite/SQLAlchemy:用于数据存储
- ReportLab/FPDF:用于生成PDF报表
- Flask/FastAPI(可选):用于构建Web界面
代码实现
第一步:发票图像预处理
在进行OCR识别前,对图像进行预处理可以显著提高识别准确率。
python
import cv2
import numpy as np
from PIL import Image
import os
class InvoiceImageProcessor:
"""发票图像预处理类"""
def __init__(self, output_dir='./processed_images'):
"""初始化图像处理器"""
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
def load_image(self, image_path):
"""加载图像"""
try:
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"无法加载图像: {image_path}")
return image
except Exception as e:
print(f"加载图像时出错: {e}")
return None
def resize_image(self, image, max_size=1600):
"""调整图像大小,保持宽高比"""
height, width = image.shape[:2]
if max(height, width) > max_size:
scale = max_size / max(height, width)
new_width = int(width * scale)
new_height = int(height * scale)
image = cv2.resize(image, (new_width, new_height))
return image
def denoise(self, image):
"""去噪处理"""
return cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21)
def enhance_contrast(self, image):
"""增强对比度"""
lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
cl = clahe.apply(l)
enhanced_lab = cv2.merge((cl, a, b))
return cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2BGR)
def convert_to_grayscale(self, image):
"""转换为灰度图"""
return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
def threshold(self, gray_image):
"""二值化处理"""
return cv2.adaptiveThreshold(
gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
def deskew(self, image):
"""倾斜校正"""
gray = self.convert_to_grayscale(image) if len(image.shape) == 3 else image
# 检测边缘
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
# 霍夫变换检测直线
lines = cv2.HoughLines(edges, 1, np.pi/180, 200)
if lines is None:
return image
# 计算倾斜角度
angles = []
for line in lines:
rho, theta = line[0]
if theta < np.pi/4 or theta > 3*np.pi/4: # 垂直线
angles.append(theta)
if not angles:
return image
# 计算平均角度
median_angle = np.median(angles) - np.pi/2
# 旋转图像
(h, w) = image.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, np.degrees(median_angle), 1.0)
rotated = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
return rotated
def process_image(self, image_path, save_intermediate=False):
"""处理图像的完整流程"""
# 加载图像
image = self.load_image(image_path)
if image is None:
return None
# 调整大小
image = self.resize_image(image)
if save_intermediate:
cv2.imwrite(os.path.join(self.output_dir, "1_resized.jpg"), image)
# 去噪
denoised = self.denoise(image)
if save_intermediate:
cv2.imwrite(os.path.join(self.output_dir, "2_denoised.jpg"), denoised)
# 增强对比度
enhanced = self.enhance_contrast(denoised)
if save_intermediate:
cv2.imwrite(os.path.join(self.output_dir, "3_enhanced.jpg"), enhanced)
# 倾斜校正
deskewed = self.deskew(enhanced)
if save_intermediate:
cv2.imwrite(os.path.join(self.output_dir, "4_deskewed.jpg"), deskewed)
# 转换为灰度图
gray = self.convert_to_grayscale(deskewed)
if save_intermediate:
cv2.imwrite(os.path.join(self.output_dir, "5_gray.jpg"), gray)
# 二值化
binary = self.threshold(gray)
# 保存最终处理结果
output_path = os.path.join(self.output_dir, os.path.basename(image_path))
cv2.imwrite(output_path, binary)
return output_path
def batch_process(self, image_dir, extensions=['.jpg', '.jpeg', '.png']):
"""批量处理图像"""
processed_paths = []
for filename in os.listdir(image_dir):
if any(filename.lower().endswith(ext) for ext in extensions):
image_path = os.path.join(image_dir, filename)
processed_path = self.process_image(image_path)
if processed_path:
processed_paths.append(processed_path)
print(f"已处理: {filename}")
return processed_paths
第二步:OCR识别与数据提取
使用OCR引擎识别发票上的文字,并提取关键字段。
python
import pytesseract
from PIL import Image
import re
import json
import os
from datetime import datetime
# 如果使用PaddleOCR
# from paddleocr import PaddleOCR
# ocr = PaddleOCR(use_angle_cls=True, lang="ch")
class InvoiceOCRProcessor:
"""发票OCR处理类"""
def __init__(self, tesseract_cmd=None):
"""初始化OCR处理器"""
if tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
def perform_ocr(self, image_path):
"""执行OCR识别"""
try:
# 使用Tesseract OCR
image = Image.open(image_path)
text = pytesseract.image_to_string(image, lang='chi_sim+eng')
# 如果使用PaddleOCR
# result = ocr.ocr(image_path, cls=True)
# text = '\n'.join([line[1][0] for line in result[0]])
return text
except Exception as e:
print(f"OCR识别时出错: {e}")
return ""
def extract_invoice_code(self, text):
"""提取发票代码"""
patterns = [
r'发票代码[::]?\s*(\d{10,12})',
r'代码[::]?\s*(\d{10,12})'
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1)
return ""
def extract_invoice_number(self, text):
"""提取发票号码"""
patterns = [
r'发票号码[::]?\s*(\d{8,10})',
r'号码[::]?\s*(\d{8,10})'
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1)
return ""
def extract_date(self, text):
"""提取开票日期"""
patterns = [
r'开票日期[::]?\s*(\d{4}[年/-]\d{1,2}[月/-]\d{1,2})',
r'日期[::]?\s*(\d{4}[年/-]\d{1,2}[月/-]\d{1,2})'
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
date_str = match.group(1)
# 标准化日期格式
date_str = re.sub(r'[年月]', '-', date_str)
date_str = date_str.replace('日', '')
return date_str
return ""
def extract_amount(self, text):
"""提取金额"""
patterns = [
r'金额[::]?\s*¥?\s*(\d+\.\d{2})',
r'小写[::]?\s*¥?\s*(\d+\.\d{2})',
r'(\d+元\d+角\d+分)'
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
amount_str = match.group(1)
# 处理中文金额格式
if '元' in amount_str:
amount_str = amount_str.replace('元', '.').replace('角', '').replace('分', '')
return amount_str
return ""
def extract_tax(self, text):
"""提取税额"""
patterns = [
r'税额[::]?\s*¥?\s*(\d+\.\d{2})',
r'税额[::]?\s*¥?\s*(\d+)'
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1)
return ""
def extract_seller(self, text):
"""提取销售方名称"""
patterns = [
r'销售方[::]?\s*(.+?)\n',
r'名称[::]?\s*(.+?)\n'
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1).strip()
return ""
def extract_buyer(self, text):
"""提取购买方名称"""
patterns = [
r'购买方[::]?\s*(.+?)\n',
r'购买方名称[::]?\s*(.+?)\n'
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1).strip()
return ""
def extract_invoice_type(self, text):
"""提取发票类型"""
if re.search(r'增值税专用发票', text):
return "增值税专用发票"
elif re.search(r'增值税普通发票', text):
return "增值税普通发票"
elif re.search(r'电子发票', text):
return "电子发票"
else:
return "其他发票"
def process_invoice(self, image_path):
"""处理单张发票"""
# 执行OCR识别
text = self.perform_ocr(image_path)
if not text:
return None
# 提取发票信息
invoice_data = {
"invoice_code": self.extract_invoice_code(text),
"invoice_number": self.extract_invoice_number(text),
"date": self.extract_date(text),
"amount": self.extract_amount(text),
"tax": self.extract_tax(text),
"seller": self.extract_seller(text),
"buyer": self.extract_buyer(text),
"invoice_type": self.extract_invoice_type(text),
"image_path": image_path,
"ocr_text": text,
"processed_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return invoice_data
def batch_process(self, image_paths):
"""批量处理发票"""
results = []
for image_path in image_paths:
print(f"正在处理: {os.path.basename(image_path)}")
invoice_data = self.process_invoice(image_path)
if invoice_data:
results.append(invoice_data)
return results
def save_results(self, results, output_file):
"""保存处理结果到JSON文件"""
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"结果已保存至: {output_file}")
第三步:数据存储与管理
将提取的发票信息存入数据库,便于后续查询和管理。
python
import sqlite3
import os
import json
from datetime import datetime
class InvoiceDatabase:
"""发票数据库管理类"""
def __init__(self, db_path):
"""初始化数据库连接"""
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self.create_tables()
def create_tables(self):
"""创建数据库表结构"""
cursor = self.conn.cursor()
# 创建发票信息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS invoices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
invoice_code TEXT,
invoice_number TEXT,
invoice_date TEXT,
amount REAL,
tax REAL,
seller TEXT,
buyer TEXT,
invoice_type TEXT,
image_path TEXT,
status TEXT DEFAULT 'pending',
notes TEXT,
created_at TEXT,
updated_at TEXT,
UNIQUE(invoice_code, invoice_number)
)
''')
# 创建发票分类表
cursor.execute('''
CREATE TABLE IF NOT EXISTS invoice_categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
description TEXT
)
''')
# 创建发票与分类的关联表
cursor.execute('''
CREATE TABLE IF NOT EXISTS invoice_category_mapping (
invoice_id INTEGER,
category_id INTEGER,
PRIMARY KEY (invoice_id, category_id),
FOREIGN KEY (invoice_id) REFERENCES invoices (id),
FOREIGN KEY (category_id) REFERENCES invoice_categories (id)
)
''')
# 插入默认分类
default_categories = [
('办公用品', '办公室日常用品采购'),
('差旅费', '出差相关费用'),
('餐饮费', '工作餐、招待餐费用'),
('交通费', '公共交通、打车费用'),
('通讯费', '电话、网络费用'),
('会议费', '会议场地、设备租赁费用'),
('其他', '未分类发票')
]
for category in default_categories:
try:
cursor.execute('INSERT INTO invoice_categories (name, description) VALUES (?, ?)', category)
except sqlite3.IntegrityError:
# 分类已存在,跳过
pass
self.conn.commit()
def insert_invoice(self, invoice_data):
"""插入发票数据"""
cursor = self.conn.cursor()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
cursor.execute('''
INSERT INTO invoices (
invoice_code, invoice_number, invoice_date, amount, tax,
seller, buyer, invoice_type, image_path, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
invoice_data.get('invoice_code', ''),
invoice_data.get('invoice_number', ''),
invoice_data.get('date', ''),
float(invoice_data.get('amount', 0)),
float(invoice_data.get('tax', 0)),
invoice_data.get('seller', ''),
invoice_data.get('buyer', ''),
invoice_data.get('invoice_type', ''),
invoice_data.get('image_path', ''),
now,
now
))
invoice_id = cursor.lastrowid
self.conn.commit()
return invoice_id
except sqlite3.IntegrityError:
# 发票已存在,更新信息
cursor.execute('''
UPDATE invoices SET
invoice_date = ?, amount = ?, tax = ?, seller = ?, buyer = ?,
invoice_type = ?, image_path = ?, updated_at = ?
WHERE invoice_code = ? AND invoice_number = ?
''', (
invoice_data.get('date', ''),
float(invoice_data.get('amount', 0)),
float(invoice_data.get('tax', 0)),
invoice_data.get('seller', ''),
invoice_data.get('buyer', ''),
invoice_data.get('invoice_type', ''),
invoice_data.get('image_path', ''),
now,
invoice_data.get('invoice_code', ''),
invoice_data.get('invoice_number', '')
))
# 获取更新的发票ID
cursor.execute('''
SELECT id FROM invoices WHERE invoice_code = ? AND invoice_number = ?
''', (invoice_data.get('invoice_code', ''), invoice_data.get('invoice_number', '')))
result = cursor.fetchone()
self.conn.commit()
return result[0] if result else None
def batch_insert(self, invoice_data_list):
"""批量插入发票数据"""
inserted_ids = []
for invoice_data in invoice_data_list:
invoice_id = self.insert_invoice(invoice_data)
if invoice_id:
inserted_ids.append(invoice_id)
return inserted_ids
def categorize_invoice(self, invoice_id, category_name):
"""为发票分配分类"""
cursor = self.conn.cursor()
# 获取分类ID
cursor.execute('SELECT id FROM invoice_categories WHERE name = ?', (category_name,))
result = cursor.fetchone()
if not result:
# 分类不存在,创建新分类
cursor.execute('INSERT INTO invoice_categories (name) VALUES (?)', (category_name,))
category_id = cursor.lastrowid
else:
category_id = result[0]
# 添加发票与分类的关联
try:
cursor.execute('INSERT INTO invoice_category_mapping (invoice_id, category_id) VALUES (?, ?)',
(invoice_id, category_id))
self.conn.commit()
return True
except sqlite3.IntegrityError:
# 关联已存在
return False
def get_invoice_by_id(self, invoice_id):
"""根据ID获取发票信息"""
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM invoices WHERE id = ?', (invoice_id,))
columns = [column[0] for column in cursor.description]
result = cursor.fetchone()
if result:
return dict(zip(columns, result))
return None
def search_invoices(self, criteria):
"""搜索发票"""
cursor = self.conn.cursor()
query = 'SELECT * FROM invoices WHERE 1=1'
params = []
if 'invoice_code' in criteria and criteria['invoice_code']:
query += ' AND invoice_code LIKE ?'
params.append(f"%{criteria['invoice_code']}%")
if 'invoice_number' in criteria and criteria['invoice_number']:
query += ' AND invoice_number LIKE ?'
params.append(f"%{criteria['invoice_number']}%")
if 'date_from' in criteria and criteria['date_from']:
query += ' AND invoice_date >= ?'
params.append(criteria['date_from'])
if 'date_to' in criteria and criteria['date_to']:
query += ' AND invoice_date <= ?'
params.append(criteria['date_to'])
if 'seller' in criteria and criteria['seller']:
query += ' AND seller LIKE ?'
params.append(f"%{criteria['seller']}%")
if 'buyer' in criteria and criteria['buyer']:
query += ' AND buyer LIKE ?'
params.append(f"%{criteria['buyer']}%")
if 'amount_min' in criteria and criteria['amount_min']:
query += ' AND amount >= ?'
params.append(float(criteria['amount_min']))
if 'amount_max' in criteria and criteria['amount_max']:
query += ' AND amount <= ?'
params.append(float(criteria['amount_max']))
if 'invoice_type' in criteria and criteria['invoice_type']:
query += ' AND invoice_type = ?'
params.append(criteria['invoice_type'])
if 'status' in criteria and criteria['status']:
query += ' AND status = ?'
params.append(criteria['status'])
cursor.execute(query, params)
columns = [column[0] for column in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results
def get_statistics(self):
"""获取发票统计信息"""
cursor = self.conn.cursor()
# 总发票数量和金额
cursor.execute('SELECT COUNT(*), SUM(amount), SUM(tax) FROM invoices')
total_count, total_amount, total_tax = cursor.fetchone()
# 按发票类型统计
cursor.execute('SELECT invoice_type, COUNT(*), SUM(amount) FROM invoices GROUP BY invoice_type')
type_stats = [{'type': row[0], 'count': row[1], 'amount': row[2]} for row in cursor.fetchall()]
# 按月份统计
cursor.execute('''
SELECT strftime('%Y-%m', invoice_date) as month, COUNT(*), SUM(amount)
FROM invoices
GROUP BY month
ORDER BY month
''')
monthly_stats = [{'month': row[0], 'count': row[1], 'amount': row[2]} for row in cursor.fetchall()]
# 按分类统计
cursor.execute('''
SELECT c.name, COUNT(m.invoice_id), SUM(i.amount)
FROM invoice_categories c
LEFT JOIN invoice_category_mapping m ON c.id = m.category_id
LEFT JOIN invoices i ON m.invoice_id = i.id
GROUP BY c.name
''')
category_stats = [{'category': row[0], 'count': row[1], 'amount': row[2] if row[2] else 0}
for row in cursor.fetchall()]
return {
'total': {
'count': total_count,
'amount': total_amount,
'tax': total_tax
},
'by_type': type_stats,
'by_month': monthly_stats,
'by_category': category_stats
}
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
第四步:生成PDF汇总报表
根据数据库中的发票信息,生成PDF格式的汇总报表。
python
from reportlab.lib.pagesizes import letter, A4
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image
from reportlab.graphics.charts.piecharts import Pie
from reportlab.graphics.charts.barcharts import VerticalBarChart
from reportlab.graphics.shapes import Drawing
from reportlab.graphics.charts.legends import Legend
from reportlab.graphics.charts.textlabels import Label
import os
from datetime import datetime
class InvoiceReportGenerator:
"""发票报表生成类"""
def __init__(self, db_manager, output_dir='./reports'):
"""初始化报表生成器"""
self.db_manager = db_manager
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
# 设置样式
self.styles = getSampleStyleSheet()
self.title_style = self.styles['Heading1']
self.subtitle_style = self.styles['Heading2']
self.normal_style = self.styles['Normal']
# 添加中文支持(需要先安装中文字体)
self.cn_style = ParagraphStyle(
'ChineseStyle',
parent=self.normal_style,
fontName='SimSun',
fontSize=10
)
def create_title(self, title):
"""创建标题"""
return Paragraph(title, self.title_style)
def create_subtitle(self, subtitle):
"""创建副标题"""
return Paragraph(subtitle, self.subtitle_style)
def create_text(self, text):
"""创建正文文本"""
return Paragraph(text, self.cn_style)
def create_invoice_table(self, invoices):
"""创建发票数据表格"""
# 表头
headers = ['发票代码', '发票号码', '日期', '金额(元)', '税额(元)', '销售方', '购买方', '发票类型']
# 表格数据
data = [headers]
for invoice in invoices:
row = [
invoice.get('invoice_code', ''),
invoice.get('invoice_number', ''),
invoice.get('invoice_date', ''),
f"{float(invoice.get('amount', 0)):.2f}",
f"{float(invoice.get('tax', 0)):.2f}",
invoice.get('seller', ''),
invoice.get('buyer', ''),
invoice.get('invoice_type', '')
]
data.append(row)
# 创建表格
table = Table(data, repeatRows=1)
# 设置表格样式
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'SimHei'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('FONTNAME', (0, 1), (-1, -1), 'SimSun'),
('FONTSIZE', (0, 1), (-1, -1), 10),
]))
return table
def create_summary_table(self, statistics):
"""创建汇总统计表格"""
# 表格数据
data = [
['统计项', '数值'],
['发票总数', statistics['total']['count']],
['金额合计', f"{statistics['total']['amount']:.2f}元"],
['税额合计', f"{statistics['total']['tax']:.2f}元"]
]
# 创建表格
table = Table(data, colWidths=[200, 100])
# 设置表格样式
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'SimHei'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('FONTNAME', (0, 1), (-1, -1), 'SimSun'),
('FONTSIZE', (0, 1), (-1, -1), 10),
]))
return table
def create_pie_chart(self, statistics):
"""创建饼图"""
drawing = Drawing(400, 200)
# 创建饼图
pie = Pie()
pie.x = 100
pie.y = 0
pie.width = 150
pie.height = 150
# 设置数据
type_stats = statistics['by_type']
pie.data = [stat['amount'] for stat in type_stats]
pie.labels = [stat['type'] for stat in type_stats]
# 设置颜色
pie.slices.strokeWidth = 0.5
pie.slices[0].fillColor = colors.red
pie.slices[1].fillColor = colors.green
pie.slices[2].fillColor = colors.blue
# 添加图例
legend = Legend()
legend.x = 280
legend.y = 50
legend.alignment = 'right'
legend.columnMaximum = 10
legend.colorNamePairs = [(pie.slices[i].fillColor, pie.labels[i]) for i in range(len(pie.labels))]
# 添加标题
title = Label()
title.setText('发票类型金额分布')
title.x = 200
title.y = 180
title.textAnchor = 'middle'
drawing.add(pie)
drawing.add(legend)
drawing.add(title)
return drawing
def create_bar_chart(self, statistics):
"""创建柱状图"""
drawing = Drawing(400, 200)
# 创建柱状图
chart = VerticalBarChart()
chart.x = 50
chart.y = 50
chart.width = 300
chart.height = 125
# 设置数据
monthly_stats = statistics['by_month']
chart.data = [[stat['amount'] for stat in monthly_stats]]
chart.categoryAxis.categoryNames = [stat['month'] for stat in monthly_stats]
chart.valueAxis.valueMin = 0
# 设置样式
chart.bars[0].fillColor = colors.lightblue
chart.bars[0].strokeColor = colors.blue
chart.bars[0].strokeWidth = 0.5
# 添加标题
title = Label()
title.setText('月度发票金额统计')
title.x = 200
title.y = 180
title.textAnchor = 'middle'
drawing.add(chart)
drawing.add(title)
return drawing
def generate_monthly_report(self, year, month):
"""生成月度报表"""
# 构建日期范围
start_date = f"{year}-{month:02d}-01"
if month == 12:
end_date = f"{year+1}-01-01"
else:
end_date = f"{year}-{month+1:02d}-01"
# 查询指定月份的发票
criteria = {
'date_from': start_date,
'date_to': end_date
}
invoices = self.db_manager.search_invoices(criteria)
if not invoices:
print(f"未找到{year}年{month}月的发票记录")
return None
# 获取统计信息
statistics = self.db_manager.get_statistics()
# 创建PDF文档
report_filename = f"发票月度报表_{year}年{month:02d}月_{datetime.now().strftime('%Y%m%d%H%M%S')}.pdf"
report_path = os.path.join(self.output_dir, report_filename)
doc = SimpleDocTemplate(report_path, pagesize=A4)
# 构建报表内容
elements = []
# 添加标题
elements.append(self.create_title(f"{year}年{month}月发票汇总报表"))
elements.append(Spacer(1, 20))
# 添加生成日期
elements.append(self.create_text(f"生成日期:{datetime.now().strftime('%Y年%m月%d日')}\n\n"))
elements.append(Spacer(1, 10))
# 添加统计摘要
elements.append(self.create_subtitle("统计摘要"))
elements.append(self.create_summary_table(statistics))
elements.append(Spacer(1, 20))
# 添加图表
elements.append(self.create_subtitle("统计图表"))
elements.append(self.create_pie_chart(statistics))
elements.append(Spacer(1, 10))
elements.append(self.create_bar_chart(statistics))
elements.append(Spacer(1, 20))
# 添加发票明细
elements.append(self.create_subtitle("发票明细"))
elements.append(self.create_invoice_table(invoices))
# 生成PDF
doc.build(elements)
print(f"月度报表已生成: {report_path}")
return report_path
def generate_annual_report(self, year):
"""生成年度报表"""
# 构建日期范围
start_date = f"{year}-01-01"
end_date = f"{year+1}-01-01"
# 查询指定年份的发票
criteria = {
'date_from': start_date,
'date_to': end_date
}
invoices = self.db_manager.search_invoices(criteria)
if not invoices:
print(f"未找到{year}年的发票记录")
return None
# 获取统计信息
statistics = self.db_manager.get_statistics()
# 创建PDF文档
report_filename = f"发票年度报表_{year}年_{datetime.now().strftime('%Y%m%d%H%M%S')}.pdf"
report_path = os.path.join(self.output_dir, report_filename)
doc = SimpleDocTemplate(report_path, pagesize=A4)
# 构建报表内容
elements = []
# 添加标题
elements.append(self.create_title(f"{year}年度发票汇总报表"))
elements.append(Spacer(1, 20))
# 添加生成日期
elements.append(self.create_text(f"生成日期:{datetime.now().strftime('%Y年%m月%d日')}\n\n"))
elements.append(Spacer(1, 10))
# 添加统计摘要
elements.append(self.create_subtitle("年度统计摘要"))
elements.append(self.create_summary_table(statistics))
elements.append(Spacer(1, 20))
# 添加图表
elements.append(self.create_subtitle("统计图表"))
elements.append(self.create_pie_chart(statistics))
elements.append(Spacer(1, 10))
elements.append(self.create_bar_chart(statistics))
elements.append(Spacer(1, 20))
# 添加发票明细
elements.append(self.create_subtitle("发票明细"))
elements.append(self.create_invoice_table(invoices))
# 生成PDF
doc.build(elements)
print(f"年度报表已生成: {report_path}")
return report_path
def generate_custom_report(self, criteria, title="自定义发票报表"):
"""生成自定义报表"""
# 查询符合条件的发票
invoices = self.db_manager.search_invoices(criteria)
if not invoices:
print("未找到符合条件的发票记录")
return None
# 创建PDF文档
report_filename = f"发票自定义报表_{datetime.now().strftime('%Y%m%d%H%M%S')}.pdf"
report_path = os.path.join(self.output_dir, report_filename)
doc = SimpleDocTemplate(report_path, pagesize=A4)
# 构建报表内容
elements = []
# 添加标题
elements.append(self.create_title(title))
elements.append(Spacer(1, 20))
# 添加生成日期和查询条件
elements.append(self.create_text(f"生成日期:{datetime.now().strftime('%Y年%m月%d日')}\n\n"))
# 添加查询条件描述
condition_text = "查询条件:"
for key, value in criteria.items():
if value:
condition_text += f"{key}={value}, "
condition_text = condition_text.rstrip(", ")
elements.append(self.create_text(condition_text))
elements.append(Spacer(1, 20))
# 添加统计摘要
total_amount = sum(float(invoice.get('amount', 0)) for invoice in invoices)
total_tax = sum(float(invoice.get('tax', 0)) for invoice in invoices)
summary_data = [
['统计项', '数值'],
['发票总数', len(invoices)],
['金额合计', f"{total_amount:.2f}元"],
['税额合计', f"{total_tax:.2f}元"]
]
summary_table = Table(summary_data, colWidths=[200, 100])
summary_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'SimHei'),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('FONTNAME', (0, 1), (-1, -1), 'SimSun'),
]))
elements.append(self.create_subtitle("统计摘要"))
elements.append(summary_table)
elements.append(Spacer(1, 20))
# 添加发票明细
elements.append(self.create_subtitle("发票明细"))
elements.append(self.create_invoice_table(invoices))
# 生成PDF
doc.build(elements)
print(f"自定义报表已生成: {report_path}")
return report_path
第五步:整合所有功能
将前面实现的各个模块整合起来,构建完整的发票识别与归档系统。
python
import os
import argparse
from datetime import datetime
import schedule
import time
class InvoiceSystem:
"""发票识别与归档系统主类"""
def __init__(self, config):
"""初始化系统"""
self.config = config
# 创建必要的目录
os.makedirs(config['processed_images_dir'], exist_ok=True)
os.makedirs(config['reports_dir'], exist_ok=True)
os.makedirs(config['archive_dir'], exist_ok=True)
# 初始化各个组件
self.image_processor = InvoiceImageProcessor(config['processed_images_dir'])
self.ocr_processor = InvoiceOCRProcessor(config.get('tesseract_cmd'))
self.db_manager = InvoiceDatabase(config['db_path'])
self.report_generator = InvoiceReportGenerator(self.db_manager, config['reports_dir'])
def process_invoice_batch(self, image_dir):
"""处理一批发票图像"""
print(f"\n===== 开始处理发票批次: {datetime.now()} =====")
# 步骤1:图像预处理
print("\n1. 图像预处理...")
processed_images = self.image_processor.batch_process(image_dir)
print(f"预处理完成,共处理{len(processed_images)}张图像")
# 步骤2:OCR识别与数据提取
print("\n2. OCR识别与数据提取...")
ocr_results = self.ocr_processor.batch_process(processed_images)
print(f"OCR识别完成,成功提取{len(ocr_results)}张发票信息")
# 保存OCR结果到JSON文件(可选)
json_path = os.path.join(self.config['archive_dir'], f"ocr_results_{datetime.now().strftime('%Y%m%d%H%M%S')}.json")
self.ocr_processor.save_results(ocr_results, json_path)
# 步骤3:存入数据库
print("\n3. 存储发票数据...")
inserted_ids = self.db_manager.batch_insert(ocr_results)
print(f"数据存储完成,成功存入{len(inserted_ids)}条记录")
# 步骤4:自动分类(简单规则)
print("\n4. 自动分类发票...")
for invoice_data in ocr_results:
invoice_id = None
for id in inserted_ids:
invoice = self.db_manager.get_invoice_by_id(id)
if (invoice['invoice_code'] == invoice_data.get('invoice_code') and
invoice['invoice_number'] == invoice_data.get('invoice_number')):
invoice_id = id
break
if invoice_id:
# 简单分类规则
seller = invoice_data.get('seller', '').lower()
if '餐' in seller or '食' in seller or '饭' in seller or '酒店' in seller:
self.db_manager.categorize_invoice(invoice_id, '餐饮费')
elif '交通' in seller or '出租' in seller or '打车' in seller or '高铁' in seller or '航空' in seller:
self.db_manager.categorize_invoice(invoice_id, '交通费')
elif '办公' in seller or '文具' in seller or '耗材' in seller:
self.db_manager.categorize_invoice(invoice_id, '办公用品')
elif '通信' in seller or '电信' in seller or '移动' in seller or '联通' in seller:
self.db_manager.categorize_invoice(invoice_id, '通讯费')
else:
self.db_manager.categorize_invoice(invoice_id, '其他')
print("发票分类完成")
return len(inserted_ids)
def generate_reports(self):
"""生成报表"""
print("\n5. 生成报表...")
# 获取当前年月
now = datetime.now()
year = now.year
month = now.month
# 生成月度报表
monthly_report = self.report_generator.generate_monthly_report(year, month)
# 如果是年底,生成年度报表
if month == 12:
annual_report = self.report_generator.generate_annual_report(year)
print("报表生成完成")
def run_scheduled_tasks(self):
"""运行定时任务"""
# 每天凌晨2点处理新增发票
schedule.every().day.at("02:00").do(self.process_invoice_batch, self.config['watch_dir'])
# 每月1日生成上月报表
def generate_last_month_report():
now = datetime.now()
year = now.year
month = now.month - 1
if month == 0:
month = 12
year -= 1
self.report_generator.generate_monthly_report(year, month)
schedule.every().month.at("00:01").do(generate_last_month_report)
# 每年1月1日生成上年年度报表
def generate_last_year_report():
year = datetime.now().year - 1
self.report_generator.generate_annual_report(year)
schedule.every().year.at("01-01 00:05").do(generate_last_year_report)
print("定时任务已设置,系统运行中...")
while True:
schedule.run_pending()
time.sleep(60)
def close(self):
"""关闭系统,释放资源"""
self.db_manager.close()
print("系统已关闭")
# 使用示例
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='发票识别与归档系统')
parser.add_argument('--mode', choices=['process', 'report', 'service'], default='process',
help='运行模式:process-处理发票,report-生成报表,service-启动服务')
parser.add_argument('--input', help='输入目录,包含待处理的发票图像')
parser.add_argument('--year', type=int, help='报表年份')
parser.add_argument('--month', type=int, help='报表月份')
args = parser.parse_args()
# 系统配置
config = {
'db_path': 'invoice_system.db',
'processed_images_dir': './processed_images',
'reports_dir': './reports',
'archive_dir': './archive',
'watch_dir': './new_invoices',
'tesseract_cmd': r'C:\Program Files\Tesseract-OCR\tesseract.exe' # Windows路径示例
}
# 创建系统实例
system = InvoiceSystem(config)
try:
if args.mode == 'process':
if not args.input:
print("错误:处理模式需要指定输入目录 --input")
else:
system.process_invoice_batch(args.input)
system.generate_reports()
elif args.mode == 'report':
if args.year and args.month:
system.report_generator.generate_monthly_report(args.year, args.month)
elif args.year:
system.report_generator.generate_annual_report(args.year)
else:
print("错误:报表模式需要指定年份 --year,可选指定月份 --month")
elif args.mode == 'service':
system.run_scheduled_tasks()
finally:
system.close()
实际应用场景
1. 财务报销自动化
员工提交报销申请时,只需上传发票照片,系统自动识别发票信息、验证真伪、分类归档,并生成报销凭证,大大简化了财务人员的工作量,提高了报销处理效率。
2. 税务申报辅助
系统可以自动识别和归类增值税专用发票,计算可抵扣进项税额,生成纳税申报所需的汇总表,帮助企业更准确、高效地完成税务申报工作。
3. 财务档案数字化
将纸质发票通过扫描仪批量导入系统,实现财务档案的数字化管理,便于长期保存和随时查询,同时节省物理存储空间,降低档案管理成本。
4. 费用分析与预算控制
系统可以按部门、项目、费用类型等多维度统计和分析发票数据,生成可视化报表,帮助管理层了解费用构成,优化预算分配,加强成本控制。
进阶优化
1. 发票真伪验证
通过调用税务局提供的API接口,实现发票真伪自动验证:
python
import requests
def verify_invoice(invoice_code, invoice_number, invoice_date, amount):
"""调用税务局API验证发票真伪"""
api_url = "https://api.example.com/invoice/verify" # 替换为实际的API地址
params = {
"invoice_code": invoice_code,
"invoice_number": invoice_number,
"invoice_date": invoice_date,
"amount": amount
}
try:
response = requests.post(api_url, json=params)
result = response.json()
if result.get("status") == "valid":
return True, "发票有效"
else:
return False, result.get("message", "发票无效")
except Exception as e:
return False, f"验证过程出错: {str(e)}"
2. 智能分类与标签
使用机器学习算法对发票进行智能分类,提高分类准确率:
python
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
import joblib
class InvoiceClassifier:
"""发票智能分类器"""
def __init__(self, model_path=None):
"""初始化分类器"""
self.vectorizer = TfidfVectorizer(max_features=1000)
self.classifier = RandomForestClassifier(n_estimators=100, random_state=42)
self.model_path = model_path
if model_path and os.path.exists(model_path):
self.load_model(model_path)
def prepare_data(self, invoices):
"""准备训练数据"""
texts = []
categories = []
for invoice in invoices:
# 组合发票文本特征
text = f"{invoice.get('seller', '')} {invoice.get('invoice_type', '')} {invoice.get('ocr_text', '')}"
texts.append(text)
# 获取分类标签
categories.append(invoice.get('category', '其他'))
return texts, categories
def train(self, invoices):
"""训练分类模型"""
texts, categories = self.prepare_data(invoices)
# 文本向量化
X = self.vectorizer.fit_transform(texts)
y = np.array(categories)
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练模型
self.classifier.fit(X_train, y_train)
# 评估模型
y_pred = self.classifier.predict(X_test)
report = classification_report(y_test, y_pred)
print("模型评估报告:\n", report)
# 保存模型
if self.model_path:
self.save_model(self.model_path)
def predict(self, invoice):
"""预测发票分类"""
# 组合发票文本特征
text = f"{invoice.get('seller', '')} {invoice.get('invoice_type', '')} {invoice.get('ocr_text', '')}"
# 文本向量化
X = self.vectorizer.transform([text])
# 预测分类
category = self.classifier.predict(X)[0]
return category
def save_model(self, path):
"""保存模型"""
model_data = {
'vectorizer': self.vectorizer,
'classifier': self.classifier
}
joblib.dump(model_data, path)
print(f"模型已保存至: {path}")
def load_model(self, path):
"""加载模型"""
try:
model_data = joblib.load(path)
self.vectorizer = model_data['vectorizer']
self.classifier = model_data['classifier']
print(f"模型已从{path}加载")
return True
except Exception as e:
print(f"加载模型时出错: {e}")
return False
3. Web界面开发
使用Flask框架开发一个简单的Web界面,方便用户上传、查询和管理发票:
python
from flask import Flask, request, render_template, redirect, url_for, jsonify, send_file
import os
import uuid
from werkzeug.utils import secure_filename
import json
from datetime import datetime
app = Flask(__name__)
# 配置上传文件存储路径
UPLOAD_FOLDER = './uploads'
PROCESSED_FOLDER = './processed_images'
REPORTS_FOLDER = './reports'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(PROCESSED_FOLDER, exist_ok=True)
os.makedirs(REPORTS_FOLDER, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 限制上传文件大小为16MB
# 初始化系统组件
config = {
'db_path': 'invoice_system.db',
'processed_images_dir': PROCESSED_FOLDER,
'reports_dir': REPORTS_FOLDER,
'archive_dir': './archive',
'watch_dir': UPLOAD_FOLDER
}
system = InvoiceSystem(config)
@app.route('/')
def index():
"""首页"""
return render_template('index.html')
@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
"""上传发票图像"""
if request.method == 'POST':
# 检查是否有文件
if 'file' not in request.files:
return jsonify({'error': '没有选择文件'}), 400
files = request.files.getlist('file')
if not files or files[0].filename == '':
return jsonify({'error': '没有选择文件'}), 400
filenames = []
for file in files:
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
# 添加时间戳避免文件名冲突
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique_filename = f"{timestamp}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(file_path)
filenames.append(file_path)
# 处理上传的发票图像
if filenames:
try:
# 预处理图像
processed_images = system.image_processor.batch_process(app.config['UPLOAD_FOLDER'])
# OCR识别与数据提取
ocr_results = system.ocr_processor.batch_process(processed_images)
# 存入数据库
inserted_ids = system.db_manager.batch_insert(ocr_results)
return jsonify({
'success': True,
'message': f'成功处理{len(inserted_ids)}张发票',
'invoice_ids': inserted_ids
})
except Exception as e:
return jsonify({'error': f'处理发票时出错: {str(e)}'}), 500
return jsonify({'error': '没有有效的文件上传'}), 400
return render_template('upload.html')
@app.route('/invoices')
def list_invoices():
"""列出所有发票"""
try:
# 获取查询参数
criteria = {}
for key in ['invoice_code', 'invoice_number', 'seller', 'buyer', 'invoice_type', 'status']:
if key in request.args and request.args[key]:
criteria[key] = request.args[key]
if 'date_from' in request.args and request.args['date_from']:
criteria['date_from'] = request.args['date_from']
if 'date_to' in request.args and request.args['date_to']:
criteria['date_to'] = request.args['date_to']
if 'amount_min' in request.args and request.args['amount_min']:
criteria['amount_min'] = float(request.args['amount_min'])
if 'amount_max' in request.args and request.args['amount_max']:
criteria['amount_max'] = float(request.args['amount_max'])
# 查询发票
invoices = system.db_manager.search_invoices(criteria)
return render_template('invoices.html', invoices=invoices)
except Exception as e:
return jsonify({'error': f'查询发票时出错: {str(e)}'}), 500
@app.route('/invoice/<int:invoice_id>')
def view_invoice(invoice_id):
"""查看单张发票详情"""
try:
invoice = system.db_manager.get_invoice_by_id(invoice_id)
if not invoice:
return jsonify({'error': '发票不存在'}), 404
return render_template('invoice_detail.html', invoice=invoice)
except Exception as e:
return jsonify({'error': f'获取发票详情时出错: {str(e)}'}), 500
@app.route('/reports')
def list_reports():
"""列出所有报表"""
try:
reports = []
for filename in os.listdir(REPORTS_FOLDER):
if filename.endswith('.pdf'):
file_path = os.path.join(REPORTS_FOLDER, filename)
reports.append({
'filename': filename,
'path': file_path,
'size': os.path.getsize(file_path),
'created_time': datetime.fromtimestamp(os.path.getctime(file_path)).strftime('%Y-%m-%d %H:%M:%S')
})
return render_template('reports.html', reports=reports)
except Exception as e:
return jsonify({'error': f'获取报表列表时出错: {str(e)}'}), 500
@app.route('/generate_report', methods=['GET', 'POST'])
def generate_report():
"""生成报表"""
if request.method == 'POST':
try:
report_type = request.form.get('report_type')
if report_type == 'monthly':
year = int(request.form.get('year'))
month = int(request.form.get('month'))
report_path = system.report_generator.generate_monthly_report(year, month)
elif report_type == 'annual':
year = int(request.form.get('year'))
report_path = system.report_generator.generate_annual_report(year)
elif report_type == 'custom':
# 构建查询条件
criteria = {}
for key in ['invoice_code', 'invoice_number', 'seller', 'buyer', 'invoice_type', 'status']:
if key in request.form and request.form[key]:
criteria[key] = request.form[key]
if 'date_from' in request.form and request.form['date_from']:
criteria['date_from'] = request.form['date_from']
if 'date_to' in request.form and request.form['date_to']:
criteria['date_to'] = request.form['date_to']
if 'amount_min' in request.form and request.form['amount_min']:
criteria['amount_min'] = float(request.form['amount_min'])
if 'amount_max' in request.form and request.form['amount_max']:
criteria['amount_max'] = float(request.form['amount_max'])
title = request.form.get('title', '自定义发票报表')
report_path = system.report_generator.generate_custom_report(criteria, title)
else:
return jsonify({'error': '无效的报表类型'}), 400
if report_path:
return redirect(url_for('list_reports'))
else:
return jsonify({'error': '生成报表失败'}), 500
except Exception as e:
return jsonify({'error': f'生成报表时出错: {str(e)}'}), 500
return render_template('generate_report.html')
@app.route('/download_report/<path:filename>')
def download_report(filename):
"""下载报表"""
try:
return send_file(os.path.join(REPORTS_FOLDER, filename), as_attachment=True)
except Exception as e:
return jsonify({'error': f'下载报表时出错: {str(e)}'}), 500
def allowed_file(filename):
"""检查文件类型是否允许"""
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'tif', 'tiff', 'pdf'}
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
4. 移动应用集成
通过开发RESTful API,实现与移动应用的集成,使用户可以随时随地通过手机拍照上传发票:
python
@app.route('/api/upload', methods=['POST'])
def api_upload_file():
"""API接口:上传发票图像"""
# 验证API密钥
api_key = request.headers.get('X-API-Key')
if not api_key or not validate_api_key(api_key):
return jsonify({'error': '无效的API密钥'}), 401
# 检查是否有文件
if 'file' not in request.files:
return jsonify({'error': '没有选择文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '没有选择文件'}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
# 添加时间戳避免文件名冲突
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique_filename = f"{timestamp}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(file_path)
try:
# 预处理图像
processed_path = system.image_processor.process_image(file_path)
# OCR识别与数据提取
invoice_data = system.ocr_processor.process_invoice(processed_path)
# 存入数据库
invoice_id = system.db_manager.insert_invoice(invoice_data)
# 返回识别结果
return jsonify({
'success': True,
'invoice_id': invoice_id,
'invoice_data': invoice_data
})
except Exception as e:
return jsonify({'error': f'处理发票时出错: {str(e)}'}), 500
return jsonify({'error': '不支持的文件类型'}), 400
@app.route('/api/invoices', methods=['GET'])
def api_list_invoices():
"""API接口:获取发票列表"""
# 验证API密钥
api_key = request.headers.get('X-API-Key')
if not api_key or not validate_api_key(api_key):
return jsonify({'error': '无效的API密钥'}), 401
try:
# 获取查询参数
criteria = {}
for key in ['invoice_code', 'invoice_number', 'seller', 'buyer', 'invoice_type', 'status']:
if key in request.args and request.args[key]:
criteria[key] = request.args[key]
if 'date_from' in request.args and request.args['date_from']:
criteria['date_from'] = request.args['date_from']
if 'date_to' in request.args and request.args['date_to']:
criteria['date_to'] = request.args['date_to']
if 'amount_min' in request.args and request.args['amount_min']:
criteria['amount_min'] = float(request.args['amount_min'])
if 'amount_max' in request.args and request.args['amount_max']:
criteria['amount_max'] = float(request.args['amount_max'])
# 分页参数
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
# 查询发票
invoices = system.db_manager.search_invoices(criteria)
# 简单分页
start = (page - 1) * per_page
end = start + per_page
paginated_invoices = invoices[start:end]
return jsonify({
'success': True,
'total': len(invoices),
'page': page,
'per_page': per_page,
'invoices': paginated_invoices
})
except Exception as e:
return jsonify({'error': f'查询发票时出错: {str(e)}'}), 500
def validate_api_key(api_key):
"""验证API密钥"""
# 实际应用中应该从数据库或配置文件中读取有效的API密钥
valid_keys = ['test_key_123', 'mobile_app_key_456']
return api_key in valid_keys
小结
本节介绍了如何使用Python构建一个完整的发票识别与归档系统,实现了从OCR识别发票图片、提取关键字段、存入数据库到生成PDF汇总表的全流程自动化。通过这个系统,企业可以大大提高财务处理效率,减少人工错误,实现财务档案的数字化管理。
在实际应用中,可以根据具体需求进一步扩展系统功能,如:
- 增强OCR识别准确率:针对不同类型的发票(增值税专用发票、普通发票、电子发票等)优化识别算法
- 多语言支持:扩展系统以支持多语言发票的识别和处理
- 区块链存储:利用区块链技术确保发票数据的不可篡改性和可追溯性
- 深度学习模型:使用深度学习模型进一步提高发票分类和信息提取的准确率
- 云端部署:将系统部署到云服务器,实现多地协同办公
通过本节的学习,读者应该能够掌握使用Python进行OCR识别、数据提取、数据库存储和PDF报表生成的基本技能,为构建更复杂的自动化办公系统打下基础。