OpenClaw Core Flow Part 1: From WebSocket to Agent — The Gateway Path
In our previous series we mapped the architecture. Now we go deeper — tracing the actual code path a message takes from arrival to response. This is the function-by-function walkthrough.
- Part 1 (this post): Gateway — WebSocket connection,
chat.send, dispatch to agent - Part 2: The Double Loop — pi-agent-core’s State Machine
- Part 3: Tools, Memory & the Return Path
Server Bootstrap
File: src/gateway/server.impl.ts
Everything starts with startGatewayServer(port = 18789). The server creates a WebSocketServer in noServer mode with a 25MB max payload limit, attaches the HTTP upgrade handler, and wires the agent event system.
const httpServer = opts.tlsOptions
? createHttpsServer(opts.tlsOptions, handleRequest)
: createHttpServer(handleRequest);The upgrade handler routes WebSocket connections, with special handling for Canvas paths (authenticated separately):
httpServer.on("upgrade", (req, socket, head) => {
if (canvasHost?.handleUpgrade(req, socket, head)) return;
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit("connection", ws, req);
});
});After the server is listening, startGatewaySidecars() kicks off browser control, Gmail watcher, channel runtimes (Telegram, Discord, WhatsApp…), plugin services, and the memory backend.
WebSocket Handshake
File: src/gateway/server/ws-connection.ts
On connection, the server assigns a connId = randomUUID(), sends a challenge nonce, and starts a 10-second handshake timer:
Server → Client: { type: "connect.challenge", nonce: "..." }
Client → Server: { type: "req", method: "connect", params: ConnectParams }ConnectParams includes auth credentials, client version, device info, and requested scopes. The authorizeGatewayConnect() function validates in this order:
- Trusted proxy check (if enabled)
- Rate limiting per IP
- Tailscale verification (if allowed)
- Token/password matching via constant-time comparison (
safeEqualSecret())
On success: hello-ok with server version and capabilities. On failure: socket closed with appropriate error code.
Method Routing
File: src/gateway/server-methods.ts
After the handshake, all subsequent messages route through handleGatewayRequest():
const handler = coreGatewayHandlers[method] ?? pluginExtraHandlers[method];
if (!handler) {
respond(false, undefined, { code: "METHOD_NOT_FOUND" });
return;
}Each handler is categorized by required authorization:
| Category | Methods | Required Scope |
|---|---|---|
| READ | chat.history, health |
basic connection |
| WRITE | chat.send, agent |
connection + write |
| APPROVAL | exec.approval.* |
operator.approvals |
| NODE | node.invoke.result |
role === "node" |
The operator.admin scope bypasses all checks.
The chat.send Handler
File: src/gateway/server-methods/chat.ts
This is the hottest code path — every webchat message passes through here.
Step 1: Validate & Sanitize
if (!validateChatSendParams(params)) {
respond(false, undefined, errorShape(...));
return;
}
const sanitized = sanitizeChatSendMessageInput(params.message);Sanitization normalizes Unicode, strips disallowed control characters, and rejects messages with null bytes.
Step 2: Stop Command Check
if (isChatStopCommandText(sanitized)) {
abortChatRunsForSessionKey(ops, { sessionKey, stopReason: "/stop" });
return;
}/stop and other abort triggers cancel all active runs for that session.
Step 3: Attachment Processing
const parsed = await parseMessageWithAttachments(
inboundMessage, normalizedAttachments, { maxBytes: 5_000_000 }
);Attachments are parsed, images extracted, and the message is reassembled.
Step 4: AbortController Setup
const abortController = new AbortController();
context.chatAbortControllers.set(clientRunId, {
controller: abortController,
sessionId: entry?.sessionId ?? clientRunId,
sessionKey: rawSessionKey,
startedAtMs: now,
expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }),
});The expiry calculation uses bounded timeout + 60s grace period, clamped between 2 minutes and 24 hours.
Step 5: Fire-and-Forget Dispatch
void dispatchInboundMessage({
ctx, cfg, dispatcher,
replyOptions: {
runId: clientRunId,
abortSignal: abortController.signal,
images: parsedImages,
onAgentRunStart: (runId) => {
toolEventRecipients.add(runId, client.connId);
},
},
});The handler responds immediately with { started: true, runId }. The agent executes asynchronously.
Step 6: Async Result Handling
.then(() => {
broadcastChatFinal({ context, runId, sessionKey, message });
})
.catch((err) => {
broadcastChatError({ context, runId, sessionKey, errorMessage: String(err) });
});The Agent Event Bus
File: src/infra/agent-events.ts
The agent process communicates back through a simple in-process pub/sub bus:
export function emitAgentEvent(event) {
const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1;
seqByRun.set(event.runId, nextSeq);
const enriched = { ...event, seq: nextSeq, ts: Date.now() };
for (const listener of listeners) {
try { listener(enriched); } catch { /* ignore */ }
}
}Every event carries a monotonic seq per run. The gateway detects sequence gaps and broadcasts errors so clients can identify missed events.
The Return Path: 150ms Throttle & Backpressure
File: src/gateway/server-chat.ts
createAgentEventHandler() processes every agent event and routes it to WebSocket clients.
Text Deltas — 150ms Throttle
const now = Date.now();
const last = chatRunState.deltaSentAt.get(clientRunId) ?? 0;
if (now - last < 150) return; // DROP — buffer has latest text
chatRunState.deltaSentAt.set(clientRunId, now);
broadcast("chat", payload, { dropIfSlow: true });Deltas arriving within 150ms of the last broadcast are silently dropped. The buffer always holds the complete accumulated text, so the next broadcast carries everything. No data is lost — just UI updates are throttled.
Tool Events — Targeted Routing
Tool events are not broadcast to all clients. They go only to the WebSocket connection that initiated the run:
if (isToolEvent) {
const recipients = toolEventRecipients.get(evt.runId);
broadcastToConnIds("agent", toolPayload, recipients);
}Tool payloads are also stripped of result/partialResult unless verbose level is "full".
Backpressure — Two Tiers
File: src/gateway/server-broadcast.ts
const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES; // 50 MB
if (slow && opts?.dropIfSlow) {
continue; // Silently drop frame
}
if (slow) {
c.socket.close(1008, "slow consumer"); // Force disconnect
}| Condition | dropIfSlow: true (deltas) |
No flag (final messages) |
|---|---|---|
| Buffer ≤ 50MB | Send normally | Send normally |
| Buffer > 50MB | Drop frame | Force-close socket |
This ensures slow clients don’t cause memory pressure on the server, while final messages are guaranteed to reach healthy clients.
ChatRun Queue
The ChatRunRegistry maintains a per-session FIFO queue. Multiple messages to the same session are serialized — the agent processes them in order:
add(sessionId, entry) // Append to queue
peek(sessionId) // Front entry (active run)
shift(sessionId) // Dequeue on completion
remove(sessionId, runId) // Cancel specific run
Constants
| Constant | Value | Purpose |
|---|---|---|
MAX_PAYLOAD_BYTES |
25 MB | Max WebSocket frame |
MAX_BUFFERED_BYTES |
50 MB | Backpressure threshold |
DEFAULT_HANDSHAKE_TIMEOUT_MS |
10s | WS handshake timeout |
TICK_INTERVAL_MS |
30s | Maintenance interval |
DEDUPE_TTL_MS |
5 min | Message deduplication |
| Chat history limit | 6 MB | History response cap |
The Complete Path (So Far)
Client ──WS──► Gateway Server (port 18789)
│
├─ connect.challenge → auth → hello-ok
│
├─ chat.send
│ ├─ validate + sanitize
│ ├─ create AbortController
│ ├─ respond { started: true }
│ └─ dispatchInboundMessage() ──async──► [Agent]
│
◄── agent events (seq-numbered)
│ ├─ assistant text → 150ms throttle → broadcast (dropIfSlow)
│ ├─ tool events → targeted to initiator connId only
│ └─ lifecycle end → flush buffer → broadcast (guaranteed)Next: Part 2 traces what happens inside dispatchInboundMessage() — the 8-level routing engine, system prompt construction, and the double-loop state machine inside pi-agent-core.
Analysis based on OpenClaw v2026.2.18. Source: github.com/openclaw/openclaw. Research from cryptocj.org.