进阶模式分析
3. 多Agent协作模式(Router + Worker + Critic)
通过多个专业化Agent的协作来提高复杂任务的处理能力。
class RouterAgent:
def __init__(self):
self.workers = {
'code': CodeAgent(),
'analysis': AnalysisAgent(),
'search': SearchAgent(),
'writing': WritingAgent()
}
self.critic = CriticAgent()
async def process(self, query: str) -> str:
# 步骤1: 路由决策
route = await self._route_query(query)
if not route:
return "无法确定合适的处理路径"
# 步骤2: 分配给专业Agent
worker = self.workers[route.domain]
result = await worker.process(route.augmented_query)
# 步骤3: 质量评估与修正
evaluation = await self.critic.evaluate(query, result)
if evaluation.needs_improvement:
# 递归改进或选择其他Agent
return await self._handle_improvement(query, result, evaluation)
return result
async def _route_query(self, query: str) -> Route:
prompt = f"""
分析查询并选择最佳处理路径:
查询: {query}
可用Agent:
- code: 代码生成、调试、审查
- analysis: 数据分析、趋势分析
- search: 信息检索、知识查询
- writing: 文档写作、内容创作
输出JSON格式的路由决策
"""
response = await self.llm.chat(prompt)
return Route.parse(response)
class CodeAgent:
async def process(self, query: str) -> str:
# 专业化的代码相关处理
pass
class CriticAgent:
async def evaluate(self, original_query: str, result: str) -> Evaluation:
# 质量评估逻辑
pass适用场景:
- 复杂的多领域任务
- 需要专业知识的场景
- 对质量要求极高的场景
优势:
- 专业化提升处理质量
- 支持复杂任务分解
- 便于扩展新的专业能力
局限性:
- 系统复杂度增加
- 协调成本较高
- 需要更复杂的治理机制
4. RAG增强Agent模式
通过检索增强生成来提高知识的准确性和时效性。
class RAGAgent:
def __init__(self, vector_store: VectorStore, knowledge_base: KnowledgeBase):
self.llm = ChatGPT()
self.vector_store = vector_store
self.knowledge_base = knowledge_base
self.retrieval_cache = {}
async def process(self, query: str) -> str:
# 步骤1: 查询理解和重写
enhanced_query = await self._enhance_query(query)
# 步骤2: 多路检索
retrieval_results = await self._multi_retrieval(enhanced_query)
# 步骤3: 检索结果融合与重排序
fused_context = await self._fuse_retrieval_results(retrieval_results)
# 步骤4: 基于上下文的生成
prompt = self._build_rag_prompt(query, fused_context)
response = await self.llm.chat(prompt)
# 步骤5: 结果验证和引用生成
return await self._verify_and_cite(response, retrieval_results)
async def _multi_retrieval(self, query: str) -> List[RetrievalResult]:
# 向量检索
vector_results = await self.vector_store.similarity_search(
query, top_k=10
)
# 关键词检索
keyword_results = await self.knowledge_base.keyword_search(
query, top_k=5
)
# 结构化检索
structured_results = await self.knowledge_base.structured_search(
query, tables=['documents', 'faq']
)
return vector_results + keyword_results + structured_results基础模式分析
在LLM Agent的架构设计中,基础模式是理解更复杂系统的前提。本节深入分析两种最核心的架构模式:单Agent + 工具调用模式和规划—执行—评估模式,通过详细的技术分析、代码示例和架构对比,帮助读者建立扎实的模式理解基础。
单Agent + 工具调用模式
模式概述
单Agent + 工具调用是最简单也是最实用的Agent架构模式。在这种模式下,一个LLM实例负责接收用户请求、理解任务意图、选择合适的工具执行,并将结果整合返回给用户。
核心架构组件
graph TB
A[用户输入] --> B[LLM Agent]
B --> C{工具选择}
C -->|查询API| D[工具执行器]
C -->|计算任务| E[本地计算]
C -->|信息检索| F[搜索工具]
D --> G[结果整合]
E --> G
F --> G
G --> H[响应输出]详细实现分析
1. 工具注册与管理
from typing import Dict, List, Callable, Any
import json
import asyncio
from dataclasses import dataclass
from enum import Enum
class ToolType(Enum):
API_CALL = "api_call"
LOCAL_FUNCTION = "local_function"
SEARCH = "search"
CALCULATION = "calculation"
@dataclass
class ToolDefinition:
name: str
description: str
parameters_schema: Dict[str, Any]
function: Callable
tool_type: ToolType
timeout: int = 30
retry_count: int = 3
class ToolRegistry:
def __init__(self):
self.tools: Dict[str, ToolDefinition] = {}
def register_tool(self, tool: ToolDefinition):
"""注册工具到工具库"""
self.tools[tool.name] = tool
def get_tool(self, name: str) -> ToolDefinition:
"""获取指定工具"""
return self.tools.get(name)
def list_tools(self) -> List[ToolDefinition]:
"""列出所有可用工具"""
return list(self.tools.values())
def get_schema_for_llm(self) -> Dict[str, Any]:
"""为LLM生成工具调用Schema"""
schema = {
"type": "function",
"functions": []
}
for tool in self.tools.values():
function_schema = {
"name": tool.name,
"description": tool.description,
"parameters": {
"type": "object",
"properties": tool.parameters_schema.get("properties", {}),
"required": tool.parameters_schema.get("required", [])
}
}
schema["functions"].append(function_schema)
return schema2. 工具执行器实现
import aiohttp
import json
from typing import Optional
import logging
class ToolExecutor:
def __init__(self, tool_registry: ToolRegistry):
self.registry = tool_registry
self.logger = logging.getLogger(__name__)
async def execute_tool(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""执行指定工具"""
tool = self.registry.get_tool(tool_name)
if not tool:
raise ValueError(f"Tool {tool_name} not found")
try:
if tool.tool_type == ToolType.API_CALL:
return await self._execute_api_tool(tool, parameters)
elif tool.tool_type == ToolType.LOCAL_FUNCTION:
return await self._execute_local_tool(tool, parameters)
elif tool.tool_type == ToolType.SEARCH:
return await self._execute_search_tool(tool, parameters)
elif tool.tool_type == ToolType.CALCULATION:
return await self._execute_calc_tool(tool, parameters)
else:
raise ValueError(f"Unknown tool type: {tool.tool_type}")
except Exception as e:
self.logger.error(f"Tool execution failed: {tool_name}, Error: {str(e)}")
return {
"error": str(e),
"status": "failed",
"tool": tool_name
}
async def _execute_api_tool(self, tool: ToolDefinition, params: Dict[str, Any]) -> Dict[str, Any]:
"""执行API调用工具"""
timeout = aiohttp.ClientTimeout(total=tool.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
# 假设API工具需要url和method参数
url = params.get("url")
method = params.get("method", "GET")
headers = params.get("headers", {})
data = params.get("data")
async with session.request(method, url, headers=headers, json=data) as response:
response_data = await response.json()
return {
"status": "success",
"data": response_data,
"tool": tool.name,
"metadata": {
"status_code": response.status,
"response_size": len(str(response_data))
}
}
async def _execute_local_tool(self, tool: ToolDefinition, params: Dict[str, Any]) -> Dict[str, Any]:
"""执行本地函数工具"""
result = tool.function(**params)
# 如果是异步函数
if asyncio.iscoroutinefunction(tool.function):
result = await result
return {
"status": "success",
"data": result,
"tool": tool.name,
"metadata": {
"execution_time": "N/A", # 可添加计时逻辑
"function": tool.function.__name__
}
}
async def _execute_search_tool(self, tool: ToolDefinition, params: Dict[str, Any]) -> Dict[str, Any]:
"""执行搜索工具"""
# 这里可以实现各种搜索逻辑
query = params.get("query")
max_results = params.get("max_results", 10)
# 模拟搜索结果
search_results = {
"query": query,
"results": [
{"title": f"Result {i}", "url": f"https://example.com/{i}", "snippet": f"Description for result {i}"}
for i in range(max_results)
],
"total": max_results
}
return {
"status": "success",
"data": search_results,
"tool": tool.name
}3. 单Agent实现
import openai
from typing import List, Dict, Any
import json
class SingleAgent:
def __init__(self, tool_registry: ToolRegistry, tool_executor: ToolExecutor, model_name: str = "gpt-4"):
self.tool_registry = tool_registry
self.tool_executor = tool_executor
self.model_name = model_name
self.conversation_history: List[Dict[str, str]] = []
def _build_system_prompt(self) -> str:
"""构建系统提示"""
tools_schema = self.tool_registry.get_schema_for_llm()
system_prompt = f"""你是一个智能助手,可以使用各种工具来帮助用户完成任务。
可用的工具:
{json.dumps(tools_schema, ensure_ascii=False, indent=2)}
使用指南:
1. 分析用户请求,理解需要执行的任务
2. 如果需要使用工具,调用相应的工具函数
3. 等待工具执行结果,然后根据结果回答用户
4. 如果任务需要多个步骤,可以按顺序执行多个工具调用
5. 始终以清晰、简洁的方式回答用户
工具调用的格式要求:
- 如果需要调用工具,使用function_call格式
- 参数必须符合工具定义的要求
- 不要自己编造参数值
"""
return system_prompt
async def process_message(self, user_message: str) -> str:
"""处理用户消息"""
# 添加用户消息到对话历史
self.conversation_history.append({"role": "user", "content": user_message})
# 构建消息列表
messages = [{"role": "system", "content": self._build_system_prompt()}]
messages.extend(self.conversation_history)
# 调用LLM
response = await self._call_llm(messages)
return str(response)
async def _call_llm(self, messages: List[Dict[str, str]]) -> str:
return "LLM response"