🚀 快速安装
复制以下命令并运行,立即安装此 Skill:
npx skills add https://github.com/wshobson/agents --skill langchain-architecture
💡 提示:需要 Node.js 和 NPM
LangChain & LangGraph 架构
掌握现代 LangChain 1.x 和 LangGraph,以构建具有代理、状态管理、内存和工具集成的复杂 LLM 应用程序。
何时使用此技能
- 构建具有工具访问权限的自主 AI 代理
- 实现复杂的多步骤 LLM 工作流
- 管理对话内存和状态
- 将 LLM 与外部数据源和 API 集成
- 创建模块化、可重用的 LLM 应用程序组件
- 实现文档处理管道
- 构建生产级 LLM 应用程序
包结构(LangChain 1.x)
langchain (1.2.x) # 高级编排
langchain-core (1.2.x) # 核心抽象(消息、提示、工具)
langchain-community # 第三方集成
langgraph # 代理编排和状态管理
langchain-openai # OpenAI 集成
langchain-anthropic # Anthropic/Claude 集成
langchain-voyageai # Voyage AI 嵌入模型
langchain-pinecone # Pinecone 向量存储
核心概念
1. LangGraph 代理
LangGraph 是 2026 年构建代理的标准。它提供:
关键特性:
- 状态图:具有类型化状态的显式状态管理
- 持久化执行:代理在失败后仍然存在
- 人在回路:在任何时候检查和修改状态
- 内存:跨会话的短期和长期内存
- 检查点:保存和恢复代理状态
代理模式:
- ReAct:使用
create_react_agent进行推理和行动 - 计划与执行:分离规划和执行节点
- 多代理:主管在专门代理之间进行路由
- 工具调用:使用 Pydantic 模式进行结构化工具调用
2. 状态管理
LangGraph 使用 TypedDict 进行显式状态管理:
from typing import Annotated, TypedDict
from langgraph.graph import MessagesState
# 基于消息的简单状态
class AgentState(MessagesState):
"""扩展 MessagesState 并添加自定义字段。"""
context: Annotated[list, "检索到的文档"]
# 用于复杂代理的自定义状态
class CustomState(TypedDict):
messages: Annotated[list, "对话历史"]
context: Annotated[dict, "检索到的上下文"]
current_step: str
results: list
3. 内存系统
现代内存实现:
- 会话缓冲内存:存储所有消息(简短对话)
- 会话摘要内存:总结较旧的消息(长对话)
- 会话令牌缓冲内存:基于令牌的窗口
- 向量存储检索器内存:基于语义相似性的检索
- LangGraph 检查点:跨会话的持久化状态
4. 文档处理
加载、转换和存储文档:
组件:
- 文档加载器:从各种来源加载
- 文本分割器:智能地对文档进行分块
- 向量存储:存储和检索嵌入向量
- 检索器:获取相关文档
5. 回调与追踪
LangSmith 是可观测性的标准:
- 请求/响应日志记录
- 令牌使用情况跟踪
- 延迟监控
- 错误跟踪
- 追踪可视化
快速开始
使用 LangGraph 的现代 ReAct 代理
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
from langchain_anthropic import ChatAnthropic
from langchain_core.tools import tool
import ast
import operator
# 初始化 LLM(推荐使用 Claude Sonnet 4.6)
llm = ChatAnthropic(model="claude-sonnet-4-6", temperature=0)
# 使用 Pydantic 模式定义工具
@tool
def search_database(query: str) -> str:
"""搜索内部数据库以获取信息。"""
# 你的数据库搜索逻辑
return f"Results for: {query}"
@tool
def calculate(expression: str) -> str:
"""安全地计算数学表达式。
支持:+, -, *, /, **, %, 括号
示例:'(2 + 3) * 4' 返回 '20'
"""
# 使用 ast 进行安全数学计算
allowed_operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.Mod: operator.mod,
ast.USub: operator.neg,
}
def _eval(node):
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.BinOp):
left = _eval(node.left)
right = _eval(node.right)
return allowed_operators[type(node.op)](left, right)
elif isinstance(node, ast.UnaryOp):
operand = _eval(node.operand)
return allowed_operators[type(node.op)](operand)
else:
raise ValueError(f"Unsupported operation: {type(node)}")
try:
tree = ast.parse(expression, mode='eval')
return str(_eval(tree.body))
except Exception as e:
return f"Error: {e}"
tools = [search_database, calculate]
# 创建用于内存持久化的检查点
checkpointer = MemorySaver()
# 创建 ReAct 代理
agent = create_react_agent(
llm,
tools,
checkpointer=checkpointer
)
# 使用线程 ID 运行代理以进行内存管理
config = {"configurable": {"thread_id": "user-123"}}
result = await agent.ainvoke(
{"messages": [("user", "Search for Python tutorials and calculate 25 * 4")]},
config=config
)
架构模式
模式 1:使用 LangGraph 的 RAG
from langgraph.graph import StateGraph, START, END
from langchain_anthropic import ChatAnthropic
from langchain_voyageai import VoyageAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, Annotated
class RAGState(TypedDict):
question: str
context: Annotated[list[Document], "检索到的文档"]
answer: str
# 初始化组件
llm = ChatAnthropic(model="claude-sonnet-4-6")
embeddings = VoyageAIEmbeddings(model="voyage-3-large")
vectorstore = PineconeVectorStore(index_name="docs", embedding=embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 4})
# 定义节点
async def retrieve(state: RAGState) -> RAGState:
"""检索相关文档。"""
docs = await retriever.ainvoke(state["question"])
return {"context": docs}
async def generate(state: RAGState) -> RAGState:
"""根据上下文生成答案。"""
prompt = ChatPromptTemplate.from_template(
"""根据以下上下文回答问题。如果你无法回答,请如实说明。
上下文:{context}
问题:{question}
答案:"""
)
context_text = "\n\n".join(doc.page_content for doc in state["context"])
response = await llm.ainvoke(
prompt.format(context=context_text, question=state["question"])
)
return {"answer": response.content}
# 构建图
builder = StateGraph(RAGState)
builder.add_node("retrieve", retrieve)
builder.add_node("generate", generate)
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
builder.add_edge("generate", END)
rag_chain = builder.compile()
# 使用链
result = await rag_chain.ainvoke({"question": "主要主题是什么?"})
模式 2:具有结构化工具的自定义代理
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
"""数据库搜索的输入。"""
query: str = Field(description="搜索查询")
filters: dict = Field(default={}, description="可选的过滤器")
class EmailInput(BaseModel):
"""发送电子邮件的输入。"""
recipient: str = Field(description="邮件收件人")
subject: str = Field(description="邮件主题")
content: str = Field(description="邮件正文")
async def search_database(query: str, filters: dict = {}) -> str:
"""搜索内部数据库以获取信息。"""
# 你的数据库搜索逻辑
return f"Results for '{query}' with filters {filters}"
async def send_email(recipient: str, subject: str, content: str) -> str:
"""向指定收件人发送电子邮件。"""
# 邮件发送逻辑
return f"Email sent to {recipient}"
tools = [
StructuredTool.from_function(
coroutine=search_database,
name="search_database",
description="搜索内部数据库",
args_schema=SearchInput
),
StructuredTool.from_function(
coroutine=send_email,
name="send_email",
description="发送电子邮件",
args_schema=EmailInput
)
]
agent = create_react_agent(llm, tools)
模式 3:使用状态图的多步骤工作流
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Literal
class WorkflowState(TypedDict):
text: str
entities: list
analysis: str
summary: str
current_step: str
async def extract_entities(state: WorkflowState) -> WorkflowState:
"""从文本中提取关键实体。"""
prompt = f"从以下文本中提取关键实体:{state['text']}\n\n以 JSON 列表形式返回。"
response = await llm.ainvoke(prompt)
return {"entities": response.content, "current_step": "analyze"}
async def analyze_entities(state: WorkflowState) -> WorkflowState:
"""分析提取的实体。"""
prompt = f"分析这些实体:{state['entities']}\n\n提供见解。"
response = await llm.ainvoke(prompt)
return {"analysis": response.content, "current_step": "summarize"}
async def generate_summary(state: WorkflowState) -> WorkflowState:
"""生成最终摘要。"""
prompt = f"""总结:
实体:{state['entities']}
分析:{state['analysis']}
提供简洁的摘要。"""
response = await llm.ainvoke(prompt)
return {"summary": response.content, "current_step": "complete"}
def route_step(state: WorkflowState) -> Literal["analyze", "summarize", "end"]:
"""根据当前状态路由到下一步。"""
step = state.get("current_step", "extract")
if step == "analyze":
return "analyze"
elif step == "summarize":
return "summarize"
return "end"
# 构建工作流
builder = StateGraph(WorkflowState)
builder.add_node("extract", extract_entities)
builder.add_node("analyze", analyze_entities)
builder.add_node("summarize", generate_summary)
builder.add_edge(START, "extract")
builder.add_conditional_edges("extract", route_step, {
"analyze": "analyze",
"summarize": "summarize",
"end": END
})
builder.add_conditional_edges("analyze", route_step, {
"summarize": "summarize",
"end": END
})
builder.add_edge("summarize", END)
workflow = builder.compile()
模式 4:多代理编排
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage
from typing import Literal
class MultiAgentState(TypedDict):
messages: list
next_agent: str
# 创建专门的代理
researcher = create_react_agent(llm, research_tools)
writer = create_react_agent(llm, writing_tools)
reviewer = create_react_agent(llm, review_tools)
async def supervisor(state: MultiAgentState) -> MultiAgentState:
"""根据任务路由到适当的代理。"""
prompt = f"""根据对话,哪个代理应该处理这个?
选项:
- researcher:用于查找信息
- writer:用于创建内容
- reviewer:用于审查和编辑
- FINISH:任务已完成
消息:{state['messages']}
仅回复代理名称。"""
response = await llm.ainvoke(prompt)
return {"next_agent": response.content.strip().lower()}
def route_to_agent(state: MultiAgentState) -> Literal["researcher", "writer", "reviewer", "end"]:
"""根据主管决策进行路由。"""
next_agent = state.get("next_agent", "").lower()
if next_agent == "finish":
return "end"
return next_agent if next_agent in ["researcher", "writer", "reviewer"] else "end"
# 构建多代理图
builder = StateGraph(MultiAgentState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)
builder.add_node("reviewer", reviewer)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", route_to_agent, {
"researcher": "researcher",
"writer": "writer",
"reviewer": "reviewer",
"end": END
})
# 每个代理返回到主管
for agent in ["researcher", "writer", "reviewer"]:
builder.add_edge(agent, "supervisor")
multi_agent = builder.compile()
内存管理
使用 LangGraph 的基于令牌的内存
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
# 内存检查点(开发环境)
checkpointer = MemorySaver()
# 创建具有持久内存的代理
agent = create_react_agent(llm, tools, checkpointer=checkpointer)
# 每个 thread_id 维护独立的对话
config = {"configurable": {"thread_id": "session-abc123"}}
# 使用相同的 thread_id,消息在多次调用中持久存在
result1 = await agent.ainvoke({"messages": [("user", "我的名字是 Alice")]}, config)
result2 = await agent.ainvoke({"messages": [("user", "我的名字是什么?")]}, config)
# 代理记住:"你的名字是 Alice"
使用 PostgreSQL 的生产环境内存
from langgraph.checkpoint.postgres import PostgresSaver
# 生产环境检查点
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/langgraph"
)
agent = create_react_agent(llm, tools, checkpointer=checkpointer)
用于长期上下文的向量存储内存
from langchain_community.vectorstores import Chroma
from langchain_voyageai import VoyageAIEmbeddings
embeddings = VoyageAIEmbeddings(model="voyage-3-large")
memory_store = Chroma(
collection_name="conversation_memory",
embedding_function=embeddings,
persist_directory="./memory_db"
)
async def retrieve_relevant_memory(query: str, k: int = 5) -> list:
"""检索相关的过去对话。"""
docs = await memory_store.asimilarity_search(query, k=k)
return [doc.page_content for doc in docs]
async def store_memory(content: str, metadata: dict = {}):
"""将对话存储在长期内存中。"""
await memory_store.aadd_texts([content], metadatas=[metadata])
回调系统和 LangSmith
LangSmith 追踪
import os
from langchain_anthropic import ChatAnthropic
# 启用 LangSmith 追踪
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-project"
# 所有 LangChain/LangGraph 操作都会自动被追踪
llm = ChatAnthropic(model="claude-sonnet-4-6")
自定义回调处理器
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any, Dict, List
class CustomCallbackHandler(BaseCallbackHandler):
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs
) -> None:
print(f"LLM started with {len(prompts)} prompts")
def on_llm_end(self, response, **kwargs) -> None:
print(f"LLM completed: {len(response.generations)} generations")
def on_llm_error(self, error: Exception, **kwargs) -> None:
print(f"LLM error: {error}")
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs
) -> None:
print(f"Tool started: {serialized.get('name')}")
def on_tool_end(self, output: str, **kwargs) -> None:
print(f"Tool completed: {output[:100]}...")
# 使用回调
result = await agent.ainvoke(
{"messages": [("user", "query")]},
config={"callbacks": [CustomCallbackHandler()]}
)
流式响应
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-6", streaming=True)
# 流式传输令牌
async for chunk in llm.astream("给我讲个故事"):
print(chunk.content, end="", flush=True)
# 流式传输代理事件
async for event in agent.astream_events(
{"messages": [("user", "搜索并总结")]},
version="v2"
):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="")
elif event["event"] == "on_tool_start":
print(f"\n[正在使用工具:{event['name']}]")
测试策略
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_agent_tool_selection():
"""测试代理选择正确的工具。"""
with patch.object(llm, 'ainvoke') as mock_llm:
mock_llm.return_value = AsyncMock(content="Using search_database")
result = await agent.ainvoke({
"messages": [("user", "search for documents")]
})
# 验证工具是否被调用
assert "search_database" in str(result)
@pytest.mark.asyncio
async def test_memory_persistence():
"""测试内存跨调用持久化。"""
config = {"configurable": {"thread_id": "test-thread"}}
# 第一条消息
await agent.ainvoke(
{"messages": [("user", "记住:密码是 12345")]},
config
)
# 第二条消息应该记住
result = await agent.ainvoke(
{"messages": [("user", "密码是什么?")]},
config
)
assert "12345" in result["messages"][-1].content
性能优化
1. 使用 Redis 缓存
from langchain_community.cache import RedisCache
from langchain_core.globals import set_llm_cache
import redis
redis_client = redis.Redis.from_url("redis://localhost:6379")
set_llm_cache(RedisCache(redis_client))
2. 异步批处理
import asyncio
from langchain_core.documents import Document
async def process_documents(documents: list[Document]) -> list:
"""并行处理文档。"""
tasks = [process_single(doc) for doc in documents]
return await asyncio.gather(*tasks)
async def process_single(doc: Document) -> dict:
"""处理单个文档。"""
chunks = text_splitter.split_documents([doc])
embeddings = await embeddings_model.aembed_documents(
[c.page_content for c in chunks]
)
return {"doc_id": doc.metadata.get("id"), "embeddings": embeddings}
3. 连接池
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone
# 重用 Pinecone 客户端
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
index = pc.Index("my-index")
# 使用现有索引创建向量存储
vectorstore = PineconeVectorStore(index=index, embedding=embeddings)
📄 原始文档
完整文档(英文):
https://skills.sh/wshobson/agents/langchain-architecture
💡 提示:点击上方链接查看 skills.sh 原始英文文档,方便对照翻译。
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

评论(0)