🚀 快速安装

复制以下命令并运行,立即安装此 Skill:

npx @anthropic-ai/skills install wshobson/agents/python-performance-optimization

💡 提示:需要 Node.js 和 NPM

Python 性能优化

用于分析、评估和优化 Python 代码的全面指南,涵盖 CPU 分析、内存优化和实现最佳实践。

何时使用该技能

  • 识别 Python 应用中的性能瓶颈
  • 降低应用延迟和响应时间
  • 优化 CPU 密集型操作
  • 减少内存消耗和内存泄漏
  • 提升数据库查询性能
  • 优化 I/O 操作
  • 加速数据处理管道
  • 实现高性能算法
  • 分析生产环境应用

核心概念

1. 分析类型

  • CPU 分析:识别耗时函数
  • 内存分析:跟踪内存分配和泄漏
  • 逐行分析:以行级粒度进行分析
  • 调用图:可视化函数调用关系

2. 性能指标

  • 执行时间:操作耗时
  • 内存使用:峰值和平均内存消耗
  • CPU 利用率:处理器使用模式
  • I/O 等待:I/O 操作耗时

3. 优化策略

  • 算法优化:使用更好的算法和数据结构
  • 实现优化:更高效的代码模式
  • 并行化:多线程/多进程
  • 缓存:避免重复计算
  • 原生扩展:对关键路径使用 C/Rust

快速入门

基础计时

import time

def measure_time():
    """简单的计时测量。"""
    start = time.time()

    # 你的代码在这里
    result = sum(range(1000000))

    elapsed = time.time() - start
    print(f"执行时间: {elapsed:.4f} 秒")
    return result

# 更好:使用 timeit 进行精确测量
import timeit

execution_time = timeit.timeit(
    "sum(range(1000000))",
    number=100
)
print(f"平均时间: {execution_time/100:.6f} 秒")

分析工具

模式 1:cProfile – CPU 分析

import cProfile
import pstats
from pstats import SortKey

def slow_function():
    """要分析的函数。"""
    total = 0
    for i in range(1000000):
        total += i
    return total

def another_function():
    """另一个函数。"""
    return [i**2 for i in range(100000)]

def main():
    """要分析的主函数。"""
    result1 = slow_function()
    result2 = another_function()
    return result1, result2

# 分析代码
if __name__ == "__main__":
    profiler = cProfile.Profile()
    profiler.enable()

    main()

    profiler.disable()

    # 打印统计信息
    stats = pstats.Stats(profiler)
    stats.sort_stats(SortKey.CUMULATIVE)
    stats.print_stats(10)  # 前 10 个函数

    # 保存到文件供后续分析
    stats.dump_stats("profile_output.prof")

命令行分析:

# 分析脚本
python -m cProfile -o output.prof script.py

# 查看结果
python -m pstats output.prof
# 在 pstats 中:
# sort cumtime
# stats 10

模式 2:line_profiler – 逐行分析

# 安装:pip install line-profiler

# 添加 @profile 装饰器(由 line_profiler 提供)
@profile
def process_data(data):
    """使用逐行分析处理数据。"""
    result = []
    for item in data:
        processed = item * 2
        result.append(processed)
    return result

# 使用以下命令运行:
# kernprof -l -v script.py

手动逐行分析:

from line_profiler import LineProfiler

def process_data(data):
    """要分析的函数。"""
    result = []
    for item in data:
        processed = item * 2
        result.append(processed)
    return result

if __name__ == "__main__":
    lp = LineProfiler()
    lp.add_function(process_data)

    data = list(range(100000))

    lp_wrapper = lp(process_data)
    lp_wrapper(data)

    lp.print_stats()

模式 3:memory_profiler – 内存使用

# 安装:pip install memory-profiler

from memory_profiler import profile

@profile
def memory_intensive():
    """消耗大量内存的函数。"""
    # 创建大列表
    big_list = [i for i in range(1000000)]

    # 创建大字典
    big_dict = {i: i**2 for i in range(100000)}

    # 处理数据
    result = sum(big_list)

    return result

if __name__ == "__main__":
    memory_intensive()

# 使用以下命令运行:
# python -m memory_profiler script.py

模式 4:py-spy – 生产环境分析

# 安装:pip install py-spy

# 分析正在运行的 Python 进程
py-spy top --pid 12345

# 生成火焰图
py-spy record -o profile.svg --pid 12345

# 分析脚本
py-spy record -o profile.svg -- python script.py

# 导出当前调用堆栈
py-spy dump --pid 12345

优化模式

模式 5:列表推导式 vs 循环

import timeit

# 慢:传统循环
def slow_squares(n):
    """使用循环创建平方数列表。"""
    result = []
    for i in range(n):
        result.append(i**2)
    return result

# 快:列表推导式
def fast_squares(n):
    """使用列表推导式创建平方数列表。"""
    return [i**2 for i in range(n)]

# 基准测试
n = 100000

slow_time = timeit.timeit(lambda: slow_squares(n), number=100)
fast_time = timeit.timeit(lambda: fast_squares(n), number=100)

print(f"循环: {slow_time:.4f}秒")
print(f"列表推导式: {fast_time:.4f}秒")
print(f"加速比: {slow_time/fast_time:.2f}倍")

# 对于简单操作,map 甚至可以更快
def faster_squares(n):
    """使用 map 获得更好性能。"""
    return list(map(lambda x: x**2, range(n)))

模式 6:生成器表达式节省内存

import sys

def list_approach():
    """内存密集型的列表方法。"""
    data = [i**2 for i in range(1000000)]
    return sum(data)

def generator_approach():
    """内存高效的生成器方法。"""
    data = (i**2 for i in range(1000000))
    return sum(data)

# 内存对比
list_data = [i for i in range(1000000)]
gen_data = (i for i in range(1000000))

print(f"列表大小: {sys.getsizeof(list_data)} 字节")
print(f"生成器大小: {sys.getsizeof(gen_data)} 字节")

# 生成器无论大小都使用常量内存

模式 7:字符串拼接

import timeit

def slow_concat(items):
    """慢速字符串拼接。"""
    result = ""
    for item in items:
        result += str(item)
    return result

def fast_concat(items):
    """使用 join 快速拼接。"""
    return "".join(str(item) for item in items)

def faster_concat(items):
    """使用列表可以更快。"""
    parts = [str(item) for item in items]
    return "".join(parts)

items = list(range(10000))

# 基准测试
slow = timeit.timeit(lambda: slow_concat(items), number=100)
fast = timeit.timeit(lambda: fast_concat(items), number=100)
faster = timeit.timeit(lambda: faster_concat(items), number=100)

print(f"拼接 (+): {slow:.4f}秒")
print(f"join (生成器): {fast:.4f}秒")
print(f"join (列表): {faster:.4f}秒")

模式 8:字典查找 vs 列表搜索

import timeit

# 创建测试数据
size = 10000
items = list(range(size))
lookup_dict = {i: i for i in range(size)}

def list_search(items, target):
    """O(n) 列表搜索。"""
    return target in items

def dict_search(lookup_dict, target):
    """O(1) 字典搜索。"""
    return target in lookup_dict

target = size - 1  # 列表的最坏情况

# 基准测试
list_time = timeit.timeit(
    lambda: list_search(items, target),
    number=1000
)
dict_time = timeit.timeit(
    lambda: dict_search(lookup_dict, target),
    number=1000
)

print(f"列表搜索: {list_time:.6f}秒")
print(f"字典搜索: {dict_time:.6f}秒")
print(f"加速比: {list_time/dict_time:.0f}倍")

模式 9:局部变量访问

import timeit

# 全局变量(慢)
GLOBAL_VALUE = 100

def use_global():
    """访问全局变量。"""
    total = 0
    for i in range(10000):
        total += GLOBAL_VALUE
    return total

def use_local():
    """使用局部变量。"""
    local_value = 100
    total = 0
    for i in range(10000):
        total += local_value
    return total

# 局部变量更快
global_time = timeit.timeit(use_global, number=1000)
local_time = timeit.timeit(use_local, number=1000)

print(f"全局访问: {global_time:.4f}秒")
print(f"局部访问: {local_time:.4f}秒")
print(f"加速比: {global_time/local_time:.2f}倍")

模式 10:函数调用开销

import timeit

def calculate_inline():
    """内联计算。"""
    total = 0
    for i in range(10000):
        total += i * 2 + 1
    return total

def helper_function(x):
    """辅助函数。"""
    return x * 2 + 1

def calculate_with_function():
    """带函数调用的计算。"""
    total = 0
    for i in range(10000):
        total += helper_function(i)
    return total

# 内联更快,因为没有调用开销
inline_time = timeit.timeit(calculate_inline, number=1000)
function_time = timeit.timeit(calculate_with_function, number=1000)

print(f"内联: {inline_time:.4f}秒")
print(f"函数调用: {function_time:.4f}秒")

高级优化

模式 11:NumPy 用于数值运算

import timeit
import numpy as np

def python_sum(n):
    """使用纯 Python 求和。"""
    return sum(range(n))

def numpy_sum(n):
    """使用 NumPy 求和。"""
    return np.arange(n).sum()

n = 1000000

python_time = timeit.timeit(lambda: python_sum(n), number=100)
numpy_time = timeit.timeit(lambda: numpy_sum(n), number=100)

print(f"Python: {python_time:.4f}秒")
print(f"NumPy: {numpy_time:.4f}秒")
print(f"加速比: {python_time/numpy_time:.2f}倍")

# 向量化操作
def python_multiply():
    """Python 中的逐元素乘法。"""
    a = list(range(100000))
    b = list(range(100000))
    return [x * y for x, y in zip(a, b)]

def numpy_multiply():
    """NumPy 中的向量化乘法。"""
    a = np.arange(100000)
    b = np.arange(100000)
    return a * b

py_time = timeit.timeit(python_multiply, number=100)
np_time = timeit.timeit(numpy_multiply, number=100)

print(f"\nPython 乘法: {py_time:.4f}秒")
print(f"NumPy 乘法: {np_time:.4f}秒")
print(f"加速比: {py_time/np_time:.2f}倍")

模式 12:使用 functools.lru_cache 缓存

from functools import lru_cache
import timeit

def fibonacci_slow(n):
    """无缓存的递归斐波那契。"""
    if n < 2:
        return n
    return fibonacci_slow(n-1) + fibonacci_slow(n-2)

@lru_cache(maxsize=None)
def fibonacci_fast(n):
    """带缓存的递归斐波那契。"""
    if n < 2:
        return n
    return fibonacci_fast(n-1) + fibonacci_fast(n-2)

# 递归算法的大幅加速
n = 30

slow_time = timeit.timeit(lambda: fibonacci_slow(n), number=1)
fast_time = timeit.timeit(lambda: fibonacci_fast(n), number=1000)

print(f"无缓存 (1次运行): {slow_time:.4f}秒")
print(f"有缓存 (1000次运行): {fast_time:.4f}秒")

# 缓存信息
print(f"缓存信息: {fibonacci_fast.cache_info()}")

模式 13:使用 __slots__ 节省内存

import sys

class RegularClass:
    """带 __dict__ 的普通类。"""
    def __init__(self, x, y, z):
        self.x = x
        self.y = y
        self.z = z

class SlottedClass:
    """使用 __slots__ 节省内存的类。"""
    __slots__ = ['x', 'y', 'z']

    def __init__(self, x, y, z):
        self.x = x
        self.y = y
        self.z = z

# 内存对比
regular = RegularClass(1, 2, 3)
slotted = SlottedClass(1, 2, 3)

print(f"普通类大小: {sys.getsizeof(regular)} 字节")
print(f"插槽类大小: {sys.getsizeof(slotted)} 字节")

# 对于大量实例,节省显著
regular_objects = [RegularClass(i, i+1, i+2) for i in range(10000)]
slotted_objects = [SlottedClass(i, i+1, i+2) for i in range(10000)]

print(f"\n10000 个普通对象内存: ~{sys.getsizeof(regular) * 10000} 字节")
print(f"10000 个插槽类对象内存: ~{sys.getsizeof(slotted) * 10000} 字节")

模式 14:多进程处理 CPU 密集型任务

import multiprocessing as mp
import time

def cpu_intensive_task(n):
    """CPU 密集型计算。"""
    return sum(i**2 for i in range(n))

def sequential_processing():
    """顺序处理任务。"""
    start = time.time()
    results = [cpu_intensive_task(1000000) for _ in range(4)]
    elapsed = time.time() - start
    return elapsed, results

def parallel_processing():
    """并行处理任务。"""
    start = time.time()
    with mp.Pool(processes=4) as pool:
        results = pool.map(cpu_intensive_task, [1000000] * 4)
    elapsed = time.time() - start
    return elapsed, results

if __name__ == "__main__":
    seq_time, seq_results = sequential_processing()
    par_time, par_results = parallel_processing()

    print(f"顺序处理: {seq_time:.2f}秒")
    print(f"并行处理: {par_time:.2f}秒")
    print(f"加速比: {seq_time/par_time:.2f}倍")

模式 15:异步 I/O 处理 I/O 密集型任务

import asyncio
import aiohttp
import time
import requests

urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

def synchronous_requests():
    """同步 HTTP 请求。"""
    start = time.time()
    results = []
    for url in urls:
        response = requests.get(url)
        results.append(response.status_code)
    elapsed = time.time() - start
    return elapsed, results

async def async_fetch(session, url):
    """异步 HTTP 请求。"""
    async with session.get(url) as response:
        return response.status

async def asynchronous_requests():
    """异步 HTTP 请求。"""
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [async_fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    return elapsed, results

# 对于 I/O 密集型工作,异步快得多
sync_time, sync_results = synchronous_requests()
async_time, async_results = asyncio.run(asynchronous_requests())

print(f"同步: {sync_time:.2f}秒")
print(f"异步: {async_time:.2f}秒")
print(f"加速比: {sync_time/async_time:.2f}倍")

数据库优化

模式 16:批量数据库操作

import sqlite3
import time

def create_db():
    """创建测试数据库。"""
    conn = sqlite3.connect(":memory:")
    conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
    return conn

def slow_inserts(conn, count):
    """一次插入一条记录。"""
    start = time.time()
    cursor = conn.cursor()
    for i in range(count):
        cursor.execute("INSERT INTO users (name) VALUES (?)", (f"User {i}",))
        conn.commit()  # 每次插入都提交
    elapsed = time.time() - start
    return elapsed

def fast_inserts(conn, count):
    """批量插入,单次提交。"""
    start = time.time()
    cursor = conn.cursor()
    data = [(f"User {i}",) for i in range(count)]
    cursor.executemany("INSERT INTO users (name) VALUES (?)", data)
    conn.commit()  # 单次提交
    elapsed = time.time() - start
    return elapsed

# 基准测试
conn1 = create_db()
slow_time = slow_inserts(conn1, 1000)

conn2 = create_db()
fast_time = fast_inserts(conn2, 1000)

print(f"单条插入: {slow_time:.4f}秒")
print(f"批量插入: {fast_time:.4f}秒")
print(f"加速比: {slow_time/fast_time:.2f}倍")

模式 17:查询优化

# 为频繁查询的列使用索引
"""
-- 慢:无索引
SELECT * FROM users WHERE email = 'user@example.com';

-- 快:有索引
CREATE INDEX idx_users_email ON users(email);
SELECT * FROM users WHERE email = 'user@example.com';
"""

# 使用查询计划
import sqlite3

conn = sqlite3.connect("example.db")
cursor = conn.cursor()

# 分析查询性能
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = ?", ("test@example.com",))
print(cursor.fetchall())

# 只 SELECT 需要的列
# 慢:SELECT *
# 快:SELECT id, name

内存优化

模式 18:检测内存泄漏

import tracemalloc
import gc

def memory_leak_example():
    """内存泄漏示例。"""
    leaked_objects = []

    for i in range(100000):
        # 对象被添加但从未移除
        leaked_objects.append([i] * 100)

    # 在实际代码中,这将是一个意外的引用

def track_memory_usage():
    """跟踪内存分配。"""
    tracemalloc.start()

    # 之前拍摄快照
    snapshot1 = tracemalloc.take_snapshot()

    # 运行代码
    memory_leak_example()

    # 之后拍摄快照
    snapshot2 = tracemalloc.take_snapshot()

    # 比较
    top_stats = snapshot2.compare_to(snapshot1, 'lineno')

    print("前 10 个内存分配:")
    for stat in top_stats[:10]:
        print(stat)

    tracemalloc.stop()

# 监控内存
track_memory_usage()

# 强制垃圾回收
gc.collect()

模式 19:迭代器 vs 列表

import sys

def process_file_list(filename):
    """将整个文件加载到内存。"""
    with open(filename) as f:
        lines = f.readlines()  # 加载所有行
        return sum(1 for line in lines if line.strip())

def process_file_iterator(filename):
    """逐行处理文件。"""
    with open(filename) as f:
        return sum(1 for line in f if line.strip())

# 迭代器使用常量内存
# 列表将整个文件加载到内存

模式 20:使用弱引用缓存

import weakref

class CachedResource:
    """可被垃圾回收的资源。"""
    def __init__(self, data):
        self.data = data

# 普通缓存阻止垃圾回收
regular_cache = {}

def get_resource_regular(key):
    """从普通缓存获取资源。"""
    if key not in regular_cache:
        regular_cache[key] = CachedResource(f"Data for {key}")
    return regular_cache[key]

# 弱引用缓存允许垃圾回收
weak_cache = weakref.WeakValueDictionary()

def get_resource_weak(key):
    """从弱引用缓存获取资源。"""
    resource = weak_cache.get(key)
    if resource is None:
        resource = CachedResource(f"Data for {key}")
        weak_cache[key] = resource
    return resource

# 当没有强引用时,对象可以被垃圾回收

基准测试工具

自定义基准测试装饰器

import time
from functools import wraps

def benchmark(func):
    """用于基准测试函数执行的装饰器。"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} 耗时 {elapsed:.6f} 秒")
        return result
    return wrapper

@benchmark
def slow_function():
    """要基准测试的函数。"""
    time.sleep(0.5)
    return sum(range(1000000))

result = slow_function()

使用 pytest-benchmark 进行性能测试

# 安装:pip install pytest-benchmark

def test_list_comprehension(benchmark):
    """基准测试列表推导式。"""
    result = benchmark(lambda: [i**2 for i in range(10000)])
    assert len(result) == 10000

def test_map_function(benchmark):
    """基准测试 map 函数。"""
    result = benchmark(lambda: list(map(lambda x: x**2, range(10000))))
    assert len(result) == 10000

# 使用以下命令运行:pytest test_performance.py --benchmark-compare

最佳实践

  1. 先分析再优化 – 测量以找到真正的瓶颈
  2. 关注热点路径 – 优化最频繁运行的代码
  3. 使用合适的数据结构 – 查找用字典,成员检查用集合
  4. 避免过早优化 – 先保证清晰,再优化
  5. 使用内置函数 – 它们是用 C 实现的
  6. 缓存昂贵计算 – 使用 lru_cache
  7. 批处理 I/O 操作 – 减少系统调用
  8. 处理大数据集使用生成器
  9. 数值运算考虑 NumPy
  10. 分析生产代码 – 对实时系统使用 py-spy

常见陷阱

  • 未分析就优化
  • 不必要地使用全局变量
  • 未使用合适的数据结构
  • 创建不必要的数据副本
  • 未对数据库使用连接池
  • 忽略算法复杂度
  • 过度优化罕见代码路径
  • 未考虑内存使用

📄 原始文档

完整文档(英文):

https://skills.sh/wshobson/agents/python-performance-optimization

💡 提示:点击上方链接查看 skills.sh 原始英文文档,方便对照翻译。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。