LLM Agent架构设计模式与核心组件分析 - Part 5 编排协作模式

📑 目录

2. 编排协作模式:集中式编排 vs. 分布式协作

2.1 集中式编排模式

集中式编排模式采用单一的控制中心来协调所有Agent的行为,类似于传统的微服务架构中的API网关。

架构特征:

             ┌─────────────┐
             │ 编排中心     │
             │ Orchestrator │
             └─────┬───────┘
                   │
     ┌─────────────┼─────────────┐
     │             │             │
┌────▼────┐   ┌────▼────┐   ┌────▼────┐
│ Agent A │   │ Agent B │   │ Agent C │
└─────────┘   └─────────┘   └─────────┘

核心实现:

class CentralizedOrchestrator:
    def __init__(self, router, agents, coordinator):
        self.router = router
        self.agents = agents
        self.coordinator = coordinator
    
    async def process_complex_task(self, task):
        subtasks = self.router.decompose(task)
        results = []
        for subtask in subtasks:
            agent = self.router.select_agent(subtask)
            results.append(await agent.execute(subtask))
        return await self.coordinator.synthesize(results)

核心组件深入分析

LLM Agent的架构可以抽象为六个核心层次,每一层承担不同的职责,相互协作完成复杂任务。下图展示了完整的组件架构:

graph TB
    subgraph 交互层
        UI[对话UI]
        CTX[会话上下文管理]
        MM[多模态处理器]
    end
    
    subgraph 编排与控制层
        PLAN[任务规划器]
        EXEC[执行控制器]
        POLICY[策略引擎]
    end
    
    subgraph 知识与记忆层
        STM[短期记忆]
        LTM[长期记忆]
        KG[知识图谱]
    end
    
    subgraph 工具与适配层
        REG[工具注册中心]
        CALL[调用适配器]
        NORM[结果标准化]
    end
    
    subgraph 安全与治理层
        AUTH[权限管理]
        SANDBOX[沙箱隔离]
        AUDIT[审计日志]
    end
    
    subgraph 评估与监控层
        EVAL[评估引擎]
        OBS[可观测性]
        ALERT[告警系统]
    end
    
    UI --> CTX --> PLAN
    MM --> CTX
    PLAN --> EXEC --> CALL
    POLICY --> EXEC
    STM <--> EXEC
    LTM <--> PLAN
    REG --> CALL
    CALL --> NORM
    AUTH --> CALL
    SANDBOX --> EXEC
    AUDIT --> OBS
    EVAL --> ALERT

1. 交互层

交互层是Agent与用户/外部系统的接口边界,负责输入解析、上下文管理和输出格式化。

1.1 会话上下文与回合管理

会话管理的核心挑战在于如何在有限的上下文窗口内维护有效的对话历史:

from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum

class MessageRole(Enum):
    USER = "user"
    ASSISTANT = "assistant"
    SYSTEM = "system"
    TOOL = "tool"

@dataclass
class Message:
    role: MessageRole
    content: str
    metadata: dict = field(default_factory=dict)
    token_count: int = 0

@dataclass
class ConversationContext:
    messages: List[Message] = field(default_factory=list)
    max_tokens: int = 8000
    current_tokens: int = 0
    
    def add_message(self, message: Message) -> None:
        self.messages.append(message)
        self.current_tokens += message.token_count
        self._truncate_if_needed()
    
    def _truncate_if_needed(self) -> None:
        """滑动窗口策略:保留系统提示和最近消息"""
        while self.current_tokens > self.max_tokens and len(self.messages) > 2:
            # 保留第一条系统消息,移除最早的用户/助手消息
            removed = self.messages.pop(1)
            self.current_tokens -= removed.token_count
    
    def get_context_for_llm(self) -> List[dict]:
        """生成LLM调用所需的消息格式"""
        return [{"role": m.role.value, "content": m.content} for m in self.messages]

上下文管理策略对比:

策略优点缺点适用场景
滑动窗口实现简单,实时性好可能丢失关键历史短对话、实时交互
摘要压缩保留语义,节省token摘要可能失真长对话、复杂任务
分层存储灵活,可按需检索实现复杂度高企业级应用
语义聚类相关性高计算开销大知识密集型任务

1.2 多模态输入输出处理

from abc import ABC, abstractmethod
from typing import Union, Any

class ModalityProcessor(ABC):
    @abstractmethod
    def encode(self, input_data: Any) -> dict:
        """将原始输入编码为Agent可处理的格式"""
        pass
    
    @abstractmethod
    def decode(self, output_data: dict) -> Any:
        """将Agent输出解码为目标格式"""
        pass

class ImageProcessor(ModalityProcessor):
    def encode(self, image_path: str) -> dict:
        # 图像编码为base64或URL引用
        import base64
        with open(image_path, "rb") as f:
            encoded = base64.b64encode(f.read()).decode()
        return {
            "type": "image",
            "content": encoded,
            "format": "base64"
        }
    
    def decode(self, output_data: dict) -> str:
        # 解析图像描述或生成结果
        return output_data.get("description", "")

class MultiModalRouter:
    """多模态输入路由器"""
    processors = {
        "text": TextProcessor(),
        "image": ImageProcessor(),
        "audio": AudioProcessor(),
        "table": TableProcessor(),
    }
    
    def process_input(self, inputs: List[dict]) -> List[dict]:
        processed = []
        for item in inputs:
            modality = item.get("type", "text")
            processor = self.processors.get(modality)
            if processor:
                processed.append(processor.encode(item["data"]))
        return processed

2. 编排与控制层

编排层是Agent的"大脑",负责任务分解、执行调度和策略决策。

2.1 任务规划与分解

flowchart LR
    INPUT[用户请求] --> DECOMPOSE[任务分解]
    DECOMPOSE --> PLAN[生成执行计划]
    PLAN --> VALIDATE[计划验证]
    VALIDATE -->|通过| EXEC[执行队列]
    VALIDATE -->|失败| REPLAN[重新规划]
    REPLAN --> PLAN

层级规划实现:

from typing import List, Optional
from dataclasses import dataclass

@dataclass
class Task:
    id: str
    description: str
    dependencies: List[str] = field(default_factory=list)
    tool_hint: Optional[str] = None
    status: str = "pending"
    result: Optional[Any] = None

class HierarchicalPlanner:
    def __init__(self, llm_client):
        self.llm = llm_client
    
    async def decompose(self, goal: str, context: dict) -> List[Task]:
        """将高层目标分解为可执行的子任务"""
        prompt = f"""
        目标: {goal}
        上下文: {context}
        
        请将目标分解为具体的子任务,输出JSON格式:
        [
            {{"id": "task_1", "description": "...", "dependencies": [], "tool_hint": "..."}},
            ...
        ]
        要求:
        1. 任务粒度适中,每个任务对应一次工具调用
        2. 明确标注任务间的依赖关系
        3. 指定推荐使用的工具
        """
        response = await self.llm.generate(prompt)
        return self._parse_tasks(response)
    
    def build_execution_graph(self, tasks: List[Task]) -> dict:
        """构建任务依赖图,支持并行执行"""
        graph = {t.id: {"task": t, "successors": []} for t in tasks}
        for task in tasks:
            for dep_id in task.dependencies:
                if dep_id in graph:
                    graph[dep_id]["successors"].append(task.id)
        return graph

2.2 执行控制器

import asyncio
from enum import Enum

class ExecutionState(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    RETRY = "retry"

class ExecutionController:
    def __init__(self, max_retries: int = 3, timeout: int = 30):
        self.max_retries = max_retries
        self.timeout = timeout
        self.execution_history = []
    
    async def execute_with_retry(self, task: Task, executor) -> Task:
        """带重试和回退的任务执行"""
        retries = 0
        last_error = None
        
        while retries <= self.max_retries:
            try:
                task.status = ExecutionState.RUNNING.value
                result = await asyncio.wait_for(
                    executor.run(task),
                    timeout=self.timeout
                )
                task.result = result
                task.status = ExecutionState.SUCCESS.value
                return task
                
            except asyncio.TimeoutError:
                last_error = "Execution timeout"
                retries += 1
            except ToolExecutionError as e:
                last_error = str(e)
                if e.is_retryable:
                    retries += 1
                    await asyncio.sleep(2 ** retries)  # 指数退避
                else:
                    break
        
        task.status = ExecutionState.FAILED.value
        task.result = {"error": last_error}
        return task
    
    async def execute_plan(self, tasks: List[Task], executor) -> List[Task]:
        """按依赖顺序执行任务计划"""
        completed = set()
        results = []
        
        while len(completed) < len(tasks):
            # 找出所有依赖已满足的任务
            ready = [
                t for t in tasks 
                if t.id not in completed 
                and all(d in completed for d in t.dependencies)
            ]
            # 并行执行就绪任务
            batch_results = await asyncio.gather(
                *[self.execute_with_retry(t, executor) for t in ready]
            )
            for task in batch_results:
                completed.add(task.id)
                results.append(task)
        
        return results

2.3 策略引擎

class PolicyEngine:
    """策略引擎:融合规则、学习和人类反馈"""
    
    def __init__(self):
        self.rules = []  # 硬编码规则
        self.learned_policies = {}  # 学习得到的策略
        self.human_overrides = {}  # 人类反馈覆盖
    
    def add_rule(self, condition: callable, action: callable, priority: int = 0):
        self.rules.append({"condition": condition, "action": action, "priority": priority})
        self.rules.sort(key=lambda x: x["priority"], reverse=True)
    
    def evaluate(self, state: dict) -> Optional[dict]:
        """评估当前状态,返回推荐动作"""
        # 1. 首先检查人类覆盖
        state_key = self._hash_state(state)
        if state_key in self.human_overrides:
            return self.human_overrides[state_key]
        
        # 2. 检查规则匹配
        for rule in self.rules:
            if rule["condition"](state):
                return rule["action"](state)
        
        # 3. 使用学习策略
        return self.learned_policies.get(state_key)
    
    def record_feedback(self, state: dict, action: dict, reward: float):
        """记录人类反馈用于策略学习"""
        state_key = self._hash_state(state)
        if reward > 0.8:  # 高置信度正反馈
            self.human_overrides[state_key] = action

3. 知识与记忆层

记忆系统是Agent实现持续学习和个性化的关键。

3.1 记忆架构

graph TB
    subgraph 短期记忆
        WM[工作记忆
当前任务上下文] CACHE[检索缓存
最近RAG结果] end subgraph 长期记忆 VDB["(向量数据库
语义检索)"] KG["(知识图谱
结构化关系)"] DOC["(文档库
原始知识)"] end subgraph 记忆控制器 WRITE[写入策略] READ[检索策略] UPDATE[更新/淘汰] end WM <--> READ CACHE <--> READ READ --> VDB READ --> KG WRITE --> VDB UPDATE --> VDB UPDATE --> KG

3.2 记忆管理实现

from datetime import datetime, timedelta
import numpy as np

@dataclass
class MemoryEntry:
    id: str
    content: str
    embedding: np.ndarray
    metadata: dict
    created_at: datetime
    accessed_at: datetime
    access_count: int = 0
    importance: float = 0.5

class MemoryManager:
    def __init__(self, vector_store, max_entries: int = 10000):
        self.vector_store = vector_store
        self.max_entries = max_entries
        self.short_term = {}  # session_id -> List[MemoryEntry]
        
    async def store(self, content: str, metadata: dict, 
                    memory_type: str = "long_term") -> str:
        """存储记忆,自动计算重要性"""
        embedding = await self._embed(content)
        importance = self._calculate_importance(content, metadata)
        
        entry = MemoryEntry(
            id=self._generate_id(),
            content=content,
            embedding=embedding,
            metadata=metadata,
            created_at=datetime.now(),
            accessed_at=datetime.now(),
            importance=importance
        )
        
        if memory_type == "long_term":
            await self.vector_store.upsert(entry)
            await self._check_and_evict()
        else:
            session_id = metadata.get("session_id")
            self.short_term.setdefault(session_id, []).append(entry)
        
        return entry.id
    
    async def retrieve(self, query: str, top_k: int = 5, 
                       filters: dict = None) -> List[MemoryEntry]:
        """混合检索:语义 + 时间衰减 + 重要性"""
        query_embedding = await self._embed(query)
        
        # 向量检索
        candidates = await self.vector_store.search(
            query_embedding, top_k=top_k * 3, filters=filters
        )
        
        # 重排序:结合相似度、时间衰减、重要性
        scored = []
        now = datetime.now()
        for entry, similarity in candidates:
            time_decay = self._time_decay(entry.accessed_at, now)
            final_score = (
                0.5 * similarity + 
                0.3 * entry.importance + 
                0.2 * time_decay
            )
            scored.append((entry, final_score))
        
        scored.sort(key=lambda x: x[1], reverse=True)
        return [entry for entry, _ in scored[:top_k]]
    
    def _time_decay(self, last_access: datetime, now: datetime) -> float:
        """时间衰减函数"""
        hours_elapsed = (now - last_access).total_seconds() / 3600
        return np.exp(-0.1 * hours_elapsed)
    
    async def _check_and_evict(self):
        """基于LRU + 重要性的淘汰策略"""
        count = await self.vector_store.count()
        if count > self.max_entries:
            # 淘汰低重要性、长时间未访问的条目
            evict_count = count - int(self.max_entries * 0.9)
            await self.vector_store.evict_least_important(evict_count)

3.3 记忆一致性与冲突处理

class MemoryConsistencyManager:
    """处理记忆冲突和一致性"""
    
    async def detect_conflicts(self, new_entry: MemoryEntry) -> List[MemoryEntry]:
        """检测与新记忆冲突的已有记忆"""
        # 检索语义相似的记忆
        similar = await self.memory_manager.retrieve(
            new_entry.content, top_k=10
        )
        
        conflicts = []
        for existing in similar:
            if self._is_contradictory(new_entry.content, existing.content):
                conflicts.append(existing)
        return conflicts
    
    async def resolve_conflict(self, new_entry: MemoryEntry, 
                               conflicts: List[MemoryEntry]) -> str:
        """冲突解决策略"""
        # 策略1: 时间优先 - 新信息覆盖旧信息
        # 策略2: 权威优先 - 根据来源可信度
        # 策略3: 合并 - 使用LLM整合信息
        
        resolution_prompt = f"""
        新信息: {new_entry.content}
        已有信息: {[c.content for c in conflicts]}
        
        请判断是否存在冲突,如果存在,请给出整合后的准确信息。
        """
        merged = await self.llm.generate(resolution_prompt)
        return merged