From 5492845659b2fc307893b9124dedc2572a959414 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 5 Dec 2025 21:13:17 +0000 Subject: [PATCH] feat: stream turn completions and tighten rpc timeout --- src/auto-reply/command-reply.test.ts | 99 +++++++ src/auto-reply/command-reply.ts | 385 +++++++++++++++------------ src/auto-reply/reply.ts | 56 ++++ src/process/tau-rpc.ts | 23 +- src/web/auto-reply.ts | 25 ++ 5 files changed, 412 insertions(+), 176 deletions(-) diff --git a/src/auto-reply/command-reply.test.ts b/src/auto-reply/command-reply.test.ts index 22f626a4e..ac0f8f955 100644 --- a/src/auto-reply/command-reply.test.ts +++ b/src/auto-reply/command-reply.test.ts @@ -173,6 +173,105 @@ describe("runCommandReply (pi)", () => { expect(meta.killed).toBe(true); }); + it("collapses rpc deltas instead of emitting raw JSON spam", async () => { + mockPiRpc({ + stdout: [ + '{"type":"message_update","assistantMessageEvent":{"type":"text_delta","delta":"Hello"}}', + '{"type":"message_update","assistantMessageEvent":{"type":"text_delta","delta":" world"}}', + ].join("\n"), + stderr: "", + code: 0, + }); + + const { payloads } = await runCommandReply({ + reply: { + mode: "command", + command: ["pi", "{{Body}}"], + agent: { kind: "pi" }, + }, + templatingCtx: noopTemplateCtx, + sendSystemOnce: false, + isNewSession: true, + isFirstTurnInSession: true, + systemSent: false, + timeoutMs: 1000, + timeoutSeconds: 1, + commandRunner: vi.fn(), + enqueue: enqueueImmediate, + }); + + expect(payloads?.[0]?.text).toBe("Hello world"); + }); + + it("falls back to assistant text when parseOutput yields nothing", async () => { + mockPiRpc({ + stdout: [ + '{"type":"agent_start"}', + '{"type":"turn_start"}', + '{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"Acknowledged."}]}}', + ].join("\n"), + stderr: "", + code: 0, + }); + // Force parser to return nothing so we exercise fallback. + const parseSpy = vi + .spyOn((await import("../agents/pi.js")).piSpec, "parseOutput") + .mockReturnValue({ texts: [], toolResults: [], meta: undefined }); + + const { payloads } = await runCommandReply({ + reply: { + mode: "command", + command: ["pi", "{{Body}}"], + agent: { kind: "pi" }, + }, + templatingCtx: noopTemplateCtx, + sendSystemOnce: false, + isNewSession: true, + isFirstTurnInSession: true, + systemSent: false, + timeoutMs: 1000, + timeoutSeconds: 1, + commandRunner: vi.fn(), + enqueue: enqueueImmediate, + }); + + parseSpy.mockRestore(); + expect(payloads?.[0]?.text).toBe("Acknowledged."); + }); + + it("does not stream tool results when verbose is off", async () => { + const onPartial = vi.fn(); + mockPiRpc({ + stdout: [ + '{"type":"tool_execution_start","toolName":"bash","args":{"command":"ls"}}', + '{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"done"}]}}', + ].join("\n"), + stderr: "", + code: 0, + }); + + await runCommandReply({ + reply: { + mode: "command", + command: ["pi", "{{Body}}"], + agent: { kind: "pi" }, + }, + templatingCtx: noopTemplateCtx, + sendSystemOnce: false, + isNewSession: true, + isFirstTurnInSession: true, + systemSent: false, + timeoutMs: 1000, + timeoutSeconds: 1, + commandRunner: vi.fn(), + enqueue: enqueueImmediate, + onPartialReply: onPartial, + verboseLevel: "off", + }); + + expect(onPartial).not.toHaveBeenCalled(); + }); + it("parses MEDIA tokens and respects mediaMaxMb for local files", async () => { const tmp = path.join(os.tmpdir(), `warelay-test-${Date.now()}.bin`); const bigBuffer = Buffer.alloc(2 * 1024 * 1024, 1); diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index ec8dddb1c..ffc5c6541 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -2,7 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { type AgentKind, getAgentSpec } from "../agents/index.js"; +import { piSpec } from "../agents/pi.js"; import type { AgentMeta, AgentToolResult } from "../agents/types.js"; import type { WarelayConfig } from "../config/config.js"; import { isVerbose, logVerbose } from "../globals.js"; @@ -33,6 +33,9 @@ function stripRpcNoise(raw: string): string { const msg = evt?.message ?? evt?.assistantMessageEvent; const msgType = msg?.type; + // RPC streaming emits one message_update per delta; skip them to avoid flooding fallbacks. + if (type === "message_update") continue; + // Ignore toolcall delta chatter and input buffer append events. if (type === "message_update" && msgType === "toolcall_delta") continue; if (type === "input_audio_buffer.append") continue; @@ -52,6 +55,66 @@ function stripRpcNoise(raw: string): string { return kept.join("\n"); } +function extractRpcAssistantText(raw: string): string | undefined { + if (!raw.trim()) return undefined; + let deltaBuffer = ""; + let lastAssistant: string | undefined; + for (const line of raw.split(/\n+/)) { + try { + const evt = JSON.parse(line) as { + type?: string; + message?: { role?: string; content?: Array<{ type?: string; text?: string }> }; + assistantMessageEvent?: { type?: string; delta?: string; content?: string }; + }; + if ( + evt.type === "message_end" && + evt.message?.role === "assistant" && + Array.isArray(evt.message.content) + ) { + const text = evt.message.content + .filter((c) => c?.type === "text" && typeof c.text === "string") + .map((c) => c.text as string) + .join("\n") + .trim(); + if (text) { + lastAssistant = text; + deltaBuffer = ""; + } + } + if (evt.type === "message_update" && evt.assistantMessageEvent) { + const evtType = evt.assistantMessageEvent.type; + if ( + evtType === "text_delta" || + evtType === "text_end" || + evtType === "text_start" + ) { + const chunk = + typeof evt.assistantMessageEvent.delta === "string" + ? evt.assistantMessageEvent.delta + : typeof evt.assistantMessageEvent.content === "string" + ? evt.assistantMessageEvent.content + : ""; + if (chunk) { + deltaBuffer += chunk; + lastAssistant = deltaBuffer; + } + } + } + } catch { + // ignore malformed/non-JSON lines + } + } + return lastAssistant?.trim() || undefined; +} + +function extractAssistantTextLoosely(raw: string): string | undefined { + // Fallback: grab the last "text":"..." occurrence from a JSON-ish blob. + const matches = [...raw.matchAll(/"text"\s*:\s*"([^"]+?)"/g)]; + if (!matches.length) return undefined; + const last = matches.at(-1)?.[1]; + return last ? last.replace(/\\n/g, "\n").trim() : undefined; +} + type CommandReplyConfig = NonNullable["reply"] & { mode: "command"; }; @@ -263,28 +326,13 @@ export async function runCommandReply( throw new Error("reply.command is required for mode=command"); } const agentCfg = reply.agent ?? { kind: "pi" }; - const agentKind: AgentKind = agentCfg.kind ?? "pi"; - const agent = getAgentSpec(agentKind); + const agent = piSpec; + const agentKind = "pi"; const rawCommand = reply.command; const hasBodyTemplate = rawCommand.some((part) => /\{\{Body(Stripped)?\}\}/.test(part), ); let argv = rawCommand.map((part) => applyTemplate(part, templatingCtx)); - // Pi is the only supported agent; treat commands as Pi when the binary path looks like pi/tau or the path contains pi. - const isAgentInvocation = - agentKind === "pi" && - (agent.isInvocation(argv) || - argv.some((part) => { - if (typeof part !== "string") return false; - const lower = part.toLowerCase(); - const base = path.basename(part).toLowerCase(); - return ( - base === "pi" || - base === "tau" || - lower.includes("pi-coding-agent") || - lower.includes("/pi/") - ); - })); const templatePrefix = reply.template && (!sendSystemOnce || isFirstTurnInSession || !systemSent) ? applyTemplate(reply.template, templatingCtx) @@ -349,12 +397,7 @@ export async function runCommandReply( // Tau (pi agent) needs --continue to reload prior messages when resuming. // Without it, pi starts from a blank state even though we pass the session file path. - if ( - agentKind === "pi" && - isAgentInvocation && - !isNewSession && - !sessionArgList.includes("--continue") - ) { + if (!isNewSession && !sessionArgList.includes("--continue")) { sessionArgList.push("--continue"); } @@ -372,9 +415,7 @@ export async function runCommandReply( argv = [...argv, ...sessionArgList]; } - const shouldApplyAgent = isAgentInvocation; - - if (shouldApplyAgent && thinkLevel && thinkLevel !== "off") { + if (thinkLevel && thinkLevel !== "off") { const hasThinkingFlag = argv.some( (p, i) => p === "--thinking" || @@ -386,18 +427,16 @@ export async function runCommandReply( bodyIndex += 2; } } - const builtArgv = shouldApplyAgent - ? agent.buildArgs({ - argv, - bodyIndex, - isNewSession, - sessionId: templatingCtx.SessionId, - sendSystemOnce, - systemSent, - identityPrefix: agentCfg.identityPrefix, - format: agentCfg.format, - }) - : argv; + const builtArgv = agent.buildArgs({ + argv, + bodyIndex, + isNewSession, + sessionId: templatingCtx.SessionId, + sendSystemOnce, + systemSent, + identityPrefix: agentCfg.identityPrefix, + format: agentCfg.format, + }); const promptIndex = builtArgv.findIndex( (arg) => typeof arg === "string" && arg.includes(bodyMarker), @@ -412,24 +451,22 @@ export async function runCommandReply( return typeof arg === "string" ? arg.replace(bodyMarker, "") : arg; }); - // For pi/tau agents: drive the agent via RPC stdin so auto-compaction and streaming run server-side. + // Drive pi via RPC stdin so auto-compaction and streaming run server-side. let rpcInput: string | undefined; let rpcArgv = finalArgv; - if (agentKind === "pi") { - rpcInput = `${JSON.stringify({ type: "prompt", message: promptArg })}\n`; - const bodyIdx = - promptIndex >= 0 ? promptIndex : Math.max(finalArgv.length - 1, 0); - rpcArgv = finalArgv.filter((_, idx) => idx !== bodyIdx); - const modeIdx = rpcArgv.indexOf("--mode"); - if (modeIdx >= 0 && rpcArgv[modeIdx + 1]) { - rpcArgv[modeIdx + 1] = "rpc"; - } else { - rpcArgv.push("--mode", "rpc"); - } + rpcInput = `${JSON.stringify({ type: "prompt", message: promptArg })}\n`; + const bodyIdx = + promptIndex >= 0 ? promptIndex : Math.max(finalArgv.length - 1, 0); + rpcArgv = finalArgv.filter((_, idx) => idx !== bodyIdx); + const modeIdx = rpcArgv.indexOf("--mode"); + if (modeIdx >= 0 && rpcArgv[modeIdx + 1]) { + rpcArgv[modeIdx + 1] = "rpc"; + } else { + rpcArgv.push("--mode", "rpc"); } logVerbose( - `Running command auto-reply: ${(agentKind === "pi" ? rpcArgv : finalArgv).join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`, + `Running command auto-reply: ${rpcArgv.join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`, ); logger.info( { @@ -437,7 +474,7 @@ export async function runCommandReply( sessionId: templatingCtx.SessionId, newSession: isNewSession, cwd: reply.cwd, - command: (agentKind === "pi" ? rpcArgv : finalArgv).slice(0, -1), // omit body to reduce noise + command: rpcArgv.slice(0, -1), // omit body to reduce noise }, "command auto-reply start", ); @@ -449,9 +486,11 @@ export async function runCommandReply( let pendingToolName: string | undefined; let pendingMetas: string[] = []; let pendingTimer: NodeJS.Timeout | null = null; + let streamedAny = false; + const enableToolStreaming = verboseLevel === "on"; const toolMetaById = new Map(); const flushPendingTool = () => { - if (!onPartialReply) return; + if (!onPartialReply || !enableToolStreaming) return; if (!pendingToolName && pendingMetas.length === 0) return; const text = formatToolAggregate(pendingToolName, pendingMetas); const { text: cleanedText, mediaUrls: mediaFound } = @@ -460,6 +499,7 @@ export async function runCommandReply( text: cleanedText, mediaUrls: mediaFound?.length ? mediaFound : undefined, } as ReplyPayload); + streamedAny = true; pendingToolName = undefined; pendingMetas = []; if (pendingTimer) { @@ -468,7 +508,7 @@ export async function runCommandReply( } }; let lastStreamedAssistant: string | undefined; - const streamAssistant = (msg?: { role?: string; content?: unknown[] }) => { + const streamAssistantFinal = (msg?: { role?: string; content?: unknown[] }) => { if (!onPartialReply || msg?.role !== "assistant") return; const textBlocks = Array.isArray(msg.content) ? (msg.content as Array<{ type?: string; text?: string }>) @@ -486,96 +526,62 @@ export async function runCommandReply( text: cleanedText, mediaUrls: mediaFound?.length ? mediaFound : undefined, } as ReplyPayload); + streamedAny = true; }; const run = async () => { - // Prefer long-lived tau RPC for pi agent to avoid cold starts. - if (agentKind === "pi" && shouldApplyAgent) { - const rpcPromptIndex = - promptIndex >= 0 ? promptIndex : finalArgv.length - 1; - const body = promptArg ?? ""; - // Build rpc args without the prompt body; force --mode rpc. - const rpcArgv = (() => { - const copy = [...finalArgv]; - copy.splice(rpcPromptIndex, 1); - const modeIdx = copy.indexOf("--mode"); - if (modeIdx >= 0 && copy[modeIdx + 1]) { - copy.splice(modeIdx, 2, "--mode", "rpc"); - } else if (!copy.includes("--mode")) { - copy.splice(copy.length - 1, 0, "--mode", "rpc"); - } - return copy; - })(); - const rpcResult = await runPiRpc({ - argv: rpcArgv, - cwd: reply.cwd, - prompt: body, - timeoutMs, - onEvent: onPartialReply - ? (line: string) => { - try { - const ev = JSON.parse(line) as { - type?: string; - message?: { - role?: string; - content?: unknown[]; - details?: Record; - arguments?: Record; - toolCallId?: string; - tool_call_id?: string; - toolName?: string; - name?: string; - }; + const rpcPromptIndex = + promptIndex >= 0 ? promptIndex : finalArgv.length - 1; + const body = promptArg ?? ""; + // Build rpc args without the prompt body; force --mode rpc. + const rpcArgvForRun = (() => { + const copy = [...finalArgv]; + copy.splice(rpcPromptIndex, 1); + const modeIdx = copy.indexOf("--mode"); + if (modeIdx >= 0 && copy[modeIdx + 1]) { + copy.splice(modeIdx, 2, "--mode", "rpc"); + } else if (!copy.includes("--mode")) { + copy.splice(copy.length - 1, 0, "--mode", "rpc"); + } + return copy; + })(); + const rpcResult = await runPiRpc({ + argv: rpcArgvForRun, + cwd: reply.cwd, + prompt: body, + timeoutMs, + onEvent: onPartialReply + ? (line: string) => { + try { + const ev = JSON.parse(line) as { + type?: string; + message?: { + role?: string; + content?: unknown[]; + details?: Record; + arguments?: Record; toolCallId?: string; + tool_call_id?: string; toolName?: string; - args?: Record; + name?: string; }; - // Capture metadata as soon as the tool starts (from args). - if (ev.type === "tool_execution_start") { - const toolName = ev.toolName; - const meta = inferToolMeta({ - toolName, - name: ev.toolName, - arguments: ev.args, - }); - if (ev.toolCallId) { - toolMetaById.set(ev.toolCallId, meta); - } - if (meta) { - if ( - pendingToolName && - toolName && - toolName !== pendingToolName - ) { - flushPendingTool(); - } - if (!pendingToolName) pendingToolName = toolName; - pendingMetas.push(meta); - if ( - TOOL_RESULT_FLUSH_COUNT > 0 && - pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT - ) { - flushPendingTool(); - } else { - if (pendingTimer) clearTimeout(pendingTimer); - pendingTimer = setTimeout( - flushPendingTool, - TOOL_RESULT_DEBOUNCE_MS, - ); - } - } + toolCallId?: string; + toolName?: string; + args?: Record; + }; + if (!enableToolStreaming) return; + // Capture metadata as soon as the tool starts (from args). + if (ev.type === "tool_execution_start") { + const toolName = ev.toolName; + const meta = inferToolMeta({ + toolName, + name: ev.toolName, + arguments: ev.args, + }); + if (ev.toolCallId) { + toolMetaById.set(ev.toolCallId, meta); } - if ( - (ev.type === "message" || ev.type === "message_end") && - ev.message?.role === "tool_result" && - Array.isArray(ev.message.content) - ) { - const toolName = inferToolName(ev.message); - const toolCallId = - ev.message.toolCallId ?? ev.message.tool_call_id; - const meta = - inferToolMeta(ev.message) ?? - (toolCallId ? toolMetaById.get(toolCallId) : undefined); + if (meta) { if ( pendingToolName && toolName && @@ -584,41 +590,66 @@ export async function runCommandReply( flushPendingTool(); } if (!pendingToolName) pendingToolName = toolName; - if (meta) pendingMetas.push(meta); + pendingMetas.push(meta); if ( TOOL_RESULT_FLUSH_COUNT > 0 && pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT ) { flushPendingTool(); - return; + } else { + if (pendingTimer) clearTimeout(pendingTimer); + pendingTimer = setTimeout( + flushPendingTool, + TOOL_RESULT_DEBOUNCE_MS, + ); } - if (pendingTimer) clearTimeout(pendingTimer); - pendingTimer = setTimeout( - flushPendingTool, - TOOL_RESULT_DEBOUNCE_MS, - ); } - if ( - ev.type === "message_end" || - ev.type === "message_update" || - ev.type === "message" - ) { - streamAssistant(ev.message); - } - } catch { - // ignore malformed lines } + if ( + enableToolStreaming && + (ev.type === "message" || ev.type === "message_end") && + ev.message?.role === "tool_result" && + Array.isArray(ev.message.content) + ) { + const toolName = inferToolName(ev.message); + const toolCallId = + ev.message.toolCallId ?? ev.message.tool_call_id; + const meta = + inferToolMeta(ev.message) ?? + (toolCallId ? toolMetaById.get(toolCallId) : undefined); + if ( + pendingToolName && + toolName && + toolName !== pendingToolName + ) { + flushPendingTool(); + } + if (!pendingToolName) pendingToolName = toolName; + if (meta) pendingMetas.push(meta); + if ( + TOOL_RESULT_FLUSH_COUNT > 0 && + pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT + ) { + flushPendingTool(); + return; + } + if (pendingTimer) clearTimeout(pendingTimer); + pendingTimer = setTimeout( + flushPendingTool, + TOOL_RESULT_DEBOUNCE_MS, + ); + } + if (ev.type === "message_end") { + streamAssistantFinal(ev.message); + } + } catch { + // ignore malformed lines } - : undefined, - }); - flushPendingTool(); - return rpcResult; - } - return await commandRunner(agentKind === "pi" ? rpcArgv : finalArgv, { - timeoutMs, - cwd: reply.cwd, - input: rpcInput, + } + : undefined, }); + flushPendingTool(); + return rpcResult; }; const { stdout, stderr, code, signal, killed } = await enqueue(run, { @@ -633,6 +664,7 @@ export async function runCommandReply( }, }); const rawStdout = stdout.trim(); + const rpcAssistantText = extractRpcAssistantText(stdout); let mediaFromCommand: string[] | undefined; const trimmed = stripRpcNoise(rawStdout); if (stderr?.trim()) { @@ -656,9 +688,7 @@ export async function runCommandReply( ); }; - const parsed = - shouldApplyAgent && trimmed ? agent.parseOutput(trimmed) : undefined; - const _parserProvided = shouldApplyAgent && !!parsed; + const parsed = trimmed ? agent.parseOutput(trimmed) : undefined; // Collect assistant texts and tool results from parseOutput (tau RPC can emit many). const parsedTexts = @@ -734,10 +764,15 @@ export async function runCommandReply( }); } - // If parser gave nothing, fall back to raw stdout as a single message. - if (replyItems.length === 0 && trimmed && !hasParsedContent) { + // If parser gave nothing, fall back to best-effort assistant text (prefers RPC deltas). + const fallbackText = + rpcAssistantText ?? + extractRpcAssistantText(trimmed) ?? + extractAssistantTextLoosely(trimmed) ?? + trimmed; + if (replyItems.length === 0 && fallbackText && !hasParsedContent) { const { text: cleanedText, mediaUrls: mediaFound } = - splitMediaFromOutput(trimmed); + splitMediaFromOutput(fallbackText); if (cleanedText || mediaFound?.length) { replyItems.push({ text: cleanedText, @@ -771,8 +806,9 @@ export async function runCommandReply( `Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`, ); // Include any partial output or stderr in error message - const partialOut = trimmed - ? `\n\nOutput: ${trimmed.slice(0, 500)}${trimmed.length > 500 ? "..." : ""}` + const summarySource = rpcAssistantText ?? trimmed; + const partialOut = summarySource + ? `\n\nOutput: ${summarySource.slice(0, 500)}${summarySource.length > 500 ? "..." : ""}` : ""; const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`; return { @@ -864,7 +900,7 @@ export async function runCommandReply( } verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`); - return { payloads, meta }; + return { payloads: streamedAny && onPartialReply ? [] : payloads, meta }; } catch (err) { const elapsed = Date.now() - started; logger.info( @@ -884,7 +920,10 @@ export async function runCommandReply( const baseMsg = "Command timed out after " + `${timeoutSeconds}s${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}. Try a shorter prompt or split the request.`; - const partial = errorObj.stdout?.trim(); + const partial = + extractRpcAssistantText(errorObj.stdout ?? "") || + extractAssistantTextLoosely(errorObj.stdout ?? "") || + stripRpcNoise(errorObj.stdout ?? ""); const partialSnippet = partial && partial.length > 800 ? `${partial.slice(0, 800)}...` diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 6e7de6bbe..c064cb172 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -402,6 +402,62 @@ export async function getReplyFromConfig( return { text: ack }; } + // If any directive (think/verbose) is present anywhere, acknowledge immediately and skip agent execution. + if (hasThinkDirective || hasVerboseDirective) { + if (sessionEntry && sessionStore && sessionKey) { + if (hasThinkDirective && inlineThink) { + if (inlineThink === "off") { + delete sessionEntry.thinkingLevel; + } else { + sessionEntry.thinkingLevel = inlineThink; + } + sessionEntry.updatedAt = Date.now(); + } + if (hasVerboseDirective && inlineVerbose) { + if (inlineVerbose === "off") { + delete sessionEntry.verboseLevel; + } else { + sessionEntry.verboseLevel = inlineVerbose; + } + sessionEntry.updatedAt = Date.now(); + } + if (sessionEntry.updatedAt) { + sessionStore[sessionKey] = sessionEntry; + await saveSessionStore(storePath, sessionStore); + } + } + const parts: string[] = []; + if (hasThinkDirective) { + if (!inlineThink) { + parts.push( + `Unrecognized thinking level "${rawThinkLevel ?? ""}". Valid levels: off, minimal, low, medium, high.`, + ); + } else { + parts.push( + inlineThink === "off" + ? "Thinking disabled." + : `Thinking level set to ${inlineThink}.`, + ); + } + } + if (hasVerboseDirective) { + if (!inlineVerbose) { + parts.push( + `Unrecognized verbose level "${rawVerboseLevel ?? ""}". Valid levels: off, on.`, + ); + } else { + parts.push( + inlineVerbose === "off" + ? "Verbose logging disabled." + : "Verbose logging enabled.", + ); + } + } + const ack = parts.join(" "); + cleanupTyping(); + return { text: ack }; + } + // Optional allowlist by origin number (E.164 without whatsapp: prefix) const allowFrom = cfg.inbound?.allowFrom; const from = (ctx.From ?? "").replace(/^whatsapp:/, ""); diff --git a/src/process/tau-rpc.ts b/src/process/tau-rpc.ts index 5b62f5b9d..ccfce7f10 100644 --- a/src/process/tau-rpc.ts +++ b/src/process/tau-rpc.ts @@ -28,6 +28,7 @@ class TauRpcClient { reject: (err: unknown) => void; timer: NodeJS.Timeout; onEvent?: (line: string) => void; + capMs: number; } | undefined; @@ -67,6 +68,10 @@ class TauRpcClient { } private handleLine(line: string) { + // Any line = activity; refresh timeout watchdog. + if (this.pending) { + this.resetTimeout(); + } if (!this.pending) return; this.buffer.push(line); this.pending?.onEvent?.(line); @@ -90,6 +95,18 @@ class TauRpcClient { } } + private resetTimeout() { + if (!this.pending) return; + const capMs = this.pending.capMs; + if (this.pending.timer) clearTimeout(this.pending.timer); + this.pending.timer = setTimeout(() => { + const pending = this.pending; + this.pending = undefined; + pending?.reject(new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`)); + this.child?.kill("SIGKILL"); + }, capMs); + } + async prompt( prompt: string, timeoutMs: number, @@ -112,14 +129,14 @@ class TauRpcClient { if (!ok) child.stdin.once("drain", () => resolve()); }); return await new Promise((resolve, reject) => { - // Hard cap to avoid stuck relays; agent_end or process exit should usually resolve first. + // Hard cap to avoid stuck relays; resets on every line received. const capMs = Math.min(timeoutMs, 5 * 60 * 1000); const timer = setTimeout(() => { this.pending = undefined; - reject(new Error(`tau rpc timed out after ${capMs}ms`)); + reject(new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`)); child.kill("SIGKILL"); }, capMs); - this.pending = { resolve, reject, timer, onEvent }; + this.pending = { resolve, reject, timer, onEvent, capMs }; }); } diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 14ca131e6..f11163140 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -793,6 +793,31 @@ export async function monitorWebProvider( }, { onReplyStart: latest.sendComposing, + onPartialReply: async (partial) => { + try { + await deliverWebReply({ + replyResult: partial, + msg: latest, + maxMediaBytes, + replyLogger, + runtime, + connectionId, + }); + if (partial.text) { + recentlySent.add(partial.text); + if (recentlySent.size > MAX_RECENT_MESSAGES) { + const firstKey = recentlySent.values().next().value; + if (firstKey) recentlySent.delete(firstKey); + } + } + } catch (err) { + console.error( + danger( + `Failed sending partial web auto-reply to ${latest.from ?? conversationId}: ${String(err)}`, + ), + ); + } + }, }, );