
2025即将结束,2026年我们要清楚一点:效率革命始于工具,终于习惯。用塑造下一代自动化的工具,武装你的代码栈。
我还记得那个深夜,我手动重命名了231个文件,只由于觉得写自动化脚本“太小题大做”。
那时凌晨2点,我学习Python已有4年,心高气傲,逻辑却暂时离线。我告知自己:“我能写任何代码,但这种小事?我自己来就行。” 快进90分钟,手腕发麻,信心动摇——我终于写下了人生第一个真正的自动化脚本。一个循环,两个库,然后……嘭!我感觉自己像个刚发现火的原始人。
那个时刻至今影响着我选择库的标准:不追求最酷的技术,而是先看它解决了什么问题。 由于未来不会奖励最机智的程序员,而会奖励那些能节省时间且不牺牲智慧的程序员。
所以,如果你正在阅读本文——在这里,Python自动化不是炫技,而是像氧气一样的存在。
核心提议:“选择能放大你能力的工具,而不是让你分心的工具。” 每位周末程序员都该把这句话裱起来。
自动化新纪元:从“能做”到“优雅地做”
是的,马上到2026年了,编码助手将处理40%到70%的样板代码。真正的护城河变成了:你的库栈和像忍者一样自动化的能力。
今天,云朵君和大家来聊聊将主导自动化工作流的12个库。它们不是最“热门”的,但却是最能实实在在提升你效率的。
12个未来必备的自动化神器
1. Kedro:为机器学习管道注入秩序
ML项目的GPS导航系统,强制让你的工作流保持清醒。
当你的ML项目从“几个脚本”演变成“一堆相互依赖的实验”时,混乱随之而来。Kedro通过强制执行项目结构、数据目录和模块化管道,将秩序带回你的自动化工作流。
为什么重大?
- 可复现性:确保三个月后你(或同事)还能理解并运行整个流程
- 模块化:像搭积木一样组合数据处理、训练和评估步骤
- 生产就绪:从实验到生产部署的路径清晰
# 安装:pip install kedro
from kedro.pipeline import Pipeline, node
def clean_data(df):
"""清理数据:删除空值"""
return df.dropna()
def process_features(df):
"""特征工程:添加新特征"""
df["feature_ratio"] = df["feature_a"] / (df["feature_b"] + 1e-10)
return df
# 定义管道节点
pipeline = Pipeline([
node(clean_data, "raw_data", "cleaned_data", name="数据清理"),
node(process_features, "cleaned_data", "processed_data", name="特征工程")
])
# 在Kedro项目中,这个管道会自动被组织、可视化和管理
专家提示:如果自动化需要一个大脑,它会选择Kedro。特别适合团队协作的中大型ML项目。
2. Prefect:下一代工作流编排,让Cron退休
Cron是恐龙,Prefect是那颗陨石。
还在用crontab调度Python脚本?是时候升级了。Prefect不仅调度任务,还提供重试机制、状态监控、参数化工作流等企业级功能,而且代码极其优雅。
# 安装:pip install prefect
from prefect import flow, task
from datetime import timedelta
import pandas as pd
@task(retries=3, retry_delay_seconds=10)
def extract_data(api_url: str):
"""从API提取数据(自动重试3次)"""
# 模拟API调用
print(f"从 {api_url} 提取数据...")
return pd.DataFrame({"data": [1, 2, 3]})
@task
def transform_data(df: pd.DataFrame):
"""转换数据:加倍所有值"""
return df * 2
@flow(name="每日数据处理流水线")
def daily_data_pipeline(api_endpoint: str = "https://api.example.com/data"):
"""主工作流:提取 -> 转换 -> 保存"""
raw_data = extract_data(api_endpoint)
processed_data = transform_data(raw_data)
# 保存结果(真实场景中可能是存数据库)
processed_data.to_csv("processed_data.csv", index=False)
print("数据处理完成!")
return processed_data
# 运行这个工作流
if __name__ == "__main__":
# 本地运行
result = daily_data_pipeline()
# 部署到Prefect Cloud后,可以设置:
# - 定时调度(每天凌晨2点)
# - 失败通知(Slack/邮件)
# - 依赖触发(上游任务完成后自动运行)
对比优势:
- vs Cron:错误处理、日志记录、可视化监控
- vs Airflow:更简单的API、动态工作流、更好的开发体验
3. Pywinauto:桌面GUI自动化之王
当你的自动化需要“点击按钮”而不是“调用API”时。
尽管PyAutoGUI很流行,但在复杂的Windows桌面应用自动化中,Pywinauto更加强劲和稳定。它理解Windows控件的内部结构,而不是简单的屏幕坐标。
# 安装:pip install pywinauto
from pywinauto import Application
import time
# 启动记事本
app = Application().start("notepad.exe")
# 连接到一个已存在的应用程序
# app = Application().connect(title="无标题 - 记事本")
# 获取主窗口
main_window = app["无标题 - 记事本"]
# 输入文本
main_window.type_keys("你好,自动化世界!{ENTER}")
main_window.type_keys("这是通过Pywinauto自动输入的文字。")
# 操作菜单
main_window.menu_select("文件(F)->另存为(A)...")
# 获取保存对话框
save_dialog = app["另存为"]
# 在文件名输入框中输入
save_dialog["文件名:Edit"].set_text("automated_file.txt")
time.sleep(1) # 等待一下
# 点击保存按钮
save_dialog["保存(S)"].click()
# 等待保存完成,然后关闭
time.sleep(1)
main_window.close()
# 如果出现"是否保存"的提示,选择不保存
try:
app["记事本"]["不保存(N)"].click()
except:
pass
适用场景:
- 自动化老旧的桌面软件操作
- 测试Windows应用程序
- 批量处理只能通过GUI操作的任务
4. Swifter:Pandas的涡轮增压器
自动为你的Pandas操作选择最佳并行策略。
大数据处理时,普通的.apply()太慢?Swifter自动检测你的操作,智能选择使用Dask、Ray还是Pandas原生的向量化方法。
# 安装:pip install swifter
import pandas as pd
import swifter
import numpy as np
# 创建一个大型DataFrame
df = pd.DataFrame({
"id": range(1_000_000),
"values": np.random.randn(1_000_000) * 100
})
# 传统方式 - 可能很慢
def complex_calculation(x):
"""复杂的计算函数"""
return np.sin(x) * np.log(abs(x) + 1) + np.sqrt(abs(x))
# 使用Swifter - 自动并行化
print("开始Swifter加速计算...")
df["result"] = df["values"].swifter.apply(complex_calculation)
print(f"计算完成!前5个结果:{df['result'].head().tolist()}")
print(f"DataFrame形状:{df.shape}")
# 比较性能
import time
# 普通apply
start = time.time()
df["traditional"] = df["values"].apply(complex_calculation)
traditional_time = time.time() - start
# Swifter apply(一般更快,特别是大数据集)
start = time.time()
df["swifter"] = df["values"].swifter.apply(complex_calculation)
swifter_time = time.time() - start
print(f"
⏱️ 性能对比:")
print(f"传统.apply()耗时:{traditional_time:.2f}秒")
print(f"Swifter.apply()耗时:{swifter_time:.2f}秒")
print(f"加速比:{traditional_time/swifter_time:.1f}倍")
工作原理:Swifter会先尝试向量化操作,如果不支持,再尝试使用并行计算(Dask),最后才回退到普通apply。
5. DagFactory:Airflow DAG的自动化生成器
自动化你的自动化配置。
还在手动编写Airflow DAG文件?DagFactory让你可以用YAML配置文件生成DAG,特别适合模式类似的多个数据管道。
# 安装:pip install dag-factory
from airflow import DAG
from dagfactory import DagFactory
# 创建配置文件(一般放在单独的YAML文件中)
dag_config = {
"example_dag": {
"default_args": {
"owner": "data_team",
"start_date": "2024-01-01",
"retries": 1,
"retry_delay_sec": 300
},
"schedule_interval": "0 2 * * *", # 每天凌晨2点
"tasks": {
"extract_data": {
"operator": "airflow.operators.python_operator.PythonOperator",
"python_callable": "data_pipeline.extract",
"op_args": ["{{ ds }}"]
},
"transform_data": {
"operator": "airflow.operators.python_operator.PythonOperator",
"python_callable": "data_pipeline.transform",
"op_args": ["{{ ti.xcom_pull(task_ids='extract_data') }}"],
"dependencies": ["extract_data"]
},
"load_data": {
"operator": "airflow.operators.python_operator.PythonOperator",
"python_callable": "data_pipeline.load",
"op_args": ["{{ ti.xcom_pull(task_ids='transform_data') }}"],
"dependencies": ["transform_data"]
}
}
}
}
# 保存配置到文件(实际使用中)
import yaml
with open("dag_config.yml", "w") as f:
yaml.dump(dag_config, f)
# 在Airflow的DAG文件夹中
# from dagfactory import DagFactory
# factory = DagFactory("dag_config.yml")
# factory.generate_dags(globals())
优点:
- 一致性:确保所有DAG遵循一样模式
- 维护简单:修改配置即可更新所有相关DAG
- 新人友善:不需要深入理解Airflow内部也能创建DAG
6. Schedule:人类友善的调度库
比crontab更人性化,比Celery更轻量。
虽然资料中提到了一个不存在的schedulezen,但Python中早已有schedule库,提供了极其人性化的调度API。
# 安装:pip install schedule
import schedule
import time
from datetime import datetime
def job_that_runs_every_minute():
"""每分钟运行的任务"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] 心跳检测 - 系统正常")
def daily_report():
"""每天上午9点运行的日报"""
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] 生成每日报告...")
# 这里添加实际报告生成逻辑
def hourly_data_sync():
"""每小时的第30分钟运行数据同步"""
print(f"[{datetime.now()}] 开始数据同步...")
# 这里添加数据同步逻辑
# 设置调度规则
schedule.every(1).minutes.do(job_that_runs_every_minute)
schedule.every().day.at("09:00").do(daily_report)
schedule.every().hour.at(":30").do(hourly_data_sync)
# 还可以设置更复杂的规则
schedule.every().monday.at("08:00").do(lambda: print("周一早上,启动周计划!"))
schedule.every().wednesday.at("14:30").do(lambda: print("周三下午,中期检查"))
print(" 调度器已启动,按Ctrl+C停止")
print("当前调度任务:")
for job in schedule.get_jobs():
print(f" - {job}")
# 运行调度器
try:
whileTrue:
schedule.run_pending()
time.sleep(1) # 每秒检查一次
except KeyboardInterrupt:
print("
调度器已停止")
适用场景:轻量级的后台任务调度,不需要分布式或复杂监控的场景。
7. Tenacity:优雅的重试机制
让失败和重试变得优雅。
网络请求、数据库连接、API调用——在分布式系统中,失败不是异常,而是常态。Tenacity提供了强劲而灵活的重试机制。
# 安装:pip install tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
import random
# 场景1:基本的重试装饰器
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=10))
def fetch_data_from_api(url):
"""从API获取数据,失败时重试最多5次"""
print(f"尝试请求: {url}")
# 模拟随机失败(真实场景中是网络问题)
if random.random() < 0.7: # 70%的失败率
raise ConnectionError("模拟网络错误")
response = requests.get(url, timeout=5)
response.raise_for_status() # 如果HTTP错误会抛出异常
return response.json()
# 场景2:更复杂的重试逻辑
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((ConnectionError, TimeoutError)),
before_sleep=lambda retry_state: print(f"第{retry_state.attempt_number}次尝试失败,{retry_state.outcome.exception()}"),
after=lambda retry_state: print(f"最终{'成功' if retry_state.outcome else '失败'}") if retry_state.attempt_number > 1elseNone
)
def critical_database_operation():
"""关键数据库操作,只对特定异常重试"""
print("执行关键操作...")
# 模拟不同的异常
errors = [ConnectionError, TimeoutError, ValueError]
error = random.choice(errors)
if error != ValueError: # 只重试ConnectionError和TimeoutError
raise error("模拟错误")
else:
raise error("值错误(不会重试)")
# 测试
try:
# 这个会重试直到成功或达到最大尝试次数
# data = fetch_data_from_api("https://api.example.com/data")
# print(f"获取的数据: {data}")
# 这个只会对特定异常重试
critical_database_operation()
except Exception as e:
print(f"最终捕获的异常: {type(e).__name__}: {e}")
核心特性:
- 多种停止条件:尝试次数、总时间等
- 多种等待策略:指数退避、固定间隔、随机间隔
- 条件重试:只对特定异常类型重试
- 钩子函数:重试前后执行自定义逻辑
8. Beanie:异步MongoDB的现代接口
用Pydantic的优雅来操作MongoDB。
如果你在使用MongoDB和异步Python(FastAPI等),Beanie提供了基于Pydantic模型的ODM,代码既安全又优雅。
# 安装:pip install beanie
from beanie import Document, init_beanie
from pydantic import Field
from datetime import datetime
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
# 定义文档模型
class User(Document):
name: str
email: str = Field(unique=True)
age: int = Field(ge=0, le=150)
is_active: bool = True
created_at: datetime = Field(default_factory=datetime.utcnow)
class Settings:
name = "users"# MongoDB集合名
def greet(self):
returnf"Hello, {self.name}!"
class Product(Document):
name: str
price: float = Field(ge=0)
in_stock: bool = True
tags: list[str] = []
# 异步数据库操作
asyncdef main():
# 连接MongoDB
client = AsyncIOMotorClient("mongodb://localhost:27017")
# 初始化Beanie
await init_beanie(
database=client.my_database,
document_models=[User, Product]
)
# 创建用户
user = User(name="张三", email="zhangsan@example.com", age=28)
await user.insert()
print(f"创建用户: {user.greet()}")
# 批量插入
users = [
User(name="李四", email="lisi@example.com", age=32),
User(name="王五", email="wangwu@example.com", age=25)
]
await User.insert_many(users)
# 查询
# 查找所有活跃用户
active_users = await User.find(User.is_active == True).to_list()
print(f"活跃用户数: {len(active_users)}")
# 带条件的查询
young_users = await User.find(User.age < 30).sort(User.created_at).to_list()
print(f"30岁以下的用户: {[u.name for u in young_users]}")
# 更新
user.age = 29
await user.save() # 更新单个字段
# 使用更新操作符
await User.find(User.age < 18).update({"$set": {"is_active": False}})
# 事务支持(需要MongoDB副本集)
asyncwithawait client.start_session() as session:
asyncwith session.start_transaction():
user1 = await User.find_one(User.name == "张三", session=session)
user2 = await User.find_one(User.name == "李四", session=session)
# 执行事务操作
user1.age += 1
user2.age += 1
await user1.save(session=session)
await user2.save(session=session)
# 运行
if __name__ == "__main__":
asyncio.run(main())
为什么选择Beanie:
- 类型安全:基于Pydantic,有完整的类型提示
- 异步优先:为async/await设计
- 直观的API:查询语法类似Pydantic验证
- 迁移友善:模型变更易于管理
9. Helium:最人性化的浏览器自动化
Selenium的强劲,但API像英语句子一样可读。
Playwright和Selenium都很强劲,但Helium的API简洁到令人发指——特别适合快速原型或简单的Web自动化。
# 安装:pip install helium
from helium import *
import time
# 启动浏览器并访问Google
start_chrome() # 或 start_firefox()
go_to("https://www.google.com")
# 搜索Python自动化
write("Python自动化2026", into="Google 搜索")
press(ENTER)
# 等待结果加载
wait_until(Text("Python").exists)
# 点击第一个结果
click(Text("Python"))
# 更多操作示例
"""
# 填写表单
write("username", into="用户名")
write("password123", into="密码")
click("登录")
# 处理下拉菜单
click("选择国家")
click("中国")
# 上传文件
click("选择文件")
write(r"C:UsersMefile.txt") # 文件路径
press(ENTER)
# 拖放
drag_file(r"C:UsersMefile.txt", to="上传区域")
# 截图
highlight("重大区域")
take_screenshot("page_screenshot.png")
"""
# 滚动页面
scroll_down(500) # 向下滚动500像素
scroll_up(200) # 向上滚动200像素
# 获取元素文本
first_link = find_all(S("#search a"))[0]
print(f"第一个链接文本: {first_link.web_element.text}")
# 关闭浏览器
kill_browser()
适用场景:
- 快速原型:验证Web流程
- 简单爬虫:不需要复杂反爬的情况
- 自动化测试:特别是验收测试
- 数据提取:从无法提供API的网站获取数据
优势对比:
- vs Selenium:代码更简洁,更易读
- vs Playwright:学习曲线更低
- 特别适合:非专业自动化工程师的日常任务
10. PyFilesystem2:统一所有文件系统的API
用同一套代码操作本地文件、ZIP、S3、FTP等。
在云原生时代,你的数据可能分布在本地、S3、Google Cloud Storage等多个地方。PyFilesystem2提供了一个统一的抽象层。
# 安装:pip install fs
from fs import open_fs
import fs.memoryfs
import fs.osfs
import json
# 1. 操作内存文件系统(测试用)
mem_fs = fs.memoryfs.MemoryFS()
mem_fs.writetext("hello.txt", "Hello, Memory FS!")
print(f"内存文件内容: {mem_fs.readtext('hello.txt')}")
# 2. 操作本地文件系统
with open_fs(".") as local_fs: # 当前目录
# 列出文件
print(f"当前目录文件: {list(local_fs.listdir('./'))}")
# 创建目录
local_fs.makedirs("test_folder", recreate=True)
# 写入文件
local_fs.writetext("test_folder/test.json", json.dumps({"name": "test", "value": 123}))
# 读取JSON文件
data = json.loads(local_fs.readtext("test_folder/test.json"))
print(f"JSON数据: {data}")
# 3. 操作ZIP文件(像普通文件夹一样)
from fs.zipfs import ZipFS
# 创建ZIP文件
with ZipFS("archive.zip", write=True) as zip_fs:
zip_fs.writetext("doc1.txt", "这是一个文档")
zip_fs.makedirs("data")
zip_fs.writetext("data/numbers.txt", "1
2
3
4
5")
# 读取ZIP文件
with ZipFS("archive.zip") as zip_fs:
print(f"ZIP内容: {list(zip_fs.walk.files())}")
print(f"文档内容: {zip_fs.readtext('doc1.txt')}")
# 4. 操作S3(需要boto3)
"""
from fs_s3fs import S3FS
# 连接到S3
s3_fs = S3FS(
"my-bucket",
aws_access_key_id="YOUR_KEY",
aws_secret_access_key="YOUR_SECRET"
)
# 像操作本地文件一样操作S3
s3_fs.upload("local_file.txt", "remote_file.txt")
s3_files = list(s3_fs.listdir("/"))
print(f"S3中的文件: {s3_files}")
"""
# 5. 文件系统操作(跨所有FS类型都一样)
def process_files(filesystem, folder_path):
"""处理文件系统中的一个文件夹"""
for path in filesystem.listdir(folder_path):
full_path = f"{folder_path}/{path}"
if filesystem.isdir(full_path):
print(f"目录: {full_path}")
process_files(filesystem, full_path)
else:
print(f"文件: {full_path}")
# 这里可以对文件进行处理
# 这个函数可以用于本地、ZIP、S3等任何文件系统!
核心价值:
- 代码复用:同一套代码处理不同存储后端
- 简化测试:用内存文件系统测试文件操作逻辑
- 灵活迁移:从本地存储迁移到云存储只需改一行代码
11. Ruff:极速的Python代码检查与格式化
用Rust重写的Flake8+isort+black,速度快到飞起。
代码质量是自动化的基础。Ruff集成了linting和formatting,速度是传统工具的10-100倍。
# 安装:pip install ruff
# 或者用pipx: pipx install ruff
"""
Ruff主要用法(命令行):
"""
# 1. 检查代码(类似flake8)
# ruff check .
# 2. 自动修复可修复的问题
# ruff check --fix .
# 3. 格式化代码(类似black)
# ruff format .
# 4. 按类型排序imports(类似isort)
# ruff check --select I --fix .
# 5. 检查单个文件
# ruff check path/to/file.py
"""
配置(在pyproject.toml或ruff.toml中):
"""
# 示例ruff.toml配置
"""
[tool.ruff]
line-length = 88 # 与black兼容
target-version = "py310" # Python目标版本
# 启用/禁用规则
select = [
"E", # pycodestyle错误
"W", # pycodestyle警告
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
]
ignore = [
"E501", # 忽略行长度限制(有时需要)
"F841", # 忽略未使用的变量(在开发中常见)
]
[tool.ruff.per-file-ignores]
"__init__.py" = ["F401"] # 在__init__.py中允许未使用的导入
[tool.ruff.isort]
known-first-party = ["myapp"]
"""
"""
集成到pre-commit(.pre-commit-config.yaml):
"""
"""
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.0 # 使用最新版本
hooks:
- id: ruff
args: [--fix]
- id: ruff-format
"""
"""
性能对比示例:
"""
# 传统工具链:
# flake8 . # 5-10秒(大型代码库)
# black . # 3-8秒
# isort . # 2-5秒
# 总计:10-23秒
# Ruff:
# ruff check . # 0.1-0.5秒
# ruff format . # 0.1-0.3秒
# 总计:0.2-0.8秒(10-100倍提速)
"""
为什么选择Ruff:
1. 极速:Rust编写,并行处理
2. 一体化:一个工具替代多个工具
3. 兼容性:与现有工具链(flake8, black, isort)兼容
4. 可配置:精细的规则控制
"""
# 在Python代码中使用Ruff API(高级用法)
"""
import ruff
from pathlib import Path
# 检查代码
result = ruff.check_code("def foo(): pass", filename="test.py")
print(f"发现问题: {result}")
# 格式化代码
formatted = ruff.format_code("def foo( ):
pass")
print(f"格式化后: {formatted}")
"""
实际收益:在CI/CD流水线中,代码检查从分钟级降到秒级,大大加快反馈循环。
12. Zappa:无服务器部署的终极简化
一行命令部署Python Web应用到AWS Lambda。
如果你有Flask、Django或FastAPI应用,Zappa让你无需学习Docker、Kubernetes或AWS复杂配置,就能部署到无服务器环境。
# 安装:pip install zappa
# 注意:Zappa主要用于部署,而不是开发
"""
使用步骤:
"""
# 1. 第一创建一个简单的Flask应用
# app.py
"""
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello():
return 'Hello from Zappa!'
@app.route('/api/data')
def get_data():
return {'status': 'success', 'data': [1, 2, 3]}
if __name__ == '__main__':
app.run()
"""
# 2. 初始化Zappa配置
# zappa init
# 这会创建zappa_settings.json文件
# 3. 配置示例(zappa_settings.json)
"""
{
"dev": {
"app_function": "app.app", # Flask应用对象路径
"aws_region": "us-east-1",
"project_name": "my-flask-app",
"runtime": "python3.10",
"s3_bucket": "zappa-deploy-bucket-unique-name",
"timeout_seconds": 30,
"memory_size": 512,
"keep_warm": false,
"environment_variables": {
"MY_ENV_VAR": "value"
}
}
}
"""
# 4. 部署到 AWS Lambda
# zappa deploy dev
# 5. 更新部署(代码变更后)
# zappa update dev
# 6. 查看日志
# zappa tail dev
# 7. 计划任务(Cron表达式)
"""
{
"dev": {
...其他配置...
"events": [{
"function": "your_module.your_function", # 要运行的函数
"expression": "rate(1 hour)" # 每小时运行一次
}, {
"function": "your_module.another_function",
"expression": "cron(0 12 * * ? *)" # 每天中午12点
}]
}
}
"""
# 8. 对于Django项目也类似
# zappa_settings.json for Django
"""
{
"dev": {
"django_settings": "myproject.settings",
"aws_region": "us-east-1",
"profile_name": "default",
"project_name": "myproject",
"runtime": "python3.10",
"s3_bucket": "zappa-myproject"
}
}
"""
# 9. 数据库连接注意事项
# 无服务器环境需要连接池管理
"""
import psycopg2
from psycopg2 import pool
# 创建连接池
connection_pool = pool.SimpleConnectionPool(
1, 20, # 最小1个,最大20个连接
host='your-db-host',
database='your-db',
user='your-user',
password='your-password'
)
@app.route('/db-test')
def db_test():
conn = connection_pool.getconn()
try:
with conn.cursor() as cur:
cur.execute('SELECT NOW()')
result = cur.fetchone()
return {'time': result[0]}
finally:
connection_pool.putconn(conn)
"""
成本优势:无服务器按使用量计费,对于中小型应用,每月成本可能只有几美元。
自动化专家的思维模式
从我4年多的Python自动化经验中,我学到的最重大一课:
初学者和专家级自动化的区别在于:可读性、错误处理能力和管道思维。
你不需要掌握20个库,只需要正确的10个,并巧妙地将它们组合起来解决实际问题。
自动化组合拳示例
"""
一个真实的数据管道示例,结合多个库:
1. Prefect - 工作流编排
2. Tenacity - 重试机制
3. PyFilesystem2 - 多存储支持
4. Swifter - 数据处理加速
"""
import pandas as pd
from prefect import flow, task
from tenacity import retry, stop_after_attempt
from fs import open_fs
import swifter
@retry(stop=stop_after_attempt(3))
@task
def extract_data_from_source(source_config):
"""从数据源提取数据(带重试)"""
fs = open_fs(source_config["type"])
with fs.open(source_config["path"], "r") as f:
data = pd.read_csv(f)
return data
@task
def transform_data(df):
"""转换数据(自动并行化)"""
# 使用Swifter加速复杂操作
df["processed_value"] = df["raw_value"].swifter.apply(
lambda x: complex_transformation(x)
)
return df
@task
def load_data_to_destination(df, dest_config):
"""加载数据到目标(支持多种存储)"""
fs = open_fs(dest_config["type"])
with fs.open(dest_config["path"], "w") as f:
df.to_parquet(f) # 或to_csv, to_json等
returnTrue
@flow(name="企业级数据管道")
def enterprise_data_pipeline(source_config, dest_config):
"""端到端数据管道"""
# 提取
raw_data = extract_data_from_source(source_config)
# 转换
processed_data = transform_data(raw_data)
# 加载
success = load_data_to_destination(processed_data, dest_config)
# 日志和监控(Prefect自动处理)
return success
def complex_transformation(x):
"""复杂的业务逻辑转换"""
# 这里可以是任何复杂的计算
return x ** 2 + 2 * x + 1
写在最后
自动化选择的黄金法则
- 解决问题优先:不要由于某个库热门而使用它
- 可维护性至上:你的代码会被未来的你(或同事)阅读
- 渐进式采用:一次引入1-2个新库,充分掌握后再添加
- 社区活跃度:选择有持续维护和良好文档的库
你在工作中遇到过哪些重复性任务?尝试过用哪些Python库解决?效果如何?
查了下helium,项目维护不积极,项目基于selenium封装,非常不推荐使用
收藏了,感谢分享