今天是我们 15天 Python 技巧之旅的最后一天,我们将学习 Day 15:生成器与 yield。这是 Python 中处理大数据流和惰性计算的核心技术。
Day 15:生成器与yield
深度解析:生成器(Generators)
1. 为什么需要生成器?
传统列表的问题:
# 创建包含100万个数字的列表
def create_large_list(n):
result = []
for i in range(n):
result.append(i ** 2)
return result
# 占用大量内存
large_list = create_large_list(1000000) # 立即占用内存
生成器解决方案:
def generate_squares(n):
for i in range(n):
yield i ** 2 # 惰性计算,不立即占用内存
# 几乎不占用内存
squares_gen = generate_squares(1000000) # 只是创建生成器对象
2. 生成器函数 vs 普通函数
普通函数:
def normal_function():
result = []
for i in range(5):
result.append(i)
return result # 一次性返回所有结果
print(normal_function()) # [0, 1, 2, 3, 4]
生成器函数:
def generator_function():
for i in range(5):
yield i # 每次 yield 一个值
gen = generator_function()
print(next(gen)) # 0
print(next(gen)) # 1
print(next(gen)) # 2
# 可以继续调用 next()
3. 基本用法和语法
简单的生成器示例:
def countdown(n):
print("开始倒计时")
while n > 0:
yield n
n -= 1
print("发射!")
# 使用生成器
counter = countdown(5)
print(next(counter)) # 开始倒计时 → 5
print(next(counter)) # 4
print(next(counter)) # 3
print(next(counter)) # 2
print(next(counter)) # 1
print(next(counter)) # 发射! → StopIteration
使用 for 循环遍历生成器:
def fibonacci(limit):
a, b = 0, 1
count = 0
while count < limit:
yield a
a, b = b, a + b
count += 1
# 自动处理 StopIteration
for num in fibonacci(10):
print(num, end=" ")
# 输出: 0 1 1 2 3 5 8 13 21 34
4. 生成器表达式
类似列表推导式,但返回生成器:
# 列表推导式 - 立即计算
squares_list = [x**2 for x in range(1000000)] # 占用大量内存
# 生成器表达式 - 惰性计算
squares_gen = (x**2 for x in range(1000000)) # 几乎不占内存
# 使用
print(next(squares_gen)) # 0
print(next(squares_gen)) # 1
# 可以转换为列表(如果需要)
first_10 = [next(squares_gen) for _ in range(10)]
5. 实际应用场景
场景1:大文件读取
def read_large_file(file_path):
"""逐行读取大文件,不加载到内存"""
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
yield line.strip()
# 使用
for line in read_large_file('huge_file.txt'):
if 'error' in line:
print(f"发现错误行: {line}")
# 处理每一行,不会耗尽内存
场景2:数据流处理
def data_pipeline(data_stream):
"""数据处理管道"""
# 过滤
filtered = (item for item in data_stream if item['active'])
# 转换
transformed = ({'id': item['id'], 'name': item['name'].upper()} for item in filtered)
# 分批处理
return batched(transformed, batch_size=100)
def batched(iterable, batch_size):
"""将数据流分批"""
batch = []
for item in iterable:
batch.append(item)
if len(batch) == batch_size:
yield batch
batch = []
if batch: # 最后一批
yield batch
# 模拟数据流
def generate_data():
import random
for i in range(1000):
yield {
'id': i,
'name': f'user_{i}',
'active': random.choice([True, False])
}
# 使用管道
for batch in data_pipeline(generate_data()):
print(f"处理批次: {len(batch)} 条记录")
场景3:无限序列生成
def infinite_sequence():
"""生成无限序列"""
num = 0
while True:
yield num
num += 1
def prime_numbers():
"""生成无限质数序列"""
yield 2
primes = [2]
candidate = 3
while True:
is_prime = True
for prime in primes:
if prime * prime > candidate:
break
if candidate % prime == 0:
is_prime = False
break
if is_prime:
primes.append(candidate)
yield candidate
candidate += 2
# 使用
primes = prime_numbers()
first_10_primes = [next(primes) for _ in range(10)]
print(f"前10个质数: {first_10_primes}")
6. 高级技巧:生成器状态管理
send() 方法通信:
def running_average():
"""使用 send() 接收新值并返回当前平均值"""
total = 0
count = 0
while True:
value = yield total / count if count > 0 else 0
total += value
count += 1
# 使用
avg_gen = running_average()
next(avg_gen) # 启动生成器
print(avg_gen.send(10)) # 10.0
print(avg_gen.send(20)) # 15.0
print(avg_gen.send(30)) # 20.0
throw() 和 close() 方法:
def resilient_generator():
"""具有错误恢复能力的生成器"""
try:
for i in range(10):
try:
yield i
except ValueError as e:
print(f"处理值错误: {e}")
continue
except GeneratorExit:
print("生成器被关闭,执行清理操作")
gen = resilient_generator()
print(next(gen)) # 0
print(gen.throw(ValueError("测试错误"))) # 处理错误后继续 → 1
gen.close() # 触发 GeneratorExit
7. 生成器与协程
简单的协程模式:
def coroutine_pattern():
"""协程模式示例"""
while True:
received = yield
print(f"接收到: {received}")
# 使用
coro = coroutine_pattern()
next(coro) # 启动协程
coro.send("Hello") # 接收到: Hello
coro.send("World") # 接收到: World
管道处理链:
def producer(target):
"""数据生产者"""
for i in range(5):
print(f"生产: {i}")
target.send(i)
target.close()
@coroutine
def filter_evens(target):
"""过滤偶数"""
try:
while True:
value = (yield)
if value % 2 == 0:
target.send(value)
except GeneratorExit:
target.close()
@coroutine
def consumer():
"""数据消费者"""
try:
while True:
value = (yield)
print(f"消费: {value}")
except GeneratorExit:
print("消费者结束")
def coroutine(func):
"""装饰器自动启动协程"""
def start(*args, **kwargs):
cr = func(*args, **kwargs)
next(cr)
return cr
return start
# 构建处理链
cons = consumer()
filt = filter_evens(cons)
producer(filt)
8. 性能对比和内存分析
内存使用对比:
import sys
def memory_compare():
# 列表方式
def make_list(n):
return [x**2 for x in range(n)]
# 生成器方式
def make_generator(n):
return (x**2 for x in range(n))
n = 1000000
# 内存占用对比
list_memory = sys.getsizeof(make_list(1000)) * (n // 1000) # 估算
gen_memory = sys.getsizeof(make_generator(n))
print(f"列表内存占用: ~{list_memory / 1024 / 1024:.2f} MB")
print(f"生成器内存占用: {gen_memory} 字节")
memory_compare()
9. 实际项目案例
案例1:日志文件实时监控
import time
import os
def log_monitor(log_file):
"""实时监控日志文件新增内容"""
# 移动到文件末尾
with open(log_file, 'r') as f:
f.seek(0, os.SEEK_END)
while True:
with open(log_file, 'r') as f:
# 获取当前文件大小
current_size = os.path.getsize(log_file)
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if not line:
break
yield line.strip()
# 等待新内容
time.sleep(0.1)
# 使用示例
def alert_system():
"""日志告警系统"""
for line in log_monitor('application.log'):
if 'ERROR' in line:
print(f" 发现错误: {line}")
elif 'WARNING' in line:
print(f"⚠️ 发现警告: {line}")
# 在后台线程运行
# import threading
# threading.Thread(target=alert_system, daemon=True).start()
案例2:分页API数据获取
import requests
def paginated_api_client(base_url, page_size=100):
"""分页获取API数据的生成器"""
page = 1
while True:
response = requests.get(f"{base_url}?page={page}&size={page_size}")
data = response.json()
if not data.get('results'):
break # 没有更多数据
for item in data['results']:
yield item
if not data.get('has_next'):
break # 最后一页
page += 1
# 使用
def process_all_users():
"""处理所有用户数据"""
for user in paginated_api_client('https://api.example.com/users'):
print(f"处理用户: {user['name']}")
# 可以在这里添加处理逻辑,内存占用恒定
案例3:数据流分析引擎
from typing import Iterator, List, Any
import statistics
class StreamAnalyzer:
"""数据流实时分析器"""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.buffer = []
def process_stream(self, data_stream: Iterator[float]) -> Iterator[dict]:
"""处理数据流并返回实时统计信息"""
for value in data_stream:
self.buffer.append(value)
# 保持窗口大小
if len(self.buffer) > self.window_size:
self.buffer.pop(0)
if len(self.buffer) >= 10: # 有足够数据时开始分析
yield {
'timestamp': time.time(),
'current': value,
'average': statistics.mean(self.buffer),
'std_dev': statistics.stdev(self.buffer) if len(self.buffer) > 1 else 0,
'min': min(self.buffer),
'max': max(self.buffer),
'window_size': len(self.buffer)
}
def generate_sensor_data():
"""模拟传感器数据流"""
import random
while True:
yield random.gauss(100, 10) # 正态分布数据
time.sleep(0.01) # 10ms 间隔
# 使用
analyzer = StreamAnalyzer(window_size=500)
for stats in analyzer.process_stream(generate_sensor_data()):
print(f"实时统计: {stats}")
if stats['std_dev'] > 15: # 检测异常波动
print("⚠️ 检测到异常波动!")
今日练习
练习1:创建基础生成器
# 1. 创建生成斐波那契数列的生成器
# 2. 创建读取大文件并过滤包含关键字的行的生成器
# 3. 创建无限循环的计数器生成器
练习2:生成器管道
# 创建数据处理管道:
# 1. 数据生成 → 2. 过滤 → 3. 转换 → 4. 分批
# 每个步骤都使用生成器实现
练习3:实现协程模式
# 实现一个简单的消息广播系统:
# 多个消费者可以订阅生产者,生产者发送的消息会被所有消费者接收
练习答案:
# 练习1答案:
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
def filter_file_lines(file_path, keyword):
with open(file_path, 'r') as f:
for line in f:
if keyword in line:
yield line.strip()
def infinite_counter(start=0, step=1):
current = start
while True:
yield current
current += step
# 练习2答案:
def data_generator(n):
for i in range(n):
yield {'id': i, 'value': i * 10}
def data_filter(data_stream):
for item in data_stream:
if item['value'] % 20 == 0: # 过滤条件
yield item
def data_transformer(data_stream):
for item in data_stream:
yield {'id': item['id'], 'transformed': item['value'] * 2}
def batcher(data_stream, batch_size):
batch = []
for item in data_stream:
batch.append(item)
if len(batch) == batch_size:
yield batch
batch = []
if batch:
yield batch
# 构建管道
pipeline = batcher(data_transformer(data_filter(data_generator(100))), 10)
for batch in pipeline:
print(f"批次: {batch}")
# 练习3答案:
def broadcast_consumer(name):
try:
while True:
message = yield
print(f"{name} 收到: {message}")
except GeneratorExit:
print(f"{name} 退出")
def broadcast_producer(consumers):
for i in range(5):
message = f"消息 {i}"
print(f"广播: {message}")
for consumer in consumers:
consumer.send(message)
# 使用
consumers = [broadcast_consumer(f"消费者{i}") for i in range(3)]
for consumer in consumers:
next(consumer) # 启动协程
broadcast_producer(consumers)
for consumer in consumers:
consumer.close()
今日总结
- 内存高效: 惰性计算,适合处理大数据流
- 状态保持: 生成器在 yield 时暂停,恢复时继续
- 管道处理: 可以构建复杂的数据处理管道
- 无限序列: 可以表明无限或超级大的序列
生成器使用场景:
- ✅ 处理大型数据集或文件
- ✅ 表明无限序列
- ✅ 构建数据处理管道
- ✅ 实现协程和状态机
- ❌ 需要随机访问或多次遍历的数据
最佳实践:
- 使用生成器表达式替代列表推导式处理大数据
- 使用 yield from 委托子生成器
- 合理使用 send(), throw(), close() 方法
- 注意生成器的一次性使用特性
15天 Python 技巧之旅总结
祝贺你完成了15天的 Python 学习之旅!让我们回顾一下学到的核心技巧:
15天学习路线回顾
|
天数 |
主题 |
核心价值 |
|
Day 1 |
优雅的合并字典 |
更简洁的字典操作 |
|
Day 2 |
条件表达式 |
一行代码的 if-else |
|
Day 3 |
enumerate 获取索引 |
优雅的索引遍历 |
|
Day 4 |
zip 并行迭代 |
多序列同步处理 |
|
Day 5 |
推导式 |
简洁的数据结构创建 |
|
Day 6 |
any() 和 all() |
高效的逻辑判断 |
|
Day 7 |
f-string 格式化 |
现代化的字符串格式化 |
|
Day 8 |
collections 模块 |
强劲的数据结构工具 |
|
Day 9 |
上下文管理器 |
安全的资源管理 |
|
Day 10 |
pathlib 路径处理 |
面向对象的路径操作 |
|
Day 11 |
装饰器 |
函数增强和元编程 |
|
Day 12 |
lambda 匿名函数 |
简洁的函数式编程 |
|
Day 13 |
@dataclass |
自动生成数据类 |
|
Day 14 |
类型提示 |
代码可读性和可靠性 |
|
Day 15 |
生成器与 yield |
内存高效的数据流处理 |
下一步学习提议
- 项目实践: 选择一个小项目应用这些技巧
- 深入框架: 学习 Django、FastAPI 等框架
- 算法数据结构: 夯实计算机科学基础
- 并发编程: 学习异步编程 asyncio
- 测试开发: 掌握 pytest、unittest
- 开源贡献: 参与开源项目提升实战能力
持续学习资源
- 官方文档: docs.python.org
- Awesome Python: GitHub 上的优秀资源集合
- Real Python: 高质量的教程网站
- Python Weekly: 每周技术资讯
记住: 编程技能的提升在于持续实践和不断学习。每天花30分钟编写代码,比一次性学习几个小时更有效!
祝你 Python 编程之旅愉快,期待看到你创造出优秀的作品!

收藏了,感谢分享