引入:现实世界中的文件格式
在真实项目中,我们很少只处理纯文本文件。常见的文件格式有:
- CSV文件:表格数据
- JSON文件:配置和数据交换
- 文本文件:日志和文档
- 其他格式:XML、Excel等
今天我们先重点学习最常用的 CSV文件 和 JSON文件!
1. CSV 文件处理
什么是CSV文件?
CSV = Comma-Separated Values(逗号分隔值)
import csv
# 读取CSV文件
def 读取csv文件(文件名):
"""使用csv模块读取CSV文件"""
with open(文件名, 'r', encoding='utf-8') as file:
# 创建csv读取器
csv_reader = csv.reader(file)
print(" CSV文件内容:")
for 行号, 行 in enumerate(csv_reader, 1):
print(f"第{行号}行: {行}")
# 创建示例CSV文件
def 创建示例csv():
"""创建一个示例CSV文件"""
数据 = [
['姓名', '年龄', '城市', '职业'],
['张三', '25', '北京', '工程师'],
['李四', '30', '上海', '设计师'],
['王五', '28', '广州', '教师'],
['赵六', '35', '深圳', '产品经理']
]
with open('员工信息.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerows(数据)
print("✅ 示例CSV文件创建完成!")
# 使用示例
创建示例csv()
读取csv文件('员工信息.csv')
1.2 更友善的CSV读取方式
import csv
def 读取csv为字典(文件名):
"""将CSV文件读取为字典列表,更易处理"""
with open(文件名, 'r', encoding='utf-8') as file:
# 使用DictReader,第一行作为字段名
csv_reader = csv.DictReader(file)
print(" 员工信息表:")
print("=" * 50)
员工列表 = []
for 行号, 行 in enumerate(csv_reader, 1):
员工列表.append(行)
print(f"{行号}. 姓名:{行['姓名']},年龄:{行['年龄']},城市:{行['城市']},职业:{行['职业']}")
return 员工列表
def 分析csv数据(文件名):
"""分析CSV文件中的数据"""
员工列表 = 读取csv为字典(文件名)
if not 员工列表:
return
# 统计信息
总人数 = len(员工列表)
平均年龄 = sum(int(员工['年龄']) for 员工 in 员工列表) / 总人数
城市统计 = {}
for 员工 in 员工列表:
城市 = 员工['城市']
城市统计[城市] = 城市统计.get(城市, 0) + 1
print(f"
数据分析结果:")
print(f" 总员工数:{总人数}")
print(f" 平均年龄:{平均年龄:.1f}岁")
print(f" 城市分布:{城市统计}")
# 使用示例
分析csv数据('员工信息.csv')
1.3 写入CSV文件
import csv
def 添加新员工(文件名):
"""向CSV文件添加新员工"""
print(" 添加新员工信息")
print("-" * 30)
姓名 = input("姓名: ")
年龄 = input("年龄: ")
城市 = input("城市: ")
职业 = input("职业: ")
# 准备新数据
新员工 = [姓名, 年龄, 城市, 职业]
# 追加到CSV文件
with open(文件名, 'a', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerow(新员工)
print("✅ 新员工信息已添加!")
def 创建复杂csv文件():
"""创建包含复杂数据的CSV文件"""
# 模拟销售数据
销售数据 = [
['日期', '产品', '数量', '单价', '销售员'],
['2024-01-01', '手机', '5', '2999', '张三'],
['2024-01-01', '电脑', '3', '5999', '李四'],
['2024-01-02', '平板', '2', '3999', '王五'],
['2024-01-02', '手机', '8', '2999', '张三'],
['2024-01-03', '电脑', '4', '5999', '李四']
]
with open('销售记录.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerows(销售数据)
print("✅ 销售记录CSV文件创建完成!")
def 分析销售数据(文件名):
"""分析销售数据"""
with open(文件名, 'r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
销售记录 = list(csv_reader)
# 计算总销售额
总销售额 = 0
销售员业绩 = {}
for 记录 in 销售记录:
销售额 = int(记录['数量']) * float(记录['单价'])
总销售额 += 销售额
销售员 = 记录['销售员']
销售员业绩[销售员] = 销售员业绩.get(销售员, 0) + 销售额
print(f"
销售分析报告:")
print(f" 总销售额:{总销售额:.2f}元")
print(f" 销售员业绩排名:")
for 销售员, 业绩 in sorted(销售员业绩.items(), key=lambda x: x[1], reverse=True):
print(f" {销售员}:{业绩:.2f}元")
# 使用示例
创建复杂csv文件()
分析销售数据('销售记录.csv')
2. JSON 文件处理
什么是JSON文件?
JSON = JavaScript Object Notation
{
"姓名": "张三",
"年龄": 25,
"城市": "北京",
"爱好": ["编程", "阅读", "运动"],
"教育经历": {
"大学": "清华大学",
"专业": "计算机科学"
}
}
2.1 读取和写入JSON文件
import json
def 创建示例json文件():
"""创建示例JSON文件"""
用户数据 = {
"用户列表": [
{
"id": 1,
"姓名": "张三",
"年龄": 25,
"邮箱": "zhangsan@example.com",
"活跃": True,
"技能": ["Python", "Java", "SQL"],
"地址": {
"城市": "北京",
"街道": "中关村大街"
}
},
{
"id": 2,
"姓名": "李四",
"年龄": 30,
"邮箱": "lisi@example.com",
"活跃": False,
"技能": ["JavaScript", "HTML", "CSS"],
"地址": {
"城市": "上海",
"街道": "浦东新区"
}
}
],
"统计信息": {
"总用户数": 2,
"平均年龄": 27.5,
"活跃用户": 1
}
}
# 写入JSON文件
with open('用户数据.json', 'w', encoding='utf-8') as file:
json.dump(用户数据, file, ensure_ascii=False, indent=2)
print("✅ JSON文件创建完成!")
def 读取json文件(文件名):
"""读取JSON文件"""
with open(文件名, 'r', encoding='utf-8') as file:
数据 = json.load(file)
return 数据
def 显示json内容(文件名):
"""格式化显示JSON内容"""
数据 = 读取json文件(文件名)
print(" 用户信息:")
print("=" * 50)
for 用户 in 数据["用户列表"]:
print(f"ID: {用户['id']}")
print(f"姓名: {用户['姓名']}")
print(f"年龄: {用户['年龄']}")
print(f"邮箱: {用户['邮箱']}")
print(f"活跃: {'是' if 用户['活跃'] else '否'}")
print(f"技能: {', '.join(用户['技能'])}")
print(f"地址: {用户['地址']['城市']} - {用户['地址']['街道']}")
print("-" * 30)
# 显示统计信息
统计 = 数据["统计信息"]
print(f"
统计信息:")
print(f" 总用户数: {统计['总用户数']}")
print(f" 平均年龄: {统计['平均年龄']}")
print(f" 活跃用户: {统计['活跃用户']}")
# 使用示例
创建示例json文件()
显示json内容('用户数据.json')
2.2 JSON数据的增删改查
import json
def 添加新用户(文件名, 新用户):
"""向JSON文件添加新用户"""
# 读取现有数据
with open(文件名, 'r', encoding='utf-8') as file:
数据 = json.load(file)
# 生成新ID
新id = max(用户['id'] for 用户 in 数据['用户列表']) + 1
新用户['id'] = 新id
# 添加新用户
数据['用户列表'].append(新用户)
# 更新统计信息
数据['统计信息']['总用户数'] = len(数据['用户列表'])
数据['统计信息']['平均年龄'] = sum(用户['年龄'] for 用户 in 数据['用户列表']) / len(数据['用户列表'])
数据['统计信息']['活跃用户'] = sum(1 for 用户 in 数据['用户列表'] if 用户['活跃'])
# 写回文件
with open(文件名, 'w', encoding='utf-8') as file:
json.dump(数据, file, ensure_ascii=False, indent=2)
print(f"✅ 新用户添加成功!ID: {新id}")
def 交互式添加用户(文件名):
"""交互式添加用户"""
print(" 添加新用户")
print("-" * 30)
新用户 = {
"姓名": input("姓名: "),
"年龄": int(input("年龄: ")),
"邮箱": input("邮箱: "),
"活跃": input("是否活跃 (是/否): ").lower() == '是',
"技能": input("技能 (用逗号分隔): ").split(','),
"地址": {
"城市": input("城市: "),
"街道": input("街道: ")
}
}
添加新用户(文件名, 新用户)
def 搜索用户(文件名, 关键词):
"""搜索用户"""
数据 = 读取json文件(文件名)
结果 = []
for 用户 in 数据['用户列表']:
# 在姓名、邮箱、技能、城市中搜索
if (关键词.lower() in 用户['姓名'].lower() or
关键词.lower() in 用户['邮箱'].lower() or
any(关键词.lower() in 技能.lower() for 技能 in 用户['技能']) or
关键词.lower() in 用户['地址']['城市'].lower()):
结果.append(用户)
if 结果:
print(f"
找到 {len(结果)} 个匹配用户:")
for 用户 in 结果:
print(f" - {用户['姓名']} (ID: {用户['id']})")
else:
print("❌ 没有找到匹配的用户")
# 使用示例
交互式添加用户('用户数据.json')
搜索用户('用户数据.json', '北京')
3. 配置文件处理实战
3.1 使用JSON作为配置文件
import json
import os
class 配置管理器:
"""使用JSON管理程序配置"""
def __init__(self, 配置文件='config.json'):
self.配置文件 = 配置文件
self.配置 = self.加载配置()
def 加载配置(self):
"""加载配置,如果文件不存在则创建默认配置"""
if os.path.exists(self.配置文件):
with open(self.配置文件, 'r', encoding='utf-8') as file:
return json.load(file)
else:
# 默认配置
默认配置 = {
"程序设置": {
"语言": "zh-CN",
"主题": "dark",
"自动保存": True,
"保存间隔": 300
},
"用户设置": {
"用户名": "默认用户",
"字体大小": 14,
"显示行号": True
},
"数据路径": {
"工作目录": "./workspace",
"备份目录": "./backups"
}
}
# 保存默认配置
self.保存配置(默认配置)
return 默认配置
def 保存配置(self, 配置=None):
"""保存配置到文件"""
if 配置 is None:
配置 = self.配置
with open(self.配置文件, 'w', encoding='utf-8') as file:
json.dump(配置, file, ensure_ascii=False, indent=2)
print("✅ 配置已保存")
def 获取配置项(self, 路径):
"""获取特定配置项,支持点分隔路径"""
键列表 = 路径.split('.')
当前值 = self.配置
for 键 in 键列表:
当前值 = 当前值.get(键)
if 当前值 is None:
return None
return 当前值
def 设置配置项(self, 路径, 值):
"""设置特定配置项"""
键列表 = 路径.split('.')
当前字典 = self.配置
# 遍历到最后一个键的父级
for 键 in 键列表[:-1]:
if 键 not in 当前字典:
当前字典[键] = {}
当前字典 = 当前字典[键]
# 设置最终的值
当前字典[键列表[-1]] = 值
self.保存配置()
def 显示配置(self):
"""显示当前所有配置"""
print("⚙️ 当前配置:")
print(json.dumps(self.配置, ensure_ascii=False, indent=2))
# 使用示例
配置 = 配置管理器()
配置.显示配置()
# 修改一些配置
配置.设置配置项('程序设置.语言', 'en-US')
配置.设置配置项('用户设置.字体大小', 16)
print(f"
当前语言:{配置.获取配置项('程序设置.语言')}")
4. 综合实战:学生成绩管理系统
import csv
import json
import os
class 学生成绩管理系统:
"""使用CSV和JSON管理学生成绩"""
def __init__(self):
self.csv文件 = '学生成绩.csv'
self.json文件 = '学生档案.json'
self.初始化数据文件()
def 初始化数据文件(self):
"""初始化数据文件"""
# 初始化CSV文件(如果不存在)
if not os.path.exists(self.csv文件):
with open(self.csv文件, 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerow(['学号', '姓名', '语文', '数学', '英语', '总分'])
print("✅ CSV文件初始化完成")
# 初始化JSON文件(如果不存在)
if not os.path.exists(self.json文件):
初始数据 = {
"班级信息": {
"班级名称": "高一(1)班",
"班主任": "张老师",
"学生总数": 0
},
"成绩统计": {
"平均分": {},
"最高分": {},
"最低分": {}
}
}
with open(self.json文件, 'w', encoding='utf-8') as file:
json.dump(初始数据, file, ensure_ascii=False, indent=2)
print("✅ JSON文件初始化完成")
def 添加学生成绩(self):
"""添加学生成绩"""
print(" 添加学生成绩")
print("-" * 30)
学号 = input("学号: ")
姓名 = input("姓名: ")
语文 = float(input("语文成绩: "))
数学 = float(input("数学成绩: "))
英语 = float(input("英语成绩: "))
总分 = 语文 + 数学 + 英语
# 添加到CSV文件
with open(self.csv文件, 'a', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerow([学号, 姓名, 语文, 数学, 英语, 总分])
print(f"✅ 学生 {姓名} 的成绩已添加,总分: {总分}")
# 更新统计信息
self.更新统计信息()
def 显示所有成绩(self):
"""显示所有学生成绩"""
with open(self.csv文件, 'r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
成绩列表 = list(csv_reader)
if not 成绩列表:
print("还没有学生成绩记录!")
return
print("
学生成绩表:")
print("=" * 70)
print(f"{'学号':<10} {'姓名':<8} {'语文':<6} {'数学':<6} {'英语':<6} {'总分':<6}")
print("-" * 70)
for 成绩 in 成绩列表:
print(f"{成绩['学号']:<10} {成绩['姓名']:<8} {成绩['语文']:<6} {成绩['数学']:<6} {成绩['英语']:<6} {成绩['总分']:<6}")
def 更新统计信息(self):
"""更新成绩统计信息"""
with open(self.csv文件, 'r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
成绩列表 = list(csv_reader)
if not 成绩列表:
return
# 读取现有JSON数据
with open(self.json文件, 'r', encoding='utf-8') as file:
json数据 = json.load(file)
# 更新统计信息
json数据['班级信息']['学生总数'] = len(成绩列表)
# 计算各科统计
科目列表 = ['语文', '数学', '英语']
for 科目 in 科目列表:
成绩列 = [float(成绩[科目]) for 成绩 in 成绩列表]
json数据['成绩统计']['平均分'][科目] = sum(成绩列) / len(成绩列)
json数据['成绩统计']['最高分'][科目] = max(成绩列)
json数据['成绩统计']['最低分'][科目] = min(成绩列)
# 保存更新后的JSON数据
with open(self.json文件, 'w', encoding='utf-8') as file:
json.dump(json数据, file, ensure_ascii=False, indent=2)
def 显示统计报告(self):
"""显示成绩统计报告"""
with open(self.json文件, 'r', encoding='utf-8') as file:
json数据 = json.load(file)
print("
成绩统计报告:")
print("=" * 50)
print(f"班级: {json数据['班级信息']['班级名称']}")
print(f"班主任: {json数据['班级信息']['班主任']}")
print(f"学生总数: {json数据['班级信息']['学生总数']}")
print("
各科成绩统计:")
for 科目 in ['语文', '数学', '英语']:
平均分 = json数据['成绩统计']['平均分'].get(科目, 0)
最高分 = json_data['成绩统计']['最高分'].get(科目, 0)
最低分 = json_data['成绩统计']['最低分'].get(科目, 0)
print(f" {科目}: 平均{平均分:.1f}分, 最高{最高分}分, 最低{最低分}分")
# 使用示例
def 主菜单():
"""主菜单"""
系统 = 学生成绩管理系统()
while True:
print("
学生成绩管理系统")
print("=" * 30)
print("1. 添加学生成绩")
print("2. 查看所有成绩")
print("3. 查看统计报告")
print("4. 退出系统")
print("=" * 30)
选择 = input("请选择操作 (1-4): ")
if 选择 == '1':
系统.添加学生成绩()
elif 选择 == '2':
系统.显示所有成绩()
elif 选择 == '3':
系统.显示统计报告()
elif 选择 == '4':
print("感谢使用!再见!")
break
else:
print("❌ 无效选择,请重新输入!")
# 运行系统
主菜单()
重点总结
CSV文件处理:
- 使用 csv 模块:专门处理CSV格式
- csv.reader():读取为列表
- csv.DictReader():读取为字典(推荐!)
- newline='':防止额外的空行
JSON文件处理:
- json.load():读取JSON文件
- json.dump():写入JSON文件
- ensure_ascii=False:支持中文
- indent=2:美化输出格式
最佳实践:
- 总是指定编码:encoding='utf-8'
- 使用 with 语句:自动管理文件
- 异常处理:处理文件不存在等情况
- 数据验证:确保数据格式正确
© 版权声明
文章版权归作者所有,未经允许请勿转载。



收藏了,感谢分享