gateway: harden ws protocol and liveness

main
Peter Steinberger 2025-12-09 17:02:58 +01:00
parent 20d247b3f7
commit 72eb240c3b
6 changed files with 108 additions and 42 deletions

View File

@ -26,6 +26,8 @@ private actor GatewayChannelActor {
private var backoffMs: Double = 500 private var backoffMs: Double = 500
private var shouldReconnect = true private var shouldReconnect = true
private var lastSeq: Int? private var lastSeq: Int?
private var lastTick: Date?
private var tickIntervalMs: Double = 30_000
private let decoder = JSONDecoder() private let decoder = JSONDecoder()
private let encoder = JSONEncoder() private let encoder = JSONEncoder()
@ -49,8 +51,8 @@ private actor GatewayChannelActor {
private func sendHello() async throws { private func sendHello() async throws {
let hello: [String: Any] = [ let hello: [String: Any] = [
"type": "hello", "type": "hello",
"minProtocol": 1, "minProtocol": GATEWAY_PROTOCOL_VERSION,
"maxProtocol": 1, "maxProtocol": GATEWAY_PROTOCOL_VERSION,
"client": [ "client": [
"name": "clawdis-mac", "name": "clawdis-mac",
"version": Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "dev", "version": Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "dev",
@ -80,6 +82,12 @@ private actor GatewayChannelActor {
guard let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], guard let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
let type = obj["type"] as? String else { return false } let type = obj["type"] as? String else { return false }
if type == "hello-ok" { if type == "hello-ok" {
if let policy = obj["policy"] as? [String: Any],
let tick = policy["tickIntervalMs"] as? Double {
self.tickIntervalMs = tick
}
self.lastTick = Date()
Task { await self.watchTicks() }
NotificationCenter.default.post(name: .gatewaySnapshot, object: nil, userInfo: obj) NotificationCenter.default.post(name: .gatewaySnapshot, object: nil, userInfo: obj)
return true return true
} }
@ -134,14 +142,33 @@ private actor GatewayChannelActor {
} }
self.lastSeq = seq self.lastSeq = seq
} }
if evt.event == "tick" { self.lastTick = Date() }
NotificationCenter.default.post(name: .gatewayEvent, object: frame) NotificationCenter.default.post(name: .gatewayEvent, object: frame)
case .helloOk: case .helloOk:
self.lastTick = Date()
NotificationCenter.default.post(name: .gatewaySnapshot, object: frame) NotificationCenter.default.post(name: .gatewaySnapshot, object: frame)
default: default:
break break
} }
} }
private func watchTicks() async {
let tolerance = self.tickIntervalMs * 2
while self.connected {
try? await Task.sleep(nanoseconds: UInt64(tolerance * 1_000_000))
guard self.connected else { return }
if let last = self.lastTick {
let delta = Date().timeIntervalSince(last) * 1000
if delta > tolerance {
self.logger.error("gateway tick missed; reconnecting")
self.connected = false
await self.scheduleReconnect()
return
}
}
}
}
private func scheduleReconnect() async { private func scheduleReconnect() async {
guard self.shouldReconnect else { return } guard self.shouldReconnect else { return }
let delay = self.backoffMs / 1000 let delay = self.backoffMs / 1000

View File

@ -25,7 +25,6 @@ import { emitAgentEvent } from "../infra/agent-events.js";
import { runCommandWithTimeout } from "../process/exec.js"; import { runCommandWithTimeout } from "../process/exec.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js"; import { normalizeE164 } from "../utils.js";
import { sendViaIpc } from "../web/ipc.js";
type AgentCommandOpts = { type AgentCommandOpts = {
message: string; message: string;
@ -420,24 +419,14 @@ export async function agentCommand(
if (deliver && targetTo) { if (deliver && targetTo) {
const text = payload.text ?? ""; const text = payload.text ?? "";
const media = mediaList; const media = mediaList;
// Prefer IPC to reuse the running relay; fall back to direct web send. if (!text && media.length === 0) continue;
let sentViaIpc = false;
const ipcResult = await sendViaIpc(targetTo, text, media[0]); const primaryMedia = media[0];
if (ipcResult) {
sentViaIpc = ipcResult.success;
if (ipcResult.success && media.length > 1) {
for (const extra of media.slice(1)) {
await sendViaIpc(targetTo, "", extra);
}
}
}
if (!sentViaIpc) {
if (text || media.length === 0) {
await deps.sendMessageWhatsApp(targetTo, text, { await deps.sendMessageWhatsApp(targetTo, text, {
verbose: false, verbose: false,
mediaUrl: media[0], mediaUrl: primaryMedia,
}); });
}
for (const extra of media.slice(1)) { for (const extra of media.slice(1)) {
await deps.sendMessageWhatsApp(targetTo, "", { await deps.sendMessageWhatsApp(targetTo, "", {
verbose: false, verbose: false,
@ -446,5 +435,4 @@ export async function agentCommand(
} }
} }
} }
}
} }

View File

@ -1,5 +1,6 @@
import { randomUUID } from "node:crypto"; import { randomUUID } from "node:crypto";
import { GatewayClient } from "./client.js"; import { GatewayClient } from "./client.js";
import { PROTOCOL_VERSION } from "./protocol/index.js";
export type CallGatewayOptions = { export type CallGatewayOptions = {
url?: string; url?: string;
@ -39,8 +40,8 @@ export async function callGateway<T = unknown>(
clientVersion: opts.clientVersion ?? "dev", clientVersion: opts.clientVersion ?? "dev",
platform: opts.platform, platform: opts.platform,
mode: opts.mode ?? "cli", mode: opts.mode ?? "cli",
minProtocol: opts.minProtocol ?? 1, minProtocol: opts.minProtocol ?? PROTOCOL_VERSION,
maxProtocol: opts.maxProtocol ?? 1, maxProtocol: opts.maxProtocol ?? PROTOCOL_VERSION,
onHelloOk: async () => { onHelloOk: async () => {
try { try {
const result = await client.request<T>(opts.method, opts.params, { const result = await client.request<T>(opts.method, opts.params, {

View File

@ -39,6 +39,10 @@ export class GatewayClient {
private backoffMs = 1000; private backoffMs = 1000;
private closed = false; private closed = false;
private lastSeq: number | null = null; private lastSeq: number | null = null;
// Track last tick to detect silent stalls.
private lastTick: number | null = null;
private tickIntervalMs = 30_000;
private tickTimer: NodeJS.Timeout | null = null;
constructor(opts: GatewayClientOptions) { constructor(opts: GatewayClientOptions) {
this.opts = opts; this.opts = opts;
@ -66,6 +70,10 @@ export class GatewayClient {
stop() { stop() {
this.closed = true; this.closed = true;
if (this.tickTimer) {
clearInterval(this.tickTimer);
this.tickTimer = null;
}
this.ws?.close(); this.ws?.close();
this.ws = null; this.ws = null;
this.flushPendingErrors(new Error("gateway client stopped")); this.flushPendingErrors(new Error("gateway client stopped"));
@ -94,6 +102,12 @@ export class GatewayClient {
const parsed = JSON.parse(raw); const parsed = JSON.parse(raw);
if (parsed?.type === "hello-ok") { if (parsed?.type === "hello-ok") {
this.backoffMs = 1000; this.backoffMs = 1000;
this.tickIntervalMs =
typeof parsed.policy?.tickIntervalMs === "number"
? parsed.policy.tickIntervalMs
: 30_000;
this.lastTick = Date.now();
this.startTickWatch();
this.opts.onHelloOk?.(parsed as HelloOk); this.opts.onHelloOk?.(parsed as HelloOk);
return; return;
} }
@ -111,6 +125,9 @@ export class GatewayClient {
} }
this.lastSeq = seq; this.lastSeq = seq;
} }
if (evt.event === "tick") {
this.lastTick = Date.now();
}
this.opts.onEvent?.(evt); this.opts.onEvent?.(evt);
return; return;
} }
@ -134,6 +151,10 @@ export class GatewayClient {
private scheduleReconnect() { private scheduleReconnect() {
if (this.closed) return; if (this.closed) return;
if (this.tickTimer) {
clearInterval(this.tickTimer);
this.tickTimer = null;
}
const delay = this.backoffMs; const delay = this.backoffMs;
this.backoffMs = Math.min(this.backoffMs * 2, 30_000); this.backoffMs = Math.min(this.backoffMs * 2, 30_000);
setTimeout(() => this.start(), delay).unref(); setTimeout(() => this.start(), delay).unref();
@ -146,6 +167,19 @@ export class GatewayClient {
this.pending.clear(); this.pending.clear();
} }
private startTickWatch() {
if (this.tickTimer) clearInterval(this.tickTimer);
const interval = Math.max(this.tickIntervalMs, 1000);
this.tickTimer = setInterval(() => {
if (this.closed) return;
if (!this.lastTick) return;
const gap = Date.now() - this.lastTick;
if (gap > this.tickIntervalMs * 2) {
this.ws?.close(4000, "tick timeout");
}
}, interval);
}
async request<T = unknown>( async request<T = unknown>(
method: string, method: string,
params?: unknown, params?: unknown,

View File

@ -40,7 +40,6 @@ const METHODS = [
"status", "status",
"system-presence", "system-presence",
"system-event", "system-event",
"set-heartbeats",
"send", "send",
"agent", "agent",
]; ];
@ -54,6 +53,8 @@ export type GatewayServer = {
let presenceVersion = 1; let presenceVersion = 1;
let healthVersion = 1; let healthVersion = 1;
let seq = 0; let seq = 0;
// Track per-run sequence to detect out-of-order/lost agent events.
const agentRunSeq = new Map<string, number>();
function buildSnapshot(): Snapshot { function buildSnapshot(): Snapshot {
const presence = listSystemPresence(); const presence = listSystemPresence();
@ -147,6 +148,21 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
}, 60_000); }, 60_000);
const agentUnsub = onAgentEvent((evt) => { const agentUnsub = onAgentEvent((evt) => {
const last = agentRunSeq.get(evt.runId) ?? 0;
if (evt.seq !== last + 1) {
// Fan out an error event so clients can refresh the stream on gaps.
broadcast("agent", {
runId: evt.runId,
stream: "error",
ts: Date.now(),
data: {
reason: "seq gap",
expected: last + 1,
received: evt.seq,
},
});
}
agentRunSeq.set(evt.runId, evt.seq);
broadcast("agent", evt); broadcast("agent", evt);
}); });
@ -247,14 +263,15 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
client = { socket, hello, connId }; client = { socket, hello, connId };
clients.add(client); clients.add(client);
// synthesize presence entry for this connection // synthesize presence entry for this connection (client fingerprint)
const presenceKey = hello.client.instanceId || connId; const presenceKey = hello.client.instanceId || connId;
const remoteAddr = (
socket as WebSocket & { _socket?: { remoteAddress?: string } }
)._socket?.remoteAddress;
upsertPresence(presenceKey, { upsertPresence(presenceKey, {
host: os.hostname(), host: hello.client.name || os.hostname(),
version: ip: remoteAddr,
process.env.CLAWDIS_VERSION ?? version: hello.client.version,
process.env.npm_package_version ??
"dev",
mode: hello.client.mode, mode: hello.client.mode,
instanceId: hello.client.instanceId, instanceId: hello.client.instanceId,
reason: "connect", reason: "connect",
@ -352,10 +369,6 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
respond(true, { ok: true }, undefined); respond(true, { ok: true }, undefined);
break; break;
} }
case "set-heartbeats": {
respond(true, { ok: true }, undefined);
break;
}
case "send": { case "send": {
const p = (req.params ?? {}) as Record<string, unknown>; const p = (req.params ?? {}) as Record<string, unknown>;
if (!validateSendParams(p)) { if (!validateSendParams(p)) {

View File

@ -6,13 +6,16 @@ export type AgentEventPayload = {
data: Record<string, unknown>; data: Record<string, unknown>;
}; };
let seq = 0; // Keep per-run counters so streams stay strictly monotonic per runId.
const seqByRun = new Map<string, number>();
const listeners = new Set<(evt: AgentEventPayload) => void>(); const listeners = new Set<(evt: AgentEventPayload) => void>();
export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) { export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
const nextSeq = (seqByRun.get(event.runId) ?? 0) + 1;
seqByRun.set(event.runId, nextSeq);
const enriched: AgentEventPayload = { const enriched: AgentEventPayload = {
...event, ...event,
seq: ++seq, seq: nextSeq,
ts: Date.now(), ts: Date.now(),
}; };
for (const listener of listeners) { for (const listener of listeners) {