2. 编排协作模式:集中式编排 vs. 分布式协作
2.1 集中式编排模式
集中式编排模式采用单一的控制中心来协调所有Agent的行为,类似于传统的微服务架构中的API网关。
架构特征:
┌─────────────┐
│ 编排中心 │
│ Orchestrator │
└─────┬───────┘
│
┌─────────────┼─────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ Agent A │ │ Agent B │ │ Agent C │
└─────────┘ └─────────┘ └─────────┘核心实现:
class CentralizedOrchestrator:
def __init__(self, router, agents, coordinator):
self.router = router
self.agents = agents
self.coordinator = coordinator
async def process_complex_task(self, task):
subtasks = self.router.decompose(task)
results = []
for subtask in subtasks:
agent = self.router.select_agent(subtask)
results.append(await agent.execute(subtask))
return await self.coordinator.synthesize(results)核心组件深入分析
LLM Agent的架构可以抽象为六个核心层次,每一层承担不同的职责,相互协作完成复杂任务。下图展示了完整的组件架构:
graph TB
subgraph 交互层
UI[对话UI]
CTX[会话上下文管理]
MM[多模态处理器]
end
subgraph 编排与控制层
PLAN[任务规划器]
EXEC[执行控制器]
POLICY[策略引擎]
end
subgraph 知识与记忆层
STM[短期记忆]
LTM[长期记忆]
KG[知识图谱]
end
subgraph 工具与适配层
REG[工具注册中心]
CALL[调用适配器]
NORM[结果标准化]
end
subgraph 安全与治理层
AUTH[权限管理]
SANDBOX[沙箱隔离]
AUDIT[审计日志]
end
subgraph 评估与监控层
EVAL[评估引擎]
OBS[可观测性]
ALERT[告警系统]
end
UI --> CTX --> PLAN
MM --> CTX
PLAN --> EXEC --> CALL
POLICY --> EXEC
STM <--> EXEC
LTM <--> PLAN
REG --> CALL
CALL --> NORM
AUTH --> CALL
SANDBOX --> EXEC
AUDIT --> OBS
EVAL --> ALERT1. 交互层
交互层是Agent与用户/外部系统的接口边界,负责输入解析、上下文管理和输出格式化。
1.1 会话上下文与回合管理
会话管理的核心挑战在于如何在有限的上下文窗口内维护有效的对话历史:
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
class MessageRole(Enum):
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
TOOL = "tool"
@dataclass
class Message:
role: MessageRole
content: str
metadata: dict = field(default_factory=dict)
token_count: int = 0
@dataclass
class ConversationContext:
messages: List[Message] = field(default_factory=list)
max_tokens: int = 8000
current_tokens: int = 0
def add_message(self, message: Message) -> None:
self.messages.append(message)
self.current_tokens += message.token_count
self._truncate_if_needed()
def _truncate_if_needed(self) -> None:
"""滑动窗口策略:保留系统提示和最近消息"""
while self.current_tokens > self.max_tokens and len(self.messages) > 2:
# 保留第一条系统消息,移除最早的用户/助手消息
removed = self.messages.pop(1)
self.current_tokens -= removed.token_count
def get_context_for_llm(self) -> List[dict]:
"""生成LLM调用所需的消息格式"""
return [{"role": m.role.value, "content": m.content} for m in self.messages]上下文管理策略对比:
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 滑动窗口 | 实现简单,实时性好 | 可能丢失关键历史 | 短对话、实时交互 |
| 摘要压缩 | 保留语义,节省token | 摘要可能失真 | 长对话、复杂任务 |
| 分层存储 | 灵活,可按需检索 | 实现复杂度高 | 企业级应用 |
| 语义聚类 | 相关性高 | 计算开销大 | 知识密集型任务 |
1.2 多模态输入输出处理
from abc import ABC, abstractmethod
from typing import Union, Any
class ModalityProcessor(ABC):
@abstractmethod
def encode(self, input_data: Any) -> dict:
"""将原始输入编码为Agent可处理的格式"""
pass
@abstractmethod
def decode(self, output_data: dict) -> Any:
"""将Agent输出解码为目标格式"""
pass
class ImageProcessor(ModalityProcessor):
def encode(self, image_path: str) -> dict:
# 图像编码为base64或URL引用
import base64
with open(image_path, "rb") as f:
encoded = base64.b64encode(f.read()).decode()
return {
"type": "image",
"content": encoded,
"format": "base64"
}
def decode(self, output_data: dict) -> str:
# 解析图像描述或生成结果
return output_data.get("description", "")
class MultiModalRouter:
"""多模态输入路由器"""
processors = {
"text": TextProcessor(),
"image": ImageProcessor(),
"audio": AudioProcessor(),
"table": TableProcessor(),
}
def process_input(self, inputs: List[dict]) -> List[dict]:
processed = []
for item in inputs:
modality = item.get("type", "text")
processor = self.processors.get(modality)
if processor:
processed.append(processor.encode(item["data"]))
return processed2. 编排与控制层
编排层是Agent的"大脑",负责任务分解、执行调度和策略决策。
2.1 任务规划与分解
flowchart LR
INPUT[用户请求] --> DECOMPOSE[任务分解]
DECOMPOSE --> PLAN[生成执行计划]
PLAN --> VALIDATE[计划验证]
VALIDATE -->|通过| EXEC[执行队列]
VALIDATE -->|失败| REPLAN[重新规划]
REPLAN --> PLAN层级规划实现:
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class Task:
id: str
description: str
dependencies: List[str] = field(default_factory=list)
tool_hint: Optional[str] = None
status: str = "pending"
result: Optional[Any] = None
class HierarchicalPlanner:
def __init__(self, llm_client):
self.llm = llm_client
async def decompose(self, goal: str, context: dict) -> List[Task]:
"""将高层目标分解为可执行的子任务"""
prompt = f"""
目标: {goal}
上下文: {context}
请将目标分解为具体的子任务,输出JSON格式:
[
{{"id": "task_1", "description": "...", "dependencies": [], "tool_hint": "..."}},
...
]
要求:
1. 任务粒度适中,每个任务对应一次工具调用
2. 明确标注任务间的依赖关系
3. 指定推荐使用的工具
"""
response = await self.llm.generate(prompt)
return self._parse_tasks(response)
def build_execution_graph(self, tasks: List[Task]) -> dict:
"""构建任务依赖图,支持并行执行"""
graph = {t.id: {"task": t, "successors": []} for t in tasks}
for task in tasks:
for dep_id in task.dependencies:
if dep_id in graph:
graph[dep_id]["successors"].append(task.id)
return graph2.2 执行控制器
import asyncio
from enum import Enum
class ExecutionState(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
RETRY = "retry"
class ExecutionController:
def __init__(self, max_retries: int = 3, timeout: int = 30):
self.max_retries = max_retries
self.timeout = timeout
self.execution_history = []
async def execute_with_retry(self, task: Task, executor) -> Task:
"""带重试和回退的任务执行"""
retries = 0
last_error = None
while retries <= self.max_retries:
try:
task.status = ExecutionState.RUNNING.value
result = await asyncio.wait_for(
executor.run(task),
timeout=self.timeout
)
task.result = result
task.status = ExecutionState.SUCCESS.value
return task
except asyncio.TimeoutError:
last_error = "Execution timeout"
retries += 1
except ToolExecutionError as e:
last_error = str(e)
if e.is_retryable:
retries += 1
await asyncio.sleep(2 ** retries) # 指数退避
else:
break
task.status = ExecutionState.FAILED.value
task.result = {"error": last_error}
return task
async def execute_plan(self, tasks: List[Task], executor) -> List[Task]:
"""按依赖顺序执行任务计划"""
completed = set()
results = []
while len(completed) < len(tasks):
# 找出所有依赖已满足的任务
ready = [
t for t in tasks
if t.id not in completed
and all(d in completed for d in t.dependencies)
]
# 并行执行就绪任务
batch_results = await asyncio.gather(
*[self.execute_with_retry(t, executor) for t in ready]
)
for task in batch_results:
completed.add(task.id)
results.append(task)
return results2.3 策略引擎
class PolicyEngine:
"""策略引擎:融合规则、学习和人类反馈"""
def __init__(self):
self.rules = [] # 硬编码规则
self.learned_policies = {} # 学习得到的策略
self.human_overrides = {} # 人类反馈覆盖
def add_rule(self, condition: callable, action: callable, priority: int = 0):
self.rules.append({"condition": condition, "action": action, "priority": priority})
self.rules.sort(key=lambda x: x["priority"], reverse=True)
def evaluate(self, state: dict) -> Optional[dict]:
"""评估当前状态,返回推荐动作"""
# 1. 首先检查人类覆盖
state_key = self._hash_state(state)
if state_key in self.human_overrides:
return self.human_overrides[state_key]
# 2. 检查规则匹配
for rule in self.rules:
if rule["condition"](state):
return rule["action"](state)
# 3. 使用学习策略
return self.learned_policies.get(state_key)
def record_feedback(self, state: dict, action: dict, reward: float):
"""记录人类反馈用于策略学习"""
state_key = self._hash_state(state)
if reward > 0.8: # 高置信度正反馈
self.human_overrides[state_key] = action3. 知识与记忆层
记忆系统是Agent实现持续学习和个性化的关键。
3.1 记忆架构
graph TB
subgraph 短期记忆
WM[工作记忆
当前任务上下文]
CACHE[检索缓存
最近RAG结果]
end
subgraph 长期记忆
VDB["(向量数据库
语义检索)"]
KG["(知识图谱
结构化关系)"]
DOC["(文档库
原始知识)"]
end
subgraph 记忆控制器
WRITE[写入策略]
READ[检索策略]
UPDATE[更新/淘汰]
end
WM <--> READ
CACHE <--> READ
READ --> VDB
READ --> KG
WRITE --> VDB
UPDATE --> VDB
UPDATE --> KG3.2 记忆管理实现
from datetime import datetime, timedelta
import numpy as np
@dataclass
class MemoryEntry:
id: str
content: str
embedding: np.ndarray
metadata: dict
created_at: datetime
accessed_at: datetime
access_count: int = 0
importance: float = 0.5
class MemoryManager:
def __init__(self, vector_store, max_entries: int = 10000):
self.vector_store = vector_store
self.max_entries = max_entries
self.short_term = {} # session_id -> List[MemoryEntry]
async def store(self, content: str, metadata: dict,
memory_type: str = "long_term") -> str:
"""存储记忆,自动计算重要性"""
embedding = await self._embed(content)
importance = self._calculate_importance(content, metadata)
entry = MemoryEntry(
id=self._generate_id(),
content=content,
embedding=embedding,
metadata=metadata,
created_at=datetime.now(),
accessed_at=datetime.now(),
importance=importance
)
if memory_type == "long_term":
await self.vector_store.upsert(entry)
await self._check_and_evict()
else:
session_id = metadata.get("session_id")
self.short_term.setdefault(session_id, []).append(entry)
return entry.id
async def retrieve(self, query: str, top_k: int = 5,
filters: dict = None) -> List[MemoryEntry]:
"""混合检索:语义 + 时间衰减 + 重要性"""
query_embedding = await self._embed(query)
# 向量检索
candidates = await self.vector_store.search(
query_embedding, top_k=top_k * 3, filters=filters
)
# 重排序:结合相似度、时间衰减、重要性
scored = []
now = datetime.now()
for entry, similarity in candidates:
time_decay = self._time_decay(entry.accessed_at, now)
final_score = (
0.5 * similarity +
0.3 * entry.importance +
0.2 * time_decay
)
scored.append((entry, final_score))
scored.sort(key=lambda x: x[1], reverse=True)
return [entry for entry, _ in scored[:top_k]]
def _time_decay(self, last_access: datetime, now: datetime) -> float:
"""时间衰减函数"""
hours_elapsed = (now - last_access).total_seconds() / 3600
return np.exp(-0.1 * hours_elapsed)
async def _check_and_evict(self):
"""基于LRU + 重要性的淘汰策略"""
count = await self.vector_store.count()
if count > self.max_entries:
# 淘汰低重要性、长时间未访问的条目
evict_count = count - int(self.max_entries * 0.9)
await self.vector_store.evict_least_important(evict_count)3.3 记忆一致性与冲突处理
class MemoryConsistencyManager:
"""处理记忆冲突和一致性"""
async def detect_conflicts(self, new_entry: MemoryEntry) -> List[MemoryEntry]:
"""检测与新记忆冲突的已有记忆"""
# 检索语义相似的记忆
similar = await self.memory_manager.retrieve(
new_entry.content, top_k=10
)
conflicts = []
for existing in similar:
if self._is_contradictory(new_entry.content, existing.content):
conflicts.append(existing)
return conflicts
async def resolve_conflict(self, new_entry: MemoryEntry,
conflicts: List[MemoryEntry]) -> str:
"""冲突解决策略"""
# 策略1: 时间优先 - 新信息覆盖旧信息
# 策略2: 权威优先 - 根据来源可信度
# 策略3: 合并 - 使用LLM整合信息
resolution_prompt = f"""
新信息: {new_entry.content}
已有信息: {[c.content for c in conflicts]}
请判断是否存在冲突,如果存在,请给出整合后的准确信息。
"""
merged = await self.llm.generate(resolution_prompt)
return merged