feat: stream tool/job events over control channel

main
Peter Steinberger 2025-12-09 00:31:39 +01:00
parent 40dd23337c
commit 371a30f08b
3 changed files with 96 additions and 98 deletions

View File

@ -186,6 +186,7 @@ final class ControlChannel: ObservableObject {
private var mode: Mode = .local private var mode: Mode = .local
private var localPort: UInt16 = 18789 private var localPort: UInt16 = 18789
private var pingTask: Task<Void, Never>? private var pingTask: Task<Void, Never>?
private var activeJobs: Int = 0
@Published private(set) var state: ConnectionState = .disconnected @Published private(set) var state: ConnectionState = .disconnected
@Published private(set) var lastPingMs: Double? @Published private(set) var lastPingMs: Double?
@ -430,7 +431,15 @@ final class ControlChannel: ObservableObject {
private func handleAgentEvent(_ event: ControlAgentEvent) { private func handleAgentEvent(_ event: ControlAgentEvent) {
if event.stream == "job" { if event.stream == "job" {
if let state = event.data["state"]?.value as? String { if let state = event.data["state"]?.value as? String {
let working = state.lowercased() == "started" || state.lowercased() == "streaming" switch state.lowercased() {
case "started", "streaming":
self.activeJobs &+= 1
case "done", "error":
self.activeJobs = max(0, self.activeJobs - 1)
default:
break
}
let working = self.activeJobs > 0
Task { @MainActor in Task { @MainActor in
AppStateStore.shared.setWorking(working) AppStateStore.shared.setWorking(working)
} }

View File

@ -13,6 +13,7 @@ import { enqueueCommand } from "../process/command-queue.js";
import type { runCommandWithTimeout } from "../process/exec.js"; import type { runCommandWithTimeout } from "../process/exec.js";
import { runPiRpc } from "../process/tau-rpc.js"; import { runPiRpc } from "../process/tau-rpc.js";
import { applyTemplate, type TemplateContext } from "./templating.js"; import { applyTemplate, type TemplateContext } from "./templating.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { import {
formatToolAggregate, formatToolAggregate,
shortenMeta, shortenMeta,
@ -159,6 +160,7 @@ type CommandReplyParams = {
thinkLevel?: ThinkLevel; thinkLevel?: ThinkLevel;
verboseLevel?: "off" | "on"; verboseLevel?: "off" | "on";
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void; onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
runId?: string;
}; };
export type CommandReplyMeta = { export type CommandReplyMeta = {
@ -552,7 +554,8 @@ export async function runCommandReply(
streamedAny = true; streamedAny = true;
}; };
const run = async () => { const run = async () => {
const runId = params.runId ?? crypto.randomUUID();
const rpcPromptIndex = const rpcPromptIndex =
promptIndex >= 0 ? promptIndex : finalArgv.length - 1; promptIndex >= 0 ? promptIndex : finalArgv.length - 1;
const body = promptArg ?? ""; const body = promptArg ?? "";
@ -573,103 +576,88 @@ export async function runCommandReply(
cwd: reply.cwd, cwd: reply.cwd,
prompt: body, prompt: body,
timeoutMs, timeoutMs,
onEvent: onPartialReply onEvent: (line: string) => {
? (line: string) => { let ev: any;
try { try {
const ev = JSON.parse(line) as { ev = JSON.parse(line);
type?: string; } catch {
message?: { return;
role?: string; }
content?: unknown[];
details?: Record<string, unknown>; // Forward tool lifecycle events to the agent bus.
arguments?: Record<string, unknown>; if (enableToolStreaming && ev.type === "tool_execution_start") {
toolCallId?: string; emitAgentEvent({
tool_call_id?: string; runId,
toolName?: string; stream: "tool",
name?: string; data: {
}; phase: "start",
toolCallId?: string; name: ev.toolName,
toolName?: string; toolCallId: ev.toolCallId,
args?: Record<string, unknown>; args: ev.args,
}; },
if (!enableToolStreaming) return; });
// Capture metadata as soon as the tool starts (from args). }
if (ev.type === "tool_execution_start") {
const toolName = ev.toolName; if (
const meta = inferToolMeta({ enableToolStreaming &&
toolName, (ev.type === "message" || ev.type === "message_end") &&
name: ev.toolName, ev.message?.role === "tool_result" &&
arguments: ev.args, Array.isArray(ev.message.content)
}); ) {
if (ev.toolCallId) { const toolName = inferToolName(ev.message);
toolMetaById.set(ev.toolCallId, meta); const toolCallId = ev.message.toolCallId ?? ev.message.tool_call_id;
} const meta =
if (meta) { inferToolMeta(ev.message) ??
if ( (toolCallId ? toolMetaById.get(toolCallId) : undefined);
pendingToolName &&
toolName && emitAgentEvent({
toolName !== pendingToolName runId,
) { stream: "tool",
flushPendingTool(); data: {
} phase: "result",
if (!pendingToolName) pendingToolName = toolName; name: toolName,
pendingMetas.push(meta); toolCallId,
if ( meta,
TOOL_RESULT_FLUSH_COUNT > 0 && },
pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT });
) {
flushPendingTool(); if (
} else { pendingToolName &&
if (pendingTimer) clearTimeout(pendingTimer); toolName &&
pendingTimer = setTimeout( toolName !== pendingToolName
flushPendingTool, ) {
TOOL_RESULT_DEBOUNCE_MS, flushPendingTool();
);
}
}
}
if (
enableToolStreaming &&
(ev.type === "message" || ev.type === "message_end") &&
ev.message?.role === "tool_result" &&
Array.isArray(ev.message.content)
) {
const toolName = inferToolName(ev.message);
const toolCallId =
ev.message.toolCallId ?? ev.message.tool_call_id;
const meta =
inferToolMeta(ev.message) ??
(toolCallId ? toolMetaById.get(toolCallId) : undefined);
if (
pendingToolName &&
toolName &&
toolName !== pendingToolName
) {
flushPendingTool();
}
if (!pendingToolName) pendingToolName = toolName;
if (meta) pendingMetas.push(meta);
if (
TOOL_RESULT_FLUSH_COUNT > 0 &&
pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT
) {
flushPendingTool();
return;
}
if (pendingTimer) clearTimeout(pendingTimer);
pendingTimer = setTimeout(
flushPendingTool,
TOOL_RESULT_DEBOUNCE_MS,
);
}
if (ev.type === "message_end") {
streamAssistantFinal(ev.message);
}
} catch {
// ignore malformed lines
}
} }
: undefined, if (!pendingToolName) pendingToolName = toolName;
if (meta) pendingMetas.push(meta);
if (
TOOL_RESULT_FLUSH_COUNT > 0 &&
pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT
) {
flushPendingTool();
return;
}
if (pendingTimer) clearTimeout(pendingTimer);
pendingTimer = setTimeout(
flushPendingTool,
TOOL_RESULT_DEBOUNCE_MS,
);
}
if (ev.type === "message_end") {
streamAssistantFinal(ev.message);
}
// Preserve existing partial reply hook when provided.
if (onPartialReply && ev.message?.role === "assistant") {
// Let the existing logic reuse the already-parsed message.
try {
streamAssistantFinal(ev.message);
} catch {
/* ignore */
}
}
},
}); });
flushPendingTool(); flushPendingTool();
return rpcResult; return rpcResult;

View File

@ -320,6 +320,7 @@ export async function agentCommand(
commandRunner: runCommandWithTimeout, commandRunner: runCommandWithTimeout,
thinkLevel: resolvedThinkLevel, thinkLevel: resolvedThinkLevel,
verboseLevel: resolvedVerboseLevel, verboseLevel: resolvedVerboseLevel,
runId: sessionId,
}); });
emitAgentEvent({ emitAgentEvent({
runId: sessionId, runId: sessionId,