Contents

OpenClaw 核心流程第二部分:双重循环 — 路由、提示词与 Agent 状态机

第一部分 介绍了从 WebSocket 到 dispatchInboundMessage() 的 gateway 路径。本篇继续深入内部——从将消息路由到正确的 agent,到系统提示词构建,再到核心 LLM 循环。


Channel → MsgContext 规范化

当消息从任意 channel(WhatsApp、Telegram、Discord……)到达时,会被规范化为 MsgContext——这是一个定义在 src/auto-reply/templating.ts 中、包含 80 多个字段的综合类型:

type MsgContext = {
  Body?: string;           // 标准消息内容
  BodyForAgent?: string;   // 可能包含信封/历史/上下文
  Channel?: string;        // "whatsapp"、"telegram" 等
  From?: string;           // 发送者 ID
  ChatType?: string;       // "direct" | "group" | "channel"
  GroupId?: string;
  ReplyToId?: string;
  MediaPath?: string;
  CommandAuthorized?: boolean;  // 默认为 false(默认拒绝)
  // ... 70+ 个其他字段
};

src/auto-reply/reply/inbound-context.ts 中的 finalizeInboundContext() 负责执行规范化:

  1. 换行符规范化 — 对所有文本字段统一处理
  2. ChatType 标准化 — 统一为 "direct" | "group" | "channel"
  3. BodyForAgent 选取 — 优先级链:强制值 → 已有值 → CommandBody → RawBody → Body
  4. CommandAuthorized — 默认为 false,即显式默认拒绝
  5. 媒体处理 — 将 MediaTypes 数组补齐以匹配媒体数量,默认类型为 application/octet-stream

8 层路由解析

文件: src/routing/resolve-route.ts

路由引擎按严格优先级顺序对绑定进行匹配,先匹配者获胜:

const tiers = [
  { matchedBy: "binding.peer",          // 第 1 层:直接 peer 匹配
    predicate: (c) => c.match.peer.state === "valid" },
  { matchedBy: "binding.peer.parent",   // 第 2 层:父级 peer(线程 → 父级)
    predicate: (c) => c.match.peer.state === "valid" },
  { matchedBy: "binding.guild+roles",   // 第 3 层:Discord 服务器 + 身份组
    predicate: (c) => hasGuildConstraint(c.match) && hasRolesConstraint(c.match) },
  { matchedBy: "binding.guild",         // 第 4 层:无身份组的服务器
    predicate: (c) => hasGuildConstraint(c.match) && !hasRolesConstraint(c.match) },
  { matchedBy: "binding.team",          // 第 5 层:Slack 工作区
    predicate: (c) => hasTeamConstraint(c.match) },
  { matchedBy: "binding.account",       // 第 6 层:账号(非通配符)
    predicate: (c) => c.match.accountPattern !== "*" },
  { matchedBy: "binding.channel",       // 第 7 层:Channel 通配符
    predicate: (c) => c.match.accountPattern === "*" },
];
// 第 8 层:resolveDefaultAgentId(cfg) — 全局回退

绑定会按 (channel, accountId) 对预先求值并缓存,使用 WeakMap + LRU 淘汰策略,上限为 2000 个键。

会话键生成

会话键的格式依据 dmScope 确定:

dmScope="main":                   agent:{id}:main
dmScope="per-peer":               agent:{id}:direct:{peerId}
dmScope="per-channel-peer":       agent:{id}:{channel}:direct:{peerId}
dmScope="per-account-channel-peer": agent:{id}:{channel}:{account}:direct:{peerId}

身份链接 支持跨 channel 用户合并。如果 Alice 同时使用 Telegram 和 WhatsApp,identityLinks 会将两个 ID 映射到同一个规范名称——共享同一个会话和记忆。


分发管线

dispatchReplyFromConfig() 编排完整的入站流程:

  1. 去重检查 — 跳过重复的入站消息
  2. message_received 钩子 — 异步触发(void,并行执行)
  3. 模型解析 — 触发 before_model_resolve 钩子(可修改,顺序执行)
  4. Workspace + 会话初始化
  5. 媒体/链接理解 应用
  6. 指令解析 — 处理用户命令,如 /think/model
  7. before_prompt_build 钩子 触发
  8. runPreparedReply() — 组装 40 多个参数用于 agent 执行
  9. runReplyAgent() — 实际的 agent 轮次

OpenClaw → pi-agent-core 桥接

文件: src/agents/pi-embedded-runner/run/attempt.ts

这是 OpenClaw 调用 pi-coding-agent SDK 的入口:

const { builtInTools, customTools } = splitSdkTools({ tools, sandboxEnabled });

const { session } = await createAgentSession({
  cwd: resolvedWorkspace,
  model: params.model,
  thinkingLevel: mapThinkingLevel(params.thinkLevel),
  tools: builtInTools,
  customTools: [...customTools, ...clientToolDefs],
  sessionManager,
  settingsManager,
});

OpenClaw 随后订阅会话的事件流:

const { unsubscribe } = subscribeEmbeddedPiSession({
  session,
  onBlockReply: ({ text, mediaUrls }) => { /* 流式传输给用户 */ },
  onToolResult: ({ text }) => { /* 发出工具事件 */ },
  onReasoningStream: ({ text }) => { /* 流式传输思考过程 */ },
});

深入 pi-agent-core:Agent 类

文件: packages/agent/src/agent.ts(pi-mono 仓库)

export class Agent {
  private _state: AgentState;
  private steeringQueue: AgentMessage[] = [];
  private followUpQueue: AgentMessage[] = [];
  private abortController?: AbortController;

  async prompt(input) {
    if (this._state.isStreaming) {
      throw new Error("Agent is already processing. Use steer() or followUp().");
    }
    await this._runLoop(messages);
  }

  steer(m: AgentMessage)   { this.steeringQueue.push(m); }
  followUp(m: AgentMessage) { this.followUpQueue.push(m); }
  abort() { this.abortController?.abort(); }
}

三种控制方法:

  • prompt() — 开始新的轮次(若已在流式传输则阻塞)
  • steer() — 在运行中途打断,在当前工具完成后注入,跳过剩余工具
  • followUp() — 将工作排入队列,在当前运行完成后执行

双重循环

文件: packages/agent/src/agent-loop.ts

这是核心状态机。它是一个嵌套循环——内层循环处理工具调用 + 转向,外层循环处理后续消息:

外层循环(后续消息): while(true) {

  内层循环(工具调用 + 转向): while(hasMoreToolCalls || pendingMessages) {

    [1] 将待处理的转向/后续消息注入上下文
    [2] streamAssistantResponse() → 调用 LLM
    [3] 检查 stopReason:error/aborted → 退出
    [4] 从响应中提取工具调用
    [5] 若有工具:executeToolCalls() → 每次调用间检查转向消息
    [6] 发出 turn_end
    [7] 轮询 getSteeringMessages()
  }

  [8] 检查 getFollowUpMessages()
      若存在后续消息 → 继续外层循环
      否则 → 跳出
}
emit agent_end

streamAssistantResponse() — LLM 调用

async function streamAssistantResponse(context, config, signal, stream) {
  // 1. 应用 transformContext(扩展修改上下文)
  let messages = config.transformContext
    ? await config.transformContext(context.messages, signal)
    : context.messages;

  // 2. 将 AgentMessage 转换为 LLM Message
  const llmMessages = await config.convertToLlm(messages);

  // 3. 从 LLM 流式获取响应
  const response = await streamFn(config.model, { systemPrompt, messages: llmMessages, tools }, { signal });

  // 4. 处理流式事件
  for await (const event of response) {
    switch (event.type) {
      case "start":      stream.push({ type: "message_start", message: event.partial }); break;
      case "text_delta": stream.push({ type: "message_update", ... }); break;
      case "done":       stream.push({ type: "message_end", message: finalMessage }); break;
    }
  }
}

消息转换:convertToLlm()

自定义的 AgentMessage 类型被规范化为标准 LLM Message[]

AgentMessage 角色 转换结果
userassistanttoolResult 直接透传,不作修改
bashExecution 转为带文本表示的 user 消息
branchSummary 转为 user 消息,内容为 BRANCH_SUMMARY_PREFIX + summary
compactionSummary 转为 user 消息,内容为 COMPACTION_SUMMARY_PREFIX + summary
custom 转为带规范化内容的 user 消息

这种两阶段方式(transformContextconvertToLlm)允许扩展在保持 LLM 兼容性的同时,操作富类型的 AgentMessage[] 格式。


带转向功能的工具执行

async function executeToolCalls(tools, assistantMessage, signal, stream, getSteeringMessages) {
  for (let i = 0; i < toolCalls.length; i++) {
    const toolCall = toolCalls[i];
    stream.push({ type: "tool_execution_start", ... });

    try {
      const validatedArgs = validateToolArguments(tool, toolCall);
      result = await tool.execute(toolCall.id, validatedArgs, signal, (partial) => {
        stream.push({ type: "tool_execution_update", partialResult: partial });
      });
    } catch (e) {
      result = { content: [{ type: "text", text: e.message }] };
      isError = true;  // LLM 可见错误,可重试
    }

    stream.push({ type: "tool_execution_end", ... });

    // 工具调用之间的转向检查:
    const steering = await getSteeringMessages();
    if (steering.length > 0) {
      // 跳过剩余工具,注入转向消息
      for (const skipped of toolCalls.slice(i + 1)) {
        results.push(skipToolCall(skipped));
      }
      return { toolResults: results, steeringMessages: steering };
    }
  }
}

工具调用之间的转向检查是实现用户打断的关键机制。当用户发送转向消息时,剩余的工具调用会被跳过(使用占位结果),新消息会在下一次 LLM 轮次之前注入。


错误恢复

错误类型 处理方式
工具执行错误 捕获后作为 toolResult 返回,isError: true。LLM 可见该错误并可重试。
LLM 流式传输错误 返回 stopReason: "error",内层循环退出。
上下文溢出 压缩系统分阶段对较旧的消息进行摘要。若摘要失败则回退为仅元数据模式。
中止 AbortController.abort() 传播至工具执行和 LLM 流式传输,以 stopReason: "aborted" 干净退出。
校验错误 validateToolArguments() 失败结果作为工具返回值返回——LLM 使用修正后的参数重试。

事件流汇总

Agent._runLoop()
  └─ agentLoop() → EventStream<AgentEvent>
       └─ runLoop() — 双重循环
            ├─ streamAssistantResponse()
            │    ├─ transformContext()
            │    ├─ convertToLlm()
            │    └─ streamFn(model, context) → LLM API
            │
            └─ executeToolCalls()
                 └─ tool.execute() → 各工具结果
                      └─ getSteeringMessages() → 检查打断

事件流向:
  agentLoop → EventStream → Agent._runLoop() → Agent.emit()
    → AgentSession._handleAgentEvent()
      → OpenClaw subscribeEmbeddedPiSession()
        → emitAgentEvent() → Gateway 事件总线
          → createAgentEventHandler() → WebSocket 广播

每一层都会增加相应处理:会话持久化、扩展钩子、块回复分块、用量追踪、150ms 限流以及背压控制。


下一篇:第三部分 将介绍工具层——sandbox Docker 执行、SSRF 防护的 web fetch、带 MMR 重排序的混合记忆搜索,以及 13 步钩子执行顺序。

分析基于 OpenClaw v2026.2.18 与 pi-mono v0.53.0。资料来源:openclaw/openclawbadlogic/pi-mono。研究来自 cryptocj.org