🚀 快速安装
复制以下命令并运行,立即安装此 Skill:
npx @anthropic-ai/skills install wshobson/agents/python-performance-optimization
💡 提示:需要 Node.js 和 NPM
Python 性能优化
用于分析、评估和优化 Python 代码的全面指南,涵盖 CPU 分析、内存优化和实现最佳实践。
何时使用该技能
- 识别 Python 应用中的性能瓶颈
- 降低应用延迟和响应时间
- 优化 CPU 密集型操作
- 减少内存消耗和内存泄漏
- 提升数据库查询性能
- 优化 I/O 操作
- 加速数据处理管道
- 实现高性能算法
- 分析生产环境应用
核心概念
1. 分析类型
- CPU 分析:识别耗时函数
- 内存分析:跟踪内存分配和泄漏
- 逐行分析:以行级粒度进行分析
- 调用图:可视化函数调用关系
2. 性能指标
- 执行时间:操作耗时
- 内存使用:峰值和平均内存消耗
- CPU 利用率:处理器使用模式
- I/O 等待:I/O 操作耗时
3. 优化策略
- 算法优化:使用更好的算法和数据结构
- 实现优化:更高效的代码模式
- 并行化:多线程/多进程
- 缓存:避免重复计算
- 原生扩展:对关键路径使用 C/Rust
快速入门
基础计时
import time
def measure_time():
"""简单的计时测量。"""
start = time.time()
# 你的代码在这里
result = sum(range(1000000))
elapsed = time.time() - start
print(f"执行时间: {elapsed:.4f} 秒")
return result
# 更好:使用 timeit 进行精确测量
import timeit
execution_time = timeit.timeit(
"sum(range(1000000))",
number=100
)
print(f"平均时间: {execution_time/100:.6f} 秒")
分析工具
模式 1:cProfile – CPU 分析
import cProfile
import pstats
from pstats import SortKey
def slow_function():
"""要分析的函数。"""
total = 0
for i in range(1000000):
total += i
return total
def another_function():
"""另一个函数。"""
return [i**2 for i in range(100000)]
def main():
"""要分析的主函数。"""
result1 = slow_function()
result2 = another_function()
return result1, result2
# 分析代码
if __name__ == "__main__":
profiler = cProfile.Profile()
profiler.enable()
main()
profiler.disable()
# 打印统计信息
stats = pstats.Stats(profiler)
stats.sort_stats(SortKey.CUMULATIVE)
stats.print_stats(10) # 前 10 个函数
# 保存到文件供后续分析
stats.dump_stats("profile_output.prof")
命令行分析:
# 分析脚本
python -m cProfile -o output.prof script.py
# 查看结果
python -m pstats output.prof
# 在 pstats 中:
# sort cumtime
# stats 10
模式 2:line_profiler – 逐行分析
# 安装:pip install line-profiler
# 添加 @profile 装饰器(由 line_profiler 提供)
@profile
def process_data(data):
"""使用逐行分析处理数据。"""
result = []
for item in data:
processed = item * 2
result.append(processed)
return result
# 使用以下命令运行:
# kernprof -l -v script.py
手动逐行分析:
from line_profiler import LineProfiler
def process_data(data):
"""要分析的函数。"""
result = []
for item in data:
processed = item * 2
result.append(processed)
return result
if __name__ == "__main__":
lp = LineProfiler()
lp.add_function(process_data)
data = list(range(100000))
lp_wrapper = lp(process_data)
lp_wrapper(data)
lp.print_stats()
模式 3:memory_profiler – 内存使用
# 安装:pip install memory-profiler
from memory_profiler import profile
@profile
def memory_intensive():
"""消耗大量内存的函数。"""
# 创建大列表
big_list = [i for i in range(1000000)]
# 创建大字典
big_dict = {i: i**2 for i in range(100000)}
# 处理数据
result = sum(big_list)
return result
if __name__ == "__main__":
memory_intensive()
# 使用以下命令运行:
# python -m memory_profiler script.py
模式 4:py-spy – 生产环境分析
# 安装:pip install py-spy
# 分析正在运行的 Python 进程
py-spy top --pid 12345
# 生成火焰图
py-spy record -o profile.svg --pid 12345
# 分析脚本
py-spy record -o profile.svg -- python script.py
# 导出当前调用堆栈
py-spy dump --pid 12345
优化模式
模式 5:列表推导式 vs 循环
import timeit
# 慢:传统循环
def slow_squares(n):
"""使用循环创建平方数列表。"""
result = []
for i in range(n):
result.append(i**2)
return result
# 快:列表推导式
def fast_squares(n):
"""使用列表推导式创建平方数列表。"""
return [i**2 for i in range(n)]
# 基准测试
n = 100000
slow_time = timeit.timeit(lambda: slow_squares(n), number=100)
fast_time = timeit.timeit(lambda: fast_squares(n), number=100)
print(f"循环: {slow_time:.4f}秒")
print(f"列表推导式: {fast_time:.4f}秒")
print(f"加速比: {slow_time/fast_time:.2f}倍")
# 对于简单操作,map 甚至可以更快
def faster_squares(n):
"""使用 map 获得更好性能。"""
return list(map(lambda x: x**2, range(n)))
模式 6:生成器表达式节省内存
import sys
def list_approach():
"""内存密集型的列表方法。"""
data = [i**2 for i in range(1000000)]
return sum(data)
def generator_approach():
"""内存高效的生成器方法。"""
data = (i**2 for i in range(1000000))
return sum(data)
# 内存对比
list_data = [i for i in range(1000000)]
gen_data = (i for i in range(1000000))
print(f"列表大小: {sys.getsizeof(list_data)} 字节")
print(f"生成器大小: {sys.getsizeof(gen_data)} 字节")
# 生成器无论大小都使用常量内存
模式 7:字符串拼接
import timeit
def slow_concat(items):
"""慢速字符串拼接。"""
result = ""
for item in items:
result += str(item)
return result
def fast_concat(items):
"""使用 join 快速拼接。"""
return "".join(str(item) for item in items)
def faster_concat(items):
"""使用列表可以更快。"""
parts = [str(item) for item in items]
return "".join(parts)
items = list(range(10000))
# 基准测试
slow = timeit.timeit(lambda: slow_concat(items), number=100)
fast = timeit.timeit(lambda: fast_concat(items), number=100)
faster = timeit.timeit(lambda: faster_concat(items), number=100)
print(f"拼接 (+): {slow:.4f}秒")
print(f"join (生成器): {fast:.4f}秒")
print(f"join (列表): {faster:.4f}秒")
模式 8:字典查找 vs 列表搜索
import timeit
# 创建测试数据
size = 10000
items = list(range(size))
lookup_dict = {i: i for i in range(size)}
def list_search(items, target):
"""O(n) 列表搜索。"""
return target in items
def dict_search(lookup_dict, target):
"""O(1) 字典搜索。"""
return target in lookup_dict
target = size - 1 # 列表的最坏情况
# 基准测试
list_time = timeit.timeit(
lambda: list_search(items, target),
number=1000
)
dict_time = timeit.timeit(
lambda: dict_search(lookup_dict, target),
number=1000
)
print(f"列表搜索: {list_time:.6f}秒")
print(f"字典搜索: {dict_time:.6f}秒")
print(f"加速比: {list_time/dict_time:.0f}倍")
模式 9:局部变量访问
import timeit
# 全局变量(慢)
GLOBAL_VALUE = 100
def use_global():
"""访问全局变量。"""
total = 0
for i in range(10000):
total += GLOBAL_VALUE
return total
def use_local():
"""使用局部变量。"""
local_value = 100
total = 0
for i in range(10000):
total += local_value
return total
# 局部变量更快
global_time = timeit.timeit(use_global, number=1000)
local_time = timeit.timeit(use_local, number=1000)
print(f"全局访问: {global_time:.4f}秒")
print(f"局部访问: {local_time:.4f}秒")
print(f"加速比: {global_time/local_time:.2f}倍")
模式 10:函数调用开销
import timeit
def calculate_inline():
"""内联计算。"""
total = 0
for i in range(10000):
total += i * 2 + 1
return total
def helper_function(x):
"""辅助函数。"""
return x * 2 + 1
def calculate_with_function():
"""带函数调用的计算。"""
total = 0
for i in range(10000):
total += helper_function(i)
return total
# 内联更快,因为没有调用开销
inline_time = timeit.timeit(calculate_inline, number=1000)
function_time = timeit.timeit(calculate_with_function, number=1000)
print(f"内联: {inline_time:.4f}秒")
print(f"函数调用: {function_time:.4f}秒")
高级优化
模式 11:NumPy 用于数值运算
import timeit
import numpy as np
def python_sum(n):
"""使用纯 Python 求和。"""
return sum(range(n))
def numpy_sum(n):
"""使用 NumPy 求和。"""
return np.arange(n).sum()
n = 1000000
python_time = timeit.timeit(lambda: python_sum(n), number=100)
numpy_time = timeit.timeit(lambda: numpy_sum(n), number=100)
print(f"Python: {python_time:.4f}秒")
print(f"NumPy: {numpy_time:.4f}秒")
print(f"加速比: {python_time/numpy_time:.2f}倍")
# 向量化操作
def python_multiply():
"""Python 中的逐元素乘法。"""
a = list(range(100000))
b = list(range(100000))
return [x * y for x, y in zip(a, b)]
def numpy_multiply():
"""NumPy 中的向量化乘法。"""
a = np.arange(100000)
b = np.arange(100000)
return a * b
py_time = timeit.timeit(python_multiply, number=100)
np_time = timeit.timeit(numpy_multiply, number=100)
print(f"\nPython 乘法: {py_time:.4f}秒")
print(f"NumPy 乘法: {np_time:.4f}秒")
print(f"加速比: {py_time/np_time:.2f}倍")
模式 12:使用 functools.lru_cache 缓存
from functools import lru_cache
import timeit
def fibonacci_slow(n):
"""无缓存的递归斐波那契。"""
if n < 2:
return n
return fibonacci_slow(n-1) + fibonacci_slow(n-2)
@lru_cache(maxsize=None)
def fibonacci_fast(n):
"""带缓存的递归斐波那契。"""
if n < 2:
return n
return fibonacci_fast(n-1) + fibonacci_fast(n-2)
# 递归算法的大幅加速
n = 30
slow_time = timeit.timeit(lambda: fibonacci_slow(n), number=1)
fast_time = timeit.timeit(lambda: fibonacci_fast(n), number=1000)
print(f"无缓存 (1次运行): {slow_time:.4f}秒")
print(f"有缓存 (1000次运行): {fast_time:.4f}秒")
# 缓存信息
print(f"缓存信息: {fibonacci_fast.cache_info()}")
模式 13:使用 __slots__ 节省内存
import sys
class RegularClass:
"""带 __dict__ 的普通类。"""
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
class SlottedClass:
"""使用 __slots__ 节省内存的类。"""
__slots__ = ['x', 'y', 'z']
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
# 内存对比
regular = RegularClass(1, 2, 3)
slotted = SlottedClass(1, 2, 3)
print(f"普通类大小: {sys.getsizeof(regular)} 字节")
print(f"插槽类大小: {sys.getsizeof(slotted)} 字节")
# 对于大量实例,节省显著
regular_objects = [RegularClass(i, i+1, i+2) for i in range(10000)]
slotted_objects = [SlottedClass(i, i+1, i+2) for i in range(10000)]
print(f"\n10000 个普通对象内存: ~{sys.getsizeof(regular) * 10000} 字节")
print(f"10000 个插槽类对象内存: ~{sys.getsizeof(slotted) * 10000} 字节")
模式 14:多进程处理 CPU 密集型任务
import multiprocessing as mp
import time
def cpu_intensive_task(n):
"""CPU 密集型计算。"""
return sum(i**2 for i in range(n))
def sequential_processing():
"""顺序处理任务。"""
start = time.time()
results = [cpu_intensive_task(1000000) for _ in range(4)]
elapsed = time.time() - start
return elapsed, results
def parallel_processing():
"""并行处理任务。"""
start = time.time()
with mp.Pool(processes=4) as pool:
results = pool.map(cpu_intensive_task, [1000000] * 4)
elapsed = time.time() - start
return elapsed, results
if __name__ == "__main__":
seq_time, seq_results = sequential_processing()
par_time, par_results = parallel_processing()
print(f"顺序处理: {seq_time:.2f}秒")
print(f"并行处理: {par_time:.2f}秒")
print(f"加速比: {seq_time/par_time:.2f}倍")
模式 15:异步 I/O 处理 I/O 密集型任务
import asyncio
import aiohttp
import time
import requests
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
def synchronous_requests():
"""同步 HTTP 请求。"""
start = time.time()
results = []
for url in urls:
response = requests.get(url)
results.append(response.status_code)
elapsed = time.time() - start
return elapsed, results
async def async_fetch(session, url):
"""异步 HTTP 请求。"""
async with session.get(url) as response:
return response.status
async def asynchronous_requests():
"""异步 HTTP 请求。"""
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [async_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
return elapsed, results
# 对于 I/O 密集型工作,异步快得多
sync_time, sync_results = synchronous_requests()
async_time, async_results = asyncio.run(asynchronous_requests())
print(f"同步: {sync_time:.2f}秒")
print(f"异步: {async_time:.2f}秒")
print(f"加速比: {sync_time/async_time:.2f}倍")
数据库优化
模式 16:批量数据库操作
import sqlite3
import time
def create_db():
"""创建测试数据库。"""
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
return conn
def slow_inserts(conn, count):
"""一次插入一条记录。"""
start = time.time()
cursor = conn.cursor()
for i in range(count):
cursor.execute("INSERT INTO users (name) VALUES (?)", (f"User {i}",))
conn.commit() # 每次插入都提交
elapsed = time.time() - start
return elapsed
def fast_inserts(conn, count):
"""批量插入,单次提交。"""
start = time.time()
cursor = conn.cursor()
data = [(f"User {i}",) for i in range(count)]
cursor.executemany("INSERT INTO users (name) VALUES (?)", data)
conn.commit() # 单次提交
elapsed = time.time() - start
return elapsed
# 基准测试
conn1 = create_db()
slow_time = slow_inserts(conn1, 1000)
conn2 = create_db()
fast_time = fast_inserts(conn2, 1000)
print(f"单条插入: {slow_time:.4f}秒")
print(f"批量插入: {fast_time:.4f}秒")
print(f"加速比: {slow_time/fast_time:.2f}倍")
模式 17:查询优化
# 为频繁查询的列使用索引
"""
-- 慢:无索引
SELECT * FROM users WHERE email = 'user@example.com';
-- 快:有索引
CREATE INDEX idx_users_email ON users(email);
SELECT * FROM users WHERE email = 'user@example.com';
"""
# 使用查询计划
import sqlite3
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
# 分析查询性能
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = ?", ("test@example.com",))
print(cursor.fetchall())
# 只 SELECT 需要的列
# 慢:SELECT *
# 快:SELECT id, name
内存优化
模式 18:检测内存泄漏
import tracemalloc
import gc
def memory_leak_example():
"""内存泄漏示例。"""
leaked_objects = []
for i in range(100000):
# 对象被添加但从未移除
leaked_objects.append([i] * 100)
# 在实际代码中,这将是一个意外的引用
def track_memory_usage():
"""跟踪内存分配。"""
tracemalloc.start()
# 之前拍摄快照
snapshot1 = tracemalloc.take_snapshot()
# 运行代码
memory_leak_example()
# 之后拍摄快照
snapshot2 = tracemalloc.take_snapshot()
# 比较
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("前 10 个内存分配:")
for stat in top_stats[:10]:
print(stat)
tracemalloc.stop()
# 监控内存
track_memory_usage()
# 强制垃圾回收
gc.collect()
模式 19:迭代器 vs 列表
import sys
def process_file_list(filename):
"""将整个文件加载到内存。"""
with open(filename) as f:
lines = f.readlines() # 加载所有行
return sum(1 for line in lines if line.strip())
def process_file_iterator(filename):
"""逐行处理文件。"""
with open(filename) as f:
return sum(1 for line in f if line.strip())
# 迭代器使用常量内存
# 列表将整个文件加载到内存
模式 20:使用弱引用缓存
import weakref
class CachedResource:
"""可被垃圾回收的资源。"""
def __init__(self, data):
self.data = data
# 普通缓存阻止垃圾回收
regular_cache = {}
def get_resource_regular(key):
"""从普通缓存获取资源。"""
if key not in regular_cache:
regular_cache[key] = CachedResource(f"Data for {key}")
return regular_cache[key]
# 弱引用缓存允许垃圾回收
weak_cache = weakref.WeakValueDictionary()
def get_resource_weak(key):
"""从弱引用缓存获取资源。"""
resource = weak_cache.get(key)
if resource is None:
resource = CachedResource(f"Data for {key}")
weak_cache[key] = resource
return resource
# 当没有强引用时,对象可以被垃圾回收
基准测试工具
自定义基准测试装饰器
import time
from functools import wraps
def benchmark(func):
"""用于基准测试函数执行的装饰器。"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} 耗时 {elapsed:.6f} 秒")
return result
return wrapper
@benchmark
def slow_function():
"""要基准测试的函数。"""
time.sleep(0.5)
return sum(range(1000000))
result = slow_function()
使用 pytest-benchmark 进行性能测试
# 安装:pip install pytest-benchmark
def test_list_comprehension(benchmark):
"""基准测试列表推导式。"""
result = benchmark(lambda: [i**2 for i in range(10000)])
assert len(result) == 10000
def test_map_function(benchmark):
"""基准测试 map 函数。"""
result = benchmark(lambda: list(map(lambda x: x**2, range(10000))))
assert len(result) == 10000
# 使用以下命令运行:pytest test_performance.py --benchmark-compare
最佳实践
- 先分析再优化 – 测量以找到真正的瓶颈
- 关注热点路径 – 优化最频繁运行的代码
- 使用合适的数据结构 – 查找用字典,成员检查用集合
- 避免过早优化 – 先保证清晰,再优化
- 使用内置函数 – 它们是用 C 实现的
- 缓存昂贵计算 – 使用 lru_cache
- 批处理 I/O 操作 – 减少系统调用
- 处理大数据集使用生成器
- 数值运算考虑 NumPy
- 分析生产代码 – 对实时系统使用 py-spy
常见陷阱
- 未分析就优化
- 不必要地使用全局变量
- 未使用合适的数据结构
- 创建不必要的数据副本
- 未对数据库使用连接池
- 忽略算法复杂度
- 过度优化罕见代码路径
- 未考虑内存使用
📄 原始文档
完整文档(英文):
https://skills.sh/wshobson/agents/python-performance-optimization
💡 提示:点击上方链接查看 skills.sh 原始英文文档,方便对照翻译。
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

评论(0)