Skip to content

6.2 客户信息管理系统

客户信息管理是企业运营中的核心环节,高效的客户信息管理系统可以帮助企业更好地了解客户需求、提升服务质量、增强客户关系。本节将介绍如何使用Python构建一个自动化的客户信息管理系统,实现从客户资料导入、分类整理、生成PPT展示到自动存档的全流程自动化。

需求分析

一个完整的客户信息管理系统通常需要实现以下功能:

  1. 客户资料导入:从Excel、CSV或数据库中导入客户基础信息
  2. 数据清洗与分类:对客户数据进行清洗、去重、分类和标签化
  3. 数据分析与可视化:生成客户画像、消费趋势等分析报告
  4. PPT展示生成:自动生成客户分析PPT,用于业务汇报
  5. 数据存档与备份:定期将客户数据进行存档和备份

技术选型

我们将使用以下Python库来实现各个环节:

  • pandas:用于数据处理和分析
  • openpyxl:用于Excel文件的读写
  • python-pptx:用于生成PPT演示文稿
  • matplotlib/seaborn:用于数据可视化
  • sqlite3/sqlalchemy:用于数据存储和管理
  • schedule:用于定时任务调度

代码实现

第一步:客户资料导入

python
import pandas as pd
import numpy as np
import os
import sqlite3
from datetime import datetime

class CustomerDataImporter:
    """客户数据导入类"""
    
    def __init__(self, db_path):
        """初始化数据库连接"""
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self.create_tables()
    
    def create_tables(self):
        """创建数据库表结构"""
        cursor = self.conn.cursor()
        
        # 创建客户基本信息表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS customers (
            customer_id TEXT PRIMARY KEY,
            name TEXT,
            gender TEXT,
            age INTEGER,
            phone TEXT,
            email TEXT,
            address TEXT,
            registration_date TEXT,
            customer_type TEXT,
            source TEXT,
            last_updated TEXT
        )
        ''')
        
        # 创建客户交易记录表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS transactions (
            transaction_id TEXT PRIMARY KEY,
            customer_id TEXT,
            transaction_date TEXT,
            amount REAL,
            product_category TEXT,
            product_name TEXT,
            payment_method TEXT,
            FOREIGN KEY (customer_id) REFERENCES customers (customer_id)
        )
        ''')
        
        # 创建客户联系记录表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS contacts (
            contact_id TEXT PRIMARY KEY,
            customer_id TEXT,
            contact_date TEXT,
            contact_type TEXT,
            contact_purpose TEXT,
            contact_result TEXT,
            follow_up_needed INTEGER,
            notes TEXT,
            FOREIGN KEY (customer_id) REFERENCES customers (customer_id)
        )
        ''')
        
        self.conn.commit()
    
    def import_from_excel(self, file_path, sheet_name, table_name):
        """从Excel导入数据"""
        try:
            # 读取Excel文件
            df = pd.read_excel(file_path, sheet_name=sheet_name)
            
            # 添加最后更新时间
            if table_name == 'customers':
                df['last_updated'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            
            # 将数据写入SQLite数据库
            df.to_sql(table_name, self.conn, if_exists='append', index=False)
            
            print(f"成功从{file_path}导入{len(df)}条记录到{table_name}表")
            return len(df)
        except Exception as e:
            print(f"导入数据时出错: {e}")
            return 0
    
    def import_from_csv(self, file_path, table_name):
        """从CSV导入数据"""
        try:
            # 读取CSV文件
            df = pd.read_csv(file_path)
            
            # 添加最后更新时间
            if table_name == 'customers':
                df['last_updated'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            
            # 将数据写入SQLite数据库
            df.to_sql(table_name, self.conn, if_exists='append', index=False)
            
            print(f"成功从{file_path}导入{len(df)}条记录到{table_name}表")
            return len(df)
        except Exception as e:
            print(f"导入数据时出错: {e}")
            return 0
    
    def import_from_database(self, source_db_path, query, table_name):
        """从其他数据库导入数据"""
        try:
            # 连接源数据库
            source_conn = sqlite3.connect(source_db_path)
            
            # 执行查询
            df = pd.read_sql_query(query, source_conn)
            
            # 添加最后更新时间
            if table_name == 'customers':
                df['last_updated'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            
            # 将数据写入目标数据库
            df.to_sql(table_name, self.conn, if_exists='append', index=False)
            
            source_conn.close()
            print(f"成功从{source_db_path}导入{len(df)}条记录到{table_name}表")
            return len(df)
        except Exception as e:
            print(f"从数据库导入数据时出错: {e}")
            return 0
    
    def close(self):
        """关闭数据库连接"""
        if self.conn:
            self.conn.close()

第二步:数据清洗与分类

python
class CustomerDataProcessor:
    """客户数据处理类"""
    
    def __init__(self, db_path):
        """初始化数据库连接"""
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
    
    def clean_customer_data(self):
        """清洗客户数据"""
        # 读取客户数据
        df = pd.read_sql_query("SELECT * FROM customers", self.conn)
        
        # 数据清洗
        # 1. 去除重复记录
        original_count = len(df)
        df = df.drop_duplicates(subset=['customer_id'])
        print(f"去除了{original_count - len(df)}条重复客户记录")
        
        # 2. 处理缺失值
        df['name'] = df['name'].fillna('未知')
        df['gender'] = df['gender'].fillna('未知')
        df['age'] = df['age'].fillna(0).astype(int)
        
        # 3. 标准化电话号码格式
        df['phone'] = df['phone'].astype(str).apply(lambda x: ''.join(filter(str.isdigit, x)) if pd.notna(x) else '')
        
        # 4. 标准化邮箱地址(转为小写)
        df['email'] = df['email'].str.lower() if pd.notna(df['email']).any() else df['email']
        
        # 5. 标准化日期格式
        df['registration_date'] = pd.to_datetime(df['registration_date'], errors='coerce').dt.strftime('%Y-%m-%d')
        
        # 将清洗后的数据写回数据库
        cursor = self.conn.cursor()
        for _, row in df.iterrows():
            cursor.execute('''
            UPDATE customers SET 
                name = ?, gender = ?, age = ?, phone = ?, email = ?, 
                address = ?, registration_date = ?, customer_type = ?, 
                source = ?, last_updated = ?
            WHERE customer_id = ?
            ''', (
                row['name'], row['gender'], row['age'], row['phone'], row['email'],
                row['address'], row['registration_date'], row['customer_type'],
                row['source'], datetime.now().strftime('%Y-%m-%d %H:%M:%S'), row['customer_id']
            ))
        
        self.conn.commit()
        print("客户数据清洗完成")
    
    def categorize_customers(self):
        """对客户进行分类"""
        # 读取客户和交易数据
        customers = pd.read_sql_query("SELECT * FROM customers", self.conn)
        transactions = pd.read_sql_query("SELECT * FROM transactions", self.conn)
        
        # 合并客户和交易数据
        merged_data = pd.merge(transactions, customers, on='customer_id', how='left')
        
        # 按客户ID分组,计算每个客户的消费总额和消费次数
        customer_stats = merged_data.groupby('customer_id').agg({
            'amount': ['sum', 'count'],
            'transaction_date': ['min', 'max']
        })
        
        customer_stats.columns = ['total_spent', 'transaction_count', 'first_purchase', 'last_purchase']
        customer_stats.reset_index(inplace=True)
        
        # 计算客户生命周期价值(CLV)和购买频率
        customer_stats['first_purchase'] = pd.to_datetime(customer_stats['first_purchase'])
        customer_stats['last_purchase'] = pd.to_datetime(customer_stats['last_purchase'])
        customer_stats['customer_age_days'] = (customer_stats['last_purchase'] - customer_stats['first_purchase']).dt.days
        customer_stats['customer_age_days'] = customer_stats['customer_age_days'].apply(lambda x: max(x, 1))  # 避免除以零
        customer_stats['purchase_frequency'] = customer_stats['transaction_count'] / customer_stats['customer_age_days'] * 30  # 每月平均购买次数
        
        # 客户价值分类 (RFM分析: Recency, Frequency, Monetary)
        today = pd.to_datetime('today')
        customer_stats['recency_days'] = (today - customer_stats['last_purchase']).dt.days
        
        # 根据RFM值对客户进行分类
        def classify_customer(row):
            recency = row['recency_days']
            frequency = row['purchase_frequency']
            monetary = row['total_spent']
            
            # 高价值客户
            if recency <= 30 and frequency >= 2 and monetary >= 1000:
                return 'VIP客户'
            # 活跃客户
            elif recency <= 60 and frequency >= 1:
                return '活跃客户'
            # 一般客户
            elif recency <= 180 and monetary >= 500:
                return '一般客户'
            # 沉睡客户
            elif recency > 180 and recency <= 365:
                return '沉睡客户'
            # 流失客户
            else:
                return '流失客户'
        
        customer_stats['customer_category'] = customer_stats.apply(classify_customer, axis=1)
        
        # 更新客户分类到数据库
        cursor = self.conn.cursor()
        for _, row in customer_stats.iterrows():
            cursor.execute('''
            UPDATE customers SET 
                customer_type = ?,
                last_updated = ?
            WHERE customer_id = ?
            ''', (
                row['customer_category'],
                datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                row['customer_id']
            ))
        
        self.conn.commit()
        print("客户分类完成")
        
        # 返回分类结果统计
        category_counts = customer_stats['customer_category'].value_counts().to_dict()
        print("客户分类统计:")
        for category, count in category_counts.items():
            print(f"{category}: {count}人")
        
        return category_counts
    
    def analyze_customer_preferences(self):
        """分析客户偏好"""
        # 读取客户和交易数据
        customers = pd.read_sql_query("SELECT * FROM customers", self.conn)
        transactions = pd.read_sql_query("SELECT * FROM transactions", self.conn)
        
        # 合并客户和交易数据
        merged_data = pd.merge(transactions, customers, on='customer_id', how='left')
        
        # 按客户类型和产品类别分析消费偏好
        preference_by_type = merged_data.groupby(['customer_type', 'product_category'])['amount'].agg(['sum', 'count']).reset_index()
        preference_by_type.columns = ['customer_type', 'product_category', 'total_amount', 'purchase_count']
        
        # 计算每种客户类型的主要消费品类
        top_categories = preference_by_type.sort_values(['customer_type', 'total_amount'], ascending=[True, False])
        top_categories_by_type = {}
        
        for customer_type in top_categories['customer_type'].unique():
            top_for_type = top_categories[top_categories['customer_type'] == customer_type].head(3)
            top_categories_by_type[customer_type] = top_for_type['product_category'].tolist()
        
        print("各类客户主要消费品类:")
        for customer_type, categories in top_categories_by_type.items():
            print(f"{customer_type}: {', '.join(categories)}")
        
        return top_categories_by_type
    
    def close(self):
        """关闭数据库连接"""
        if self.conn:
            self.conn.close()

第三步:数据分析与可视化

python
import matplotlib.pyplot as plt
import seaborn as sns
import os
from matplotlib.font_manager import FontProperties

class CustomerDataAnalyzer:
    """客户数据分析类"""
    
    def __init__(self, db_path, output_folder):
        """初始化数据库连接和输出文件夹"""
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self.output_folder = output_folder
        os.makedirs(output_folder, exist_ok=True)
        
        # 设置中文字体
        plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
        plt.rcParams['axes.unicode_minus'] = False  # 用来正常显示负号
    
    def generate_customer_distribution_chart(self):
        """生成客户分布图表"""
        # 读取客户数据
        customers = pd.read_sql_query("SELECT * FROM customers", self.conn)
        
        # 1. 客户类型分布饼图
        plt.figure(figsize=(10, 6))
        customers['customer_type'].value_counts().plot(kind='pie', autopct='%1.1f%%')
        plt.title('客户类型分布')
        plt.ylabel('')
        plt.tight_layout()
        pie_chart_path = os.path.join(self.output_folder, 'customer_type_distribution.png')
        plt.savefig(pie_chart_path)
        plt.close()
        
        # 2. 客户年龄分布直方图
        plt.figure(figsize=(10, 6))
        sns.histplot(customers['age'].dropna(), bins=10, kde=True)
        plt.title('客户年龄分布')
        plt.xlabel('年龄')
        plt.ylabel('客户数量')
        plt.tight_layout()
        age_chart_path = os.path.join(self.output_folder, 'customer_age_distribution.png')
        plt.savefig(age_chart_path)
        plt.close()
        
        # 3. 客户来源分布条形图
        plt.figure(figsize=(12, 6))
        source_counts = customers['source'].value_counts()
        sns.barplot(x=source_counts.index, y=source_counts.values)
        plt.title('客户来源分布')
        plt.xlabel('来源')
        plt.ylabel('客户数量')
        plt.xticks(rotation=45)
        plt.tight_layout()
        source_chart_path = os.path.join(self.output_folder, 'customer_source_distribution.png')
        plt.savefig(source_chart_path)
        plt.close()
        
        return {
            'pie_chart_path': pie_chart_path,
            'age_chart_path': age_chart_path,
            'source_chart_path': source_chart_path
        }
    
    def generate_sales_analysis_charts(self):
        """生成销售分析图表"""
        # 读取交易数据
        transactions = pd.read_sql_query("SELECT * FROM transactions", self.conn)
        transactions['transaction_date'] = pd.to_datetime(transactions['transaction_date'])
        
        # 1. 月度销售趋势图
        monthly_sales = transactions.groupby(pd.Grouper(key='transaction_date', freq='M'))['amount'].sum().reset_index()
        plt.figure(figsize=(12, 6))
        plt.plot(monthly_sales['transaction_date'], monthly_sales['amount'], marker='o')
        plt.title('月度销售趋势')
        plt.xlabel('月份')
        plt.ylabel('销售额')
        plt.grid(True)
        plt.tight_layout()
        trend_chart_path = os.path.join(self.output_folder, 'monthly_sales_trend.png')
        plt.savefig(trend_chart_path)
        plt.close()
        
        # 2. 产品类别销售分布图
        plt.figure(figsize=(10, 6))
        category_sales = transactions.groupby('product_category')['amount'].sum().sort_values(ascending=False)
        sns.barplot(x=category_sales.index, y=category_sales.values)
        plt.title('产品类别销售分布')
        plt.xlabel('产品类别')
        plt.ylabel('销售额')
        plt.xticks(rotation=45)
        plt.tight_layout()
        category_chart_path = os.path.join(self.output_folder, 'category_sales_distribution.png')
        plt.savefig(category_chart_path)
        plt.close()
        
        # 3. 支付方式分布图
        plt.figure(figsize=(8, 6))
        payment_counts = transactions['payment_method'].value_counts()
        plt.pie(payment_counts.values, labels=payment_counts.index, autopct='%1.1f%%')
        plt.title('支付方式分布')
        plt.tight_layout()
        payment_chart_path = os.path.join(self.output_folder, 'payment_method_distribution.png')
        plt.savefig(payment_chart_path)
        plt.close()
        
        return {
            'trend_chart_path': trend_chart_path,
            'category_chart_path': category_chart_path,
            'payment_chart_path': payment_chart_path
        }
    
    def generate_customer_value_analysis(self):
        """生成客户价值分析"""
        # 读取客户和交易数据
        customers = pd.read_sql_query("SELECT * FROM customers", self.conn)
        transactions = pd.read_sql_query("SELECT * FROM transactions", self.conn)
        
        # 合并客户和交易数据
        merged_data = pd.merge(transactions, customers, on='customer_id', how='left')
        
        # 按客户类型统计平均消费金额和消费频次
        customer_value = merged_data.groupby('customer_type').agg({
            'amount': ['mean', 'sum'],
            'customer_id': ['count', 'nunique']
        })
        
        customer_value.columns = ['avg_amount', 'total_amount', 'transaction_count', 'customer_count']
        customer_value['avg_transactions_per_customer'] = customer_value['transaction_count'] / customer_value['customer_count']
        customer_value.reset_index(inplace=True)
        
        # 客户价值气泡图
        plt.figure(figsize=(10, 8))
        plt.scatter(
            customer_value['avg_amount'],
            customer_value['avg_transactions_per_customer'],
            s=customer_value['customer_count'] * 20,  # 气泡大小代表客户数量
            alpha=0.7
        )
        
        # 添加标签
        for i, row in customer_value.iterrows():
            plt.annotate(
                row['customer_type'],
                (row['avg_amount'], row['avg_transactions_per_customer']),
                fontsize=9
            )
        
        plt.title('客户价值分析')
        plt.xlabel('平均消费金额')
        plt.ylabel('平均消费频次')
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.tight_layout()
        
        value_chart_path = os.path.join(self.output_folder, 'customer_value_analysis.png')
        plt.savefig(value_chart_path)
        plt.close()
        
        return {
            'value_chart_path': value_chart_path,
            'customer_value_data': customer_value
        }
    
    def close(self):
        """关闭数据库连接"""
        if self.conn:
            self.conn.close()

第四步:生成PPT展示

python
from pptx import Presentation
from pptx.util import Inches, Pt
from pptx.enum.text import PP_ALIGN
from pptx.dml.color import RGBColor
import os
from datetime import datetime

class CustomerPresentationGenerator:
    """客户信息PPT生成类"""
    
    def __init__(self, template_path=None):
        """初始化PPT演示文稿"""
        if template_path and os.path.exists(template_path):
            self.prs = Presentation(template_path)
        else:
            self.prs = Presentation()
    
    def add_title_slide(self, title, subtitle):
        """添加标题页"""
        slide_layout = self.prs.slide_layouts[0]  # 标题页布局
        slide = self.prs.slides.add_slide(slide_layout)
        
        # 设置标题
        title_shape = slide.shapes.title
        title_shape.text = title
        
        # 设置副标题
        subtitle_shape = slide.placeholders[1]
        subtitle_shape.text = subtitle
        
        return slide
    
    def add_section_header(self, title):
        """添加章节标题页"""
        slide_layout = self.prs.slide_layouts[2]  # 章节标题布局
        slide = self.prs.slides.add_slide(slide_layout)
        
        # 设置标题
        title_shape = slide.shapes.title
        title_shape.text = title
        
        return slide
    
    def add_text_slide(self, title, content_list):
        """添加文本内容页"""
        slide_layout = self.prs.slide_layouts[1]  # 标题和内容布局
        slide = self.prs.slides.add_slide(slide_layout)
        
        # 设置标题
        title_shape = slide.shapes.title
        title_shape.text = title
        
        # 设置内容
        content_shape = slide.placeholders[1]
        text_frame = content_shape.text_frame
        
        for i, content in enumerate(content_list):
            if i == 0:
                p = text_frame.paragraphs[0]
            else:
                p = text_frame.add_paragraph()
            p.text = content
            p.level = 0
        
        return slide
    
    def add_chart_slide(self, title, image_path, description=None):
        """添加图表页"""
        slide_layout = self.prs.slide_layouts[5]  # 标题和内容布局
        slide = self.prs.slides.add_slide(slide_layout)
        
        # 设置标题
        title_shape = slide.shapes.title
        title_shape.text = title
        
        # 添加图表图片
        left = Inches(1.5)
        top = Inches(2)
        width = Inches(7)
        slide.shapes.add_picture(image_path, left, top, width=width)
        
        # 添加描述文本(如果有)
        if description:
            left = Inches(1)
            top = Inches(5.5)
            width = Inches(8)
            height = Inches(1)
            textbox = slide.shapes.add_textbox(left, top, width, height)
            text_frame = textbox.text_frame
            p = text_frame.add_paragraph()
            p.text = description
            p.alignment = PP_ALIGN.CENTER
        
        return slide
    
    def add_table_slide(self, title, data, columns):
        """添加表格页"""
        slide_layout = self.prs.slide_layouts[5]  # 标题和内容布局
        slide = self.prs.slides.add_slide(slide_layout)
        
        # 设置标题
        title_shape = slide.shapes.title
        title_shape.text = title
        
        # 添加表格
        rows = len(data) + 1  # 数据行 + 标题行
        cols = len(columns)
        
        left = Inches(1)
        top = Inches(2)
        width = Inches(8)
        height = Inches(0.8 * rows)
        
        table = slide.shapes.add_table(rows, cols, left, top, width, height).table
        
        # 设置表头
        for i, column in enumerate(columns):
            cell = table.cell(0, i)
            cell.text = column
            # 设置表头样式
            for paragraph in cell.text_frame.paragraphs:
                paragraph.font.bold = True
                paragraph.font.size = Pt(12)
        
        # 填充数据
        for i, row_data in enumerate(data):
            for j, value in enumerate(row_data):
                cell = table.cell(i + 1, j)
                cell.text = str(value)
        
        return slide
    
    def add_conclusion_slide(self, title, conclusions):
        """添加结论页"""
        slide_layout = self.prs.slide_layouts[1]  # 标题和内容布局
        slide = self.prs.slides.add_slide(slide_layout)
        
        # 设置标题
        title_shape = slide.shapes.title
        title_shape.text = title
        
        # 设置内容
        content_shape = slide.placeholders[1]
        text_frame = content_shape.text_frame
        
        for i, conclusion in enumerate(conclusions):
            if i == 0:
                p = text_frame.paragraphs[0]
            else:
                p = text_frame.add_paragraph()
            p.text = conclusion
            p.level = 0
        
        return slide
    
    def add_thank_you_slide(self):
        """添加结束页"""
        slide_layout = self.prs.slide_layouts[6]  # 仅标题布局
        slide = self.prs.slides.add_slide(slide_layout)
        
        # 设置标题
        title_shape = slide.shapes.title
        title_shape.text = "谢谢观看!"
        
        # 添加日期
        left = Inches(0)
        top = Inches(5)
        width = Inches(10)
        height = Inches(1)
        textbox = slide.shapes.add_textbox(left, top, width, height)
        text_frame = textbox.text_frame
        p = text_frame.add_paragraph()
        p.text = datetime.now().strftime("%Y年%m月%d日")
        p.alignment = PP_ALIGN.CENTER
        
        return slide
    
    def save(self, output_path):
        """保存PPT文件"""
        # 确保输出目录存在
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        self.prs.save(output_path)
        print(f"PPT已保存至: {output_path}")
        return output_path

第五步:数据存档与备份

python
import shutil
import zipfile
import os
from datetime import datetime

class CustomerDataArchiver:
    """客户数据存档类"""
    
    def __init__(self, db_path, archive_folder):
        """初始化存档路径"""
        self.db_path = db_path
        self.archive_folder = archive_folder
        os.makedirs(archive_folder, exist_ok=True)
    
    def create_database_backup(self):
        """创建数据库备份"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_filename = f"customer_db_backup_{timestamp}.db"
        backup_path = os.path.join(self.archive_folder, backup_filename)
        
        # 复制数据库文件
        shutil.copy2(self.db_path, backup_path)
        print(f"数据库已备份至: {backup_path}")
        return backup_path
    
    def export_data_to_excel(self, output_folder):
        """导出数据到Excel文件"""
        os.makedirs(output_folder, exist_ok=True)
        
        # 连接数据库
        conn = sqlite3.connect(self.db_path)
        
        # 导出客户数据
        customers_df = pd.read_sql_query("SELECT * FROM customers", conn)
        customers_path = os.path.join(output_folder, "customers_export.xlsx")
        customers_df.to_excel(customers_path, index=False)
        
        # 导出交易数据
        transactions_df = pd.read_sql_query("SELECT * FROM transactions", conn)
        transactions_path = os.path.join(output_folder, "transactions_export.xlsx")
        transactions_df.to_excel(transactions_path, index=False)
        
        # 导出联系记录
        contacts_df = pd.read_sql_query("SELECT * FROM contacts", conn)
        contacts_path = os.path.join(output_folder, "contacts_export.xlsx")
        contacts_df.to_excel(contacts_path, index=False)
        
        conn.close()
        
        print(f"数据已导出至: {output_folder}")
        return {
            'customers_path': customers_path,
            'transactions_path': transactions_path,
            'contacts_path': contacts_path
        }
    
    def create_archive_zip(self, files_to_archive):
        """创建存档ZIP文件"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        archive_filename = f"customer_data_archive_{timestamp}.zip"
        archive_path = os.path.join(self.archive_folder, archive_filename)
        
        with zipfile.ZipFile(archive_path, 'w') as zipf:
            for file_path in files_to_archive:
                if os.path.exists(file_path):
                    # 将文件添加到ZIP中,使用相对路径作为ZIP内的路径
                    zipf.write(file_path, os.path.basename(file_path))
        
        print(f"存档文件已创建: {archive_path}")
        return archive_path
    
    def cleanup_old_archives(self, max_archives=10):
        """清理旧的存档文件,只保留最新的几个"""
        # 获取所有ZIP存档文件
        archives = [f for f in os.listdir(self.archive_folder) if f.endswith('.zip')]
        
        # 按修改时间排序
        archives.sort(key=lambda x: os.path.getmtime(os.path.join(self.archive_folder, x)), reverse=True)
        
        # 删除旧文件
        if len(archives) > max_archives:
            for old_archive in archives[max_archives:]:
                old_path = os.path.join(self.archive_folder, old_archive)
                os.remove(old_path)
                print(f"已删除旧存档: {old_path}")

第六步:整合所有功能

python
import os
import schedule
import time
from datetime import datetime

class CustomerManagementSystem:
    """客户信息管理系统主类"""
    
    def __init__(self, config):
        """初始化系统配置"""
        self.config = config
        self.db_path = config['db_path']
        self.output_folder = config['output_folder']
        self.archive_folder = config['archive_folder']
        
        # 确保必要的文件夹存在
        os.makedirs(self.output_folder, exist_ok=True)
        os.makedirs(self.archive_folder, exist_ok=True)
        os.makedirs(os.path.join(self.output_folder, 'charts'), exist_ok=True)
        os.makedirs(os.path.join(self.output_folder, 'exports'), exist_ok=True)
        os.makedirs(os.path.join(self.output_folder, 'presentations'), exist_ok=True)
    
    def import_customer_data(self, data_sources):
        """导入客户数据"""
        print("开始导入客户数据...")
        importer = CustomerDataImporter(self.db_path)
        
        total_imported = 0
        
        # 导入Excel数据
        for source in data_sources.get('excel', []):
            count = importer.import_from_excel(
                source['file_path'], 
                source['sheet_name'], 
                source['table_name']
            )
            total_imported += count
        
        # 导入CSV数据
        for source in data_sources.get('csv', []):
            count = importer.import_from_csv(
                source['file_path'], 
                source['table_name']
            )
            total_imported += count
        
        # 导入数据库数据
        for source in data_sources.get('database', []):
            count = importer.import_from_database(
                source['db_path'], 
                source['query'], 
                source['table_name']
            )
            total_imported += count
        
        importer.close()
        print(f"数据导入完成,共导入{total_imported}条记录")
    
    def process_customer_data(self):
        """处理客户数据"""
        print("开始处理客户数据...")
        processor = CustomerDataProcessor(self.db_path)
        
        # 清洗数据
        processor.clean_customer_data()
        
        # 分类客户
        category_counts = processor.categorize_customers()
        
        # 分析客户偏好
        preferences = processor.analyze_customer_preferences()
        
        processor.close()
        print("客户数据处理完成")
        
        return {
            'category_counts': category_counts,
            'preferences': preferences
        }
    
    def analyze_customer_data(self):
        """分析客户数据"""
        print("开始分析客户数据...")
        charts_folder = os.path.join(self.output_folder, 'charts')
        analyzer = CustomerDataAnalyzer(self.db_path, charts_folder)
        
        # 生成客户分布图表
        distribution_charts = analyzer.generate_customer_distribution_chart()
        
        # 生成销售分析图表
        sales_charts = analyzer.generate_sales_analysis_charts()
        
        # 生成客户价值分析
        value_analysis = analyzer.generate_customer_value_analysis()
        
        analyzer.close()
        print("客户数据分析完成")
        
        return {
            'distribution_charts': distribution_charts,
            'sales_charts': sales_charts,
            'value_analysis': value_analysis
        }
    
    def generate_presentation(self, analysis_results, processing_results):
        """生成PPT演示文稿"""
        print("开始生成PPT演示文稿...")
        
        # 初始化PPT生成器
        template_path = self.config.get('ppt_template')
        generator = CustomerPresentationGenerator(template_path)
        
        # 添加标题页
        report_date = datetime.now().strftime("%Y年%m月%d日")
        generator.add_title_slide(
            "客户信息分析报告",
            f"生成日期: {report_date}"
        )
        
        # 添加客户概况章节
        generator.add_section_header("一、客户概况")
        
        # 添加客户分类统计
        category_data = []
        for category, count in processing_results['category_counts'].items():
            category_data.append([category, count, f"{count/sum(processing_results['category_counts'].values())*100:.1f}%"])
        
        generator.add_table_slide(
            "客户分类统计",
            category_data,
            ["客户类型", "数量", "占比"]
        )
        
        # 添加客户分布图表
        generator.add_chart_slide(
            "客户类型分布",
            analysis_results['distribution_charts']['pie_chart_path'],
            "不同类型客户的数量分布情况"
        )
        
        generator.add_chart_slide(
            "客户年龄分布",
            analysis_results['distribution_charts']['age_chart_path'],
            "客户年龄段分布情况"
        )
        
        generator.add_chart_slide(
            "客户来源分布",
            analysis_results['distribution_charts']['source_chart_path'],
            "客户获取渠道分布情况"
        )
        
        # 添加销售分析章节
        generator.add_section_header("二、销售分析")
        
        generator.add_chart_slide(
            "月度销售趋势",
            analysis_results['sales_charts']['trend_chart_path'],
            "近期月度销售额变化趋势"
        )
        
        generator.add_chart_slide(
            "产品类别销售分布",
            analysis_results['sales_charts']['category_chart_path'],
            "各产品类别销售额占比情况"
        )
        
        generator.add_chart_slide(
            "支付方式分布",
            analysis_results['sales_charts']['payment_chart_path'],
            "客户偏好的支付方式分布"
        )
        
        # 添加客户价值分析章节
        generator.add_section_header("三、客户价值分析")
        
        generator.add_chart_slide(
            "客户价值矩阵",
            analysis_results['value_analysis']['value_chart_path'],
            "基于消费金额和频次的客户价值分析"
        )
        
        # 添加客户偏好分析
        preference_content = []
        for customer_type, categories in processing_results['preferences'].items():
            preference_content.append(f"{customer_type}:主要消费 {', '.join(categories)}")
        
        generator.add_text_slide(
            "客户消费偏好分析",
            preference_content
        )
        
        # 添加结论和建议
        conclusions = [
            "1. VIP客户群体虽然数量较少,但贡献了最高的销售额,应重点维护关系",
            "2. 沉睡客户数量较多,可以通过个性化营销活动唤醒",
            "3. 年轻客户(25-35岁)是消费主力,应针对此群体开发更多产品",
            "4. 线上渠道获客效果最好,建议增加线上营销投入",
            "5. 建议针对不同客户群体制定差异化的营销策略,提高客户满意度和忠诚度"
        ]
        
        generator.add_conclusion_slide("结论与建议", conclusions)
        
        # 添加结束页
        generator.add_thank_you_slide()
        
        # 保存PPT
        timestamp = datetime.now().strftime("%Y%m%d_%H%M")
        ppt_filename = f"客户分析报告_{timestamp}.pptx"
        ppt_path = os.path.join(self.output_folder, 'presentations', ppt_filename)
        generator.save(ppt_path)
        
        print(f"PPT演示文稿已生成: {ppt_path}")
        return ppt_path
    
    def archive_data(self, ppt_path):
        """存档数据"""
        print("开始存档数据...")
        archiver = CustomerDataArchiver(self.db_path, self.archive_folder)
        
        # 创建数据库备份
        db_backup = archiver.create_database_backup()
        
        # 导出数据到Excel
        export_folder = os.path.join(self.output_folder, 'exports')
        exported_files = archiver.export_data_to_excel(export_folder)
        
        # 创建存档ZIP
        files_to_archive = [
            db_backup,
            exported_files['customers_path'],
            exported_files['transactions_path'],
            exported_files['contacts_path'],
            ppt_path
        ]
        
        archive_path = archiver.create_archive_zip(files_to_archive)
        
        # 清理旧存档
        archiver.cleanup_old_archives(max_archives=self.config.get('max_archives', 10))
        
        print("数据存档完成")
        return archive_path
    
    def run_full_process(self):
        """运行完整的处理流程"""
        print("\n===== 开始客户信息管理系统处理流程 =====")
        start_time = datetime.now()
        
        # 步骤1:导入客户数据
        if self.config.get('import_data', True):
            self.import_customer_data(self.config['data_sources'])
        
        # 步骤2:处理客户数据
        processing_results = self.process_customer_data()
        
        # 步骤3:分析客户数据
        analysis_results = self.analyze_customer_data()
        
        # 步骤4:生成PPT演示文稿
        ppt_path = self.generate_presentation(analysis_results, processing_results)
        
        # 步骤5:存档数据
        archive_path = self.archive_data(ppt_path)
        
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        print(f"\n===== 处理流程完成,耗时 {duration:.2f} 秒 =====")
        print(f"PPT报告: {ppt_path}")
        print(f"数据存档: {archive_path}")
        
        return {
            'ppt_path': ppt_path,
            'archive_path': archive_path,
            'processing_results': processing_results,
            'analysis_results': analysis_results
        }

# 使用示例
if __name__ == "__main__":
    # 系统配置
    config = {
        'db_path': 'customer_database.db',
        'output_folder': 'customer_system_output',
        'archive_folder': 'customer_system_archives',
        'ppt_template': None,  # 可选的PPT模板路径
        'max_archives': 10,    # 保留的最大存档数量
        'import_data': True,   # 是否导入新数据
        'data_sources': {
            'excel': [
                {
                    'file_path': 'data/customers.xlsx',
                    'sheet_name': 'Sheet1',
                    'table_name': 'customers'
                },
                {
                    'file_path': 'data/transactions.xlsx',
                    'sheet_name': 'Sheet1',
                    'table_name': 'transactions'
                }
            ],
            'csv': [
                {
                    'file_path': 'data/contacts.csv',
                    'table_name': 'contacts'
                }
            ],
            'database': []
        }
    }
    
    # 创建并运行系统
    system = CustomerManagementSystem(config)
    results = system.run_full_process()
    
    # 设置定时任务(每周一早上9点运行)
    def scheduled_job():
        print(f"\n===== 定时任务启动: {datetime.now()} =====")
        system.run_full_process()
    
    # 设置定时任务
    schedule.every().monday.at("09:00").do(scheduled_job)
    
    print("\n系统已设置定时任务,每周一早上9点自动运行...")
    # 运行定时任务循环
    # while True:
    #     schedule.run_pending()
    #     time.sleep(60)

实际应用场景

1. 销售团队客户管理

销售团队可以使用此系统自动导入CRM系统中的客户数据,进行分类和分析,生成客户价值报告,帮助销售人员识别高价值客户和潜在客户,制定针对性的销售策略。

2. 会员管理系统

零售企业可以使用此系统管理会员信息,分析会员消费行为和偏好,生成会员分析报告,为会员营销活动提供数据支持,提高会员忠诚度和复购率。

3. 客户服务优化

客服部门可以使用此系统整合客户反馈和投诉数据,分析客户满意度和问题类型,生成客户服务质量报告,帮助企业改进产品和服务,提升客户满意度。

4. 市场营销分析

市场部门可以使用此系统分析不同渠道获取的客户质量和转化率,评估营销活动效果,生成营销ROI分析报告,优化营销资源分配和策略制定。

进阶优化

1. 添加Web界面

使用Flask或Django构建Web界面,使系统更易于使用:

python
from flask import Flask, render_template, request, send_file
import os

app = Flask(__name__)
config = {...}  # 系统配置
system = CustomerManagementSystem(config)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/import', methods=['GET', 'POST'])
def import_data():
    if request.method == 'POST':
        # 处理文件上传
        file = request.files['file']
        file_path = os.path.join('uploads', file.filename)
        file.save(file_path)
        
        # 导入数据
        data_sources = {
            'excel': [{
                'file_path': file_path,
                'sheet_name': request.form['sheet_name'],
                'table_name': request.form['table_name']
            }]
        }
        system.import_customer_data(data_sources)
        return "数据导入成功!"
    return render_template('import.html')

@app.route('/analyze')
def analyze():
    # 运行分析流程
    processing_results = system.process_customer_data()
    analysis_results = system.analyze_customer_data()
    return render_template('analysis.html', results=processing_results)

@app.route('/generate_ppt')
def generate_ppt():
    # 生成PPT
    processing_results = system.process_customer_data()
    analysis_results = system.analyze_customer_data()
    ppt_path = system.generate_presentation(analysis_results, processing_results)
    return send_file(ppt_path, as_attachment=True)

if __name__ == '__main__':
    app.run(debug=True)

2. 集成机器学习模型

添加客户流失预测和客户细分模型:

python
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np

class CustomerMLModels:
    """客户机器学习模型类"""
    
    def __init__(self, db_path):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
    
    def prepare_features(self):
        """准备特征数据"""
        # 读取客户和交易数据
        customers = pd.read_sql_query("SELECT * FROM customers", self.conn)
        transactions = pd.read_sql_query("SELECT * FROM transactions", self.conn)
        
        # 合并数据
        merged = pd.merge(transactions, customers, on='customer_id', how='left')
        
        # 按客户分组计算特征
        features = merged.groupby('customer_id').agg({
            'amount': ['sum', 'mean', 'count'],
            'transaction_date': ['min', 'max'],
            'age': 'first',
            'gender': 'first'
        })
        
        features.columns = ['total_spent', 'avg_spent', 'transaction_count', 'first_purchase', 'last_purchase', 'age', 'gender']
        features.reset_index(inplace=True)
        
        # 计算客户生命周期和购买频率
        features['first_purchase'] = pd.to_datetime(features['first_purchase'])
        features['last_purchase'] = pd.to_datetime(features['last_purchase'])
        features['customer_age_days'] = (features['last_purchase'] - features['first_purchase']).dt.days
        features['customer_age_days'] = features['customer_age_days'].apply(lambda x: max(x, 1))
        features['purchase_frequency'] = features['transaction_count'] / features['customer_age_days'] * 30
        
        # 计算最近一次购买距今天数
        today = pd.to_datetime('today')
        features['recency_days'] = (today - features['last_purchase']).dt.days
        
        # 处理性别特征
        features['gender'] = features['gender'].map({'男': 1, '女': 0, '未知': -1})
        
        return features
    
    def customer_segmentation(self):
        """客户细分模型"""
        features = self.prepare_features()
        
        # 选择用于聚类的特征
        cluster_features = features[['total_spent', 'transaction_count', 'recency_days']].copy()
        
        # 标准化特征
        scaler = StandardScaler()
        scaled_features = scaler.fit_transform(cluster_features)
        
        # K-means聚类
        kmeans = KMeans(n_clusters=4, random_state=42)
        features['cluster'] = kmeans.fit_predict(scaled_features)
        
        # 分析每个簇的特征
        cluster_analysis = features.groupby('cluster').agg({
            'total_spent': 'mean',
            'transaction_count': 'mean',
            'recency_days': 'mean',
            'customer_id': 'count'
        })
        
        cluster_analysis.columns = ['平均消费额', '平均交易次数', '平均最近购买天数', '客户数量']
        
        # 根据特征为每个簇命名
        cluster_names = {}
        for cluster in cluster_analysis.index:
            if cluster_analysis.loc[cluster, '平均消费额'] > 1000 and cluster_analysis.loc[cluster, '平均最近购买天数'] < 30:
                cluster_names[cluster] = '高价值活跃客户'
            elif cluster_analysis.loc[cluster, '平均消费额'] > 1000 and cluster_analysis.loc[cluster, '平均最近购买天数'] >= 30:
                cluster_names[cluster] = '高价值沉睡客户'
            elif cluster_analysis.loc[cluster, '平均交易次数'] > 5 and cluster_analysis.loc[cluster, '平均最近购买天数'] < 30:
                cluster_names[cluster] = '高频活跃客户'
            else:
                cluster_names[cluster] = '一般客户'
        
        # 将簇名称添加到原始数据中
        features['segment'] = features['cluster'].map(cluster_names)
        
        # 更新数据库中的客户分类
        cursor = self.conn.cursor()
        for _, row in features.iterrows():
            cursor.execute('''
            UPDATE customers SET 
                customer_type = ?,
                last_updated = ?
            WHERE customer_id = ?
            ''', (
                row['segment'],
                datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                row['customer_id']
            ))
        
        self.conn.commit()

运行上述代码,会根据特征值对客户进行分类,并更新数据库中的客户信息。

小结

本节内容主要包括以下几个部分:

需求分析:明确了客户信息管理系统需要实现的核心功能,包括数据导入、清洗分类、分析可视化、PPT生成和数据存档。

技术选型:选择了适合任务的Python库,包括pandas、sqlite3、matplotlib/seaborn、python-pptx等。

代码实现:

  • 数据导入模块:支持从Excel、CSV和数据库导入数据
  • 数据处理模块:实现了数据清洗和客户分类功能
  • 数据分析模块:生成了客户分布、销售趋势、客户价值等可视化图表
  • PPT生成模块:创建了一个结构完整的PPT演示文稿
  • 数据存档模块:实现了数据库备份、数据导出和ZIP存档功能
  • 整合所有功能:通过一个主类将所有模块整合在一起,并添加了定时任务功能。

实际应用场景:讨论了系统在销售管理、会员系统、客户服务和市场营销中的具体应用。

进阶优化:提出了添加Web界面和集成机器学习模型的扩展方向。

本系统可以广泛应用于企业客户关系管理场景,帮助企业提高客户管理效率,挖掘客户价值,制定精准营销策略。代码示例充分考虑了可扩展性和实用性,可以直接用于实际项目开发。