fix(webchat): reconnect gateway ws

main
Peter Steinberger 2025-12-14 03:26:38 +00:00
parent 7a87f3cfb8
commit 7dbd5acbb1
2 changed files with 202 additions and 43 deletions

View File

@ -43,10 +43,29 @@ class GatewaySocket {
this.ws = null; this.ws = null;
this.pending = new Map(); this.pending = new Map();
this.handlers = new Map(); this.handlers = new Map();
this.connectPromise = null;
this.reconnectTimer = null;
this.retryMs = 500;
this.maxRetryMs = 10_000;
this.hello = null;
} }
async connect() { async connect() {
return new Promise((resolve, reject) => { if (this.ws && this.ws.readyState === WebSocket.OPEN && this.hello) {
return this.hello;
}
if (this.connectPromise) return this.connectPromise;
this.connectPromise = new Promise((resolve, reject) => {
let settled = false;
const settle = (err, value) => {
if (settled) return;
settled = true;
this.connectPromise = null;
if (err) reject(err);
else resolve(value);
};
const ws = new WebSocket(this.url); const ws = new WebSocket(this.url);
this.ws = ws; this.ws = ws;
@ -66,24 +85,41 @@ class GatewaySocket {
caps: [], caps: [],
}; };
ws.send(JSON.stringify({ type: "req", id, method: "connect", params })); ws.send(JSON.stringify({ type: "req", id, method: "connect", params }));
this.pending.set(id, { resolve, reject, _handshake: true }); this.pending.set(id, {
resolve: (value) => settle(null, value),
reject: (err) => settle(err),
_handshake: true,
});
}; };
ws.onerror = (err) => { ws.onerror = (err) => {
logStatus(`ws: error ${formatError(err)}`); logStatus(`ws: error ${formatError(err)}`);
reject(err); settle(err);
}; };
ws.onclose = (ev) => { ws.onclose = (ev) => {
logStatus( logStatus(
`ws: close code=${ev.code} reason=${ev.reason || "n/a"} clean=${ev.wasClean}`, `ws: close code=${ev.code} reason=${ev.reason || "n/a"} clean=${ev.wasClean}`,
); );
if (this.ws === ws) {
this.ws = null;
this.hello = null;
}
if (this.pending.size > 0) { if (this.pending.size > 0) {
for (const [, p] of this.pending) for (const [, p] of this.pending)
p.reject(new Error("gateway closed")); p.reject(new Error("gateway closed"));
this.pending.clear(); this.pending.clear();
} }
if (ev.code !== 1000) reject(new Error(`gateway closed ${ev.code}`));
if (ev.code !== 1000) {
settle(new Error(`gateway closed ${ev.code}`));
} else {
settle(new Error("gateway closed"));
}
this.scheduleReconnect();
}; };
ws.onmessage = (ev) => { ws.onmessage = (ev) => {
@ -109,6 +145,12 @@ class GatewaySocket {
`ws: hello-ok presence=${helloOk?.snapshot?.presence?.length ?? 0} healthOk=${helloOk?.snapshot?.health?.ok ?? "n/a"}`, `ws: hello-ok presence=${helloOk?.snapshot?.presence?.length ?? 0} healthOk=${helloOk?.snapshot?.health?.ok ?? "n/a"}`,
); );
this.handlers.set("snapshot", helloOk.snapshot); this.handlers.set("snapshot", helloOk.snapshot);
this.hello = helloOk;
this.retryMs = 500;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
pending.resolve(helloOk); pending.resolve(helloOk);
} else { } else {
pending.resolve(msg.payload); pending.resolve(msg.payload);
@ -119,28 +161,73 @@ class GatewaySocket {
} }
}; };
}); });
return this.connectPromise;
} }
on(event, handler) { on(event, handler) {
this.handlers.set(event, handler); this.handlers.set(event, handler);
} }
async request(method, params, { timeoutMs = 30_000 } = {}) { scheduleReconnect() {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { if (this.reconnectTimer) return;
const delay = this.retryMs;
this.retryMs = Math.min(this.retryMs * 2, this.maxRetryMs);
logStatus(`ws: reconnect in ${delay}ms`);
this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = null;
try {
await this.connect();
} catch {
this.scheduleReconnect();
}
}, delay);
}
async ensureConnected() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) return;
try {
await this.connect();
} catch (err) {
logStatus(`ws: connect failed ${formatError(err)}`);
this.scheduleReconnect();
throw new Error("gateway not connected"); throw new Error("gateway not connected");
} }
const id = randomId(); if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
const frame = { type: "req", id, method, params }; this.scheduleReconnect();
this.ws.send(JSON.stringify(frame)); throw new Error("gateway not connected");
return new Promise((resolve, reject) => { }
this.pending.set(id, { resolve, reject }); }
setTimeout(() => {
if (this.pending.has(id)) { async request(method, params, { timeoutMs = 30_000 } = {}) {
this.pending.delete(id); let lastErr = null;
reject(new Error(`${method} timed out`)); for (let attempt = 0; attempt < 2; attempt++) {
try {
await this.ensureConnected();
const ws = this.ws;
if (!ws || ws.readyState !== WebSocket.OPEN) {
throw new Error("gateway not connected");
} }
}, timeoutMs);
}); const id = randomId();
const frame = { type: "req", id, method, params };
ws.send(JSON.stringify(frame));
return await new Promise((resolve, reject) => {
this.pending.set(id, { resolve, reject });
setTimeout(() => {
if (this.pending.has(id)) {
this.pending.delete(id);
reject(new Error(`${method} timed out`));
}
}, timeoutMs);
});
} catch (err) {
lastErr = err;
this.scheduleReconnect();
}
}
throw lastErr instanceof Error ? lastErr : new Error("gateway not connected");
} }
} }

View File

@ -196592,9 +196592,26 @@ var GatewaySocket = class {
this.ws = null; this.ws = null;
this.pending = new Map(); this.pending = new Map();
this.handlers = new Map(); this.handlers = new Map();
this.connectPromise = null;
this.reconnectTimer = null;
this.retryMs = 500;
this.maxRetryMs = 1e4;
this.hello = null;
} }
async connect() { async connect() {
return new Promise((resolve, reject) => { if (this.ws && this.ws.readyState === WebSocket.OPEN && this.hello) {
return this.hello;
}
if (this.connectPromise) return this.connectPromise;
this.connectPromise = new Promise((resolve, reject) => {
let settled = false;
const settle = (err, value) => {
if (settled) return;
settled = true;
this.connectPromise = null;
if (err) reject(err);
else resolve(value);
};
const ws = new WebSocket(this.url); const ws = new WebSocket(this.url);
this.ws = ws; this.ws = ws;
ws.onopen = () => { ws.onopen = () => {
@ -196619,22 +196636,31 @@ var GatewaySocket = class {
params params
})); }));
this.pending.set(id, { this.pending.set(id, {
resolve, resolve: (value) => settle(null, value),
reject, reject: (err) => settle(err),
_handshake: true _handshake: true
}); });
}; };
ws.onerror = (err) => { ws.onerror = (err) => {
logStatus(`ws: error ${formatError(err)}`); logStatus(`ws: error ${formatError(err)}`);
reject(err); settle(err);
}; };
ws.onclose = (ev) => { ws.onclose = (ev) => {
logStatus(`ws: close code=${ev.code} reason=${ev.reason || "n/a"} clean=${ev.wasClean}`); logStatus(`ws: close code=${ev.code} reason=${ev.reason || "n/a"} clean=${ev.wasClean}`);
if (this.ws === ws) {
this.ws = null;
this.hello = null;
}
if (this.pending.size > 0) { if (this.pending.size > 0) {
for (const [, p$3] of this.pending) p$3.reject(new Error("gateway closed")); for (const [, p$3] of this.pending) p$3.reject(new Error("gateway closed"));
this.pending.clear(); this.pending.clear();
} }
if (ev.code !== 1e3) reject(new Error(`gateway closed ${ev.code}`)); if (ev.code !== 1e3) {
settle(new Error(`gateway closed ${ev.code}`));
} else {
settle(new Error("gateway closed"));
}
this.scheduleReconnect();
}; };
ws.onmessage = (ev) => { ws.onmessage = (ev) => {
let msg; let msg;
@ -196657,6 +196683,12 @@ var GatewaySocket = class {
const helloOk = msg.payload; const helloOk = msg.payload;
logStatus(`ws: hello-ok presence=${helloOk?.snapshot?.presence?.length ?? 0} healthOk=${helloOk?.snapshot?.health?.ok ?? "n/a"}`); logStatus(`ws: hello-ok presence=${helloOk?.snapshot?.presence?.length ?? 0} healthOk=${helloOk?.snapshot?.health?.ok ?? "n/a"}`);
this.handlers.set("snapshot", helloOk.snapshot); this.handlers.set("snapshot", helloOk.snapshot);
this.hello = helloOk;
this.retryMs = 500;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
pending.resolve(helloOk); pending.resolve(helloOk);
} else { } else {
pending.resolve(msg.payload); pending.resolve(msg.payload);
@ -196667,34 +196699,74 @@ var GatewaySocket = class {
} }
}; };
}); });
return this.connectPromise;
} }
on(event, handler) { on(event, handler) {
this.handlers.set(event, handler); this.handlers.set(event, handler);
} }
async request(method, params, { timeoutMs = 3e4 } = {}) { scheduleReconnect() {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { if (this.reconnectTimer) return;
const delay = this.retryMs;
this.retryMs = Math.min(this.retryMs * 2, this.maxRetryMs);
logStatus(`ws: reconnect in ${delay}ms`);
this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = null;
try {
await this.connect();
} catch {
this.scheduleReconnect();
}
}, delay);
}
async ensureConnected() {
if (this.ws && this.ws.readyState === WebSocket.OPEN) return;
try {
await this.connect();
} catch (err) {
logStatus(`ws: connect failed ${formatError(err)}`);
this.scheduleReconnect();
throw new Error("gateway not connected"); throw new Error("gateway not connected");
} }
const id = randomId(); if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
const frame = { this.scheduleReconnect();
type: "req", throw new Error("gateway not connected");
id, }
method, }
params async request(method, params, { timeoutMs = 3e4 } = {}) {
}; let lastErr = null;
this.ws.send(JSON.stringify(frame)); for (let attempt = 0; attempt < 2; attempt++) {
return new Promise((resolve, reject) => { try {
this.pending.set(id, { await this.ensureConnected();
resolve, const ws = this.ws;
reject if (!ws || ws.readyState !== WebSocket.OPEN) {
}); throw new Error("gateway not connected");
setTimeout(() => {
if (this.pending.has(id)) {
this.pending.delete(id);
reject(new Error(`${method} timed out`));
} }
}, timeoutMs); const id = randomId();
}); const frame = {
type: "req",
id,
method,
params
};
ws.send(JSON.stringify(frame));
return await new Promise((resolve, reject) => {
this.pending.set(id, {
resolve,
reject
});
setTimeout(() => {
if (this.pending.has(id)) {
this.pending.delete(id);
reject(new Error(`${method} timed out`));
}
}, timeoutMs);
});
} catch (err) {
lastErr = err;
this.scheduleReconnect();
}
}
throw lastErr instanceof Error ? lastErr : new Error("gateway not connected");
} }
}; };
var ChatTransport = class { var ChatTransport = class {