Skip to content

1.2.整理文件自动化

在日常工作中,我们经常需要处理大量的文件:整理照片、归档文档、清理临时文件、按规则重命名等。手动完成这些任务不仅耗时,而且容易出错。通过Python自动化这些操作,可以让你从繁琐的文件管理工作中解放出来,专注于更有创造性的工作。

使用os库操作文件目录

os库是Python的标准库之一,提供了与操作系统交互的基本功能,包括文件和目录操作。

创建和删除目录

python
import os

# 创建单层目录
def create_directory(directory):
    if not os.path.exists(directory):
        os.mkdir(directory)
        print(f"目录已创建: {directory}")
    else:
        print(f"目录已存在: {directory}")

# 创建多层嵌套目录
def create_nested_directories(directory_path):
    """
    创建多层嵌套目录,如果父目录不存在会自动创建
    """
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)
        print(f"多层目录已创建: {directory_path}")
    else:
        print(f"目录已存在: {directory_path}")

# 删除目录
def remove_directory(directory_path, recursive=False):
    """
    删除目录
    recursive: 如果为True,则递归删除目录及其内容
    """
    if not os.path.exists(directory_path):
        print(f"目录不存在: {directory_path}")
        return
    
    if recursive:
        import shutil
        shutil.rmtree(directory_path)
        print(f"已递归删除目录: {directory_path}")
    else:
        try:
            os.rmdir(directory_path)
            print(f"已删除空目录: {directory_path}")
        except OSError as e:
            print(f"删除失败: {e}")

# 使用示例
create_directory("backup")
create_nested_directories("projects/python/automation")
remove_directory("temp", recursive=True)

遍历目录结构

python
import os

def list_directory_tree(directory, indent=0):
    """
    递归打印目录树结构
    """
    if not os.path.exists(directory):
        print(f"目录不存在: {directory}")
        return
    
    print(' ' * indent + os.path.basename(directory) + '/')
    
    try:
        entries = os.listdir(directory)
        for entry in sorted(entries):
            full_path = os.path.join(directory, entry)
            if os.path.isdir(full_path):
                list_directory_tree(full_path, indent + 4)
            else:
                print(' ' * (indent + 4) + entry)
    except PermissionError:
        print(' ' * (indent + 4) + "[权限不足,无法访问]")

# 使用示例
list_directory_tree("projects")

批量重命名文件

python
import os
import re

def batch_rename(directory, pattern, replacement):
    """
    批量重命名文件
    directory: 目标目录
    pattern: 正则表达式模式
    replacement: 替换字符串
    """
    if not os.path.exists(directory) or not os.path.isdir(directory):
        print(f"目录不存在: {directory}")
        return
    
    renamed_count = 0
    regex = re.compile(pattern)
    
    for filename in os.listdir(directory):
        if os.path.isfile(os.path.join(directory, filename)):
            new_filename = regex.sub(replacement, filename)
            
            if new_filename != filename:
                old_path = os.path.join(directory, filename)
                new_path = os.path.join(directory, new_filename)
                
                os.rename(old_path, new_path)
                renamed_count += 1
                print(f"已重命名: {filename} -> {new_filename}")
    
    print(f"重命名完成,共处理 {renamed_count} 个文件")

# 使用示例:将所有文件名中的空格替换为下划线
batch_rename("documents", r"\s+", "_")

使用shutil模块操作文件和文件夹

shutil模块是Python的标准库之一,提供了更高级的文件和目录操作功能,可以看作是os模块的补充。它特别适合用于文件的复制、移动、归档等操作。

文件复制与移动

python
import os
import shutil
from datetime import datetime

def backup_files(source_dir, backup_dir, file_types=None):
    """
    备份指定类型的文件
    source_dir: 源目录
    backup_dir: 备份目录
    file_types: 要备份的文件类型列表,如['.docx', '.xlsx']
    """
    if not os.path.exists(source_dir):
        print(f"源目录不存在: {source_dir}")
        return
    
    # 创建备份目录(如果不存在)
    if not os.path.exists(backup_dir):
        os.makedirs(backup_dir)
    
    # 创建带时间戳的子目录
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_subdir = os.path.join(backup_dir, f"backup_{timestamp}")
    os.makedirs(backup_subdir)
    
    copied_count = 0
    
    # 遍历源目录中的所有文件
    for root, _, files in os.walk(source_dir):
        for file in files:
            file_path = os.path.join(root, file)
            
            # 检查文件类型
            if file_types is None or any(file.lower().endswith(ft.lower()) for ft in file_types):
                # 保持相对路径结构
                rel_path = os.path.relpath(root, source_dir)
                dest_dir = os.path.join(backup_subdir, rel_path)
                
                # 确保目标目录存在
                if not os.path.exists(dest_dir):
                    os.makedirs(dest_dir)
                
                # 复制文件
                dest_path = os.path.join(dest_dir, file)
                shutil.copy2(file_path, dest_path)  # copy2保留元数据(如修改时间)
                copied_count += 1
                print(f"已备份: {file_path} -> {dest_path}")
    
    print(f"备份完成,共复制 {copied_count} 个文件到 {backup_subdir}")
    return backup_subdir

# 使用示例:备份所有Excel和Word文件
backup_files("work_documents", "backups", [".xlsx", ".docx"])

文件移动与重组

python
import os
import shutil
import time

def organize_by_extension(source_dir, target_dir=None):
    """
    按文件扩展名整理文件
    """
    if not os.path.exists(source_dir):
        print(f"源目录不存在: {source_dir}")
        return
    
    # 如果没有指定目标目录,则在源目录下创建organized子目录
    if target_dir is None:
        target_dir = os.path.join(source_dir, "organized")
    
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)
    
    moved_count = 0
    
    for filename in os.listdir(source_dir):
        source_path = os.path.join(source_dir, filename)
        
        # 只处理文件,跳过目录
        if os.path.isfile(source_path):
            # 获取文件扩展名(不带点)
            _, ext = os.path.splitext(filename)
            ext = ext[1:].lower() if ext else "no_extension"
            
            # 创建扩展名对应的目录
            ext_dir = os.path.join(target_dir, ext)
            if not os.path.exists(ext_dir):
                os.makedirs(ext_dir)
            
            # 移动文件
            target_path = os.path.join(ext_dir, filename)
            shutil.move(source_path, target_path)
            moved_count += 1
            print(f"已移动: {filename} -> {ext}/{filename}")
    
    print(f"整理完成,共移动 {moved_count} 个文件")

def organize_by_date(source_dir, target_dir=None, date_format="%Y-%m"):
    """
    按文件修改日期整理文件
    """
    if not os.path.exists(source_dir):
        print(f"源目录不存在: {source_dir}")
        return
    
    # 如果没有指定目标目录,则在源目录下创建by_date子目录
    if target_dir is None:
        target_dir = os.path.join(source_dir, "by_date")
    
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)
    
    moved_count = 0
    
    for filename in os.listdir(source_dir):
        source_path = os.path.join(source_dir, filename)
        
        # 只处理文件,跳过目录
        if os.path.isfile(source_path):
            # 获取文件修改时间
            mod_time = os.path.getmtime(source_path)
            date_str = time.strftime(date_format, time.localtime(mod_time))
            
            # 创建日期对应的目录
            date_dir = os.path.join(target_dir, date_str)
            if not os.path.exists(date_dir):
                os.makedirs(date_dir)
            
            # 移动文件
            target_path = os.path.join(date_dir, filename)
            shutil.move(source_path, target_path)
            moved_count += 1
            print(f"已移动: {filename} -> {date_str}/{filename}")
    
    print(f"整理完成,共移动 {moved_count} 个文件")

# 使用示例
organize_by_extension("downloads")
organize_by_date("photos", date_format="%Y-%m")

清理重复文件

python
import os
import hashlib
from collections import defaultdict

def find_duplicate_files(directory):
    """
    查找目录中的重复文件
    返回一个字典,键为文件内容的哈希值,值为具有相同内容的文件路径列表
    """
    if not os.path.exists(directory):
        print(f"目录不存在: {directory}")
        return {}
    
    # 按文件大小分组,减少哈希计算
    size_to_files = defaultdict(list)
    
    # 第一步:按文件大小分组
    for root, _, files in os.walk(directory):
        for filename in files:
            file_path = os.path.join(root, filename)
            try:
                file_size = os.path.getsize(file_path)
                size_to_files[file_size].append(file_path)
            except (OSError, IOError) as e:
                print(f"无法访问文件 {file_path}: {e}")
    
    # 第二步:对相同大小的文件计算哈希值
    duplicates = defaultdict(list)
    
    for size, files in size_to_files.items():
        # 只处理有多个相同大小文件的情况
        if len(files) > 1:
            for file_path in files:
                try:
                    with open(file_path, 'rb') as f:
                        file_hash = hashlib.md5(f.read()).hexdigest()
                        duplicates[file_hash].append(file_path)
                except (OSError, IOError) as e:
                    print(f"无法读取文件 {file_path}: {e}")
    
    # 过滤掉没有重复的文件
    return {hash_val: paths for hash_val, paths in duplicates.items() if len(paths) > 1}

def remove_duplicates(directory, keep_newest=True):
    """
    删除重复文件,保留最新的或最旧的文件
    """
    duplicates = find_duplicate_files(directory)
    
    if not duplicates:
        print("未找到重复文件")
        return
    
    removed_count = 0
    saved_space = 0
    
    for hash_val, file_paths in duplicates.items():
        # 按修改时间排序
        sorted_paths = sorted(file_paths, key=os.path.getmtime, reverse=keep_newest)
        
        # 保留第一个文件(最新的或最旧的),删除其余文件
        keep_file = sorted_paths[0]
        files_to_remove = sorted_paths[1:]
        
        print(f"保留文件: {keep_file}")
        
        for file_path in files_to_remove:
            try:
                file_size = os.path.getsize(file_path)
                os.remove(file_path)
                removed_count += 1
                saved_space += file_size
                print(f"已删除重复文件: {file_path}")
            except (OSError, IOError) as e:
                print(f"删除失败 {file_path}: {e}")
    
    print(f"清理完成,共删除 {removed_count} 个重复文件,节省空间 {saved_space / (1024*1024):.2f} MB")

# 使用示例:清理重复文件,保留最新的版本
remove_duplicates("documents", keep_newest=True)

实际应用场景

场景一:照片整理助手

python
import os
import shutil
import time
from datetime import datetime
import re
from PIL import Image
import piexif

def organize_photos(photo_dir, output_dir=None):
    """
    智能整理照片:按拍摄日期分类,重命名,去除重复
    """
    if not os.path.exists(photo_dir):
        print(f"照片目录不存在: {photo_dir}")
        return
    
    if output_dir is None:
        output_dir = os.path.join(photo_dir, "organized_photos")
    
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # 支持的图片格式
    image_extensions = [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff"]
    processed_count = 0
    skipped_count = 0
    
    # 遍历所有文件
    for root, _, files in os.walk(photo_dir):
        for filename in files:
            if any(filename.lower().endswith(ext) for ext in image_extensions):
                file_path = os.path.join(root, filename)
                
                try:
                    # 尝试从EXIF数据获取拍摄日期
                    date_taken = None
                    try:
                        img = Image.open(file_path)
                        if "exif" in img.info:
                            exif_dict = piexif.load(img.info["exif"])
                            if piexif.ExifIFD.DateTimeOriginal in exif_dict["Exif"]:
                                date_str = exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal].decode("utf-8")
                                date_taken = datetime.strptime(date_str, "%Y:%m:%d %H:%M:%S")
                    except Exception as e:
                        print(f"无法读取EXIF数据 {file_path}: {e}")
                    
                    # 如果无法从EXIF获取日期,则使用文件修改时间
                    if date_taken is None:
                        mod_time = os.path.getmtime(file_path)
                        date_taken = datetime.fromtimestamp(mod_time)
                    
                    # 创建年月目录
                    year_month = date_taken.strftime("%Y-%m")
                    target_dir = os.path.join(output_dir, year_month)
                    if not os.path.exists(target_dir):
                        os.makedirs(target_dir)
                    
                    # 创建新文件名:日期_序号.扩展名
                    base_name = date_taken.strftime("%Y%m%d_%H%M%S")
                    _, ext = os.path.splitext(filename)
                    
                    # 处理文件名冲突
                    new_filename = f"{base_name}{ext.lower()}"
                    target_path = os.path.join(target_dir, new_filename)
                    
                    counter = 1
                    while os.path.exists(target_path):
                        new_filename = f"{base_name}_{counter}{ext.lower()}"
                        target_path = os.path.join(target_dir, new_filename)
                        counter += 1
                    
                    # 复制文件
                    shutil.copy2(file_path, target_path)
                    processed_count += 1
                    print(f"已整理: {filename} -> {year_month}/{new_filename}")
                    
                except Exception as e:
                    print(f"处理失败 {file_path}: {e}")
                    skipped_count += 1
    
    print(f"整理完成,共处理 {processed_count} 张照片,跳过 {skipped_count} 个文件")
    return output_dir

# 使用示例
organize_photos("vacation_photos")

场景二:项目归档工具

python
import os
import shutil
import time
import json
import zipfile

def archive_project(project_dir, archive_dir=None, include_patterns=None, exclude_patterns=None):
    """
    智能归档项目:压缩、记录元数据、清理临时文件
    """
    if not os.path.exists(project_dir):
        print(f"项目目录不存在: {project_dir}")
        return
    
    project_name = os.path.basename(project_dir)
    
    # 默认归档到项目父目录下的archives文件夹
    if archive_dir is None:
        parent_dir = os.path.dirname(project_dir)
        archive_dir = os.path.join(parent_dir, "archives")
    
    if not os.path.exists(archive_dir):
        os.makedirs(archive_dir)
    
    # 默认包含所有文件
    if include_patterns is None:
        include_patterns = ["*"]
    
    # 默认排除一些常见的临时文件和目录
    if exclude_patterns is None:
        exclude_patterns = [
            "__pycache__", ".git", ".svn", ".DS_Store", "*.pyc", 
            "*.tmp", "*.temp", "*.log", "node_modules", "venv", "env",
            "*.bak", "~*"
        ]
    
    # 创建时间戳
    timestamp = time.strftime("%Y%m%d_%H%M%S")
    archive_name = f"{project_name}_{timestamp}"
    archive_path = os.path.join(archive_dir, archive_name)
    os.makedirs(archive_path)
    
    # 收集项目元数据
    metadata = {
        "project_name": project_name,
        "archive_date": time.strftime("%Y-%m-%d %H:%M:%S"),
        "original_path": os.path.abspath(project_dir),
        "file_count": 0,
        "total_size": 0,
        "file_types": {}
    }
    
    # 复制文件到归档目录
    for root, dirs, files in os.walk(project_dir):
        # 跳过被排除的目录
        dirs[:] = [d for d in dirs if not any(exclude_pattern in d for exclude_pattern in exclude_patterns)]
        
        # 处理文件
        for filename in files:
            # 检查是否应该包含该文件
            if any(exclude_pattern in filename for exclude_pattern in exclude_patterns):
                continue
            
            file_path = os.path.join(root, filename)
            rel_path = os.path.relpath(file_path, project_dir)
            target_path = os.path.join(archive_path, rel_path)
            
            # 确保目标目录存在
            os.makedirs(os.path.dirname(target_path), exist_ok=True)
            
            # 复制文件
            shutil.copy2(file_path, target_path)
            
            # 更新元数据
            file_size = os.path.getsize(file_path)
            metadata["file_count"] += 1
            metadata["total_size"] += file_size
            
            # 统计文件类型
            _, ext = os.path.splitext(filename)
            ext = ext.lower() if ext else "no_extension"
            if ext in metadata["file_types"]:
                metadata["file_types"][ext] += 1
            else:
                metadata["file_types"][ext] = 1
    
    # 保存元数据
    metadata_path = os.path.join(archive_path, "archive_metadata.json")
    with open(metadata_path, "w", encoding="utf-8") as f:
        json.dump(metadata, f, indent=2)
    
    # 创建ZIP压缩包
    zip_path = f"{archive_path}.zip"
    with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(archive_path):
            for file in files:
                file_path = os.path.join(root, file)
                rel_path = os.path.relpath(file_path, archive_path)
                zipf.write(file_path, rel_path)
    
    # 删除临时归档目录
    shutil.rmtree(archive_path)
    
    print(f"项目归档完成: {zip_path}")
    print(f"共归档 {metadata['file_count']} 个文件,总大小 {metadata['total_size'] / (1024*1024):.2f} MB")
    return zip_path

# 使用示例
archive_project("my_project", exclude_patterns=["__pycache__", ".git", "*.pyc", "venv"])

通过这些实用的代码示例,你可以轻松实现各种文件整理自动化任务,大幅提高工作效率。无论是日常的文件管理,还是专业的项目归档,这些技巧都能帮你节省大量时间,让你的文件井然有序。