python每天一个小技巧——Day 15:生成器与 yield

今天是我们 15天 Python 技巧之旅的最后一天,我们将学习 Day 15:生成器与 yield。这是 Python 中处理大数据流和惰性计算的核心技术。

Day 15:生成器与yield

深度解析:生成器(Generators)

1. 为什么需要生成器?

传统列表的问题:

# 创建包含100万个数字的列表
def create_large_list(n):
    result = []
    for i in range(n):
        result.append(i ** 2)
    return result

# 占用大量内存
large_list = create_large_list(1000000)  # 立即占用内存

生成器解决方案:

def generate_squares(n):
    for i in range(n):
        yield i ** 2  # 惰性计算,不立即占用内存

# 几乎不占用内存
squares_gen = generate_squares(1000000)  # 只是创建生成器对象

2. 生成器函数 vs 普通函数

普通函数:

def normal_function():
    result = []
    for i in range(5):
        result.append(i)
    return result  # 一次性返回所有结果

print(normal_function())  # [0, 1, 2, 3, 4]

生成器函数:

def generator_function():
    for i in range(5):
        yield i  # 每次 yield 一个值

gen = generator_function()
print(next(gen))  # 0
print(next(gen))  # 1
print(next(gen))  # 2
# 可以继续调用 next()

3. 基本用法和语法

简单的生成器示例:

def countdown(n):
    print("开始倒计时")
    while n > 0:
        yield n
        n -= 1
    print("发射!")

# 使用生成器
counter = countdown(5)
print(next(counter))  # 开始倒计时 → 5
print(next(counter))  # 4
print(next(counter))  # 3
print(next(counter))  # 2
print(next(counter))  # 1
print(next(counter))  # 发射! → StopIteration

使用 for 循环遍历生成器:

def fibonacci(limit):
    a, b = 0, 1
    count = 0
    while count < limit:
        yield a
        a, b = b, a + b
        count += 1

# 自动处理 StopIteration
for num in fibonacci(10):
    print(num, end=" ")
# 输出: 0 1 1 2 3 5 8 13 21 34

4. 生成器表达式

类似列表推导式,但返回生成器:

# 列表推导式 - 立即计算
squares_list = [x**2 for x in range(1000000)]  # 占用大量内存

# 生成器表达式 - 惰性计算
squares_gen = (x**2 for x in range(1000000))  # 几乎不占内存

# 使用
print(next(squares_gen))  # 0
print(next(squares_gen))  # 1

# 可以转换为列表(如果需要)
first_10 = [next(squares_gen) for _ in range(10)]

5. 实际应用场景

场景1:大文件读取

def read_large_file(file_path):
    """逐行读取大文件,不加载到内存"""
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            yield line.strip()

# 使用
for line in read_large_file('huge_file.txt'):
    if 'error' in line:
        print(f"发现错误行: {line}")
    # 处理每一行,不会耗尽内存

场景2:数据流处理

def data_pipeline(data_stream):
    """数据处理管道"""
    # 过滤
    filtered = (item for item in data_stream if item['active'])
    # 转换
    transformed = ({'id': item['id'], 'name': item['name'].upper()} for item in filtered)
    # 分批处理
    return batched(transformed, batch_size=100)

def batched(iterable, batch_size):
    """将数据流分批"""
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:  # 最后一批
        yield batch

# 模拟数据流
def generate_data():
    import random
    for i in range(1000):
        yield {
            'id': i,
            'name': f'user_{i}',
            'active': random.choice([True, False])
        }

# 使用管道
for batch in data_pipeline(generate_data()):
    print(f"处理批次: {len(batch)} 条记录")

场景3:无限序列生成

def infinite_sequence():
    """生成无限序列"""
    num = 0
    while True:
        yield num
        num += 1

def prime_numbers():
    """生成无限质数序列"""
    yield 2
    primes = [2]
    candidate = 3
    while True:
        is_prime = True
        for prime in primes:
            if prime * prime > candidate:
                break
            if candidate % prime == 0:
                is_prime = False
                break
        if is_prime:
            primes.append(candidate)
            yield candidate
        candidate += 2

# 使用
primes = prime_numbers()
first_10_primes = [next(primes) for _ in range(10)]
print(f"前10个质数: {first_10_primes}")

6. 高级技巧:生成器状态管理

send() 方法通信:

def running_average():
    """使用 send() 接收新值并返回当前平均值"""
    total = 0
    count = 0
    while True:
        value = yield total / count if count > 0 else 0
        total += value
        count += 1

# 使用
avg_gen = running_average()
next(avg_gen)  # 启动生成器

print(avg_gen.send(10))  # 10.0
print(avg_gen.send(20))  # 15.0
print(avg_gen.send(30))  # 20.0

throw() 和 close() 方法:

def resilient_generator():
    """具有错误恢复能力的生成器"""
    try:
        for i in range(10):
            try:
                yield i
            except ValueError as e:
                print(f"处理值错误: {e}")
                continue
    except GeneratorExit:
        print("生成器被关闭,执行清理操作")

gen = resilient_generator()
print(next(gen))  # 0
print(gen.throw(ValueError("测试错误")))  # 处理错误后继续 → 1
gen.close()  # 触发 GeneratorExit

7. 生成器与协程

简单的协程模式:

def coroutine_pattern():
    """协程模式示例"""
    while True:
        received = yield
        print(f"接收到: {received}")

# 使用
coro = coroutine_pattern()
next(coro)  # 启动协程
coro.send("Hello")  # 接收到: Hello
coro.send("World")   # 接收到: World

管道处理链:

def producer(target):
    """数据生产者"""
    for i in range(5):
        print(f"生产: {i}")
        target.send(i)
    target.close()

@coroutine
def filter_evens(target):
    """过滤偶数"""
    try:
        while True:
            value = (yield)
            if value % 2 == 0:
                target.send(value)
    except GeneratorExit:
        target.close()

@coroutine
def consumer():
    """数据消费者"""
    try:
        while True:
            value = (yield)
            print(f"消费: {value}")
    except GeneratorExit:
        print("消费者结束")

def coroutine(func):
    """装饰器自动启动协程"""
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        next(cr)
        return cr
    return start

# 构建处理链
cons = consumer()
filt = filter_evens(cons)
producer(filt)

8. 性能对比和内存分析

内存使用对比:

import sys

def memory_compare():
    # 列表方式
    def make_list(n):
        return [x**2 for x in range(n)]
    
    # 生成器方式
    def make_generator(n):
        return (x**2 for x in range(n))
    
    n = 1000000
    
    # 内存占用对比
    list_memory = sys.getsizeof(make_list(1000)) * (n // 1000)  # 估算
    gen_memory = sys.getsizeof(make_generator(n))
    
    print(f"列表内存占用: ~{list_memory / 1024 / 1024:.2f} MB")
    print(f"生成器内存占用: {gen_memory} 字节")

memory_compare()

9. 实际项目案例

案例1:日志文件实时监控

import time
import os

def log_monitor(log_file):
    """实时监控日志文件新增内容"""
    # 移动到文件末尾
    with open(log_file, 'r') as f:
        f.seek(0, os.SEEK_END)
    
    while True:
        with open(log_file, 'r') as f:
            # 获取当前文件大小
            current_size = os.path.getsize(log_file)
            f.seek(0, os.SEEK_END)
            
            while True:
                line = f.readline()
                if not line:
                    break
                yield line.strip()
        
        # 等待新内容
        time.sleep(0.1)

# 使用示例
def alert_system():
    """日志告警系统"""
    for line in log_monitor('application.log'):
        if 'ERROR' in line:
            print(f" 发现错误: {line}")
        elif 'WARNING' in line:
            print(f"⚠️  发现警告: {line}")

# 在后台线程运行
# import threading
# threading.Thread(target=alert_system, daemon=True).start()

案例2:分页API数据获取

import requests

def paginated_api_client(base_url, page_size=100):
    """分页获取API数据的生成器"""
    page = 1
    while True:
        response = requests.get(f"{base_url}?page={page}&size={page_size}")
        data = response.json()
        
        if not data.get('results'):
            break  # 没有更多数据
        
        for item in data['results']:
            yield item
        
        if not data.get('has_next'):
            break  # 最后一页
        
        page += 1

# 使用
def process_all_users():
    """处理所有用户数据"""
    for user in paginated_api_client('https://api.example.com/users'):
        print(f"处理用户: {user['name']}")
        # 可以在这里添加处理逻辑,内存占用恒定

案例3:数据流分析引擎

from typing import Iterator, List, Any
import statistics

class StreamAnalyzer:
    """数据流实时分析器"""
    
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.buffer = []
    
    def process_stream(self, data_stream: Iterator[float]) -> Iterator[dict]:
        """处理数据流并返回实时统计信息"""
        for value in data_stream:
            self.buffer.append(value)
            
            # 保持窗口大小
            if len(self.buffer) > self.window_size:
                self.buffer.pop(0)
            
            if len(self.buffer) >= 10:  # 有足够数据时开始分析
                yield {
                    'timestamp': time.time(),
                    'current': value,
                    'average': statistics.mean(self.buffer),
                    'std_dev': statistics.stdev(self.buffer) if len(self.buffer) > 1 else 0,
                    'min': min(self.buffer),
                    'max': max(self.buffer),
                    'window_size': len(self.buffer)
                }

def generate_sensor_data():
    """模拟传感器数据流"""
    import random
    while True:
        yield random.gauss(100, 10)  # 正态分布数据
        time.sleep(0.01)  # 10ms 间隔

# 使用
analyzer = StreamAnalyzer(window_size=500)
for stats in analyzer.process_stream(generate_sensor_data()):
    print(f"实时统计: {stats}")
    if stats['std_dev'] > 15:  # 检测异常波动
        print("⚠️  检测到异常波动!")

今日练习

练习1:创建基础生成器

# 1. 创建生成斐波那契数列的生成器
# 2. 创建读取大文件并过滤包含关键字的行的生成器
# 3. 创建无限循环的计数器生成器

练习2:生成器管道

# 创建数据处理管道:
# 1. 数据生成 → 2. 过滤 → 3. 转换 → 4. 分批
# 每个步骤都使用生成器实现

练习3:实现协程模式

# 实现一个简单的消息广播系统:
# 多个消费者可以订阅生产者,生产者发送的消息会被所有消费者接收

练习答案:

# 练习1答案:
def fibonacci():
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

def filter_file_lines(file_path, keyword):
    with open(file_path, 'r') as f:
        for line in f:
            if keyword in line:
                yield line.strip()

def infinite_counter(start=0, step=1):
    current = start
    while True:
        yield current
        current += step

# 练习2答案:
def data_generator(n):
    for i in range(n):
        yield {'id': i, 'value': i * 10}

def data_filter(data_stream):
    for item in data_stream:
        if item['value'] % 20 == 0:  # 过滤条件
            yield item

def data_transformer(data_stream):
    for item in data_stream:
        yield {'id': item['id'], 'transformed': item['value'] * 2}

def batcher(data_stream, batch_size):
    batch = []
    for item in data_stream:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:
        yield batch

# 构建管道
pipeline = batcher(data_transformer(data_filter(data_generator(100))), 10)
for batch in pipeline:
    print(f"批次: {batch}")

# 练习3答案:
def broadcast_consumer(name):
    try:
        while True:
            message = yield
            print(f"{name} 收到: {message}")
    except GeneratorExit:
        print(f"{name} 退出")

def broadcast_producer(consumers):
    for i in range(5):
        message = f"消息 {i}"
        print(f"广播: {message}")
        for consumer in consumers:
            consumer.send(message)

# 使用
consumers = [broadcast_consumer(f"消费者{i}") for i in range(3)]
for consumer in consumers:
    next(consumer)  # 启动协程

broadcast_producer(consumers)
for consumer in consumers:
    consumer.close()

今日总结

  • 内存高效: 惰性计算,适合处理大数据流
  • 状态保持: 生成器在 yield 时暂停,恢复时继续
  • 管道处理: 可以构建复杂的数据处理管道
  • 无限序列: 可以表明无限或超级大的序列

生成器使用场景:

  • ✅ 处理大型数据集或文件
  • ✅ 表明无限序列
  • ✅ 构建数据处理管道
  • ✅ 实现协程和状态机
  • ❌ 需要随机访问或多次遍历的数据

最佳实践:

  1. 使用生成器表达式替代列表推导式处理大数据
  2. 使用 yield from 委托子生成器
  3. 合理使用 send(), throw(), close() 方法
  4. 注意生成器的一次性使用特性

15天 Python 技巧之旅总结

祝贺你完成了15天的 Python 学习之旅!让我们回顾一下学到的核心技巧:

15天学习路线回顾

天数

主题

核心价值

Day 1

优雅的合并字典

更简洁的字典操作

Day 2

条件表达式

一行代码的 if-else

Day 3

enumerate 获取索引

优雅的索引遍历

Day 4

zip 并行迭代

多序列同步处理

Day 5

推导式

简洁的数据结构创建

Day 6

any() 和 all()

高效的逻辑判断

Day 7

f-string 格式化

现代化的字符串格式化

Day 8

collections 模块

强劲的数据结构工具

Day 9

上下文管理器

安全的资源管理

Day 10

pathlib 路径处理

面向对象的路径操作

Day 11

装饰器

函数增强和元编程

Day 12

lambda 匿名函数

简洁的函数式编程

Day 13

@dataclass

自动生成数据类

Day 14

类型提示

代码可读性和可靠性

Day 15

生成器与 yield

内存高效的数据流处理

下一步学习提议

  1. 项目实践: 选择一个小项目应用这些技巧
  2. 深入框架: 学习 Django、FastAPI 等框架
  3. 算法数据结构: 夯实计算机科学基础
  4. 并发编程: 学习异步编程 asyncio
  5. 测试开发: 掌握 pytest、unittest
  6. 开源贡献: 参与开源项目提升实战能力

持续学习资源

  • 官方文档: docs.python.org
  • Awesome Python: GitHub 上的优秀资源集合
  • Real Python: 高质量的教程网站
  • Python Weekly: 每周技术资讯

记住: 编程技能的提升在于持续实践和不断学习。每天花30分钟编写代码,比一次性学习几个小时更有效!

祝你 Python 编程之旅愉快,期待看到你创造出优秀的作品!

© 版权声明

相关文章

1 条评论

  • 头像
    请叫我西瓜瓜君 读者

    收藏了,感谢分享

    无记录
    回复