OpenClaw 核心流程第二部分:双重循环 — 路由、提示词与 Agent 状态机
第一部分 介绍了从 WebSocket 到 dispatchInboundMessage() 的 gateway 路径。本篇继续深入内部——从将消息路由到正确的 agent,到系统提示词构建,再到核心 LLM 循环。
- 第一部分:Gateway — 从 WebSocket 到 Agent 分发
- 第二部分(本篇): 路由、提示词与 Agent 状态机
- 第三部分:工具、记忆与返回路径
Channel → MsgContext 规范化
当消息从任意 channel(WhatsApp、Telegram、Discord……)到达时,会被规范化为 MsgContext——这是一个定义在 src/auto-reply/templating.ts 中、包含 80 多个字段的综合类型:
type MsgContext = {
Body?: string; // 标准消息内容
BodyForAgent?: string; // 可能包含信封/历史/上下文
Channel?: string; // "whatsapp"、"telegram" 等
From?: string; // 发送者 ID
ChatType?: string; // "direct" | "group" | "channel"
GroupId?: string;
ReplyToId?: string;
MediaPath?: string;
CommandAuthorized?: boolean; // 默认为 false(默认拒绝)
// ... 70+ 个其他字段
};src/auto-reply/reply/inbound-context.ts 中的 finalizeInboundContext() 负责执行规范化:
- 换行符规范化 — 对所有文本字段统一处理
- ChatType 标准化 — 统一为
"direct"|"group"|"channel" - BodyForAgent 选取 — 优先级链:强制值 → 已有值 → CommandBody → RawBody → Body
- CommandAuthorized — 默认为
false,即显式默认拒绝 - 媒体处理 — 将 MediaTypes 数组补齐以匹配媒体数量,默认类型为
application/octet-stream
8 层路由解析
文件: src/routing/resolve-route.ts
路由引擎按严格优先级顺序对绑定进行匹配,先匹配者获胜:
const tiers = [
{ matchedBy: "binding.peer", // 第 1 层:直接 peer 匹配
predicate: (c) => c.match.peer.state === "valid" },
{ matchedBy: "binding.peer.parent", // 第 2 层:父级 peer(线程 → 父级)
predicate: (c) => c.match.peer.state === "valid" },
{ matchedBy: "binding.guild+roles", // 第 3 层:Discord 服务器 + 身份组
predicate: (c) => hasGuildConstraint(c.match) && hasRolesConstraint(c.match) },
{ matchedBy: "binding.guild", // 第 4 层:无身份组的服务器
predicate: (c) => hasGuildConstraint(c.match) && !hasRolesConstraint(c.match) },
{ matchedBy: "binding.team", // 第 5 层:Slack 工作区
predicate: (c) => hasTeamConstraint(c.match) },
{ matchedBy: "binding.account", // 第 6 层:账号(非通配符)
predicate: (c) => c.match.accountPattern !== "*" },
{ matchedBy: "binding.channel", // 第 7 层:Channel 通配符
predicate: (c) => c.match.accountPattern === "*" },
];
// 第 8 层:resolveDefaultAgentId(cfg) — 全局回退
绑定会按 (channel, accountId) 对预先求值并缓存,使用 WeakMap + LRU 淘汰策略,上限为 2000 个键。
会话键生成
会话键的格式依据 dmScope 确定:
dmScope="main": agent:{id}:main
dmScope="per-peer": agent:{id}:direct:{peerId}
dmScope="per-channel-peer": agent:{id}:{channel}:direct:{peerId}
dmScope="per-account-channel-peer": agent:{id}:{channel}:{account}:direct:{peerId}身份链接 支持跨 channel 用户合并。如果 Alice 同时使用 Telegram 和 WhatsApp,identityLinks 会将两个 ID 映射到同一个规范名称——共享同一个会话和记忆。
分发管线
dispatchReplyFromConfig() 编排完整的入站流程:
- 去重检查 — 跳过重复的入站消息
message_received钩子 — 异步触发(void,并行执行)- 模型解析 — 触发
before_model_resolve钩子(可修改,顺序执行) - Workspace + 会话初始化
- 媒体/链接理解 应用
- 指令解析 — 处理用户命令,如
/think、/model before_prompt_build钩子 触发runPreparedReply()— 组装 40 多个参数用于 agent 执行runReplyAgent()— 实际的 agent 轮次
OpenClaw → pi-agent-core 桥接
文件: src/agents/pi-embedded-runner/run/attempt.ts
这是 OpenClaw 调用 pi-coding-agent SDK 的入口:
const { builtInTools, customTools } = splitSdkTools({ tools, sandboxEnabled });
const { session } = await createAgentSession({
cwd: resolvedWorkspace,
model: params.model,
thinkingLevel: mapThinkingLevel(params.thinkLevel),
tools: builtInTools,
customTools: [...customTools, ...clientToolDefs],
sessionManager,
settingsManager,
});OpenClaw 随后订阅会话的事件流:
const { unsubscribe } = subscribeEmbeddedPiSession({
session,
onBlockReply: ({ text, mediaUrls }) => { /* 流式传输给用户 */ },
onToolResult: ({ text }) => { /* 发出工具事件 */ },
onReasoningStream: ({ text }) => { /* 流式传输思考过程 */ },
});深入 pi-agent-core:Agent 类
文件: packages/agent/src/agent.ts(pi-mono 仓库)
export class Agent {
private _state: AgentState;
private steeringQueue: AgentMessage[] = [];
private followUpQueue: AgentMessage[] = [];
private abortController?: AbortController;
async prompt(input) {
if (this._state.isStreaming) {
throw new Error("Agent is already processing. Use steer() or followUp().");
}
await this._runLoop(messages);
}
steer(m: AgentMessage) { this.steeringQueue.push(m); }
followUp(m: AgentMessage) { this.followUpQueue.push(m); }
abort() { this.abortController?.abort(); }
}三种控制方法:
prompt()— 开始新的轮次(若已在流式传输则阻塞)steer()— 在运行中途打断,在当前工具完成后注入,跳过剩余工具followUp()— 将工作排入队列,在当前运行完成后执行
双重循环
文件: packages/agent/src/agent-loop.ts
这是核心状态机。它是一个嵌套循环——内层循环处理工具调用 + 转向,外层循环处理后续消息:
外层循环(后续消息): while(true) {
内层循环(工具调用 + 转向): while(hasMoreToolCalls || pendingMessages) {
[1] 将待处理的转向/后续消息注入上下文
[2] streamAssistantResponse() → 调用 LLM
[3] 检查 stopReason:error/aborted → 退出
[4] 从响应中提取工具调用
[5] 若有工具:executeToolCalls() → 每次调用间检查转向消息
[6] 发出 turn_end
[7] 轮询 getSteeringMessages()
}
[8] 检查 getFollowUpMessages()
若存在后续消息 → 继续外层循环
否则 → 跳出
}
emit agent_endstreamAssistantResponse() — LLM 调用
async function streamAssistantResponse(context, config, signal, stream) {
// 1. 应用 transformContext(扩展修改上下文)
let messages = config.transformContext
? await config.transformContext(context.messages, signal)
: context.messages;
// 2. 将 AgentMessage 转换为 LLM Message
const llmMessages = await config.convertToLlm(messages);
// 3. 从 LLM 流式获取响应
const response = await streamFn(config.model, { systemPrompt, messages: llmMessages, tools }, { signal });
// 4. 处理流式事件
for await (const event of response) {
switch (event.type) {
case "start": stream.push({ type: "message_start", message: event.partial }); break;
case "text_delta": stream.push({ type: "message_update", ... }); break;
case "done": stream.push({ type: "message_end", message: finalMessage }); break;
}
}
}消息转换:convertToLlm()
自定义的 AgentMessage 类型被规范化为标准 LLM Message[]:
| AgentMessage 角色 | 转换结果 |
|---|---|
user、assistant、toolResult |
直接透传,不作修改 |
bashExecution |
转为带文本表示的 user 消息 |
branchSummary |
转为 user 消息,内容为 BRANCH_SUMMARY_PREFIX + summary |
compactionSummary |
转为 user 消息,内容为 COMPACTION_SUMMARY_PREFIX + summary |
custom |
转为带规范化内容的 user 消息 |
这种两阶段方式(transformContext → convertToLlm)允许扩展在保持 LLM 兼容性的同时,操作富类型的 AgentMessage[] 格式。
带转向功能的工具执行
async function executeToolCalls(tools, assistantMessage, signal, stream, getSteeringMessages) {
for (let i = 0; i < toolCalls.length; i++) {
const toolCall = toolCalls[i];
stream.push({ type: "tool_execution_start", ... });
try {
const validatedArgs = validateToolArguments(tool, toolCall);
result = await tool.execute(toolCall.id, validatedArgs, signal, (partial) => {
stream.push({ type: "tool_execution_update", partialResult: partial });
});
} catch (e) {
result = { content: [{ type: "text", text: e.message }] };
isError = true; // LLM 可见错误,可重试
}
stream.push({ type: "tool_execution_end", ... });
// 工具调用之间的转向检查:
const steering = await getSteeringMessages();
if (steering.length > 0) {
// 跳过剩余工具,注入转向消息
for (const skipped of toolCalls.slice(i + 1)) {
results.push(skipToolCall(skipped));
}
return { toolResults: results, steeringMessages: steering };
}
}
}工具调用之间的转向检查是实现用户打断的关键机制。当用户发送转向消息时,剩余的工具调用会被跳过(使用占位结果),新消息会在下一次 LLM 轮次之前注入。
错误恢复
| 错误类型 | 处理方式 |
|---|---|
| 工具执行错误 | 捕获后作为 toolResult 返回,isError: true。LLM 可见该错误并可重试。 |
| LLM 流式传输错误 | 返回 stopReason: "error",内层循环退出。 |
| 上下文溢出 | 压缩系统分阶段对较旧的消息进行摘要。若摘要失败则回退为仅元数据模式。 |
| 中止 | AbortController.abort() 传播至工具执行和 LLM 流式传输,以 stopReason: "aborted" 干净退出。 |
| 校验错误 | validateToolArguments() 失败结果作为工具返回值返回——LLM 使用修正后的参数重试。 |
事件流汇总
Agent._runLoop()
└─ agentLoop() → EventStream<AgentEvent>
└─ runLoop() — 双重循环
├─ streamAssistantResponse()
│ ├─ transformContext()
│ ├─ convertToLlm()
│ └─ streamFn(model, context) → LLM API
│
└─ executeToolCalls()
└─ tool.execute() → 各工具结果
└─ getSteeringMessages() → 检查打断
事件流向:
agentLoop → EventStream → Agent._runLoop() → Agent.emit()
→ AgentSession._handleAgentEvent()
→ OpenClaw subscribeEmbeddedPiSession()
→ emitAgentEvent() → Gateway 事件总线
→ createAgentEventHandler() → WebSocket 广播每一层都会增加相应处理:会话持久化、扩展钩子、块回复分块、用量追踪、150ms 限流以及背压控制。
下一篇:第三部分 将介绍工具层——sandbox Docker 执行、SSRF 防护的 web fetch、带 MMR 重排序的混合记忆搜索,以及 13 步钩子执行顺序。
分析基于 OpenClaw v2026.2.18 与 pi-mono v0.53.0。资料来源:openclaw/openclaw、badlogic/pi-mono。研究来自 cryptocj.org。