Contents

OpenClaw 核心流程 Part 1:从 WebSocket 到 Agent — Gateway 路径全解析

上一个系列中,我们梳理了整体架构。现在我们深入一层——追踪一条消息从到达到响应的完整代码路径。本文是逐函数的详细 walkthrough。

/images/openclaw-core-flow.svg


服务器启动

文件: src/gateway/server.impl.ts

一切从 startGatewayServer(port = 18789) 开始。服务器以 noServer 模式创建 WebSocketServer,最大 payload 限制为 25MB,挂载 HTTP upgrade 处理器,并连接 agent 事件系统。

const httpServer = opts.tlsOptions
  ? createHttpsServer(opts.tlsOptions, handleRequest)
  : createHttpServer(handleRequest);

upgrade 处理器负责路由 WebSocket 连接,Canvas 路径有特殊处理(单独认证):

httpServer.on("upgrade", (req, socket, head) => {
  if (canvasHost?.handleUpgrade(req, socket, head)) return;
  wss.handleUpgrade(req, socket, head, (ws) => {
    wss.emit("connection", ws, req);
  });
});

服务器启动监听后,startGatewaySidecars() 会依次启动浏览器控制、Gmail 监听器、channel 运行时(Telegram、Discord、WhatsApp……)、插件服务以及 memory 后端。


WebSocket 握手

文件: src/gateway/server/ws-connection.ts

连接建立时,服务器分配 connId = randomUUID(),发送 challenge nonce,并启动 10 秒握手计时器:

Server → Client:  { type: "connect.challenge", nonce: "..." }
Client → Server:  { type: "req", method: "connect", params: ConnectParams }

ConnectParams 包含认证凭据、客户端版本、设备信息以及所请求的 scopes。authorizeGatewayConnect() 函数按以下顺序校验:

  1. 可信代理检查(如果启用)
  2. 基于 IP 的限流
  3. Tailscale 验证(如果允许)
  4. 通过恒定时间比较safeEqualSecret())匹配 token/密码

成功时:返回 hello-ok,携带服务器版本和能力信息。失败时:以相应错误码关闭 socket。


方法路由

文件: src/gateway/server-methods.ts

握手完成后,所有后续消息都通过 handleGatewayRequest() 路由:

const handler = coreGatewayHandlers[method] ?? pluginExtraHandlers[method];
if (!handler) {
  respond(false, undefined, { code: "METHOD_NOT_FOUND" });
  return;
}

每个处理器按所需授权分类:

类别 方法 所需 Scope
READ chat.history, health 基本连接
WRITE chat.send, agent 连接 + 写权限
APPROVAL exec.approval.* operator.approvals
NODE node.invoke.result role === "node"

operator.admin scope 可绕过所有检查。


chat.send 处理器

文件: src/gateway/server-methods/chat.ts

这是最热的代码路径——每条 webchat 消息都经过这里。

步骤 1:校验与清洗

if (!validateChatSendParams(params)) {
  respond(false, undefined, errorShape(...));
  return;
}
const sanitized = sanitizeChatSendMessageInput(params.message);

清洗操作会规范化 Unicode、剥离不允许的控制字符,并拒绝含有 null 字节的消息。

步骤 2:停止命令检查

if (isChatStopCommandText(sanitized)) {
  abortChatRunsForSessionKey(ops, { sessionKey, stopReason: "/stop" });
  return;
}

/stop 及其他中止触发词会取消该 session 的所有活跃 run。

步骤 3:附件处理

const parsed = await parseMessageWithAttachments(
  inboundMessage, normalizedAttachments, { maxBytes: 5_000_000 }
);

附件被解析,图片被提取,消息重新组装。

步骤 4:AbortController 设置

const abortController = new AbortController();
context.chatAbortControllers.set(clientRunId, {
  controller: abortController,
  sessionId: entry?.sessionId ?? clientRunId,
  sessionKey: rawSessionKey,
  startedAtMs: now,
  expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }),
});

过期时间计算使用有界超时 + 60 秒宽限期,限制在 2 分钟到 24 小时之间。

步骤 5:Fire-and-Forget 派发

void dispatchInboundMessage({
  ctx, cfg, dispatcher,
  replyOptions: {
    runId: clientRunId,
    abortSignal: abortController.signal,
    images: parsedImages,
    onAgentRunStart: (runId) => {
      toolEventRecipients.add(runId, client.connId);
    },
  },
});

处理器立即以 { started: true, runId } 响应。Agent 异步执行。

步骤 6:异步结果处理

.then(() => {
  broadcastChatFinal({ context, runId, sessionKey, message });
})
.catch((err) => {
  broadcastChatError({ context, runId, sessionKey, errorMessage: String(err) });
});

Agent 事件总线

文件: src/infra/agent-events.ts

Agent 进程通过一个简单的进程内 pub/sub 总线将结果传回:

export function emitAgentEvent(event) {
  const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1;
  seqByRun.set(event.runId, nextSeq);
  const enriched = { ...event, seq: nextSeq, ts: Date.now() };
  for (const listener of listeners) {
    try { listener(enriched); } catch { /* ignore */ }
  }
}

每个事件携带基于 run 的单调递增 seq。Gateway 检测序号间隔并广播错误,以便客户端识别丢失的事件。


返回路径:150ms 限流与背压

文件: src/gateway/server-chat.ts

createAgentEventHandler() 处理每个 agent 事件并将其路由到 WebSocket 客户端。

文本增量 — 150ms 限流

const now = Date.now();
const last = chatRunState.deltaSentAt.get(clientRunId) ?? 0;
if (now - last < 150) return; // 丢弃 — buffer 中已有最新文本
chatRunState.deltaSentAt.set(clientRunId, now);
broadcast("chat", payload, { dropIfSlow: true });

在上次广播 150ms 内到达的增量会被静默丢弃。buffer 始终保存完整的累积文本,因此下次广播会携带全部内容。数据不会丢失——只是 UI 更新被限流了。

Tool 事件 — 定向路由

Tool 事件广播给所有客户端,只发送给发起该 run 的 WebSocket 连接:

if (isToolEvent) {
  const recipients = toolEventRecipients.get(evt.runId);
  broadcastToConnIds("agent", toolPayload, recipients);
}

除非 verbose 级别为 "full",否则 tool payload 中的 result/partialResult 字段会被剥除。

背压 — 两级机制

文件: src/gateway/server-broadcast.ts

const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES; // 50 MB

if (slow && opts?.dropIfSlow) {
  continue; // 静默丢弃帧
}
if (slow) {
  c.socket.close(1008, "slow consumer"); // 强制断开
}
条件 dropIfSlow: true(增量) 无标志(最终消息)
Buffer ≤ 50MB 正常发送 正常发送
Buffer > 50MB 丢弃帧 强制关闭 socket

这确保了慢速客户端不会对服务器造成内存压力,同时保证最终消息能可靠到达正常客户端。


ChatRun 队列

ChatRunRegistry 为每个 session 维护一个 FIFO 队列。同一 session 的多条消息会被串行化处理——agent 按顺序处理它们:

add(sessionId, entry)    // 追加到队列
peek(sessionId)          // 队首条目(活跃 run)
shift(sessionId)         // 完成时出队
remove(sessionId, runId) // 取消特定 run

常量

常量 用途
MAX_PAYLOAD_BYTES 25 MB WebSocket 最大帧大小
MAX_BUFFERED_BYTES 50 MB 背压阈值
DEFAULT_HANDSHAKE_TIMEOUT_MS 10s WS 握手超时
TICK_INTERVAL_MS 30s 维护间隔
DEDUPE_TTL_MS 5 min 消息去重有效期
Chat history limit 6 MB 历史记录响应上限

完整路径(至此为止)

Client ──WS──► Gateway Server (port 18789)
                 │
                 ├─ connect.challenge → auth → hello-ok
                 │
                 ├─ chat.send
                 │    ├─ validate + sanitize
                 │    ├─ create AbortController
                 │    ├─ respond { started: true }
                 │    └─ dispatchInboundMessage() ──async──► [Agent]
                 │
                 ◄── agent events (seq-numbered)
                 │    ├─ assistant text → 150ms throttle → broadcast (dropIfSlow)
                 │    ├─ tool events → targeted to initiator connId only
                 │    └─ lifecycle end → flush buffer → broadcast (guaranteed)

下一篇:Part 2 将追踪 dispatchInboundMessage() 内部发生的事情——8 层路由引擎、system prompt 构建,以及 pi-agent-core 内部的双循环状态机。

分析基于 OpenClaw v2026.2.18。来源:github.com/openclaw/openclaw。研究来自 cryptocj.org