Skip to content

1.1.读写文件自动化

在日常工作中,文件操作是最基础也是最常见的任务之一。无论是批量处理数据文件,还是整理工作文档,掌握高效的文件读写技巧都能极大提升工作效率。本文将介绍几种实用的Python文件操作方法,帮助你轻松应对各种文件处理需求。

使用pathlib库操作文件

传统的文件路径处理往往依赖于os和os.path模块,代码繁琐且平台兼容性差。而Python 3.4引入的pathlib库提供了面向对象的文件系统路径处理方式,使代码更简洁、更易读。

基本路径操作

python
from pathlib import Path

# 创建路径对象
file_path = Path('工作报告.docx')
project_dir = Path('/Users/sunlei/projects')

# 路径拼接(无需担心斜杠问题)
doc_path = project_dir / 'documents' / file_path
print(doc_path)  # 输出: /Users/sunlei/projects/documents/工作报告.docx

# 获取路径信息
print(doc_path.name)      # 输出: 工作报告.docx
print(doc_path.suffix)    # 输出: .docx
print(doc_path.stem)      # 输出: 工作报告
print(doc_path.parent)    # 输出: /Users/sunlei/projects/documents

文件重命名与删除

对文件进行重命名是常见的需求,通过pathlib库的Path.rename方法可以轻松实现对某文件的重命名操作。

python
from pathlib import Path
import datetime

# 获取当前日期
today = datetime.date.today().strftime('%Y%m%d')

# 批量重命名文件(添加日期前缀)
def rename_with_date(directory, pattern='*.txt'):
    dir_path = Path(directory)
    for file_path in dir_path.glob(pattern):
        # 构建新文件名
        new_name = f"{today}_{file_path.name}"
        new_path = file_path.with_name(new_name)
        
        # 执行重命名
        file_path.rename(new_path)
        print(f"已重命名: {file_path.name} -> {new_name}")

# 使用示例
rename_with_date('./reports')

Path.unlink方法等价于os.remove方法,用于删除已存在的文件;Path.rmdir方法等价于os.rmdir方法,用于删除空的目录,如果目录非空,该方法会抛出异常。

python
# 安全删除文件
def safe_delete(file_path):
    path = Path(file_path)
    if path.exists():
        if path.is_file():
            path.unlink()
            print(f"已删除文件: {path}")
        elif path.is_dir() and not any(path.iterdir()):
            path.rmdir()
            print(f"已删除空目录: {path}")
        else:
            print(f"目录非空,无法删除: {path}")
    else:
        print(f"路径不存在: {path}")

文件查找与遍历

使用的listdir()函数返回的只是文件和子文件夹的名称,而pathlib的glob()函数返回的则是文件和子文件夹的完整路径对象,更加方便操作。

python
# 查找所有Excel文件并按修改时间排序
def find_excel_files(directory):
    dir_path = Path(directory)
    excel_files = list(dir_path.glob('**/*.xlsx')) + list(dir_path.glob('**/*.xls'))
    
    # 按修改时间排序
    excel_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
    
    print(f"找到 {len(excel_files)} 个Excel文件:")
    for file in excel_files[:5]:  # 只显示前5个
        mod_time = datetime.datetime.fromtimestamp(file.stat().st_mtime)
        print(f"{file.name} - 修改时间: {mod_time.strftime('%Y-%m-%d %H:%M')}")
    
    return excel_files

文件读写操作

pathlib还提供了简便的文件读写方法,无需传统的open()函数:

python
# 读取文本文件
def read_log_file(log_path):
    path = Path(log_path)
    if path.exists() and path.is_file():
        # 直接读取文本内容
        content = path.read_text(encoding='utf-8')
        lines = content.split('\n')
        print(f"日志共 {len(lines)} 行")
        
        # 查找错误信息
        error_lines = [line for line in lines if 'ERROR' in line]
        if error_lines:
            print(f"发现 {len(error_lines)} 条错误记录:")
            for line in error_lines[:3]:  # 只显示前3条
                print(f"- {line}")
    else:
        print(f"日志文件不存在: {path}")

# 写入文本文件
def append_to_log(log_path, message):
    path = Path(log_path)
    timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    log_entry = f"[{timestamp}] {message}\n"
    
    # 追加内容到文件
    path.write_text(log_entry, encoding='utf-8') if not path.exists() else path.open('a', encoding='utf-8').write(log_entry)
    print(f"日志已更新: {path}")

使用zipfile、tarfile压缩解压文件

在处理大量文件时,压缩和解压是常见的需求。Python中提供了zipfile与tarfile内置库来分别实现对两种常见压缩文件格式的操作。

ZIP文件操作

python
import zipfile
from pathlib import Path
import os

# 创建ZIP压缩文件
def create_zip_archive(directory, zip_name=None):
    dir_path = Path(directory)
    
    # 如果没有指定压缩包名称,使用目录名
    if zip_name is None:
        zip_name = f"{dir_path.name}.zip"
    
    zip_path = dir_path.parent / zip_name
    
    # 创建压缩文件
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # 遍历目录下所有文件
        for file_path in dir_path.rglob('*'):
            if file_path.is_file():
                # 计算相对路径作为压缩包内路径
                rel_path = file_path.relative_to(dir_path.parent)
                zipf.write(file_path, rel_path)
                print(f"已添加: {rel_path}")
    
    print(f"压缩完成: {zip_path},大小: {zip_path.stat().st_size / 1024:.2f} KB")
    return zip_path

# 解压ZIP文件
def extract_zip_archive(zip_path, extract_to=None):
    zip_path = Path(zip_path)
    
    # 如果没有指定解压目录,使用当前目录
    if extract_to is None:
        extract_to = zip_path.parent / zip_path.stem
    
    extract_path = Path(extract_to)
    extract_path.mkdir(exist_ok=True)
    
    with zipfile.ZipFile(zip_path, 'r') as zipf:
        # 获取压缩包内文件列表
        file_list = zipf.namelist()
        print(f"压缩包内共 {len(file_list)} 个文件")
        
        # 解压所有文件
        zipf.extractall(extract_path)
        print(f"解压完成: {extract_path}")
    
    return extract_path

TAR文件操作

python
import tarfile
from pathlib import Path

# 创建TAR压缩文件
def create_tar_archive(directory, tar_name=None, compression='gz'):
    dir_path = Path(directory)
    
    # 如果没有指定压缩包名称,使用目录名
    if tar_name is None:
        tar_name = f"{dir_path.name}.tar.{compression}"
    
    tar_path = dir_path.parent / tar_name
    
    # 设置压缩模式
    mode = f"w:{compression}" if compression else "w"
    
    # 创建压缩文件
    with tarfile.open(tar_path, mode) as tarf:
        # 添加整个目录
        tarf.add(dir_path, arcname=dir_path.name)
        print(f"已添加目录: {dir_path}")
    
    print(f"压缩完成: {tar_path},大小: {tar_path.stat().st_size / 1024:.2f} KB")
    return tar_path

# 解压TAR文件
def extract_tar_archive(tar_path, extract_to=None):
    tar_path = Path(tar_path)
    
    # 如果没有指定解压目录,使用当前目录
    if extract_to is None:
        extract_to = tar_path.parent
    
    extract_path = Path(extract_to)
    extract_path.mkdir(exist_ok=True)
    
    # 自动检测压缩格式
    with tarfile.open(tar_path, 'r:*') as tarf:
        # 获取压缩包内文件列表
        file_list = tarf.getnames()
        print(f"压缩包内共 {len(file_list)} 个文件/目录")
        
        # 解压所有文件
        tarf.extractall(extract_path)
        print(f"解压完成: {extract_path}")
    
    return extract_path

实际应用场景

场景一:日志文件自动归档

python
from pathlib import Path
import zipfile
import datetime
import shutil

def archive_logs(log_dir, days_to_keep=30):
    """自动归档超过指定天数的日志文件"""
    log_path = Path(log_dir)
    today = datetime.datetime.now()
    archive_dir = log_path / 'archives'
    archive_dir.mkdir(exist_ok=True)
    
    # 获取所有日志文件
    log_files = list(log_path.glob('*.log'))
    archived_count = 0
    
    for log_file in log_files:
        # 获取文件修改时间
        mtime = datetime.datetime.fromtimestamp(log_file.stat().st_mtime)
        days_old = (today - mtime).days
        
        # 如果文件超过保留天数,进行归档
        if days_old > days_to_keep:
            # 创建年月子目录
            year_month = mtime.strftime('%Y-%m')
            month_dir = archive_dir / year_month
            month_dir.mkdir(exist_ok=True)
            
            # 创建压缩文件
            zip_name = f"{log_file.stem}_{mtime.strftime('%Y%m%d')}.zip"
            zip_path = month_dir / zip_name
            
            with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
                zipf.write(log_file, log_file.name)
            
            # 删除原日志文件
            log_file.unlink()
            archived_count += 1
            print(f"已归档: {log_file.name} -> {zip_path}")
    
    print(f"归档完成,共处理 {archived_count} 个日志文件")
    return archived_count

场景二:批量文件格式转换

python
from pathlib import Path
import csv
import json

def convert_csv_to_json(csv_dir, output_dir=None):
    """批量将CSV文件转换为JSON格式"""
    csv_path = Path(csv_dir)
    
    # 如果没有指定输出目录,在原目录创建json子目录
    if output_dir is None:
        output_dir = csv_path / 'json_output'
    
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)
    
    # 获取所有CSV文件
    csv_files = list(csv_path.glob('*.csv'))
    converted_count = 0
    
    for csv_file in csv_files:
        # 读取CSV文件
        data = []
        try:
            with open(csv_file, 'r', encoding='utf-8', newline='') as f:
                reader = csv.DictReader(f)
                for row in reader:
                    data.append(row)
            
            # 创建对应的JSON文件
            json_file = output_path / f"{csv_file.stem}.json"
            with open(json_file, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
            
            converted_count += 1
            print(f"已转换: {csv_file.name} -> {json_file.name}")
        except Exception as e:
            print(f"转换失败: {csv_file.name} - {str(e)}")
    
    print(f"转换完成,共处理 {converted_count} 个CSV文件")
    return converted_count

通过这些实用的代码示例,你可以轻松实现各种文件操作自动化,大幅提高工作效率。无论是日常的文件整理,还是批量的数据处理,这些技巧都能帮你节省大量时间。