LLM Agent架构设计模式与核心组件分析 - Part 7 进阶模式分析

📑 目录

进阶模式分析

3. 多Agent协作模式(Router + Worker + Critic)

通过多个专业化Agent的协作来提高复杂任务的处理能力。

class RouterAgent:
    def __init__(self):
        self.workers = {
            'code': CodeAgent(),
            'analysis': AnalysisAgent(),
            'search': SearchAgent(),
            'writing': WritingAgent()
        }
        self.critic = CriticAgent()
    
    async def process(self, query: str) -> str:
        # 步骤1: 路由决策
        route = await self._route_query(query)
        if not route:
            return "无法确定合适的处理路径"
        
        # 步骤2: 分配给专业Agent
        worker = self.workers[route.domain]
        result = await worker.process(route.augmented_query)
        
        # 步骤3: 质量评估与修正
        evaluation = await self.critic.evaluate(query, result)
        
        if evaluation.needs_improvement:
            # 递归改进或选择其他Agent
            return await self._handle_improvement(query, result, evaluation)
        
        return result
    
    async def _route_query(self, query: str) -> Route:
        prompt = f"""
        分析查询并选择最佳处理路径:
        查询: {query}
        
        可用Agent:
        - code: 代码生成、调试、审查
        - analysis: 数据分析、趋势分析
        - search: 信息检索、知识查询  
        - writing: 文档写作、内容创作
        
        输出JSON格式的路由决策
        """
        response = await self.llm.chat(prompt)
        return Route.parse(response)

class CodeAgent:
    async def process(self, query: str) -> str:
        # 专业化的代码相关处理
        pass

class CriticAgent:
    async def evaluate(self, original_query: str, result: str) -> Evaluation:
        # 质量评估逻辑
        pass

适用场景

  • 复杂的多领域任务
  • 需要专业知识的场景
  • 对质量要求极高的场景

优势

  • 专业化提升处理质量
  • 支持复杂任务分解
  • 便于扩展新的专业能力

局限性

  • 系统复杂度增加
  • 协调成本较高
  • 需要更复杂的治理机制

4. RAG增强Agent模式

通过检索增强生成来提高知识的准确性和时效性。

class RAGAgent:
    def __init__(self, vector_store: VectorStore, knowledge_base: KnowledgeBase):
        self.llm = ChatGPT()
        self.vector_store = vector_store
        self.knowledge_base = knowledge_base
        self.retrieval_cache = {}
    
    async def process(self, query: str) -> str:
        # 步骤1: 查询理解和重写
        enhanced_query = await self._enhance_query(query)
        
        # 步骤2: 多路检索
        retrieval_results = await self._multi_retrieval(enhanced_query)
        
        # 步骤3: 检索结果融合与重排序
        fused_context = await self._fuse_retrieval_results(retrieval_results)
        
        # 步骤4: 基于上下文的生成
        prompt = self._build_rag_prompt(query, fused_context)
        response = await self.llm.chat(prompt)
        
        # 步骤5: 结果验证和引用生成
        return await self._verify_and_cite(response, retrieval_results)
    
    async def _multi_retrieval(self, query: str) -> List[RetrievalResult]:
        # 向量检索
        vector_results = await self.vector_store.similarity_search(
            query, top_k=10
        )
        
        # 关键词检索
        keyword_results = await self.knowledge_base.keyword_search(
            query, top_k=5
        )
        
        # 结构化检索
        structured_results = await self.knowledge_base.structured_search(
            query, tables=['documents', 'faq']
        )
        
        return vector_results + keyword_results + structured_results

基础模式分析

在LLM Agent的架构设计中,基础模式是理解更复杂系统的前提。本节深入分析两种最核心的架构模式:单Agent + 工具调用模式规划—执行—评估模式,通过详细的技术分析、代码示例和架构对比,帮助读者建立扎实的模式理解基础。

单Agent + 工具调用模式

模式概述

单Agent + 工具调用是最简单也是最实用的Agent架构模式。在这种模式下,一个LLM实例负责接收用户请求、理解任务意图、选择合适的工具执行,并将结果整合返回给用户。

核心架构组件

graph TB
    A[用户输入] --> B[LLM Agent]
    B --> C{工具选择}
    C -->|查询API| D[工具执行器]
    C -->|计算任务| E[本地计算]
    C -->|信息检索| F[搜索工具]
    D --> G[结果整合]
    E --> G
    F --> G
    G --> H[响应输出]

详细实现分析

1. 工具注册与管理

from typing import Dict, List, Callable, Any
import json
import asyncio
from dataclasses import dataclass
from enum import Enum

class ToolType(Enum):
    API_CALL = "api_call"
    LOCAL_FUNCTION = "local_function"
    SEARCH = "search"
    CALCULATION = "calculation"

@dataclass
class ToolDefinition:
    name: str
    description: str
    parameters_schema: Dict[str, Any]
    function: Callable
    tool_type: ToolType
    timeout: int = 30
    retry_count: int = 3

class ToolRegistry:
    def __init__(self):
        self.tools: Dict[str, ToolDefinition] = {}
    
    def register_tool(self, tool: ToolDefinition):
        """注册工具到工具库"""
        self.tools[tool.name] = tool
    
    def get_tool(self, name: str) -> ToolDefinition:
        """获取指定工具"""
        return self.tools.get(name)
    
    def list_tools(self) -> List[ToolDefinition]:
        """列出所有可用工具"""
        return list(self.tools.values())
    
    def get_schema_for_llm(self) -> Dict[str, Any]:
        """为LLM生成工具调用Schema"""
        schema = {
            "type": "function",
            "functions": []
        }
        
        for tool in self.tools.values():
            function_schema = {
                "name": tool.name,
                "description": tool.description,
                "parameters": {
                    "type": "object",
                    "properties": tool.parameters_schema.get("properties", {}),
                    "required": tool.parameters_schema.get("required", [])
                }
            }
            schema["functions"].append(function_schema)
        
        return schema

2. 工具执行器实现

import aiohttp
import json
from typing import Optional
import logging

class ToolExecutor:
    def __init__(self, tool_registry: ToolRegistry):
        self.registry = tool_registry
        self.logger = logging.getLogger(__name__)
    
    async def execute_tool(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """执行指定工具"""
        tool = self.registry.get_tool(tool_name)
        if not tool:
            raise ValueError(f"Tool {tool_name} not found")
        
        try:
            if tool.tool_type == ToolType.API_CALL:
                return await self._execute_api_tool(tool, parameters)
            elif tool.tool_type == ToolType.LOCAL_FUNCTION:
                return await self._execute_local_tool(tool, parameters)
            elif tool.tool_type == ToolType.SEARCH:
                return await self._execute_search_tool(tool, parameters)
            elif tool.tool_type == ToolType.CALCULATION:
                return await self._execute_calc_tool(tool, parameters)
            else:
                raise ValueError(f"Unknown tool type: {tool.tool_type}")
        
        except Exception as e:
            self.logger.error(f"Tool execution failed: {tool_name}, Error: {str(e)}")
            return {
                "error": str(e),
                "status": "failed",
                "tool": tool_name
            }
    
    async def _execute_api_tool(self, tool: ToolDefinition, params: Dict[str, Any]) -> Dict[str, Any]:
        """执行API调用工具"""
        timeout = aiohttp.ClientTimeout(total=tool.timeout)
        
        async with aiohttp.ClientSession(timeout=timeout) as session:
            # 假设API工具需要url和method参数
            url = params.get("url")
            method = params.get("method", "GET")
            headers = params.get("headers", {})
            data = params.get("data")
            
            async with session.request(method, url, headers=headers, json=data) as response:
                response_data = await response.json()
                return {
                    "status": "success",
                    "data": response_data,
                    "tool": tool.name,
                    "metadata": {
                        "status_code": response.status,
                        "response_size": len(str(response_data))
                    }
                }
    
    async def _execute_local_tool(self, tool: ToolDefinition, params: Dict[str, Any]) -> Dict[str, Any]:
        """执行本地函数工具"""
        result = tool.function(**params)
        
        # 如果是异步函数
        if asyncio.iscoroutinefunction(tool.function):
            result = await result
        
        return {
            "status": "success",
            "data": result,
            "tool": tool.name,
            "metadata": {
                "execution_time": "N/A",  # 可添加计时逻辑
                "function": tool.function.__name__
            }
        }
    
    async def _execute_search_tool(self, tool: ToolDefinition, params: Dict[str, Any]) -> Dict[str, Any]:
        """执行搜索工具"""
        # 这里可以实现各种搜索逻辑
        query = params.get("query")
        max_results = params.get("max_results", 10)
        
        # 模拟搜索结果
        search_results = {
            "query": query,
            "results": [
                {"title": f"Result {i}", "url": f"https://example.com/{i}", "snippet": f"Description for result {i}"}
                for i in range(max_results)
            ],
            "total": max_results
        }
        
        return {
            "status": "success",
            "data": search_results,
            "tool": tool.name
        }

3. 单Agent实现

import openai
from typing import List, Dict, Any
import json

class SingleAgent:
    def __init__(self, tool_registry: ToolRegistry, tool_executor: ToolExecutor, model_name: str = "gpt-4"):
        self.tool_registry = tool_registry
        self.tool_executor = tool_executor
        self.model_name = model_name
        self.conversation_history: List[Dict[str, str]] = []
    
    def _build_system_prompt(self) -> str:
        """构建系统提示"""
        tools_schema = self.tool_registry.get_schema_for_llm()
        
        system_prompt = f"""你是一个智能助手,可以使用各种工具来帮助用户完成任务。

可用的工具:
{json.dumps(tools_schema, ensure_ascii=False, indent=2)}

使用指南:
1. 分析用户请求,理解需要执行的任务
2. 如果需要使用工具,调用相应的工具函数
3. 等待工具执行结果,然后根据结果回答用户
4. 如果任务需要多个步骤,可以按顺序执行多个工具调用
5. 始终以清晰、简洁的方式回答用户

工具调用的格式要求:
- 如果需要调用工具,使用function_call格式
- 参数必须符合工具定义的要求
- 不要自己编造参数值
"""
        return system_prompt
    
    async def process_message(self, user_message: str) -> str:
        """处理用户消息"""
        # 添加用户消息到对话历史
        self.conversation_history.append({"role": "user", "content": user_message})
        
        # 构建消息列表
        messages = [{"role": "system", "content": self._build_system_prompt()}]
        messages.extend(self.conversation_history)
        
        # 调用LLM
        response = await self._call_llm(messages)
        return str(response)
    
    async def _call_llm(self, messages: List[Dict[str, str]]) -> str:
        return "LLM response"