OpenClaw 核心流程 Part 1:从 WebSocket 到 Agent — Gateway 路径全解析
在上一个系列中,我们梳理了整体架构。现在我们深入一层——追踪一条消息从到达到响应的完整代码路径。本文是逐函数的详细 walkthrough。
- Part 1(本文): Gateway — WebSocket 连接、
chat.send、dispatch 到 agent - Part 2:双循环 — pi-agent-core 的状态机
- Part 3:Tools、Memory 与返回路径
服务器启动
文件: src/gateway/server.impl.ts
一切从 startGatewayServer(port = 18789) 开始。服务器以 noServer 模式创建 WebSocketServer,最大 payload 限制为 25MB,挂载 HTTP upgrade 处理器,并连接 agent 事件系统。
const httpServer = opts.tlsOptions
? createHttpsServer(opts.tlsOptions, handleRequest)
: createHttpServer(handleRequest);upgrade 处理器负责路由 WebSocket 连接,Canvas 路径有特殊处理(单独认证):
httpServer.on("upgrade", (req, socket, head) => {
if (canvasHost?.handleUpgrade(req, socket, head)) return;
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit("connection", ws, req);
});
});服务器启动监听后,startGatewaySidecars() 会依次启动浏览器控制、Gmail 监听器、channel 运行时(Telegram、Discord、WhatsApp……)、插件服务以及 memory 后端。
WebSocket 握手
文件: src/gateway/server/ws-connection.ts
连接建立时,服务器分配 connId = randomUUID(),发送 challenge nonce,并启动 10 秒握手计时器:
Server → Client: { type: "connect.challenge", nonce: "..." }
Client → Server: { type: "req", method: "connect", params: ConnectParams }ConnectParams 包含认证凭据、客户端版本、设备信息以及所请求的 scopes。authorizeGatewayConnect() 函数按以下顺序校验:
- 可信代理检查(如果启用)
- 基于 IP 的限流
- Tailscale 验证(如果允许)
- 通过恒定时间比较(
safeEqualSecret())匹配 token/密码
成功时:返回 hello-ok,携带服务器版本和能力信息。失败时:以相应错误码关闭 socket。
方法路由
文件: src/gateway/server-methods.ts
握手完成后,所有后续消息都通过 handleGatewayRequest() 路由:
const handler = coreGatewayHandlers[method] ?? pluginExtraHandlers[method];
if (!handler) {
respond(false, undefined, { code: "METHOD_NOT_FOUND" });
return;
}每个处理器按所需授权分类:
| 类别 | 方法 | 所需 Scope |
|---|---|---|
| READ | chat.history, health |
基本连接 |
| WRITE | chat.send, agent |
连接 + 写权限 |
| APPROVAL | exec.approval.* |
operator.approvals |
| NODE | node.invoke.result |
role === "node" |
operator.admin scope 可绕过所有检查。
chat.send 处理器
文件: src/gateway/server-methods/chat.ts
这是最热的代码路径——每条 webchat 消息都经过这里。
步骤 1:校验与清洗
if (!validateChatSendParams(params)) {
respond(false, undefined, errorShape(...));
return;
}
const sanitized = sanitizeChatSendMessageInput(params.message);清洗操作会规范化 Unicode、剥离不允许的控制字符,并拒绝含有 null 字节的消息。
步骤 2:停止命令检查
if (isChatStopCommandText(sanitized)) {
abortChatRunsForSessionKey(ops, { sessionKey, stopReason: "/stop" });
return;
}/stop 及其他中止触发词会取消该 session 的所有活跃 run。
步骤 3:附件处理
const parsed = await parseMessageWithAttachments(
inboundMessage, normalizedAttachments, { maxBytes: 5_000_000 }
);附件被解析,图片被提取,消息重新组装。
步骤 4:AbortController 设置
const abortController = new AbortController();
context.chatAbortControllers.set(clientRunId, {
controller: abortController,
sessionId: entry?.sessionId ?? clientRunId,
sessionKey: rawSessionKey,
startedAtMs: now,
expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }),
});过期时间计算使用有界超时 + 60 秒宽限期,限制在 2 分钟到 24 小时之间。
步骤 5:Fire-and-Forget 派发
void dispatchInboundMessage({
ctx, cfg, dispatcher,
replyOptions: {
runId: clientRunId,
abortSignal: abortController.signal,
images: parsedImages,
onAgentRunStart: (runId) => {
toolEventRecipients.add(runId, client.connId);
},
},
});处理器立即以 { started: true, runId } 响应。Agent 异步执行。
步骤 6:异步结果处理
.then(() => {
broadcastChatFinal({ context, runId, sessionKey, message });
})
.catch((err) => {
broadcastChatError({ context, runId, sessionKey, errorMessage: String(err) });
});Agent 事件总线
文件: src/infra/agent-events.ts
Agent 进程通过一个简单的进程内 pub/sub 总线将结果传回:
export function emitAgentEvent(event) {
const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1;
seqByRun.set(event.runId, nextSeq);
const enriched = { ...event, seq: nextSeq, ts: Date.now() };
for (const listener of listeners) {
try { listener(enriched); } catch { /* ignore */ }
}
}每个事件携带基于 run 的单调递增 seq。Gateway 检测序号间隔并广播错误,以便客户端识别丢失的事件。
返回路径:150ms 限流与背压
文件: src/gateway/server-chat.ts
createAgentEventHandler() 处理每个 agent 事件并将其路由到 WebSocket 客户端。
文本增量 — 150ms 限流
const now = Date.now();
const last = chatRunState.deltaSentAt.get(clientRunId) ?? 0;
if (now - last < 150) return; // 丢弃 — buffer 中已有最新文本
chatRunState.deltaSentAt.set(clientRunId, now);
broadcast("chat", payload, { dropIfSlow: true });在上次广播 150ms 内到达的增量会被静默丢弃。buffer 始终保存完整的累积文本,因此下次广播会携带全部内容。数据不会丢失——只是 UI 更新被限流了。
Tool 事件 — 定向路由
Tool 事件不广播给所有客户端,只发送给发起该 run 的 WebSocket 连接:
if (isToolEvent) {
const recipients = toolEventRecipients.get(evt.runId);
broadcastToConnIds("agent", toolPayload, recipients);
}除非 verbose 级别为 "full",否则 tool payload 中的 result/partialResult 字段会被剥除。
背压 — 两级机制
文件: src/gateway/server-broadcast.ts
const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES; // 50 MB
if (slow && opts?.dropIfSlow) {
continue; // 静默丢弃帧
}
if (slow) {
c.socket.close(1008, "slow consumer"); // 强制断开
}| 条件 | dropIfSlow: true(增量) |
无标志(最终消息) |
|---|---|---|
| Buffer ≤ 50MB | 正常发送 | 正常发送 |
| Buffer > 50MB | 丢弃帧 | 强制关闭 socket |
这确保了慢速客户端不会对服务器造成内存压力,同时保证最终消息能可靠到达正常客户端。
ChatRun 队列
ChatRunRegistry 为每个 session 维护一个 FIFO 队列。同一 session 的多条消息会被串行化处理——agent 按顺序处理它们:
add(sessionId, entry) // 追加到队列
peek(sessionId) // 队首条目(活跃 run)
shift(sessionId) // 完成时出队
remove(sessionId, runId) // 取消特定 run
常量
| 常量 | 值 | 用途 |
|---|---|---|
MAX_PAYLOAD_BYTES |
25 MB | WebSocket 最大帧大小 |
MAX_BUFFERED_BYTES |
50 MB | 背压阈值 |
DEFAULT_HANDSHAKE_TIMEOUT_MS |
10s | WS 握手超时 |
TICK_INTERVAL_MS |
30s | 维护间隔 |
DEDUPE_TTL_MS |
5 min | 消息去重有效期 |
| Chat history limit | 6 MB | 历史记录响应上限 |
完整路径(至此为止)
Client ──WS──► Gateway Server (port 18789)
│
├─ connect.challenge → auth → hello-ok
│
├─ chat.send
│ ├─ validate + sanitize
│ ├─ create AbortController
│ ├─ respond { started: true }
│ └─ dispatchInboundMessage() ──async──► [Agent]
│
◄── agent events (seq-numbered)
│ ├─ assistant text → 150ms throttle → broadcast (dropIfSlow)
│ ├─ tool events → targeted to initiator connId only
│ └─ lifecycle end → flush buffer → broadcast (guaranteed)下一篇:Part 2 将追踪 dispatchInboundMessage() 内部发生的事情——8 层路由引擎、system prompt 构建,以及 pi-agent-core 内部的双循环状态机。
分析基于 OpenClaw v2026.2.18。来源:github.com/openclaw/openclaw。研究来自 cryptocj.org。