Appearance
6.2 客户信息管理系统
客户信息管理是企业运营中的核心环节,高效的客户信息管理系统可以帮助企业更好地了解客户需求、提升服务质量、增强客户关系。本节将介绍如何使用Python构建一个自动化的客户信息管理系统,实现从客户资料导入、分类整理、生成PPT展示到自动存档的全流程自动化。
需求分析
一个完整的客户信息管理系统通常需要实现以下功能:
- 客户资料导入:从Excel、CSV或数据库中导入客户基础信息
- 数据清洗与分类:对客户数据进行清洗、去重、分类和标签化
- 数据分析与可视化:生成客户画像、消费趋势等分析报告
- PPT展示生成:自动生成客户分析PPT,用于业务汇报
- 数据存档与备份:定期将客户数据进行存档和备份
技术选型
我们将使用以下Python库来实现各个环节:
- pandas:用于数据处理和分析
- openpyxl:用于Excel文件的读写
- python-pptx:用于生成PPT演示文稿
- matplotlib/seaborn:用于数据可视化
- sqlite3/sqlalchemy:用于数据存储和管理
- schedule:用于定时任务调度
代码实现
第一步:客户资料导入
python
import pandas as pd
import numpy as np
import os
import sqlite3
from datetime import datetime
class CustomerDataImporter:
"""客户数据导入类"""
def __init__(self, db_path):
"""初始化数据库连接"""
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self.create_tables()
def create_tables(self):
"""创建数据库表结构"""
cursor = self.conn.cursor()
# 创建客户基本信息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS customers (
customer_id TEXT PRIMARY KEY,
name TEXT,
gender TEXT,
age INTEGER,
phone TEXT,
email TEXT,
address TEXT,
registration_date TEXT,
customer_type TEXT,
source TEXT,
last_updated TEXT
)
''')
# 创建客户交易记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS transactions (
transaction_id TEXT PRIMARY KEY,
customer_id TEXT,
transaction_date TEXT,
amount REAL,
product_category TEXT,
product_name TEXT,
payment_method TEXT,
FOREIGN KEY (customer_id) REFERENCES customers (customer_id)
)
''')
# 创建客户联系记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS contacts (
contact_id TEXT PRIMARY KEY,
customer_id TEXT,
contact_date TEXT,
contact_type TEXT,
contact_purpose TEXT,
contact_result TEXT,
follow_up_needed INTEGER,
notes TEXT,
FOREIGN KEY (customer_id) REFERENCES customers (customer_id)
)
''')
self.conn.commit()
def import_from_excel(self, file_path, sheet_name, table_name):
"""从Excel导入数据"""
try:
# 读取Excel文件
df = pd.read_excel(file_path, sheet_name=sheet_name)
# 添加最后更新时间
if table_name == 'customers':
df['last_updated'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 将数据写入SQLite数据库
df.to_sql(table_name, self.conn, if_exists='append', index=False)
print(f"成功从{file_path}导入{len(df)}条记录到{table_name}表")
return len(df)
except Exception as e:
print(f"导入数据时出错: {e}")
return 0
def import_from_csv(self, file_path, table_name):
"""从CSV导入数据"""
try:
# 读取CSV文件
df = pd.read_csv(file_path)
# 添加最后更新时间
if table_name == 'customers':
df['last_updated'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 将数据写入SQLite数据库
df.to_sql(table_name, self.conn, if_exists='append', index=False)
print(f"成功从{file_path}导入{len(df)}条记录到{table_name}表")
return len(df)
except Exception as e:
print(f"导入数据时出错: {e}")
return 0
def import_from_database(self, source_db_path, query, table_name):
"""从其他数据库导入数据"""
try:
# 连接源数据库
source_conn = sqlite3.connect(source_db_path)
# 执行查询
df = pd.read_sql_query(query, source_conn)
# 添加最后更新时间
if table_name == 'customers':
df['last_updated'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 将数据写入目标数据库
df.to_sql(table_name, self.conn, if_exists='append', index=False)
source_conn.close()
print(f"成功从{source_db_path}导入{len(df)}条记录到{table_name}表")
return len(df)
except Exception as e:
print(f"从数据库导入数据时出错: {e}")
return 0
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
第二步:数据清洗与分类
python
class CustomerDataProcessor:
"""客户数据处理类"""
def __init__(self, db_path):
"""初始化数据库连接"""
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
def clean_customer_data(self):
"""清洗客户数据"""
# 读取客户数据
df = pd.read_sql_query("SELECT * FROM customers", self.conn)
# 数据清洗
# 1. 去除重复记录
original_count = len(df)
df = df.drop_duplicates(subset=['customer_id'])
print(f"去除了{original_count - len(df)}条重复客户记录")
# 2. 处理缺失值
df['name'] = df['name'].fillna('未知')
df['gender'] = df['gender'].fillna('未知')
df['age'] = df['age'].fillna(0).astype(int)
# 3. 标准化电话号码格式
df['phone'] = df['phone'].astype(str).apply(lambda x: ''.join(filter(str.isdigit, x)) if pd.notna(x) else '')
# 4. 标准化邮箱地址(转为小写)
df['email'] = df['email'].str.lower() if pd.notna(df['email']).any() else df['email']
# 5. 标准化日期格式
df['registration_date'] = pd.to_datetime(df['registration_date'], errors='coerce').dt.strftime('%Y-%m-%d')
# 将清洗后的数据写回数据库
cursor = self.conn.cursor()
for _, row in df.iterrows():
cursor.execute('''
UPDATE customers SET
name = ?, gender = ?, age = ?, phone = ?, email = ?,
address = ?, registration_date = ?, customer_type = ?,
source = ?, last_updated = ?
WHERE customer_id = ?
''', (
row['name'], row['gender'], row['age'], row['phone'], row['email'],
row['address'], row['registration_date'], row['customer_type'],
row['source'], datetime.now().strftime('%Y-%m-%d %H:%M:%S'), row['customer_id']
))
self.conn.commit()
print("客户数据清洗完成")
def categorize_customers(self):
"""对客户进行分类"""
# 读取客户和交易数据
customers = pd.read_sql_query("SELECT * FROM customers", self.conn)
transactions = pd.read_sql_query("SELECT * FROM transactions", self.conn)
# 合并客户和交易数据
merged_data = pd.merge(transactions, customers, on='customer_id', how='left')
# 按客户ID分组,计算每个客户的消费总额和消费次数
customer_stats = merged_data.groupby('customer_id').agg({
'amount': ['sum', 'count'],
'transaction_date': ['min', 'max']
})
customer_stats.columns = ['total_spent', 'transaction_count', 'first_purchase', 'last_purchase']
customer_stats.reset_index(inplace=True)
# 计算客户生命周期价值(CLV)和购买频率
customer_stats['first_purchase'] = pd.to_datetime(customer_stats['first_purchase'])
customer_stats['last_purchase'] = pd.to_datetime(customer_stats['last_purchase'])
customer_stats['customer_age_days'] = (customer_stats['last_purchase'] - customer_stats['first_purchase']).dt.days
customer_stats['customer_age_days'] = customer_stats['customer_age_days'].apply(lambda x: max(x, 1)) # 避免除以零
customer_stats['purchase_frequency'] = customer_stats['transaction_count'] / customer_stats['customer_age_days'] * 30 # 每月平均购买次数
# 客户价值分类 (RFM分析: Recency, Frequency, Monetary)
today = pd.to_datetime('today')
customer_stats['recency_days'] = (today - customer_stats['last_purchase']).dt.days
# 根据RFM值对客户进行分类
def classify_customer(row):
recency = row['recency_days']
frequency = row['purchase_frequency']
monetary = row['total_spent']
# 高价值客户
if recency <= 30 and frequency >= 2 and monetary >= 1000:
return 'VIP客户'
# 活跃客户
elif recency <= 60 and frequency >= 1:
return '活跃客户'
# 一般客户
elif recency <= 180 and monetary >= 500:
return '一般客户'
# 沉睡客户
elif recency > 180 and recency <= 365:
return '沉睡客户'
# 流失客户
else:
return '流失客户'
customer_stats['customer_category'] = customer_stats.apply(classify_customer, axis=1)
# 更新客户分类到数据库
cursor = self.conn.cursor()
for _, row in customer_stats.iterrows():
cursor.execute('''
UPDATE customers SET
customer_type = ?,
last_updated = ?
WHERE customer_id = ?
''', (
row['customer_category'],
datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
row['customer_id']
))
self.conn.commit()
print("客户分类完成")
# 返回分类结果统计
category_counts = customer_stats['customer_category'].value_counts().to_dict()
print("客户分类统计:")
for category, count in category_counts.items():
print(f"{category}: {count}人")
return category_counts
def analyze_customer_preferences(self):
"""分析客户偏好"""
# 读取客户和交易数据
customers = pd.read_sql_query("SELECT * FROM customers", self.conn)
transactions = pd.read_sql_query("SELECT * FROM transactions", self.conn)
# 合并客户和交易数据
merged_data = pd.merge(transactions, customers, on='customer_id', how='left')
# 按客户类型和产品类别分析消费偏好
preference_by_type = merged_data.groupby(['customer_type', 'product_category'])['amount'].agg(['sum', 'count']).reset_index()
preference_by_type.columns = ['customer_type', 'product_category', 'total_amount', 'purchase_count']
# 计算每种客户类型的主要消费品类
top_categories = preference_by_type.sort_values(['customer_type', 'total_amount'], ascending=[True, False])
top_categories_by_type = {}
for customer_type in top_categories['customer_type'].unique():
top_for_type = top_categories[top_categories['customer_type'] == customer_type].head(3)
top_categories_by_type[customer_type] = top_for_type['product_category'].tolist()
print("各类客户主要消费品类:")
for customer_type, categories in top_categories_by_type.items():
print(f"{customer_type}: {', '.join(categories)}")
return top_categories_by_type
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
第三步:数据分析与可视化
python
import matplotlib.pyplot as plt
import seaborn as sns
import os
from matplotlib.font_manager import FontProperties
class CustomerDataAnalyzer:
"""客户数据分析类"""
def __init__(self, db_path, output_folder):
"""初始化数据库连接和输出文件夹"""
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self.output_folder = output_folder
os.makedirs(output_folder, exist_ok=True)
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
def generate_customer_distribution_chart(self):
"""生成客户分布图表"""
# 读取客户数据
customers = pd.read_sql_query("SELECT * FROM customers", self.conn)
# 1. 客户类型分布饼图
plt.figure(figsize=(10, 6))
customers['customer_type'].value_counts().plot(kind='pie', autopct='%1.1f%%')
plt.title('客户类型分布')
plt.ylabel('')
plt.tight_layout()
pie_chart_path = os.path.join(self.output_folder, 'customer_type_distribution.png')
plt.savefig(pie_chart_path)
plt.close()
# 2. 客户年龄分布直方图
plt.figure(figsize=(10, 6))
sns.histplot(customers['age'].dropna(), bins=10, kde=True)
plt.title('客户年龄分布')
plt.xlabel('年龄')
plt.ylabel('客户数量')
plt.tight_layout()
age_chart_path = os.path.join(self.output_folder, 'customer_age_distribution.png')
plt.savefig(age_chart_path)
plt.close()
# 3. 客户来源分布条形图
plt.figure(figsize=(12, 6))
source_counts = customers['source'].value_counts()
sns.barplot(x=source_counts.index, y=source_counts.values)
plt.title('客户来源分布')
plt.xlabel('来源')
plt.ylabel('客户数量')
plt.xticks(rotation=45)
plt.tight_layout()
source_chart_path = os.path.join(self.output_folder, 'customer_source_distribution.png')
plt.savefig(source_chart_path)
plt.close()
return {
'pie_chart_path': pie_chart_path,
'age_chart_path': age_chart_path,
'source_chart_path': source_chart_path
}
def generate_sales_analysis_charts(self):
"""生成销售分析图表"""
# 读取交易数据
transactions = pd.read_sql_query("SELECT * FROM transactions", self.conn)
transactions['transaction_date'] = pd.to_datetime(transactions['transaction_date'])
# 1. 月度销售趋势图
monthly_sales = transactions.groupby(pd.Grouper(key='transaction_date', freq='M'))['amount'].sum().reset_index()
plt.figure(figsize=(12, 6))
plt.plot(monthly_sales['transaction_date'], monthly_sales['amount'], marker='o')
plt.title('月度销售趋势')
plt.xlabel('月份')
plt.ylabel('销售额')
plt.grid(True)
plt.tight_layout()
trend_chart_path = os.path.join(self.output_folder, 'monthly_sales_trend.png')
plt.savefig(trend_chart_path)
plt.close()
# 2. 产品类别销售分布图
plt.figure(figsize=(10, 6))
category_sales = transactions.groupby('product_category')['amount'].sum().sort_values(ascending=False)
sns.barplot(x=category_sales.index, y=category_sales.values)
plt.title('产品类别销售分布')
plt.xlabel('产品类别')
plt.ylabel('销售额')
plt.xticks(rotation=45)
plt.tight_layout()
category_chart_path = os.path.join(self.output_folder, 'category_sales_distribution.png')
plt.savefig(category_chart_path)
plt.close()
# 3. 支付方式分布图
plt.figure(figsize=(8, 6))
payment_counts = transactions['payment_method'].value_counts()
plt.pie(payment_counts.values, labels=payment_counts.index, autopct='%1.1f%%')
plt.title('支付方式分布')
plt.tight_layout()
payment_chart_path = os.path.join(self.output_folder, 'payment_method_distribution.png')
plt.savefig(payment_chart_path)
plt.close()
return {
'trend_chart_path': trend_chart_path,
'category_chart_path': category_chart_path,
'payment_chart_path': payment_chart_path
}
def generate_customer_value_analysis(self):
"""生成客户价值分析"""
# 读取客户和交易数据
customers = pd.read_sql_query("SELECT * FROM customers", self.conn)
transactions = pd.read_sql_query("SELECT * FROM transactions", self.conn)
# 合并客户和交易数据
merged_data = pd.merge(transactions, customers, on='customer_id', how='left')
# 按客户类型统计平均消费金额和消费频次
customer_value = merged_data.groupby('customer_type').agg({
'amount': ['mean', 'sum'],
'customer_id': ['count', 'nunique']
})
customer_value.columns = ['avg_amount', 'total_amount', 'transaction_count', 'customer_count']
customer_value['avg_transactions_per_customer'] = customer_value['transaction_count'] / customer_value['customer_count']
customer_value.reset_index(inplace=True)
# 客户价值气泡图
plt.figure(figsize=(10, 8))
plt.scatter(
customer_value['avg_amount'],
customer_value['avg_transactions_per_customer'],
s=customer_value['customer_count'] * 20, # 气泡大小代表客户数量
alpha=0.7
)
# 添加标签
for i, row in customer_value.iterrows():
plt.annotate(
row['customer_type'],
(row['avg_amount'], row['avg_transactions_per_customer']),
fontsize=9
)
plt.title('客户价值分析')
plt.xlabel('平均消费金额')
plt.ylabel('平均消费频次')
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
value_chart_path = os.path.join(self.output_folder, 'customer_value_analysis.png')
plt.savefig(value_chart_path)
plt.close()
return {
'value_chart_path': value_chart_path,
'customer_value_data': customer_value
}
def close(self):
"""关闭数据库连接"""
if self.conn:
self.conn.close()
第四步:生成PPT展示
python
from pptx import Presentation
from pptx.util import Inches, Pt
from pptx.enum.text import PP_ALIGN
from pptx.dml.color import RGBColor
import os
from datetime import datetime
class CustomerPresentationGenerator:
"""客户信息PPT生成类"""
def __init__(self, template_path=None):
"""初始化PPT演示文稿"""
if template_path and os.path.exists(template_path):
self.prs = Presentation(template_path)
else:
self.prs = Presentation()
def add_title_slide(self, title, subtitle):
"""添加标题页"""
slide_layout = self.prs.slide_layouts[0] # 标题页布局
slide = self.prs.slides.add_slide(slide_layout)
# 设置标题
title_shape = slide.shapes.title
title_shape.text = title
# 设置副标题
subtitle_shape = slide.placeholders[1]
subtitle_shape.text = subtitle
return slide
def add_section_header(self, title):
"""添加章节标题页"""
slide_layout = self.prs.slide_layouts[2] # 章节标题布局
slide = self.prs.slides.add_slide(slide_layout)
# 设置标题
title_shape = slide.shapes.title
title_shape.text = title
return slide
def add_text_slide(self, title, content_list):
"""添加文本内容页"""
slide_layout = self.prs.slide_layouts[1] # 标题和内容布局
slide = self.prs.slides.add_slide(slide_layout)
# 设置标题
title_shape = slide.shapes.title
title_shape.text = title
# 设置内容
content_shape = slide.placeholders[1]
text_frame = content_shape.text_frame
for i, content in enumerate(content_list):
if i == 0:
p = text_frame.paragraphs[0]
else:
p = text_frame.add_paragraph()
p.text = content
p.level = 0
return slide
def add_chart_slide(self, title, image_path, description=None):
"""添加图表页"""
slide_layout = self.prs.slide_layouts[5] # 标题和内容布局
slide = self.prs.slides.add_slide(slide_layout)
# 设置标题
title_shape = slide.shapes.title
title_shape.text = title
# 添加图表图片
left = Inches(1.5)
top = Inches(2)
width = Inches(7)
slide.shapes.add_picture(image_path, left, top, width=width)
# 添加描述文本(如果有)
if description:
left = Inches(1)
top = Inches(5.5)
width = Inches(8)
height = Inches(1)
textbox = slide.shapes.add_textbox(left, top, width, height)
text_frame = textbox.text_frame
p = text_frame.add_paragraph()
p.text = description
p.alignment = PP_ALIGN.CENTER
return slide
def add_table_slide(self, title, data, columns):
"""添加表格页"""
slide_layout = self.prs.slide_layouts[5] # 标题和内容布局
slide = self.prs.slides.add_slide(slide_layout)
# 设置标题
title_shape = slide.shapes.title
title_shape.text = title
# 添加表格
rows = len(data) + 1 # 数据行 + 标题行
cols = len(columns)
left = Inches(1)
top = Inches(2)
width = Inches(8)
height = Inches(0.8 * rows)
table = slide.shapes.add_table(rows, cols, left, top, width, height).table
# 设置表头
for i, column in enumerate(columns):
cell = table.cell(0, i)
cell.text = column
# 设置表头样式
for paragraph in cell.text_frame.paragraphs:
paragraph.font.bold = True
paragraph.font.size = Pt(12)
# 填充数据
for i, row_data in enumerate(data):
for j, value in enumerate(row_data):
cell = table.cell(i + 1, j)
cell.text = str(value)
return slide
def add_conclusion_slide(self, title, conclusions):
"""添加结论页"""
slide_layout = self.prs.slide_layouts[1] # 标题和内容布局
slide = self.prs.slides.add_slide(slide_layout)
# 设置标题
title_shape = slide.shapes.title
title_shape.text = title
# 设置内容
content_shape = slide.placeholders[1]
text_frame = content_shape.text_frame
for i, conclusion in enumerate(conclusions):
if i == 0:
p = text_frame.paragraphs[0]
else:
p = text_frame.add_paragraph()
p.text = conclusion
p.level = 0
return slide
def add_thank_you_slide(self):
"""添加结束页"""
slide_layout = self.prs.slide_layouts[6] # 仅标题布局
slide = self.prs.slides.add_slide(slide_layout)
# 设置标题
title_shape = slide.shapes.title
title_shape.text = "谢谢观看!"
# 添加日期
left = Inches(0)
top = Inches(5)
width = Inches(10)
height = Inches(1)
textbox = slide.shapes.add_textbox(left, top, width, height)
text_frame = textbox.text_frame
p = text_frame.add_paragraph()
p.text = datetime.now().strftime("%Y年%m月%d日")
p.alignment = PP_ALIGN.CENTER
return slide
def save(self, output_path):
"""保存PPT文件"""
# 确保输出目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
self.prs.save(output_path)
print(f"PPT已保存至: {output_path}")
return output_path
第五步:数据存档与备份
python
import shutil
import zipfile
import os
from datetime import datetime
class CustomerDataArchiver:
"""客户数据存档类"""
def __init__(self, db_path, archive_folder):
"""初始化存档路径"""
self.db_path = db_path
self.archive_folder = archive_folder
os.makedirs(archive_folder, exist_ok=True)
def create_database_backup(self):
"""创建数据库备份"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"customer_db_backup_{timestamp}.db"
backup_path = os.path.join(self.archive_folder, backup_filename)
# 复制数据库文件
shutil.copy2(self.db_path, backup_path)
print(f"数据库已备份至: {backup_path}")
return backup_path
def export_data_to_excel(self, output_folder):
"""导出数据到Excel文件"""
os.makedirs(output_folder, exist_ok=True)
# 连接数据库
conn = sqlite3.connect(self.db_path)
# 导出客户数据
customers_df = pd.read_sql_query("SELECT * FROM customers", conn)
customers_path = os.path.join(output_folder, "customers_export.xlsx")
customers_df.to_excel(customers_path, index=False)
# 导出交易数据
transactions_df = pd.read_sql_query("SELECT * FROM transactions", conn)
transactions_path = os.path.join(output_folder, "transactions_export.xlsx")
transactions_df.to_excel(transactions_path, index=False)
# 导出联系记录
contacts_df = pd.read_sql_query("SELECT * FROM contacts", conn)
contacts_path = os.path.join(output_folder, "contacts_export.xlsx")
contacts_df.to_excel(contacts_path, index=False)
conn.close()
print(f"数据已导出至: {output_folder}")
return {
'customers_path': customers_path,
'transactions_path': transactions_path,
'contacts_path': contacts_path
}
def create_archive_zip(self, files_to_archive):
"""创建存档ZIP文件"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
archive_filename = f"customer_data_archive_{timestamp}.zip"
archive_path = os.path.join(self.archive_folder, archive_filename)
with zipfile.ZipFile(archive_path, 'w') as zipf:
for file_path in files_to_archive:
if os.path.exists(file_path):
# 将文件添加到ZIP中,使用相对路径作为ZIP内的路径
zipf.write(file_path, os.path.basename(file_path))
print(f"存档文件已创建: {archive_path}")
return archive_path
def cleanup_old_archives(self, max_archives=10):
"""清理旧的存档文件,只保留最新的几个"""
# 获取所有ZIP存档文件
archives = [f for f in os.listdir(self.archive_folder) if f.endswith('.zip')]
# 按修改时间排序
archives.sort(key=lambda x: os.path.getmtime(os.path.join(self.archive_folder, x)), reverse=True)
# 删除旧文件
if len(archives) > max_archives:
for old_archive in archives[max_archives:]:
old_path = os.path.join(self.archive_folder, old_archive)
os.remove(old_path)
print(f"已删除旧存档: {old_path}")
第六步:整合所有功能
python
import os
import schedule
import time
from datetime import datetime
class CustomerManagementSystem:
"""客户信息管理系统主类"""
def __init__(self, config):
"""初始化系统配置"""
self.config = config
self.db_path = config['db_path']
self.output_folder = config['output_folder']
self.archive_folder = config['archive_folder']
# 确保必要的文件夹存在
os.makedirs(self.output_folder, exist_ok=True)
os.makedirs(self.archive_folder, exist_ok=True)
os.makedirs(os.path.join(self.output_folder, 'charts'), exist_ok=True)
os.makedirs(os.path.join(self.output_folder, 'exports'), exist_ok=True)
os.makedirs(os.path.join(self.output_folder, 'presentations'), exist_ok=True)
def import_customer_data(self, data_sources):
"""导入客户数据"""
print("开始导入客户数据...")
importer = CustomerDataImporter(self.db_path)
total_imported = 0
# 导入Excel数据
for source in data_sources.get('excel', []):
count = importer.import_from_excel(
source['file_path'],
source['sheet_name'],
source['table_name']
)
total_imported += count
# 导入CSV数据
for source in data_sources.get('csv', []):
count = importer.import_from_csv(
source['file_path'],
source['table_name']
)
total_imported += count
# 导入数据库数据
for source in data_sources.get('database', []):
count = importer.import_from_database(
source['db_path'],
source['query'],
source['table_name']
)
total_imported += count
importer.close()
print(f"数据导入完成,共导入{total_imported}条记录")
def process_customer_data(self):
"""处理客户数据"""
print("开始处理客户数据...")
processor = CustomerDataProcessor(self.db_path)
# 清洗数据
processor.clean_customer_data()
# 分类客户
category_counts = processor.categorize_customers()
# 分析客户偏好
preferences = processor.analyze_customer_preferences()
processor.close()
print("客户数据处理完成")
return {
'category_counts': category_counts,
'preferences': preferences
}
def analyze_customer_data(self):
"""分析客户数据"""
print("开始分析客户数据...")
charts_folder = os.path.join(self.output_folder, 'charts')
analyzer = CustomerDataAnalyzer(self.db_path, charts_folder)
# 生成客户分布图表
distribution_charts = analyzer.generate_customer_distribution_chart()
# 生成销售分析图表
sales_charts = analyzer.generate_sales_analysis_charts()
# 生成客户价值分析
value_analysis = analyzer.generate_customer_value_analysis()
analyzer.close()
print("客户数据分析完成")
return {
'distribution_charts': distribution_charts,
'sales_charts': sales_charts,
'value_analysis': value_analysis
}
def generate_presentation(self, analysis_results, processing_results):
"""生成PPT演示文稿"""
print("开始生成PPT演示文稿...")
# 初始化PPT生成器
template_path = self.config.get('ppt_template')
generator = CustomerPresentationGenerator(template_path)
# 添加标题页
report_date = datetime.now().strftime("%Y年%m月%d日")
generator.add_title_slide(
"客户信息分析报告",
f"生成日期: {report_date}"
)
# 添加客户概况章节
generator.add_section_header("一、客户概况")
# 添加客户分类统计
category_data = []
for category, count in processing_results['category_counts'].items():
category_data.append([category, count, f"{count/sum(processing_results['category_counts'].values())*100:.1f}%"])
generator.add_table_slide(
"客户分类统计",
category_data,
["客户类型", "数量", "占比"]
)
# 添加客户分布图表
generator.add_chart_slide(
"客户类型分布",
analysis_results['distribution_charts']['pie_chart_path'],
"不同类型客户的数量分布情况"
)
generator.add_chart_slide(
"客户年龄分布",
analysis_results['distribution_charts']['age_chart_path'],
"客户年龄段分布情况"
)
generator.add_chart_slide(
"客户来源分布",
analysis_results['distribution_charts']['source_chart_path'],
"客户获取渠道分布情况"
)
# 添加销售分析章节
generator.add_section_header("二、销售分析")
generator.add_chart_slide(
"月度销售趋势",
analysis_results['sales_charts']['trend_chart_path'],
"近期月度销售额变化趋势"
)
generator.add_chart_slide(
"产品类别销售分布",
analysis_results['sales_charts']['category_chart_path'],
"各产品类别销售额占比情况"
)
generator.add_chart_slide(
"支付方式分布",
analysis_results['sales_charts']['payment_chart_path'],
"客户偏好的支付方式分布"
)
# 添加客户价值分析章节
generator.add_section_header("三、客户价值分析")
generator.add_chart_slide(
"客户价值矩阵",
analysis_results['value_analysis']['value_chart_path'],
"基于消费金额和频次的客户价值分析"
)
# 添加客户偏好分析
preference_content = []
for customer_type, categories in processing_results['preferences'].items():
preference_content.append(f"{customer_type}:主要消费 {', '.join(categories)}")
generator.add_text_slide(
"客户消费偏好分析",
preference_content
)
# 添加结论和建议
conclusions = [
"1. VIP客户群体虽然数量较少,但贡献了最高的销售额,应重点维护关系",
"2. 沉睡客户数量较多,可以通过个性化营销活动唤醒",
"3. 年轻客户(25-35岁)是消费主力,应针对此群体开发更多产品",
"4. 线上渠道获客效果最好,建议增加线上营销投入",
"5. 建议针对不同客户群体制定差异化的营销策略,提高客户满意度和忠诚度"
]
generator.add_conclusion_slide("结论与建议", conclusions)
# 添加结束页
generator.add_thank_you_slide()
# 保存PPT
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
ppt_filename = f"客户分析报告_{timestamp}.pptx"
ppt_path = os.path.join(self.output_folder, 'presentations', ppt_filename)
generator.save(ppt_path)
print(f"PPT演示文稿已生成: {ppt_path}")
return ppt_path
def archive_data(self, ppt_path):
"""存档数据"""
print("开始存档数据...")
archiver = CustomerDataArchiver(self.db_path, self.archive_folder)
# 创建数据库备份
db_backup = archiver.create_database_backup()
# 导出数据到Excel
export_folder = os.path.join(self.output_folder, 'exports')
exported_files = archiver.export_data_to_excel(export_folder)
# 创建存档ZIP
files_to_archive = [
db_backup,
exported_files['customers_path'],
exported_files['transactions_path'],
exported_files['contacts_path'],
ppt_path
]
archive_path = archiver.create_archive_zip(files_to_archive)
# 清理旧存档
archiver.cleanup_old_archives(max_archives=self.config.get('max_archives', 10))
print("数据存档完成")
return archive_path
def run_full_process(self):
"""运行完整的处理流程"""
print("\n===== 开始客户信息管理系统处理流程 =====")
start_time = datetime.now()
# 步骤1:导入客户数据
if self.config.get('import_data', True):
self.import_customer_data(self.config['data_sources'])
# 步骤2:处理客户数据
processing_results = self.process_customer_data()
# 步骤3:分析客户数据
analysis_results = self.analyze_customer_data()
# 步骤4:生成PPT演示文稿
ppt_path = self.generate_presentation(analysis_results, processing_results)
# 步骤5:存档数据
archive_path = self.archive_data(ppt_path)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print(f"\n===== 处理流程完成,耗时 {duration:.2f} 秒 =====")
print(f"PPT报告: {ppt_path}")
print(f"数据存档: {archive_path}")
return {
'ppt_path': ppt_path,
'archive_path': archive_path,
'processing_results': processing_results,
'analysis_results': analysis_results
}
# 使用示例
if __name__ == "__main__":
# 系统配置
config = {
'db_path': 'customer_database.db',
'output_folder': 'customer_system_output',
'archive_folder': 'customer_system_archives',
'ppt_template': None, # 可选的PPT模板路径
'max_archives': 10, # 保留的最大存档数量
'import_data': True, # 是否导入新数据
'data_sources': {
'excel': [
{
'file_path': 'data/customers.xlsx',
'sheet_name': 'Sheet1',
'table_name': 'customers'
},
{
'file_path': 'data/transactions.xlsx',
'sheet_name': 'Sheet1',
'table_name': 'transactions'
}
],
'csv': [
{
'file_path': 'data/contacts.csv',
'table_name': 'contacts'
}
],
'database': []
}
}
# 创建并运行系统
system = CustomerManagementSystem(config)
results = system.run_full_process()
# 设置定时任务(每周一早上9点运行)
def scheduled_job():
print(f"\n===== 定时任务启动: {datetime.now()} =====")
system.run_full_process()
# 设置定时任务
schedule.every().monday.at("09:00").do(scheduled_job)
print("\n系统已设置定时任务,每周一早上9点自动运行...")
# 运行定时任务循环
# while True:
# schedule.run_pending()
# time.sleep(60)
实际应用场景
1. 销售团队客户管理
销售团队可以使用此系统自动导入CRM系统中的客户数据,进行分类和分析,生成客户价值报告,帮助销售人员识别高价值客户和潜在客户,制定针对性的销售策略。
2. 会员管理系统
零售企业可以使用此系统管理会员信息,分析会员消费行为和偏好,生成会员分析报告,为会员营销活动提供数据支持,提高会员忠诚度和复购率。
3. 客户服务优化
客服部门可以使用此系统整合客户反馈和投诉数据,分析客户满意度和问题类型,生成客户服务质量报告,帮助企业改进产品和服务,提升客户满意度。
4. 市场营销分析
市场部门可以使用此系统分析不同渠道获取的客户质量和转化率,评估营销活动效果,生成营销ROI分析报告,优化营销资源分配和策略制定。
进阶优化
1. 添加Web界面
使用Flask或Django构建Web界面,使系统更易于使用:
python
from flask import Flask, render_template, request, send_file
import os
app = Flask(__name__)
config = {...} # 系统配置
system = CustomerManagementSystem(config)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/import', methods=['GET', 'POST'])
def import_data():
if request.method == 'POST':
# 处理文件上传
file = request.files['file']
file_path = os.path.join('uploads', file.filename)
file.save(file_path)
# 导入数据
data_sources = {
'excel': [{
'file_path': file_path,
'sheet_name': request.form['sheet_name'],
'table_name': request.form['table_name']
}]
}
system.import_customer_data(data_sources)
return "数据导入成功!"
return render_template('import.html')
@app.route('/analyze')
def analyze():
# 运行分析流程
processing_results = system.process_customer_data()
analysis_results = system.analyze_customer_data()
return render_template('analysis.html', results=processing_results)
@app.route('/generate_ppt')
def generate_ppt():
# 生成PPT
processing_results = system.process_customer_data()
analysis_results = system.analyze_customer_data()
ppt_path = system.generate_presentation(analysis_results, processing_results)
return send_file(ppt_path, as_attachment=True)
if __name__ == '__main__':
app.run(debug=True)
2. 集成机器学习模型
添加客户流失预测和客户细分模型:
python
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
class CustomerMLModels:
"""客户机器学习模型类"""
def __init__(self, db_path):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
def prepare_features(self):
"""准备特征数据"""
# 读取客户和交易数据
customers = pd.read_sql_query("SELECT * FROM customers", self.conn)
transactions = pd.read_sql_query("SELECT * FROM transactions", self.conn)
# 合并数据
merged = pd.merge(transactions, customers, on='customer_id', how='left')
# 按客户分组计算特征
features = merged.groupby('customer_id').agg({
'amount': ['sum', 'mean', 'count'],
'transaction_date': ['min', 'max'],
'age': 'first',
'gender': 'first'
})
features.columns = ['total_spent', 'avg_spent', 'transaction_count', 'first_purchase', 'last_purchase', 'age', 'gender']
features.reset_index(inplace=True)
# 计算客户生命周期和购买频率
features['first_purchase'] = pd.to_datetime(features['first_purchase'])
features['last_purchase'] = pd.to_datetime(features['last_purchase'])
features['customer_age_days'] = (features['last_purchase'] - features['first_purchase']).dt.days
features['customer_age_days'] = features['customer_age_days'].apply(lambda x: max(x, 1))
features['purchase_frequency'] = features['transaction_count'] / features['customer_age_days'] * 30
# 计算最近一次购买距今天数
today = pd.to_datetime('today')
features['recency_days'] = (today - features['last_purchase']).dt.days
# 处理性别特征
features['gender'] = features['gender'].map({'男': 1, '女': 0, '未知': -1})
return features
def customer_segmentation(self):
"""客户细分模型"""
features = self.prepare_features()
# 选择用于聚类的特征
cluster_features = features[['total_spent', 'transaction_count', 'recency_days']].copy()
# 标准化特征
scaler = StandardScaler()
scaled_features = scaler.fit_transform(cluster_features)
# K-means聚类
kmeans = KMeans(n_clusters=4, random_state=42)
features['cluster'] = kmeans.fit_predict(scaled_features)
# 分析每个簇的特征
cluster_analysis = features.groupby('cluster').agg({
'total_spent': 'mean',
'transaction_count': 'mean',
'recency_days': 'mean',
'customer_id': 'count'
})
cluster_analysis.columns = ['平均消费额', '平均交易次数', '平均最近购买天数', '客户数量']
# 根据特征为每个簇命名
cluster_names = {}
for cluster in cluster_analysis.index:
if cluster_analysis.loc[cluster, '平均消费额'] > 1000 and cluster_analysis.loc[cluster, '平均最近购买天数'] < 30:
cluster_names[cluster] = '高价值活跃客户'
elif cluster_analysis.loc[cluster, '平均消费额'] > 1000 and cluster_analysis.loc[cluster, '平均最近购买天数'] >= 30:
cluster_names[cluster] = '高价值沉睡客户'
elif cluster_analysis.loc[cluster, '平均交易次数'] > 5 and cluster_analysis.loc[cluster, '平均最近购买天数'] < 30:
cluster_names[cluster] = '高频活跃客户'
else:
cluster_names[cluster] = '一般客户'
# 将簇名称添加到原始数据中
features['segment'] = features['cluster'].map(cluster_names)
# 更新数据库中的客户分类
cursor = self.conn.cursor()
for _, row in features.iterrows():
cursor.execute('''
UPDATE customers SET
customer_type = ?,
last_updated = ?
WHERE customer_id = ?
''', (
row['segment'],
datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
row['customer_id']
))
self.conn.commit()
运行上述代码,会根据特征值对客户进行分类,并更新数据库中的客户信息。
小结
本节内容主要包括以下几个部分:
需求分析:明确了客户信息管理系统需要实现的核心功能,包括数据导入、清洗分类、分析可视化、PPT生成和数据存档。
技术选型:选择了适合任务的Python库,包括pandas、sqlite3、matplotlib/seaborn、python-pptx等。
代码实现:
- 数据导入模块:支持从Excel、CSV和数据库导入数据
- 数据处理模块:实现了数据清洗和客户分类功能
- 数据分析模块:生成了客户分布、销售趋势、客户价值等可视化图表
- PPT生成模块:创建了一个结构完整的PPT演示文稿
- 数据存档模块:实现了数据库备份、数据导出和ZIP存档功能
- 整合所有功能:通过一个主类将所有模块整合在一起,并添加了定时任务功能。
实际应用场景:讨论了系统在销售管理、会员系统、客户服务和市场营销中的具体应用。
进阶优化:提出了添加Web界面和集成机器学习模型的扩展方向。
本系统可以广泛应用于企业客户关系管理场景,帮助企业提高客户管理效率,挖掘客户价值,制定精准营销策略。代码示例充分考虑了可扩展性和实用性,可以直接用于实际项目开发。