agent: deliver via rpc and voice forward

main
Peter Steinberger 2025-12-07 06:05:00 +01:00
parent 1d38f5a4d5
commit 67fa82cf14
11 changed files with 105 additions and 45 deletions

View File

@ -13,17 +13,25 @@ actor AgentRPC {
private struct RpcError: Error { let message: String } private struct RpcError: Error { let message: String }
func send(text: String, thinking: String?, session: String) async -> (ok: Bool, text: String?, error: String?) { func send(
text: String,
thinking: String?,
session: String,
deliver: Bool,
to: String?) async -> (ok: Bool, text: String?, error: String?)
{
guard process?.isRunning == true else { guard process?.isRunning == true else {
return (false, nil, "rpc worker not running") return (false, nil, "rpc worker not running")
} }
do { do {
let payload: [String: Any] = [ var payload: [String: Any] = [
"type": "send", "type": "send",
"text": text, "text": text,
"session": session, "session": session,
"thinking": thinking ?? "default", "thinking": thinking ?? "default",
"deliver": deliver,
] ]
if let to { payload["to"] = to }
let data = try JSONSerialization.data(withJSONObject: payload) let data = try JSONSerialization.data(withJSONObject: payload)
guard let stdinHandle else { throw RpcError(message: "stdin missing") } guard let stdinHandle else { throw RpcError(message: "stdin missing") }
stdinHandle.write(data) stdinHandle.write(data)

View File

@ -24,7 +24,7 @@ let modelCatalogPathKey = "clawdis.modelCatalogPath"
let modelCatalogReloadKey = "clawdis.modelCatalogReload" let modelCatalogReloadKey = "clawdis.modelCatalogReload"
let voiceWakeSupported: Bool = ProcessInfo.processInfo.operatingSystemVersion.majorVersion >= 26 let voiceWakeSupported: Bool = ProcessInfo.processInfo.operatingSystemVersion.majorVersion >= 26
let cliHelperSearchPaths = ["/usr/local/bin", "/opt/homebrew/bin"] let cliHelperSearchPaths = ["/usr/local/bin", "/opt/homebrew/bin"]
let defaultVoiceWakeForwardCommand = "clawdis-mac agent --message \"${text}\" --thinking low" let defaultVoiceWakeForwardCommand = "clawdis-mac agent --message \"${text}\" --thinking low --session main --deliver"
let defaultVoiceWakeForwardPort = 22 let defaultVoiceWakeForwardPort = 22
// Allow enough time for remote agent responses (LLM replies often take >10s). // Allow enough time for remote agent responses (LLM replies often take >10s).
let defaultVoiceWakeForwardTimeout: TimeInterval = 30 let defaultVoiceWakeForwardTimeout: TimeInterval = 30

View File

@ -72,14 +72,16 @@ final class ClawdisXPCService: NSObject, ClawdisXPCProtocol {
} }
return await ShellRunner.run(command: command, cwd: cwd, env: env, timeout: timeoutSec) return await ShellRunner.run(command: command, cwd: cwd, env: env, timeout: timeoutSec)
case let .agent(message, thinking, session): case let .agent(message, thinking, session, deliver, to):
let trimmed = message.trimmingCharacters(in: .whitespacesAndNewlines) let trimmed = message.trimmingCharacters(in: .whitespacesAndNewlines)
guard !trimmed.isEmpty else { return Response(ok: false, message: "message empty") } guard !trimmed.isEmpty else { return Response(ok: false, message: "message empty") }
let sessionKey = session ?? "main" let sessionKey = session ?? "main"
let rpcResult = await AgentRPC.shared.send( let rpcResult = await AgentRPC.shared.send(
text: trimmed, text: trimmed,
thinking: thinking, thinking: thinking,
session: sessionKey) session: sessionKey,
deliver: deliver,
to: to)
return rpcResult.ok return rpcResult.ok
? Response(ok: true, message: rpcResult.text ?? "sent") ? Response(ok: true, message: rpcResult.text ?? "sent")
: Response(ok: false, message: rpcResult.error ?? "failed to send") : Response(ok: false, message: rpcResult.error ?? "failed to send")
@ -89,12 +91,16 @@ final class ClawdisXPCService: NSObject, ClawdisXPCProtocol {
private static func runAgentCLI( private static func runAgentCLI(
message: String, message: String,
thinking: String?, thinking: String?,
session: String) async -> (ok: Bool, text: String?, error: String?) session: String,
deliver: Bool,
to: String?) async -> (ok: Bool, text: String?, error: String?)
{ {
let projectRoot = CommandResolver.projectRootPath() let projectRoot = CommandResolver.projectRootPath()
var command = CommandResolver.clawdisCommand(subcommand: "agent") var command = CommandResolver.clawdisCommand(subcommand: "agent")
command += ["--message", message, "--json"] command += ["--message", message, "--json"]
if !session.isEmpty { command += ["--to", session] } if let to { command += ["--to", to] }
if deliver { command += ["--deliver"] }
if !session.isEmpty { command += ["--session-id", session] }
if let thinking { command += ["--thinking", thinking] } if let thinking { command += ["--thinking", thinking] }
let process = Process() let process = Process()

View File

@ -140,6 +140,8 @@ struct ClawdisCLI {
var message: String? var message: String?
var thinking: String? var thinking: String?
var session: String? var session: String?
var deliver = false
var to: String?
while !args.isEmpty { while !args.isEmpty {
let arg = args.removeFirst() let arg = args.removeFirst()
@ -147,6 +149,8 @@ struct ClawdisCLI {
case "--message": message = args.popFirst() case "--message": message = args.popFirst()
case "--thinking": thinking = args.popFirst() case "--thinking": thinking = args.popFirst()
case "--session": session = args.popFirst() case "--session": session = args.popFirst()
case "--deliver": deliver = true
case "--to": to = args.popFirst()
default: default:
// Support bare message as last argument // Support bare message as last argument
if message == nil { if message == nil {
@ -156,7 +160,7 @@ struct ClawdisCLI {
} }
guard let message else { throw CLIError.help } guard let message else { throw CLIError.help }
return .agent(message: message, thinking: thinking, session: session) return .agent(message: message, thinking: thinking, session: session, deliver: deliver, to: to)
default: default:
throw CLIError.help throw CLIError.help
@ -178,7 +182,7 @@ struct ClawdisCLI {
clawdis-mac run [--cwd <path>] [--env KEY=VAL] [--timeout <sec>] [--needs-screen-recording] <command ...> clawdis-mac run [--cwd <path>] [--env KEY=VAL] [--timeout <sec>] [--needs-screen-recording] <command ...>
clawdis-mac status clawdis-mac status
clawdis-mac rpc-status clawdis-mac rpc-status
clawdis-mac agent --message <text> [--thinking <low|default|high>] [--session <key>] clawdis-mac agent --message <text> [--thinking <low|default|high>] [--session <key>] [--deliver] [--to <E.164>]
clawdis-mac --help clawdis-mac --help
Returns JSON to stdout: Returns JSON to stdout:

View File

@ -25,7 +25,7 @@ public enum Request: Sendable {
timeoutSec: Double?, timeoutSec: Double?,
needsScreenRecording: Bool) needsScreenRecording: Bool)
case status case status
case agent(message: String, thinking: String?, session: String?) case agent(message: String, thinking: String?, session: String?, deliver: Bool, to: String?)
case rpcStatus case rpcStatus
} }
@ -53,7 +53,7 @@ extension Request: Codable {
case caps, interactive case caps, interactive
case displayID, windowID, format case displayID, windowID, format
case command, cwd, env, timeoutSec, needsScreenRecording case command, cwd, env, timeoutSec, needsScreenRecording
case message, thinking, session case message, thinking, session, deliver, to
case rpcStatus case rpcStatus
} }
@ -98,11 +98,13 @@ extension Request: Codable {
case .status: case .status:
try container.encode(Kind.status, forKey: .type) try container.encode(Kind.status, forKey: .type)
case let .agent(message, thinking, session): case let .agent(message, thinking, session, deliver, to):
try container.encode(Kind.agent, forKey: .type) try container.encode(Kind.agent, forKey: .type)
try container.encode(message, forKey: .message) try container.encode(message, forKey: .message)
try container.encodeIfPresent(thinking, forKey: .thinking) try container.encodeIfPresent(thinking, forKey: .thinking)
try container.encodeIfPresent(session, forKey: .session) try container.encodeIfPresent(session, forKey: .session)
try container.encode(deliver, forKey: .deliver)
try container.encodeIfPresent(to, forKey: .to)
case .rpcStatus: case .rpcStatus:
try container.encode(Kind.rpcStatus, forKey: .type) try container.encode(Kind.rpcStatus, forKey: .type)
@ -145,7 +147,9 @@ extension Request: Codable {
let message = try container.decode(String.self, forKey: .message) let message = try container.decode(String.self, forKey: .message)
let thinking = try container.decodeIfPresent(String.self, forKey: .thinking) let thinking = try container.decodeIfPresent(String.self, forKey: .thinking)
let session = try container.decodeIfPresent(String.self, forKey: .session) let session = try container.decodeIfPresent(String.self, forKey: .session)
self = .agent(message: message, thinking: thinking, session: session) let deliver = try container.decode(Bool.self, forKey: .deliver)
let to = try container.decodeIfPresent(String.self, forKey: .to)
self = .agent(message: message, thinking: thinking, session: session, deliver: deliver, to: to)
case .rpcStatus: case .rpcStatus:
self = .rpcStatus self = .rpcStatus

View File

@ -11,7 +11,7 @@ import Testing
} }
@Test func rejectEmptyMessage() async { @Test func rejectEmptyMessage() async {
let result = await AgentRPC.shared.send(text: "", thinking: nil, session: "main") let result = await AgentRPC.shared.send(text: "", thinking: nil, session: "main", deliver: false, to: nil)
#expect(result.ok == false) #expect(result.ok == false)
} }
} }

View File

@ -1,9 +1,9 @@
import chalk from "chalk"; import chalk from "chalk";
import { Command } from "commander"; import { Command } from "commander";
import { agentCommand } from "../commands/agent.js"; import { agentCommand } from "../commands/agent.js";
import { healthCommand } from "../commands/health.js";
import { sendCommand } from "../commands/send.js"; import { sendCommand } from "../commands/send.js";
import { sessionsCommand } from "../commands/sessions.js"; import { sessionsCommand } from "../commands/sessions.js";
import { healthCommand } from "../commands/health.js";
import { statusCommand } from "../commands/status.js"; import { statusCommand } from "../commands/status.js";
import { loadConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js";
import { danger, info, setVerbose } from "../globals.js"; import { danger, info, setVerbose } from "../globals.js";
@ -236,6 +236,10 @@ Examples:
if (!line.trim()) return; if (!line.trim()) return;
try { try {
const cmd = JSON.parse(line); const cmd = JSON.parse(line);
if (cmd.type === "status") {
respond({ type: "result", ok: true });
return;
}
if (cmd.type !== "send" || !cmd.text) { if (cmd.type !== "send" || !cmd.text) {
respond({ type: "error", error: "unsupported command" }); respond({ type: "error", error: "unsupported command" });
return; return;
@ -253,12 +257,14 @@ Examples:
to?: string; to?: string;
sessionId?: string; sessionId?: string;
thinking?: string; thinking?: string;
deliver?: boolean;
json: boolean; json: boolean;
} = { } = {
message: String(cmd.text), message: String(cmd.text),
to: cmd.to ? String(cmd.to) : undefined, to: cmd.to ? String(cmd.to) : undefined,
sessionId: cmd.session ? String(cmd.session) : undefined, sessionId: cmd.session ? String(cmd.session) : undefined,
thinking: cmd.thinking ? String(cmd.thinking) : undefined, thinking: cmd.thinking ? String(cmd.thinking) : undefined,
deliver: Boolean(cmd.deliver),
json: true, json: true,
}; };
@ -572,20 +578,29 @@ Examples:
program program
.command("health") .command("health")
.description("Probe WhatsApp Web health (creds + Baileys connect) and session store") .description(
"Probe WhatsApp Web health (creds + Baileys connect) and session store",
)
.option("--json", "Output JSON instead of text", false) .option("--json", "Output JSON instead of text", false)
.option("--timeout <ms>", "Connection timeout in milliseconds", "10000") .option("--timeout <ms>", "Connection timeout in milliseconds", "10000")
.option("--verbose", "Verbose logging", false) .option("--verbose", "Verbose logging", false)
.action(async (opts) => { .action(async (opts) => {
setVerbose(Boolean(opts.verbose)); setVerbose(Boolean(opts.verbose));
const timeout = opts.timeout ? Number.parseInt(String(opts.timeout), 10) : undefined; const timeout = opts.timeout
? Number.parseInt(String(opts.timeout), 10)
: undefined;
if (timeout !== undefined && (Number.isNaN(timeout) || timeout <= 0)) { if (timeout !== undefined && (Number.isNaN(timeout) || timeout <= 0)) {
defaultRuntime.error("--timeout must be a positive integer (milliseconds)"); defaultRuntime.error(
"--timeout must be a positive integer (milliseconds)",
);
defaultRuntime.exit(1); defaultRuntime.exit(1);
return; return;
} }
try { try {
await healthCommand({ json: Boolean(opts.json), timeoutMs: timeout }, defaultRuntime); await healthCommand(
{ json: Boolean(opts.json), timeoutMs: timeout },
defaultRuntime,
);
} catch (err) { } catch (err) {
defaultRuntime.error(String(err)); defaultRuntime.error(String(err));
defaultRuntime.exit(1); defaultRuntime.exit(1);

View File

@ -23,6 +23,7 @@ import {
} from "../config/sessions.js"; } from "../config/sessions.js";
import { runCommandWithTimeout } from "../process/exec.js"; import { runCommandWithTimeout } from "../process/exec.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { sendViaIpc } from "../web/ipc.js"; import { sendViaIpc } from "../web/ipc.js";
type AgentCommandOpts = { type AgentCommandOpts = {
@ -162,13 +163,13 @@ export async function agentCommand(
if (!opts.to && !opts.sessionId) { if (!opts.to && !opts.sessionId) {
throw new Error("Pass --to <E.164> or --session-id to choose a session"); throw new Error("Pass --to <E.164> or --session-id to choose a session");
} }
if (opts.deliver && !opts.to) {
throw new Error("Delivering to WhatsApp requires --to <E.164>");
}
const cfg = loadConfig(); const cfg = loadConfig();
const replyCfg = assertCommandConfig(cfg); const replyCfg = assertCommandConfig(cfg);
const sessionCfg = replyCfg.session; const sessionCfg = replyCfg.session;
const allowFrom = (cfg.inbound?.allowFrom ?? [])
.map((val) => normalizeE164(val))
.filter((val) => val.length > 1);
const thinkOverride = normalizeThinkLevel(opts.thinking); const thinkOverride = normalizeThinkLevel(opts.thinking);
if (opts.thinking && !thinkOverride) { if (opts.thinking && !thinkOverride) {
@ -340,6 +341,12 @@ export async function agentCommand(
} }
const deliver = opts.deliver === true; const deliver = opts.deliver === true;
const targetTo = opts.to ? normalizeE164(opts.to) : allowFrom[0];
if (deliver && !targetTo) {
throw new Error(
"Delivering to WhatsApp requires --to <E.164> or inbound.allowFrom[0]",
);
}
for (const payload of payloads) { for (const payload of payloads) {
const lines: string[] = []; const lines: string[] = [];
@ -351,29 +358,29 @@ export async function agentCommand(
} }
runtime.log(lines.join("\n")); runtime.log(lines.join("\n"));
if (deliver && opts.to) { if (deliver && targetTo) {
const text = payload.text ?? ""; const text = payload.text ?? "";
const media = mediaList; const media = mediaList;
// Prefer IPC to reuse the running relay; fall back to direct web send. // Prefer IPC to reuse the running relay; fall back to direct web send.
let sentViaIpc = false; let sentViaIpc = false;
const ipcResult = await sendViaIpc(opts.to, text, media[0]); const ipcResult = await sendViaIpc(targetTo, text, media[0]);
if (ipcResult) { if (ipcResult) {
sentViaIpc = ipcResult.success; sentViaIpc = ipcResult.success;
if (ipcResult.success && media.length > 1) { if (ipcResult.success && media.length > 1) {
for (const extra of media.slice(1)) { for (const extra of media.slice(1)) {
await sendViaIpc(opts.to, "", extra); await sendViaIpc(targetTo, "", extra);
} }
} }
} }
if (!sentViaIpc) { if (!sentViaIpc) {
if (text || media.length === 0) { if (text || media.length === 0) {
await deps.sendMessageWeb(opts.to, text, { await deps.sendMessageWeb(targetTo, text, {
verbose: false, verbose: false,
mediaUrl: media[0], mediaUrl: media[0],
}); });
} }
for (const extra of media.slice(1)) { for (const extra of media.slice(1)) {
await deps.sendMessageWeb(opts.to, "", { await deps.sendMessageWeb(targetTo, "", {
verbose: false, verbose: false,
mediaUrl: extra, mediaUrl: extra,
}); });

View File

@ -1,4 +1,4 @@
import { describe, expect, it, vi, beforeEach } from "vitest"; import { beforeEach, describe, expect, it, vi } from "vitest";
import { healthCommand } from "./health.js"; import { healthCommand } from "./health.js";
@ -23,7 +23,10 @@ const waitForWaConnection = vi.fn();
const webAuthExists = vi.fn(); const webAuthExists = vi.fn();
vi.mock("../web/session.js", () => ({ vi.mock("../web/session.js", () => ({
createWaSocket: vi.fn(async () => ({ ws: { close: vi.fn() }, ev: { on: vi.fn() } })), createWaSocket: vi.fn(async () => ({
ws: { close: vi.fn() },
ev: { on: vi.fn() },
})),
waitForWaConnection: (...args: unknown[]) => waitForWaConnection(...args), waitForWaConnection: (...args: unknown[]) => waitForWaConnection(...args),
webAuthExists: (...args: unknown[]) => webAuthExists(...args), webAuthExists: (...args: unknown[]) => webAuthExists(...args),
getStatusCode: vi.fn(() => 440), getStatusCode: vi.fn(() => 440),

View File

@ -2,10 +2,7 @@ import fs from "node:fs";
import path from "node:path"; import path from "node:path";
import { loadConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js";
import { import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
loadSessionStore,
resolveStorePath,
} from "../config/sessions.js";
import { info } from "../globals.js"; import { info } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js"; import type { RuntimeEnv } from "../runtime.js";
import { resolveHeartbeatSeconds } from "../web/reconnect.js"; import { resolveHeartbeatSeconds } from "../web/reconnect.js";
@ -37,7 +34,11 @@ type HealthSummary = {
sessions: { sessions: {
path: string; path: string;
count: number; count: number;
recent: Array<{ key: string; updatedAt: number | null; age: number | null }>; recent: Array<{
key: string;
updatedAt: number | null;
age: number | null;
}>;
}; };
ipc: { path: string; exists: boolean }; ipc: { path: string; exists: boolean };
}; };
@ -54,7 +55,12 @@ async function probeWebConnect(timeoutMs: number): Promise<HealthConnect> {
setTimeout(() => reject(new Error("timeout")), timeoutMs), setTimeout(() => reject(new Error("timeout")), timeoutMs),
), ),
]); ]);
return { ok: true, status: null, error: null, elapsedMs: Date.now() - started }; return {
ok: true,
status: null,
error: null,
elapsedMs: Date.now() - started,
};
} catch (err) { } catch (err) {
return { return {
ok: false, ok: false,
@ -126,18 +132,25 @@ export async function healthCommand(
} }
if (connect) { if (connect) {
const base = connect.ok const base = connect.ok
? info(`Connect: ok (${connect.elapsedMs}ms)`) : `Connect: failed (${connect.status ?? "unknown"})`; ? info(`Connect: ok (${connect.elapsedMs}ms)`)
: `Connect: failed (${connect.status ?? "unknown"})`;
runtime.log(base + (connect.error ? ` - ${connect.error}` : "")); runtime.log(base + (connect.error ? ` - ${connect.error}` : ""));
} }
runtime.log(info(`Heartbeat interval: ${heartbeatSeconds}s`)); runtime.log(info(`Heartbeat interval: ${heartbeatSeconds}s`));
runtime.log(info(`Session store: ${storePath} (${sessions.length} entries)`)); runtime.log(
info(`Session store: ${storePath} (${sessions.length} entries)`),
);
if (recent.length > 0) { if (recent.length > 0) {
runtime.log("Recent sessions:"); runtime.log("Recent sessions:");
for (const r of recent) { for (const r of recent) {
runtime.log(`- ${r.key} (${r.updatedAt ? `${Math.round((Date.now() - r.updatedAt) / 60000)}m ago` : "no activity"})`); runtime.log(
`- ${r.key} (${r.updatedAt ? `${Math.round((Date.now() - r.updatedAt) / 60000)}m ago` : "no activity"})`,
);
} }
} }
runtime.log(info(`IPC socket: ${ipcExists ? "present" : "missing"} (${ipcPath})`)); runtime.log(
info(`IPC socket: ${ipcExists ? "present" : "missing"} (${ipcPath})`),
);
} }
if (fatal) { if (fatal) {

View File

@ -55,9 +55,9 @@ describe("loginWeb coverage", () => {
output: { statusCode: DisconnectReason.loggedOut }, output: { statusCode: DisconnectReason.loggedOut },
}); });
await expect(loginWeb(false, "web", waitForWaConnection as never)).rejects.toThrow( await expect(
/cache cleared/i, loginWeb(false, "web", waitForWaConnection as never),
); ).rejects.toThrow(/cache cleared/i);
expect(rmMock).toHaveBeenCalledWith("/tmp/wa-creds", { expect(rmMock).toHaveBeenCalledWith("/tmp/wa-creds", {
recursive: true, recursive: true,
force: true, force: true,
@ -66,9 +66,9 @@ describe("loginWeb coverage", () => {
it("formats and rethrows generic errors", async () => { it("formats and rethrows generic errors", async () => {
waitForWaConnection.mockRejectedValueOnce(new Error("boom")); waitForWaConnection.mockRejectedValueOnce(new Error("boom"));
await expect(loginWeb(false, "web", waitForWaConnection as never)).rejects.toThrow( await expect(
"formatted:Error: boom", loginWeb(false, "web", waitForWaConnection as never),
); ).rejects.toThrow("formatted:Error: boom");
expect(formatError).toHaveBeenCalled(); expect(formatError).toHaveBeenCalled();
}); });
}); });