From 62a7a071275bf821424c38cfea49e1f910c8c0dd Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 12 Dec 2025 18:00:25 +0000 Subject: [PATCH] fix(gateway): ack agent requests immediately --- src/gateway/server.test.ts | 18 +++--- src/gateway/server.ts | 113 ++++++++++++++++++------------------- 2 files changed, 67 insertions(+), 64 deletions(-) diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 5985af980..d76dfc478 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -481,7 +481,7 @@ describe("gateway server", () => { await server.close(); }); - test("agent ack event then final response", { timeout: 8000 }, async () => { + test("agent ack response then final response", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); ws.send( JSON.stringify({ @@ -502,11 +502,13 @@ describe("gateway server", () => { const ackP = onceMessage( ws, (o) => - o.type === "event" && - o.event === "agent" && - o.payload?.status === "accepted", + o.type === "res" && o.id === "ag1" && o.payload?.status === "accepted", + ); + const finalP = onceMessage( + ws, + (o) => + o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted", ); - const finalP = onceMessage(ws, (o) => o.type === "res" && o.id === "ag1"); ws.send( JSON.stringify({ type: "req", @@ -732,7 +734,8 @@ describe("gateway server", () => { const ws1 = await dial(); const final1P = onceMessage( ws1, - (o) => o.type === "res" && o.id === "ag1", + (o) => + o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted", 6000, ); ws1.send( @@ -749,7 +752,8 @@ describe("gateway server", () => { const ws2 = await dial(); const final2P = onceMessage( ws2, - (o) => o.type === "res" && o.id === "ag2", + (o) => + o.type === "res" && o.id === "ag2" && o.payload?.status !== "accepted", 6000, ); ws2.send( diff --git a/src/gateway/server.ts b/src/gateway/server.ts index d2c9c1c62..bd645e30f 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -1208,65 +1208,64 @@ export async function startGatewayServer( const deliver = params.deliver === true && resolvedChannel !== "webchat"; - // Acknowledge via event to avoid double res frames - const ackEvent = { - type: "event", - event: "agent", - payload: { runId, status: "accepted" as const }, - seq: ++seq, - }; - socket.send(JSON.stringify(ackEvent)); - logWs("out", "event", { - connId, - event: "agent", - runId, - status: "accepted", + + const accepted = { runId, status: "accepted" as const }; + // Store an in-flight ack so retries do not spawn a second run. + dedupe.set(`agent:${idem}`, { + ts: Date.now(), + ok: true, + payload: accepted, }); - try { - await agentCommand( - { - message, - to: resolvedTo, - sessionId: resolvedSessionId, - thinking: params.thinking, - deliver, - provider: resolvedChannel, - timeout: params.timeout?.toString(), - bestEffortDeliver, - surface: "VoiceWake", - }, - defaultRuntime, - deps, - ); - const payload = { - runId, - status: "ok" as const, - summary: "completed", - }; - dedupe.set(`agent:${idem}`, { - ts: Date.now(), - ok: true, - payload, + respond(true, accepted, undefined, { runId }); + + void agentCommand( + { + message, + to: resolvedTo, + sessionId: resolvedSessionId, + thinking: params.thinking, + deliver, + provider: resolvedChannel, + timeout: params.timeout?.toString(), + bestEffortDeliver, + surface: "VoiceWake", + }, + defaultRuntime, + deps, + ) + .then(() => { + const payload = { + runId, + status: "ok" as const, + summary: "completed", + }; + dedupe.set(`agent:${idem}`, { + ts: Date.now(), + ok: true, + payload, + }); + // Send a second res frame (same id) so TS clients with expectFinal can wait. + // Swift clients will typically treat the first res as the result and ignore this. + respond(true, payload, undefined, { runId }); + }) + .catch((err) => { + const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); + const payload = { + runId, + status: "error" as const, + summary: String(err), + }; + dedupe.set(`agent:${idem}`, { + ts: Date.now(), + ok: false, + payload, + error, + }); + respond(false, payload, error, { + runId, + error: formatForLog(err), + }); }); - respond(true, payload, undefined, { runId }); - } catch (err) { - const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); - const payload = { - runId, - status: "error" as const, - summary: String(err), - }; - dedupe.set(`agent:${idem}`, { - ts: Date.now(), - ok: false, - payload, - error, - }); - respond(false, payload, error, { - runId, - error: formatForLog(err), - }); - } break; } default: {