chore: remove legacy rpc command
parent
fce9ded30a
commit
6129924eb2
41
docs/rpc.md
41
docs/rpc.md
|
|
@ -1,41 +0,0 @@
|
||||||
---
|
|
||||||
summary: "JSON RPC contract used by the mac app to talk to the gateway"
|
|
||||||
read_when:
|
|
||||||
- Changing mac app RPC or agent toggles
|
|
||||||
---
|
|
||||||
# Clawdis Agent RPC
|
|
||||||
|
|
||||||
Live, stdin/stdout JSON RPC used by the mac app (XPC) to avoid spawning `clawdis agent --json` for every send and to toggle runtime features (e.g., heartbeats) without restarting the gateway.
|
|
||||||
|
|
||||||
## How it is launched
|
|
||||||
- The mac app starts `clawdis rpc` in the configured project root (`CommandResolver.projectRoot()`, defaults to `~/Projects/clawdis`).
|
|
||||||
- Environment PATH is augmented with repo `node_modules/.bin`, pnpm home, /opt/homebrew/bin, /usr/local/bin.
|
|
||||||
- Process is kept alive; crashes are handled by the app’s RPC helper restarting it.
|
|
||||||
|
|
||||||
## Request/response protocol (newline-delimited JSON)
|
|
||||||
### Requests (stdin)
|
|
||||||
- `{"type":"status"}` → health ping.
|
|
||||||
- `{"type":"send","text":"hi","session":"main","thinking":"low","deliver":false,"to":"+1555..."}` → invokes existing agent send path.
|
|
||||||
- `{"type":"set-heartbeats","enabled":true|false}` → enables/disables web heartbeat timers in the running gateway process.
|
|
||||||
|
|
||||||
### Responses (stdout)
|
|
||||||
- `{"type":"result","ok":true,"payload":{...}}` on success.
|
|
||||||
- `{"type":"error","error":"..."}` on failures or unsupported commands.
|
|
||||||
|
|
||||||
Notes:
|
|
||||||
- `send` reuses the agent JSON payload extraction; `payload.payloads[0].text` carries the text reply when present.
|
|
||||||
- Unknown `type` returns `error`.
|
|
||||||
|
|
||||||
## Heartbeat control (new)
|
|
||||||
- The mac menu exposes “Send heartbeats” toggle (persisted in UserDefaults).
|
|
||||||
- On change, mac sends `set-heartbeats` RPC; the gateway updates an in-memory flag and short-circuits its heartbeat timers (`web-heartbeat` logging + reply heartbeats).
|
|
||||||
- No gateway restart required.
|
|
||||||
|
|
||||||
## Fallbacks / safety
|
|
||||||
- If the RPC process is not running, mac-side RPC calls fail fast and the app logs/clears state; callers may fall back to one-shot CLI where appropriate.
|
|
||||||
- PATH resolution prefers a real `clawdis` binary, otherwise node + repo `bin/clawdis.js`, otherwise pnpm `clawdis`.
|
|
||||||
|
|
||||||
## Future extensions
|
|
||||||
- Add `abort` to cancel in-flight sends.
|
|
||||||
- Add `compact` / `status --verbose` to return gateway internals (queue depth, session info).
|
|
||||||
- Add a JSON schema test for the RPC contract.
|
|
||||||
|
|
@ -10,7 +10,6 @@ import { startGatewayServer } from "../gateway/server.js";
|
||||||
import { danger, info, setVerbose } from "../globals.js";
|
import { danger, info, setVerbose } from "../globals.js";
|
||||||
import { GatewayLockError } from "../infra/gateway-lock.js";
|
import { GatewayLockError } from "../infra/gateway-lock.js";
|
||||||
import { loginWeb, logoutWeb } from "../provider-web.js";
|
import { loginWeb, logoutWeb } from "../provider-web.js";
|
||||||
import { runRpcLoop } from "../rpc/loop.js";
|
|
||||||
import { defaultRuntime } from "../runtime.js";
|
import { defaultRuntime } from "../runtime.js";
|
||||||
import { VERSION } from "../version.js";
|
import { VERSION } from "../version.js";
|
||||||
import { startWebChatServer } from "../webchat/server.js";
|
import { startWebChatServer } from "../webchat/server.js";
|
||||||
|
|
@ -219,22 +218,6 @@ Examples:
|
||||||
});
|
});
|
||||||
|
|
||||||
program
|
program
|
||||||
.command("rpc")
|
|
||||||
.description("Run stdin/stdout JSON RPC loop for agent sends")
|
|
||||||
.action(async () => {
|
|
||||||
// stdout must stay JSON-only for the macOS app's RPC bridge.
|
|
||||||
// Forward all console output to stderr so stray logs (e.g., WhatsApp sender)
|
|
||||||
// don't corrupt the stream the app parses.
|
|
||||||
const forwardToStderr = (...args: unknown[]) => console.error(...args);
|
|
||||||
console.log = forwardToStderr;
|
|
||||||
console.info = forwardToStderr;
|
|
||||||
console.warn = forwardToStderr;
|
|
||||||
console.debug = forwardToStderr;
|
|
||||||
console.trace = forwardToStderr;
|
|
||||||
|
|
||||||
await runRpcLoop({ input: process.stdin, output: process.stdout });
|
|
||||||
await new Promise<never>(() => {});
|
|
||||||
});
|
|
||||||
const gateway = program
|
const gateway = program
|
||||||
.command("gateway")
|
.command("gateway")
|
||||||
.description("Run the WebSocket Gateway")
|
.description("Run the WebSocket Gateway")
|
||||||
|
|
|
||||||
|
|
@ -1,111 +0,0 @@
|
||||||
import { PassThrough } from "node:stream";
|
|
||||||
|
|
||||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
|
||||||
|
|
||||||
import { runRpcLoop } from "./loop.js";
|
|
||||||
|
|
||||||
vi.mock("../commands/health.js", () => ({
|
|
||||||
getHealthSnapshot: vi.fn(async () => ({ heartbeatSeconds: 42 })),
|
|
||||||
}));
|
|
||||||
|
|
||||||
vi.mock("../commands/status.js", () => ({
|
|
||||||
getStatusSummary: vi.fn(async () => ({
|
|
||||||
web: { linked: true, authAgeMs: 0 },
|
|
||||||
heartbeatSeconds: 60,
|
|
||||||
providerSummary: ["ok"],
|
|
||||||
queuedSystemEvents: [],
|
|
||||||
sessions: {
|
|
||||||
path: "/tmp/sessions.json",
|
|
||||||
count: 0,
|
|
||||||
defaults: { model: "claude-opus-4-5", contextTokens: 200_000 },
|
|
||||||
recent: [],
|
|
||||||
},
|
|
||||||
})),
|
|
||||||
}));
|
|
||||||
|
|
||||||
vi.mock("../infra/heartbeat-events.js", () => ({
|
|
||||||
getLastHeartbeatEvent: vi.fn(() => ({ ts: 1, status: "sent" })),
|
|
||||||
onHeartbeatEvent: vi.fn((cb: (p: unknown) => void) => {
|
|
||||||
// return stopper
|
|
||||||
return () => void cb({});
|
|
||||||
}),
|
|
||||||
}));
|
|
||||||
|
|
||||||
vi.mock("../infra/agent-events.js", () => ({
|
|
||||||
onAgentEvent: vi.fn((_cb: (p: unknown) => void) => () => {}),
|
|
||||||
}));
|
|
||||||
|
|
||||||
vi.mock("../infra/system-presence.js", () => ({
|
|
||||||
enqueueSystemEvent: vi.fn(),
|
|
||||||
updateSystemPresence: vi.fn(),
|
|
||||||
listSystemPresence: vi.fn(() => [{ text: "hi" }]),
|
|
||||||
}));
|
|
||||||
|
|
||||||
vi.mock("../commands/agent.js", () => ({
|
|
||||||
agentCommand: vi.fn(
|
|
||||||
async (_opts, runtime: { log: (msg: string) => void }) => {
|
|
||||||
// Emit a fake payload log entry the loop will pick up
|
|
||||||
runtime.log(JSON.stringify({ payloads: [{ text: "ok" }] }));
|
|
||||||
},
|
|
||||||
),
|
|
||||||
}));
|
|
||||||
|
|
||||||
vi.mock("../cli/deps.js", () => ({
|
|
||||||
createDefaultDeps: vi.fn(() => ({})),
|
|
||||||
}));
|
|
||||||
|
|
||||||
describe("runRpcLoop", () => {
|
|
||||||
let input: PassThrough;
|
|
||||||
let output: PassThrough;
|
|
||||||
let lines: unknown[];
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
input = new PassThrough();
|
|
||||||
output = new PassThrough();
|
|
||||||
lines = [];
|
|
||||||
output.on("data", (chunk) => {
|
|
||||||
const str = chunk.toString();
|
|
||||||
for (const line of str.split("\n").filter(Boolean)) {
|
|
||||||
lines.push(JSON.parse(line));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it("responds to control-request health", async () => {
|
|
||||||
const loop = await runRpcLoop({ input, output });
|
|
||||||
input.write('{"type":"control-request","id":"1","method":"health"}\n');
|
|
||||||
await new Promise((r) => setTimeout(r, 50));
|
|
||||||
loop.close();
|
|
||||||
expect(
|
|
||||||
lines.find((l) => l.type === "control-response" && l.id === "1"),
|
|
||||||
).toMatchObject({
|
|
||||||
ok: true,
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it("forwards initial heartbeat event", async () => {
|
|
||||||
const loop = await runRpcLoop({ input, output });
|
|
||||||
await new Promise((r) => setTimeout(r, 20));
|
|
||||||
loop.close();
|
|
||||||
expect(lines[0]).toMatchObject({ type: "event", event: "heartbeat" });
|
|
||||||
});
|
|
||||||
|
|
||||||
it("handles send via agentCommand", async () => {
|
|
||||||
const loop = await runRpcLoop({ input, output });
|
|
||||||
input.write('{"type":"send","text":"hi"}\n');
|
|
||||||
await new Promise((r) => setTimeout(r, 50));
|
|
||||||
loop.close();
|
|
||||||
expect(lines.find((l) => l.type === "result" && l.ok)).toBeTruthy();
|
|
||||||
});
|
|
||||||
|
|
||||||
it("routes system-event", async () => {
|
|
||||||
const loop = await runRpcLoop({ input, output });
|
|
||||||
input.write(
|
|
||||||
'{"type":"control-request","id":"sys","method":"system-event","params":{"text":"ping"}}\n',
|
|
||||||
);
|
|
||||||
await new Promise((r) => setTimeout(r, 50));
|
|
||||||
loop.close();
|
|
||||||
const resp = lines.find((l) => l.id === "sys");
|
|
||||||
expect(resp).toMatchObject({ ok: true, type: "control-response" });
|
|
||||||
});
|
|
||||||
});
|
|
||||||
187
src/rpc/loop.ts
187
src/rpc/loop.ts
|
|
@ -1,187 +0,0 @@
|
||||||
import { createInterface } from "node:readline";
|
|
||||||
import type { Readable, Writable } from "node:stream";
|
|
||||||
|
|
||||||
import { createDefaultDeps } from "../cli/deps.js";
|
|
||||||
import { agentCommand } from "../commands/agent.js";
|
|
||||||
import { getHealthSnapshot, type HealthSummary } from "../commands/health.js";
|
|
||||||
import { getStatusSummary, type StatusSummary } from "../commands/status.js";
|
|
||||||
import { onAgentEvent } from "../infra/agent-events.js";
|
|
||||||
import {
|
|
||||||
getLastHeartbeatEvent,
|
|
||||||
onHeartbeatEvent,
|
|
||||||
} from "../infra/heartbeat-events.js";
|
|
||||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
|
||||||
import {
|
|
||||||
listSystemPresence,
|
|
||||||
updateSystemPresence,
|
|
||||||
} from "../infra/system-presence.js";
|
|
||||||
import { routeLogsToStderr } from "../logging.js";
|
|
||||||
import { setHeartbeatsEnabled } from "../provider-web.js";
|
|
||||||
import type { RuntimeEnv } from "../runtime.js";
|
|
||||||
|
|
||||||
export type RpcLoopHandles = { close: () => void };
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Run the stdin/stdout RPC loop used by `clawdis rpc`.
|
|
||||||
* Exposed for testing and reuse.
|
|
||||||
*/
|
|
||||||
export async function runRpcLoop(io: {
|
|
||||||
input: Readable;
|
|
||||||
output: Writable;
|
|
||||||
}): Promise<RpcLoopHandles> {
|
|
||||||
// Keep stdout reserved for RPC JSON replies; send all other logs to stderr.
|
|
||||||
routeLogsToStderr();
|
|
||||||
|
|
||||||
const rl = createInterface({ input: io.input, crlfDelay: Infinity });
|
|
||||||
|
|
||||||
const respond = (obj: unknown) => {
|
|
||||||
try {
|
|
||||||
io.output.write(`${JSON.stringify(obj)}\n`);
|
|
||||||
} catch (err) {
|
|
||||||
io.output.write(
|
|
||||||
`${JSON.stringify({ type: "error", error: String(err) })}\n`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const forwardHeartbeat = (payload: unknown) => {
|
|
||||||
respond({ type: "event", event: "heartbeat", payload });
|
|
||||||
};
|
|
||||||
const forwardAgent = (payload: unknown) => {
|
|
||||||
respond({ type: "event", event: "agent", payload });
|
|
||||||
};
|
|
||||||
|
|
||||||
const latest = getLastHeartbeatEvent();
|
|
||||||
if (latest) forwardHeartbeat(latest);
|
|
||||||
const stopHeartbeat = onHeartbeatEvent(forwardHeartbeat);
|
|
||||||
const stopAgent = onAgentEvent(forwardAgent);
|
|
||||||
|
|
||||||
rl.on("line", async (line: string) => {
|
|
||||||
if (!line.trim()) return;
|
|
||||||
try {
|
|
||||||
const cmd = JSON.parse(line);
|
|
||||||
if (cmd.type === "status") {
|
|
||||||
respond({ type: "result", ok: true });
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (cmd.type === "set-heartbeats") {
|
|
||||||
setHeartbeatsEnabled(Boolean(cmd.enabled));
|
|
||||||
respond({ type: "result", ok: true });
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (cmd.type === "control-request" && cmd.id && cmd.method) {
|
|
||||||
const id = String(cmd.id);
|
|
||||||
const method = String(cmd.method);
|
|
||||||
const params = (cmd.params ?? {}) as Record<string, unknown>;
|
|
||||||
const controlRespond = (
|
|
||||||
ok: boolean,
|
|
||||||
payload?: unknown,
|
|
||||||
error?: string,
|
|
||||||
) => respond({ type: "control-response", id, ok, payload, error });
|
|
||||||
try {
|
|
||||||
if (method === "health") {
|
|
||||||
const timeoutMs =
|
|
||||||
typeof params.timeoutMs === "number"
|
|
||||||
? params.timeoutMs
|
|
||||||
: undefined;
|
|
||||||
const payload = await getHealthSnapshot(timeoutMs);
|
|
||||||
controlRespond(true, payload satisfies HealthSummary);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (method === "status") {
|
|
||||||
const payload = await getStatusSummary();
|
|
||||||
controlRespond(true, payload satisfies StatusSummary);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (method === "last-heartbeat") {
|
|
||||||
controlRespond(true, getLastHeartbeatEvent());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (method === "set-heartbeats") {
|
|
||||||
setHeartbeatsEnabled(Boolean(params.enabled));
|
|
||||||
controlRespond(true, { ok: true });
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (method === "system-event") {
|
|
||||||
const text = String(params.text ?? "").trim();
|
|
||||||
if (text) {
|
|
||||||
enqueueSystemEvent(text);
|
|
||||||
updateSystemPresence(text);
|
|
||||||
}
|
|
||||||
controlRespond(true, { ok: true });
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (method === "system-presence") {
|
|
||||||
controlRespond(true, listSystemPresence());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
controlRespond(false, undefined, `unknown control method: ${method}`);
|
|
||||||
} catch (err) {
|
|
||||||
controlRespond(false, undefined, String(err));
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (cmd.type !== "send" || !cmd.text) {
|
|
||||||
respond({ type: "error", error: "unsupported command" });
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const logs: string[] = [];
|
|
||||||
const runtime: RuntimeEnv = {
|
|
||||||
log: (msg: string) => logs.push(String(msg)),
|
|
||||||
error: (msg: string) => logs.push(String(msg)),
|
|
||||||
exit: (_code: number): never => {
|
|
||||||
throw new Error("agentCommand requested exit");
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
const opts: {
|
|
||||||
message: string;
|
|
||||||
to?: string;
|
|
||||||
sessionId?: string;
|
|
||||||
thinking?: string;
|
|
||||||
deliver?: boolean;
|
|
||||||
json: boolean;
|
|
||||||
} = {
|
|
||||||
message: String(cmd.text),
|
|
||||||
to: cmd.to ? String(cmd.to) : undefined,
|
|
||||||
sessionId: cmd.session ? String(cmd.session) : undefined,
|
|
||||||
thinking: cmd.thinking ? String(cmd.thinking) : undefined,
|
|
||||||
deliver: Boolean(cmd.deliver),
|
|
||||||
json: true,
|
|
||||||
};
|
|
||||||
|
|
||||||
try {
|
|
||||||
await agentCommand(opts, runtime, createDefaultDeps());
|
|
||||||
const payload = extractPayload(logs);
|
|
||||||
respond({ type: "result", ok: true, payload });
|
|
||||||
} catch (err) {
|
|
||||||
respond({ type: "error", error: String(err) });
|
|
||||||
}
|
|
||||||
} catch (err) {
|
|
||||||
respond({ type: "error", error: `parse error: ${String(err)}` });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
const extractPayload = (logs: string[]) => {
|
|
||||||
for (const entry of logs.slice().reverse()) {
|
|
||||||
try {
|
|
||||||
const parsed = JSON.parse(entry);
|
|
||||||
if (parsed && typeof parsed === "object" && "payloads" in parsed) {
|
|
||||||
return parsed;
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// non-JSON log, ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
};
|
|
||||||
|
|
||||||
const close = () => {
|
|
||||||
stopHeartbeat();
|
|
||||||
stopAgent();
|
|
||||||
rl.close();
|
|
||||||
};
|
|
||||||
|
|
||||||
return { close };
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue