fix(auto-reply): parse agent_end and avoid rpc JSON leaks

main
Peter Steinberger 2025-12-16 10:28:57 +01:00
parent 7948d071e0
commit e7713a28ae
2 changed files with 95 additions and 15 deletions

View File

@ -378,6 +378,74 @@ describe("runCommandReply (pi)", () => {
expect(payloads?.[0]?.text).toBe("Acknowledged."); expect(payloads?.[0]?.text).toBe("Acknowledged.");
}); });
it("parses assistant text from agent_end messages", async () => {
mockPiRpc({
stdout: JSON.stringify({
type: "agent_end",
messages: [
{
role: "assistant",
content: [{ type: "text", text: "from agent_end" }],
model: "pi-1",
provider: "inflection",
usage: { input: 1, output: 1, cacheRead: 0, cacheWrite: 0, total: 2 },
stopReason: "stop",
},
],
}),
stderr: "",
code: 0,
});
const { payloads } = await runCommandReply({
reply: {
mode: "command",
command: ["pi", "{{Body}}"],
agent: { kind: "pi" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
enqueue: enqueueImmediate,
});
expect(payloads?.[0]?.text).toBe("from agent_end");
});
it("does not leak JSON protocol frames when assistant emits no text", async () => {
mockPiRpc({
stdout: [
'{"type":"message_end","message":{"role":"assistant","content":[{"type":"thinking","thinking":"hmm"}],"usage":{"input":10,"output":5}}}',
].join("\n"),
stderr: "",
code: 0,
});
const { payloads } = await runCommandReply({
reply: {
mode: "command",
command: ["pi", "{{Body}}"],
agent: { kind: "pi" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
enqueue: enqueueImmediate,
});
expect(payloads?.[0]?.text).toMatch(/produced no output/i);
expect(payloads?.[0]?.text).not.toContain("message_end");
expect(payloads?.[0]?.text).not.toContain("\"type\"");
});
it("does not stream tool results when verbose is off", async () => { it("does not stream tool results when verbose is off", async () => {
const onPartial = vi.fn(); const onPartial = vi.fn();
mockPiRpc({ mockPiRpc({

View File

@ -54,9 +54,15 @@ function stripRpcNoise(raw: string): string {
if (type === "message_update") continue; if (type === "message_update") continue;
// Ignore toolcall delta chatter and input buffer append events. // Ignore toolcall delta chatter and input buffer append events.
if (type === "message_update" && msgType === "toolcall_delta") continue; if (msgType === "toolcall_delta") continue;
if (type === "input_audio_buffer.append") continue; if (type === "input_audio_buffer.append") continue;
// Preserve agent_end so piSpec.parseOutput can extract the final message set.
if (type === "agent_end") {
kept.push(line);
continue;
}
// Keep only assistant/tool messages; drop agent_start/turn_start/user/etc. // Keep only assistant/tool messages; drop agent_start/turn_start/user/etc.
const isAssistant = role === "assistant"; const isAssistant = role === "assistant";
const isToolRole = const isToolRole =
@ -140,12 +146,21 @@ function extractRpcAssistantText(raw: string): string | undefined {
return lastAssistant?.trim() || undefined; return lastAssistant?.trim() || undefined;
} }
function extractAssistantTextLoosely(raw: string): string | undefined { function extractNonJsonText(raw: string): string | undefined {
// Fallback: grab the last "text":"..." occurrence from a JSON-ish blob. const kept: string[] = [];
const matches = [...raw.matchAll(/"text"\s*:\s*"([^"]+?)"/g)]; for (const line of raw.split(/\n+/)) {
if (!matches.length) return undefined; const trimmed = line.trim();
const last = matches.at(-1)?.[1]; if (!trimmed) continue;
return last ? last.replace(/\\n/g, "\n").trim() : undefined; try {
JSON.parse(trimmed);
// JSON protocol frame → never surface directly.
continue;
} catch {
kept.push(line);
}
}
const text = kept.join("\n").trim();
return text ? text : undefined;
} }
type CommandReplyConfig = NonNullable<ClawdisConfig["inbound"]>["reply"] & { type CommandReplyConfig = NonNullable<ClawdisConfig["inbound"]>["reply"] & {
@ -859,12 +874,10 @@ export async function runCommandReply(
}); });
} }
// If parser gave nothing, fall back to best-effort assistant text (prefers RPC deltas). // If parser gave nothing, fall back to best-effort assistant text (from RPC deltas),
const fallbackText = // or any non-JSON stdout the child may have emitted (e.g. MEDIA tokens).
rpcAssistantText ?? // Never fall back to raw stdout JSON protocol frames.
extractRpcAssistantText(trimmed) ?? const fallbackText = rpcAssistantText ?? extractNonJsonText(rawStdout);
extractAssistantTextLoosely(trimmed) ??
trimmed;
const normalize = (s?: string) => const normalize = (s?: string) =>
stripStructuralPrefixes((s ?? "").trim()).toLowerCase(); stripStructuralPrefixes((s ?? "").trim()).toLowerCase();
const bodyNorm = normalize( const bodyNorm = normalize(
@ -1030,8 +1043,7 @@ export async function runCommandReply(
`${timeoutSeconds}s${resolvedCwd ? ` (cwd: ${resolvedCwd})` : ""}. Try a shorter prompt or split the request.`; `${timeoutSeconds}s${resolvedCwd ? ` (cwd: ${resolvedCwd})` : ""}. Try a shorter prompt or split the request.`;
const partial = const partial =
extractRpcAssistantText(errorObj.stdout ?? "") || extractRpcAssistantText(errorObj.stdout ?? "") ||
extractAssistantTextLoosely(errorObj.stdout ?? "") || extractNonJsonText(errorObj.stdout ?? "");
stripRpcNoise(errorObj.stdout ?? "");
const partialSnippet = const partialSnippet =
partial && partial.length > 800 partial && partial.length > 800
? `${partial.slice(0, 800)}...` ? `${partial.slice(0, 800)}...`