gateway: force ws-only clients

main
Peter Steinberger 2025-12-10 16:27:54 +00:00
parent c2adda1cfe
commit 55772eec5a
9 changed files with 80 additions and 172 deletions

View File

@ -138,29 +138,26 @@ Bot-mode support (grammY only) shares the same `main` session as WhatsApp/WebCha
| Command | Description | | Command | Description |
|---------|-------------| |---------|-------------|
| `clawdis login` | Link WhatsApp Web via QR | | `clawdis login` | Link WhatsApp Web via QR |
| `clawdis send` | Send a message (WhatsApp default; `--provider telegram` for bot mode). Always uses the Gateway WS; `--spawn-gateway` may start it if the port is free. | | `clawdis send` | Send a message (WhatsApp default; `--provider telegram` for bot mode). Always uses the Gateway WS; requires a running gateway. |
| `clawdis agent` | Talk directly to the agent (no WhatsApp send) | | `clawdis agent` | Talk directly to the agent (no WhatsApp send) |
| `clawdis gateway` | Start the Gateway server (WS control plane). Params: `--port`, `--token`, `--force`, `--verbose`. | | `clawdis gateway` | Start the Gateway server (WS control plane). Params: `--port`, `--token`, `--force`, `--verbose`. |
| `clawdis gateway health|status|send|agent|call` | Gateway WS clients; never start the server unless you pass `--spawn-gateway` (and only if no listener exists). | | `clawdis gateway health|status|send|agent|call` | Gateway WS clients; assume a running gateway. |
| `clawdis status` | Web session health + session store summary | | `clawdis status` | Web session health + session store summary |
| `clawdis health` | Reports cached provider state; add `--probe` to force a fresh Baileys connect (may conflict if already connected). | | `clawdis health` | Reports cached provider state from the running gateway. |
| `clawdis heartbeat` | Trigger a heartbeat | | `clawdis heartbeat` | Trigger a heartbeat |
#### Gateway client params (WS only) #### Gateway client params (WS only)
- `--url` (default `ws://127.0.0.1:18789`) - `--url` (default `ws://127.0.0.1:18789`)
- `--token` (shared secret if set on the gateway) - `--token` (shared secret if set on the gateway)
- `--timeout <ms>` (WS call timeout) - `--timeout <ms>` (WS call timeout)
- `--spawn-gateway` (only if no listener is present; explicit opt-in)
#### Send #### Send
- `--provider whatsapp|telegram` (default whatsapp) - `--provider whatsapp|telegram` (default whatsapp)
- `--media <path-or-url>` - `--media <path-or-url>`
- `--json` for machine-readable output - `--json` for machine-readable output
- `--spawn-gateway` to start a local gateway if the port is free and nothing is listening.
#### Health #### Health
- Default: reads gateway/provider state (no extra Baileys socket). - Reads gateway/provider state (no direct Baileys socket from the CLI).
- `--probe` will open a transient Baileys connection; use only when diagnosing login issues (can be rejected if the main session is already connected).
In chat, send `/status` to see if the agent is reachable, how much context the session has used, and the current thinking/verbose toggles—no agent call required. In chat, send `/status` to see if the agent is reachable, how much context the session has used, and the current thinking/verbose toggles—no agent call required.
`/status` also shows whether your WhatsApp web session is linked and how long ago the creds were refreshed so you know when to re-scan the QR. `/status` also shows whether your WhatsApp web session is linked and how long ago the creds were refreshed so you know when to re-scan the QR.

View File

@ -131,7 +131,7 @@ Enable with `systemctl enable --now clawdis-gateway.service`.
- `clawdis gateway send --to <num> --message "hi" [--media-url ...]` — send via Gateway (idempotent). - `clawdis gateway send --to <num> --message "hi" [--media-url ...]` — send via Gateway (idempotent).
- `clawdis gateway agent --message "hi" [--to ...]` — run an agent turn (waits for final by default). - `clawdis gateway agent --message "hi" [--to ...]` — run an agent turn (waits for final by default).
- `clawdis gateway call <method> --params '{"k":"v"}'` — raw method invoker for debugging. - `clawdis gateway call <method> --params '{"k":"v"}'` — raw method invoker for debugging.
- All gateway helpers accept `--spawn-gateway` to start a local gateway if none is listening on `--url`. - Gateway helper subcommands assume a running gateway on `--url`; they no longer auto-spawn one.
## Migration guidance ## Migration guidance
- Retire uses of `clawdis gateway` and the legacy TCP control port. - Retire uses of `clawdis gateway` and the legacy TCP control port.

View File

@ -8,7 +8,7 @@ read_when:
Short guide to verify the WhatsApp Web / Baileys stack without guessing. Short guide to verify the WhatsApp Web / Baileys stack without guessing.
## Quick checks ## Quick checks
- `pnpm clawdis status --json` — confirms creds exist (`web.linked`), shows auth age (`authAgeMs`), heartbeat interval, and where the session store lives. - `pnpm clawdis status --json`via the gateway; confirms creds exist (`web.linked`), shows auth age (`authAgeMs`), heartbeat interval, and where the session store lives.
- Send `/status` in WhatsApp/WebChat to see agent readiness, session context usage, current thinking/verbose options, and when the web creds were last refreshed (relink if it looks stale) without invoking the agent. - Send `/status` in WhatsApp/WebChat to see agent readiness, session context usage, current thinking/verbose options, and when the web creds were last refreshed (relink if it looks stale) without invoking the agent.
- `pnpm clawdis heartbeat --verbose --dry-run` — runs the heartbeat path end-to-end (session resolution, message creation) without sending anything. Drop `--dry-run` or add `--message "Ping"` to actually send. - `pnpm clawdis heartbeat --verbose --dry-run` — runs the heartbeat path end-to-end (session resolution, message creation) without sending anything. Drop `--dry-run` or add `--message "Ping"` to actually send.
- `pnpm clawdis gateway --verbose --heartbeat-now` — spins the full monitor loop, fires a heartbeat immediately, and will reconnect per `web.reconnect` settings. Good for soak testing. - `pnpm clawdis gateway --verbose --heartbeat-now` — spins the full monitor loop, fires a heartbeat immediately, and will reconnect per `web.reconnect` settings. Good for soak testing.
@ -25,4 +25,4 @@ Short guide to verify the WhatsApp Web / Baileys stack without guessing.
- No inbound messages → confirm linked phone is online and sender is allowed; use `pnpm clawdis heartbeat --all --verbose` to test each known recipient. - No inbound messages → confirm linked phone is online and sender is allowed; use `pnpm clawdis heartbeat --all --verbose` to test each known recipient.
## Dedicated "health" command ## Dedicated "health" command
`pnpm clawdis health --json` runs a connect-only probe (no sends) and reports: linked creds, auth age, Baileys connect result/status code, session-store summary, and a probe duration. It exits non-zero if not linked or if the connect fails/timeouts. Use `--timeout <ms>` to override the 10s default. `pnpm clawdis health --json` asks the running gateway for its health snapshot (no direct Baileys socket from the CLI). It reports linked creds, auth age, Baileys connect result/status code, session-store summary, and a probe duration. It exits non-zero if not linked or if the gateway probe fails/timeouts. Use `--timeout <ms>` to override the 10s default.

View File

@ -14,9 +14,9 @@ import { defaultRuntime } from "../runtime.js";
import { VERSION } from "../version.js"; import { VERSION } from "../version.js";
import { startWebChatServer } from "../webchat/server.js"; import { startWebChatServer } from "../webchat/server.js";
import { createDefaultDeps } from "./deps.js"; import { createDefaultDeps } from "./deps.js";
import { forceFreePort, listPortListeners, parseLsofOutput } from "./ports.js"; import { forceFreePort } from "./ports.js";
export { forceFreePort, listPortListeners, parseLsofOutput }; export { forceFreePort };
export function buildProgram() { export function buildProgram() {
const program = new Command(); const program = new Command();
@ -136,11 +136,6 @@ export function buildProgram() {
"--provider <provider>", "--provider <provider>",
"Delivery provider: whatsapp|telegram (default: whatsapp)", "Delivery provider: whatsapp|telegram (default: whatsapp)",
) )
.option(
"--spawn-gateway",
"Start a local gateway on 127.0.0.1:18789 if none is running",
false,
)
.option("--dry-run", "Print payload and skip sending", false) .option("--dry-run", "Print payload and skip sending", false)
.option("--json", "Output result as JSON", false) .option("--json", "Output result as JSON", false)
.option("--verbose", "Verbose logging", false) .option("--verbose", "Verbose logging", false)
@ -280,50 +275,23 @@ Examples:
.option("--url <url>", "Gateway WebSocket URL", "ws://127.0.0.1:18789") .option("--url <url>", "Gateway WebSocket URL", "ws://127.0.0.1:18789")
.option("--token <token>", "Gateway token (if required)") .option("--token <token>", "Gateway token (if required)")
.option("--timeout <ms>", "Timeout in ms", "10000") .option("--timeout <ms>", "Timeout in ms", "10000")
.option("--expect-final", "Wait for final response (agent)", false) .option("--expect-final", "Wait for final response (agent)", false);
.option(
"--spawn-gateway",
"Start a local gateway if none is listening on --url",
false,
);
const callWithSpawn = async ( const callGatewayCli = async (
method: string, method: string,
opts: { opts: { url?: string; token?: string; timeout?: string; expectFinal?: boolean },
url?: string;
token?: string;
timeout?: string;
expectFinal?: boolean;
spawnGateway?: boolean;
},
params?: unknown, params?: unknown,
) => { ) =>
const timeoutMs = Number(opts.timeout ?? 10_000); callGateway({
const attempt = async () => url: opts.url,
callGateway({ token: opts.token,
url: opts.url, method,
token: opts.token, params,
method, expectFinal: Boolean(opts.expectFinal),
params, timeoutMs: Number(opts.timeout ?? 10_000),
expectFinal: Boolean(opts.expectFinal), clientName: "cli",
timeoutMs, mode: "cli",
clientName: "cli", });
mode: "cli",
});
try {
return await attempt();
} catch (err) {
if (!opts.spawnGateway) throw err;
// Only spawn if there is clearly no listener.
const url = new URL(opts.url ?? "ws://127.0.0.1:18789");
const port = Number(url.port || 18789);
const listeners = listPortListeners(port);
if (listeners.length > 0) throw err;
await startGatewayServer(port);
return await attempt();
}
};
gatewayCallOpts( gatewayCallOpts(
gateway gateway
@ -337,7 +305,7 @@ Examples:
.action(async (method, opts) => { .action(async (method, opts) => {
try { try {
const params = JSON.parse(String(opts.params ?? "{}")); const params = JSON.parse(String(opts.params ?? "{}"));
const result = await callWithSpawn(method, opts, params); const result = await callGatewayCli(method, opts, params);
defaultRuntime.log(JSON.stringify(result, null, 2)); defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) { } catch (err) {
defaultRuntime.error(`Gateway call failed: ${String(err)}`); defaultRuntime.error(`Gateway call failed: ${String(err)}`);
@ -352,7 +320,7 @@ Examples:
.description("Fetch Gateway health") .description("Fetch Gateway health")
.action(async (opts) => { .action(async (opts) => {
try { try {
const result = await callWithSpawn("health", opts); const result = await callGatewayCli("health", opts);
defaultRuntime.log(JSON.stringify(result, null, 2)); defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) { } catch (err) {
defaultRuntime.error(String(err)); defaultRuntime.error(String(err));
@ -367,7 +335,7 @@ Examples:
.description("Fetch Gateway status") .description("Fetch Gateway status")
.action(async (opts) => { .action(async (opts) => {
try { try {
const result = await callWithSpawn("status", opts); const result = await callGatewayCli("status", opts);
defaultRuntime.log(JSON.stringify(result, null, 2)); defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) { } catch (err) {
defaultRuntime.error(String(err)); defaultRuntime.error(String(err));
@ -387,7 +355,7 @@ Examples:
.action(async (opts) => { .action(async (opts) => {
try { try {
const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey(); const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey();
const result = await callWithSpawn("send", opts, { const result = await callGatewayCli("send", opts, {
to: opts.to, to: opts.to,
message: opts.message, message: opts.message,
mediaUrl: opts.mediaUrl, mediaUrl: opts.mediaUrl,
@ -415,7 +383,7 @@ Examples:
.action(async (opts) => { .action(async (opts) => {
try { try {
const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey(); const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey();
const result = await callWithSpawn( const result = await callGatewayCli(
"agent", "agent",
{ ...opts, expectFinal: true }, { ...opts, expectFinal: true },
{ {
@ -482,17 +450,10 @@ Examples:
program program
.command("health") .command("health")
.description( .description("Fetch health from the running gateway")
"Probe WhatsApp Web health (creds + Baileys connect) and session store",
)
.option("--json", "Output JSON instead of text", false) .option("--json", "Output JSON instead of text", false)
.option("--timeout <ms>", "Connection timeout in milliseconds", "10000") .option("--timeout <ms>", "Connection timeout in milliseconds", "10000")
.option("--verbose", "Verbose logging", false) .option("--verbose", "Verbose logging", false)
.option(
"--probe",
"Also attempt a live Baileys connect (can conflict if gateway is already connected)",
true,
)
.action(async (opts) => { .action(async (opts) => {
setVerbose(Boolean(opts.verbose)); setVerbose(Boolean(opts.verbose));
const timeout = opts.timeout const timeout = opts.timeout
@ -510,7 +471,6 @@ Examples:
{ {
json: Boolean(opts.json), json: Boolean(opts.json),
timeoutMs: timeout, timeoutMs: timeout,
probe: opts.probe ?? true,
}, },
defaultRuntime, defaultRuntime,
); );

View File

@ -12,6 +12,7 @@ import {
waitForWaConnection, waitForWaConnection,
webAuthExists, webAuthExists,
} from "../web/session.js"; } from "../web/session.js";
import { callGateway } from "../gateway/call.js";
type HealthConnect = { type HealthConnect = {
ok: boolean; ok: boolean;
@ -236,12 +237,13 @@ export async function getHealthSnapshot(
} }
export async function healthCommand( export async function healthCommand(
opts: { json?: boolean; timeoutMs?: number; probe?: boolean }, opts: { json?: boolean; timeoutMs?: number },
runtime: RuntimeEnv, runtime: RuntimeEnv,
) { ) {
const probe = opts.probe ?? true; // Always query the running gateway; do not open a direct Baileys socket here.
const summary = await getHealthSnapshot(opts.timeoutMs, { const summary = await callGateway<HealthSummary>({
probe, method: "health",
timeoutMs: opts.timeoutMs,
}); });
const fatal = const fatal =
!summary.web.linked || !summary.web.linked ||

View File

@ -1,7 +1,5 @@
import type { CliDeps } from "../cli/deps.js"; import type { CliDeps } from "../cli/deps.js";
import { listPortListeners } from "../cli/ports.js";
import { callGateway, randomIdempotencyKey } from "../gateway/call.js"; import { callGateway, randomIdempotencyKey } from "../gateway/call.js";
import { startGatewayServer } from "../gateway/server.js";
import { success } from "../globals.js"; import { success } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js"; import type { RuntimeEnv } from "../runtime.js";
@ -13,7 +11,6 @@ export async function sendCommand(
json?: boolean; json?: boolean;
dryRun?: boolean; dryRun?: boolean;
media?: string; media?: string;
spawnGateway?: boolean;
}, },
deps: CliDeps, deps: CliDeps,
runtime: RuntimeEnv, runtime: RuntimeEnv,
@ -74,21 +71,7 @@ export async function sendCommand(
mode: "cli", mode: "cli",
}); });
let result: { messageId: string } | undefined; const result = await sendViaGateway();
try {
result = await sendViaGateway();
} catch (err) {
if (!opts.spawnGateway) throw err;
// Only spawn when nothing is listening.
try {
const listeners = listPortListeners(18789);
if (listeners.length > 0) throw err;
await startGatewayServer(18789);
result = await sendViaGateway();
} catch {
throw err;
}
}
runtime.log( runtime.log(
success( success(

View File

@ -10,13 +10,14 @@ import { info } from "../globals.js";
import { buildProviderSummary } from "../infra/provider-summary.js"; import { buildProviderSummary } from "../infra/provider-summary.js";
import { peekSystemEvents } from "../infra/system-events.js"; import { peekSystemEvents } from "../infra/system-events.js";
import type { RuntimeEnv } from "../runtime.js"; import type { RuntimeEnv } from "../runtime.js";
import { callGateway } from "../gateway/call.js";
import { resolveHeartbeatSeconds } from "../web/reconnect.js"; import { resolveHeartbeatSeconds } from "../web/reconnect.js";
import { import {
getWebAuthAgeMs, getWebAuthAgeMs,
logWebSelfId, logWebSelfId,
webAuthExists, webAuthExists,
} from "../web/session.js"; } from "../web/session.js";
import { getHealthSnapshot, type HealthSummary } from "./health.js"; import type { HealthSummary } from "./health.js";
export type SessionStatus = { export type SessionStatus = {
key: string; key: string;
@ -193,7 +194,10 @@ export async function statusCommand(
) { ) {
const summary = await getStatusSummary(); const summary = await getStatusSummary();
const health: HealthSummary | undefined = opts.deep const health: HealthSummary | undefined = opts.deep
? await getHealthSnapshot(opts.timeoutMs) ? await callGateway<HealthSummary>({
method: "health",
timeoutMs: opts.timeoutMs,
})
: undefined; : undefined;
if (opts.json) { if (opts.json) {

View File

@ -3,6 +3,7 @@ import os from "node:os";
import path from "node:path"; import path from "node:path";
import { flockSync } from "fs-ext"; import { flockSync } from "fs-ext";
import { getLogger } from "../logging.js";
const defaultLockPath = () => const defaultLockPath = () =>
process.env.CLAWDIS_GATEWAY_LOCK_PATH ?? process.env.CLAWDIS_GATEWAY_LOCK_PATH ??
@ -43,6 +44,7 @@ export async function acquireGatewayLock(
fs.ftruncateSync(fd, 0); fs.ftruncateSync(fd, 0);
fs.writeSync(fd, `${process.pid}\n`, 0, "utf8"); fs.writeSync(fd, `${process.pid}\n`, 0, "utf8");
fs.fsyncSync(fd); fs.fsyncSync(fd);
getLogger().info({ pid: process.pid, lockPath }, "gateway lock acquired");
let released = false; let released = false;
const release = async (): Promise<void> => { const release = async (): Promise<void> => {

View File

@ -1,26 +1,24 @@
import { randomUUID } from "node:crypto"; import { randomUUID } from "node:crypto";
import type { AnyMessageContent } from "@whiskeysockets/baileys";
import { logVerbose } from "../globals.js";
import { logInfo } from "../logger.js"; import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js"; import { getChildLogger } from "../logging.js";
import { toWhatsappJid } from "../utils.js"; import { toWhatsappJid } from "../utils.js";
import { getActiveWebListener } from "./active-listener.js"; import { getActiveWebListener } from "./active-listener.js";
import { loadWebMedia } from "./media.js"; import { loadWebMedia } from "./media.js";
import { createWaSocket, waitForWaConnection } from "./session.js";
export async function sendMessageWhatsApp( export async function sendMessageWhatsApp(
to: string, to: string,
body: string, body: string,
options: { verbose: boolean; mediaUrl?: string }, options: { verbose: boolean; mediaUrl?: string },
): Promise<{ messageId: string; toJid: string }> { ): Promise<{ messageId: string; toJid: string }> {
let text = body;
const correlationId = randomUUID(); const correlationId = randomUUID();
const active = getActiveWebListener(); const active = getActiveWebListener();
const usingActive = Boolean(active); if (!active) {
const sock = usingActive throw new Error(
? null "No active gateway listener. Start the gateway before sending WhatsApp messages.",
: await createWaSocket(false, options.verbose); );
}
const logger = getChildLogger({ const logger = getChildLogger({
module: "web-outbound", module: "web-outbound",
correlationId, correlationId,
@ -28,54 +26,25 @@ export async function sendMessageWhatsApp(
}); });
try { try {
const jid = toWhatsappJid(to); const jid = toWhatsappJid(to);
if (!usingActive) { let mediaBuffer: Buffer | undefined;
logInfo("🔌 Connecting to WhatsApp Web…"); let mediaType: string | undefined;
logger.info("connecting to whatsapp web");
if (!sock) {
throw new Error("WhatsApp socket unavailable");
}
await waitForWaConnection(sock);
try {
await sock.sendPresenceUpdate("composing", jid);
} catch (err) {
logVerbose(`Presence update skipped: ${String(err)}`);
}
}
let payload: AnyMessageContent = { text: body };
if (options.mediaUrl) { if (options.mediaUrl) {
const media = await loadWebMedia(options.mediaUrl); const media = await loadWebMedia(options.mediaUrl);
const caption = body || undefined; const caption = text || undefined;
mediaBuffer = media.buffer;
mediaType = media.contentType;
if (media.kind === "audio") { if (media.kind === "audio") {
// WhatsApp expects explicit opus codec for PTT voice notes. // WhatsApp expects explicit opus codec for PTT voice notes.
const mimetype = mediaType =
media.contentType === "audio/ogg" media.contentType === "audio/ogg"
? "audio/ogg; codecs=opus" ? "audio/ogg; codecs=opus"
: (media.contentType ?? "application/octet-stream"); : media.contentType ?? "application/octet-stream";
payload = { audio: media.buffer, ptt: true, mimetype };
} else if (media.kind === "video") { } else if (media.kind === "video") {
const mimetype = media.contentType ?? "application/octet-stream"; text = caption ?? "";
payload = {
video: media.buffer,
caption,
mimetype,
};
} else if (media.kind === "image") { } else if (media.kind === "image") {
const mimetype = media.contentType ?? "application/octet-stream"; text = caption ?? "";
payload = {
image: media.buffer,
caption,
mimetype,
};
} else { } else {
// Fallback to document for anything else (pdf, etc.). text = caption ?? "";
const fileName = media.fileName ?? "file";
const mimetype = media.contentType ?? "application/octet-stream";
payload = {
document: media.buffer,
fileName,
caption,
mimetype,
};
} }
} }
logInfo( logInfo(
@ -85,39 +54,30 @@ export async function sendMessageWhatsApp(
{ jid, hasMedia: Boolean(options.mediaUrl) }, { jid, hasMedia: Boolean(options.mediaUrl) },
"sending message", "sending message",
); );
const result = usingActive const result = await (async () => {
? await (async () => { if (!active) throw new Error("Active web listener missing");
if (!active) throw new Error("Active web listener missing"); let mediaBuffer: Buffer | undefined;
let mediaBuffer: Buffer | undefined; let mediaType: string | undefined;
let mediaType: string | undefined; if (options.mediaUrl) {
if (options.mediaUrl) { const media = await loadWebMedia(options.mediaUrl);
const media = await loadWebMedia(options.mediaUrl); mediaBuffer = media.buffer;
mediaBuffer = media.buffer; mediaType = media.contentType;
mediaType = media.contentType; }
} await active.sendComposingTo(to);
await active.sendComposingTo(to); return active.sendMessage(to, text, mediaBuffer, mediaType);
return active.sendMessage(to, body, mediaBuffer, mediaType); })();
})() const messageId =
: await (async () => { (result as { messageId?: string })?.messageId ?? "unknown";
if (!sock) throw new Error("WhatsApp socket unavailable");
return sock.sendMessage(jid, payload);
})();
const messageId = usingActive
? ((result as { messageId?: string })?.messageId ?? "unknown")
: ((result as { key?: { id?: string } } | undefined)?.key?.id ??
"unknown");
logInfo( logInfo(
`✅ Sent via web session. Message ID: ${messageId} -> ${jid}${options.mediaUrl ? " (media)" : ""}`, `✅ Sent via web session. Message ID: ${messageId} -> ${jid}${options.mediaUrl ? " (media)" : ""}`,
); );
logger.info({ jid, messageId }, "sent message"); logger.info({ jid, messageId }, "sent message");
return { messageId, toJid: jid }; return { messageId, toJid: jid };
} finally { } catch (err) {
if (!usingActive) { logger.error(
try { { err: String(err), to, hasMedia: Boolean(options.mediaUrl) },
sock?.ws?.close(); "failed to send via web session",
} catch (err) { );
logVerbose(`Socket close failed: ${String(err)}`); throw err;
}
}
} }
} }