Python Async OpenAI 流式编程入门

内容分享19小时前发布
0 1 0
全能 AI 聚合平台 免费

一站式接入主流 AI 大模型,支持对话 · 生图 · 生视频,即开即用

ChatGPT Claude Gemini Grok DeepSeek 通义千问 Ollama
AI对话 AI生图 AI视频
免费使用 →

Python Async OpenAI 流式编程入门

Python Async OpenAI 流式编程

在大语言模型(LLM)应用爆发的今天,用户体验就是一切。想象一下,用户问了一个复杂问题,你的应用却卡住了 10 秒,最后才一次性“蹦”出大段文字。这种体验是很糟糕的。

用户希望感受到“即时反馈”,希望看到 AI 像打字机一样,一个字一个字地把想法呈现出来。这就是 Streaming(流式传输) 的魔力。

而为了让这种体验更加极致,我们需要 Python 的并发利器 —— asyncio。它能确保你的程序在等待 AI 生成下一个字的时候,还能腾出手来响应其他用户的操作,而不是傻傻地阻塞在那里。

今天这篇指南,我们将结合现代化的 Python 工具链,手把手带你掌握 Asyncio + OpenAI Streaming 的核心开发技巧。打造一个丝般顺滑的 AI 对话应用。


1. 核心概念速览

在动代码之前,我们先扫清两个关键概念:

1.1 Python 的 Asyncio (异步 I/O)

传统的 Python 代码是同步的(Blocking),就像排队在窗口办事,一个人没办完,后面的人都得等着。

asyncio 是 Python 处理并发的标准库。它允许你的程序在遇到耗时操作(列如等待网络请求返回)时,暂时“挂起”当前任务,去执行其他任务,等数据回来了再接着处理。

在 OpenAI 开发中,我们只需记住三个关键词:

  • async def: 定义一个异步函数(协程)。
  • await: 告知程序:“这里可能会卡住,你先去忙别的,好了叫我”。
  • async for: 专门用于遍历那种“陆陆续续到达”的数据流。

1.2 OpenAI Streaming 原理

普通 API 请求是“全量返回”:你点菜,厨师做好全部菜品,一次性端上来。 Streaming 模式是基于 SSE (Server-Sent Events):你点菜,厨师做好一道菜就立刻端上来一道,源源不断,直到上完。

结合点:我们需要使用 OpenAI 的异步客户端发起一个流式请求,然后用一个异步循环 (async for) 去“接住”服务器抛过来的一个个数据包(Chunk)。


️ 2. 现代化环境初始化:使用 uv

Python Async OpenAI 流式编程入门

基于uv的python虚拟开发环境

告别繁琐的 venv 和慢吞吞的 pip,我们将使用 Python 界的新宠 —— uv 来极速管理我们的项目环境。

2.1 安装 uv (如果还没安装)

# macOS / Linux
curl -LsSf https://astral.sh/uv/install.sh | sh
# Windows (PowerShell)
powershell -c "irm https://astral.sh/uv/install.ps1 | iex"

2.2 初始化项目

在终端执行以下命令,创建一个新项目并进入目录:

uv init async-streaming-demo
cd async-streaming-demo

2.3 添加依赖

我们需要 openai 官方库,以及用于方便加载环境变量的 python-dotenv。

# uv 会自动创建虚拟环境并飞速安装依赖
uv add openai python-dotenv

注:asyncio 和 os 是 Python 标准库,无需安装。


⚙️ 3. 灵活的配置:环境变量与客户端初始化

Python Async OpenAI 流式编程入门

OpenAI兼容的模型服务

在实际开发中,为了安全和灵活性(列如切换到 Azure OpenAI、DeepSeek 或本地的 Ollama 等兼容服务),我们绝不能把 API Key 和地址硬编码在代码里。

我们将遵循最佳实践,通过环境变量来配置客户端。我们需要以下三个变量:

  • OPENAPI_ENDPOINT: API 的基础地址(Base URL)。
  • OPENAPI_API_KEY: 你的 API 密钥。
  • OPENAPI_MODEL: 你想调用的模型名称。

3.1 创建配置文件 .env

在项目根目录下创建一个名为 .env 的文件,填入你的实际配置。

示例 1:使用官方 OpenAI(绝大多数国内模型服务都OpenAI兼容)

OPENAPI_ENDPOINT="https://api.openai.com/v1"
OPENAPI_API_KEY="sk-你的OpenAI密钥"
OPENAPI_MODEL="gpt-3.5-turbo"

示例 2:使用本地 Ollama (举例)

OPENAPI_ENDPOINT="http://localhost:11434/v1"
OPENAPI_API_KEY="ollama" # 本地服务一般随意填一个非空值即可
OPENAPI_MODEL="llama3"

3.2 代码中的初始化

在代码中,我们会使用 dotenv 加载这些变量,并初始化 AsyncOpenAI 客户端。注意要使用 AsyncOpenAI 而不是同步的 OpenAI。

Python

# (代码片段,完整代码在最后)
import os
from dotenv import load_dotenv
from openai import AsyncOpenAI

# 加载 .env 文件
load_dotenv()

# 初始化异步客户端,传入自定义的 endpoint 和 key
client = AsyncOpenAI(
    api_key=os.getenv("OPENAPI_API_KEY"),
    base_url=os.getenv("OPENAPI_ENDPOINT")
)
# 获取模型名称
target_model = os.getenv("OPENAPI_MODEL")

4. 解构 Streaming Message

当你设置 stream=True 后,API 返回的不再是一个巨大的 JSON,而是一系列微小的 Chunk(数据块)

Python Async OpenAI 流式编程入门

流式 vs 非流式

我们需要重点关注结构上的差异:

  • 非流式:完整的回复内容在 message.content 里。
  • 流式:每次返回的增量内容在 delta.content 里。

一个典型的 Chunk 结构示例:

{
  "id": "chatcmpl-xyz",
  "object": "chat.completion.chunk",
  "created": 123456789,
  "model": "gpt-3.5-turbo",
  "choices": [
    {
      "index": 0,
      "delta": {
        "content": "好"  // <-- 我们要的就是这个!
      },
      "finish_reason": null
    }
  ]
}

处理关键点

  1. 使用 async for chunk in stream: 遍历数据流。
  2. 提取 chunk.choices[0].delta.content。
  3. 务必判断 content 是否为空。由于流的开头(角色信息)和结尾(结束标志)的数据包里,content 往往是 None。
  4. 打印时使用 flush=True,确保内容立即显示,不要被终端缓存,否则就没“打字机”效果了。

5. 完整实战 Demo

好了,万事俱备。将下面的代码复制到你的 hello.py (或 main.py) 文件中。确保你已经配置好了 .env 文件。

然后在终端运行:uv run hello.py

import asyncio
import os
import sys
from dotenv import load_dotenv
from openai import AsyncOpenAI

# --- 1. 配置加载 ---
# 加载项目根目录下的 .env 文件中的环境变量
load_dotenv()

# 读取必要的配置项
api_endpoint = os.getenv("OPENAPI_ENDPOINT")
api_key = os.getenv("OPENAPI_API_KEY")
target_model = os.getenv("OPENAPI_MODEL")

# 简单的健壮性检查:确保环境变量已设置
if not all([api_endpoint, api_key, target_model]):
    print("❌ 错误: 缺少必要的环境变量。")
    print("请确保在 .env 文件中设置了 OPENAPI_ENDPOINT, OPENAPI_API_KEY, 和 OPENAPI_MODEL。")
    sys.exit(1)

print(f" 配置已加载:")
print(f"   - Endpoint: {api_endpoint}")
print(f"   - Model: {target_model}")
print("-" * 40)

# --- 2. 初始化异步客户端 ---
# 关键点:
# 1. 使用 AsyncOpenAI 而不是同步的 OpenAI
# 2. 显式传入 base_url,使其能连接到自定义的服务端点 (官方或第三方)
client = AsyncOpenAI(
    api_key=api_key,
    base_url=api_endpoint
)

async def stream_chat(prompt: str):
    """
    异步发送 prompt 并以流式打印回复的核心协程。
    """
    print(f" User: {prompt}
")
    print(f" AI ({target_model}): ", end="", flush=True) # 准备开始输出,提示当前模型

    try:
        # --- 3. 发起流式请求 ---
        # 使用 await 等待连接建立
        stream = await client.chat.completions.create(
            model=target_model,  # 使用环境变量中指定的模型
            messages=[
                {"role": "system", "content": "你是一个思维灵敏、回答简洁的AI助手。"},
                {"role": "user", "content": prompt},
            ],
            stream=True, # <--- 【核心】开启流式模式
        )

        # --- 4. 异步迭代处理响应流 ---
        # stream 是一个异步迭代器,必须用 async for 来遍历
        # 程序会在等待下一个 chunk 时释放控制权,不会阻塞
        async for chunk in stream:
            # 提取增量内容。注意:流式模式下内容在 'delta' 中,而不是 'message' 中
            content = chunk.choices[0].delta.content
            
            # 必须检查 content 是否存在,由于第一个和最后一个 chunk 的 content 可能是 None
            if content:
                # 实时打印出来
                # end="" 防止print自动换行
                # flush=True 强制刷新缓冲区,确保立刻看到字符跳出,实现打字机效果
                print(content, end="", flush=True)
        
        print("

✅ 回复结束")

    except Exception as e:
        # 捕获网络错误、认证错误等
        print(f"

❌ 发生错误: {e}")
        print("请检查你的网络连接、API Key 以及 Endpoint 设置是否正确。")

async def main():
    """
    主入口协程
    """
    print(" 程序启动 (Async Mode)...")
    
    # 这里可以放入你想测试的问题
    test_prompt = "请用 Python 写一个斐波那契数列生成器,并简单解释原理。"
    
    # 等待流式对话任务完成
    await stream_chat(test_prompt)
    
    print(" 程序退出")

if __name__ == "__main__":
    # --- 5. 启动异步事件循环 ---
    # asyncio.run 是运行最高层级入口点的标准方法
    asyncio.run(main())

总结

运行上面的代码,你将看到 AI 的回复像变魔术一样逐字显现。

通过这篇教程,你已经掌握了构建现代高性能 LLM 应用的基石:

  1. 使用 uv 管理环境。
  2. 使用 .env 灵活配置服务地址和模型。
  3. 使用 AsyncOpenAIasync for 实现非阻塞的流式输出。

6. 核心参考文档 (Reference Links)

在深入代码之前,提议将这些官方文档加入书签,遇到问题时随时查阅:

  • OpenAI 官方 API 文档 (Streaming 部分)
    • OpenAI API Reference – Chat Completion
    • 说明:官方对 stream 参数的解释以及返回数据的结构定义。
  • OpenAI Python SDK (GitHub)
    • openai-python Library
    • 说明:SDK 的源码和官方示例。特别是 Async usage 部分的说明。
  • Python Asyncio 官方文档
    • Python asyncio — Asynchronous I/O
    • 说明:Python 标准库文档。重点看 Coroutines and Tasks (协程与任务) 和 Asynchronous Iterators (异步迭代器) 章节。
© 版权声明

相关文章

1 条评论

  • 头像
    明日之月 读者

    [db:评论]

    无记录
    回复