Cron: add scheduler, wakeups, and run history

main
Peter Steinberger 2025-12-13 02:34:11 +00:00
parent 572d17f46b
commit f9409cbe43
26 changed files with 3401 additions and 342 deletions

View File

@ -112,6 +112,30 @@ Array of E.164 phone numbers allowed to trigger the AI. Use `["*"]` to allow eve
> Quick start: If you omit `inbound.reply`, CLAWDIS falls back to the bundled `@mariozechner/pi-coding-agent` with `--mode rpc`, per-sender sessions, and a 200k-token window. No extra install or config needed to get a reply. > Quick start: If you omit `inbound.reply`, CLAWDIS falls back to the bundled `@mariozechner/pi-coding-agent` with `--mode rpc`, per-sender sessions, and a 200k-token window. No extra install or config needed to get a reply.
### `cron`
Cron is a Gateway-owned scheduler for wakeups and scheduled jobs. See `docs/cron.md` for the full RFC and CLI examples.
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `enabled` | boolean | `false` | Enable the cron scheduler inside the Gateway |
| `store` | string | *(auto)* | Override the cron job store path (defaults to `~/.clawdis/cron/jobs.json` if present, otherwise `~/.clawdis/cron.json`) |
| `maxConcurrentRuns` | number | `1` | Max concurrent isolated cron runs (command-queue lane `"cron"`) |
Run history:
- The Gateway appends a JSONL run ledger on each job completion (see `docs/cron.md`). Location is derived from `cron.store` / the resolved store path.
Example:
```json5
{
cron: {
enabled: true,
maxConcurrentRuns: 2
}
}
```
### Template Variables ### Template Variables
Use these in your command: Use these in your command:

364
docs/cron.md Normal file
View File

@ -0,0 +1,364 @@
---
summary: "RFC: Cron jobs + wakeups for Clawd/Clawdis (main vs isolated sessions)"
read_when:
- Designing scheduled jobs, alarms, or wakeups
- Adding Gateway methods or CLI commands for automation
- Adjusting heartbeat behavior or session routing
---
# RFC: Cron jobs + wakeups for Clawd
Status: Draft
Last updated: 2025-12-13
## Context
Clawdis already has:
- A **periodic reply heartbeat** that runs the agent with `HEARTBEAT /think:high` and suppresses `HEARTBEAT_OK` (`src/web/auto-reply.ts`).
- A lightweight, in-memory **system event queue** (`enqueueSystemEvent`) that is injected into the next **main session** turn (`drainSystemEvents` in `src/auto-reply/reply.ts`).
- A WebSocket **Gateway** daemon that is intended to be always-on (`docs/gateway.md`).
This RFC adds a small “cron job system” so Clawd can schedule future work and reliably wake itself up:
- **Delayed**: run on the *next* normal heartbeat tick
- **Immediate**: run *now* (trigger a heartbeat immediately)
- **Isolated jobs**: optionally run in their own session that does not pollute the main session and can run concurrently (within configured limits).
## Goals
- Provide a **persistent job store** and an **in-process scheduler** owned by the Gateway.
- Allow each job to target either:
- `sessionTarget: "main"`: inject as `System:` lines and rely on the main heartbeat (or trigger it immediately).
- `sessionTarget: "isolated"`: run an agent turn in a dedicated session key (job session), optionally delivering a message and/or posting a summary back to main.
- Expose a stable control surface:
- **Gateway methods** (`cron.*`, `wake`) for programmatic usage (mac app, CLI, agents).
- **CLI commands** (`clawdis cron ...`) to add/remove/edit/list and to debug `run`.
- Produce clear, structured **logs** for job lifecycle and execution outcomes.
## Non-goals (v1)
- Multi-host distributed scheduling.
- Exactly-once semantics across crashes (we aim for “at-least-once with idempotency hooks”).
- A full Unix-cron parser as the only schedule format (we can support it, but v1 should not require complex cron features to be useful).
## Terminology
- **Wake**: a request to ensure the agent gets a turn soon (either right now or next heartbeat).
- **Main session**: the canonical session bucket (default key `"main"`) that receives `System:` events.
- **Isolated session**: a per-job session key (e.g. `cron:<jobId>`) with its own session id / session file.
## User stories
- “Remind me in 20 minutes” → add a one-shot job that triggers an immediate heartbeat at T+20m.
- “Every weekday at 7:30, wake me up and start music” → recurring job, isolated session, deliver to WhatsApp.
- “Every hour, check battery; only interrupt me if < 20% isolated job that decides whether to deliver; may also post a brief status to main.
- “Next heartbeat, please check calendar” → delayed wake targeting main session.
## Job model
### Storage schema (v1)
Each job is a JSON object with stable keys (unknown keys ignored for forward compatibility):
- `id: string` (UUID)
- `name?: string`
- `enabled: boolean`
- `createdAtMs: number`
- `updatedAtMs: number`
- `schedule` (one of)
- `{"kind":"at","atMs":number}` (one-shot)
- `{"kind":"every","everyMs":number,"anchorMs"?:number}` (simple interval)
- `{"kind":"cron","expr":string,"tz"?:string}` (optional; see “Schedule parsing”)
- `sessionTarget: "main" | "isolated"`
- `wakeMode: "next-heartbeat" | "now"`
- For `sessionTarget:"isolated"`, `wakeMode:"now"` means “run immediately when due”.
- For `sessionTarget:"main"`, `wakeMode` controls whether we trigger the heartbeat immediately or just enqueue and wait.
- `payload` (one of)
- `{"kind":"systemEvent","text":string}` (enqueue as `System:`)
- `{"kind":"agentTurn","message":string,"deliver"?:boolean,"channel"?: "last"|"whatsapp"|"telegram","to"?:string,"timeoutSeconds"?:number}`
- `isolation` (optional; only meaningful for isolated jobs)
- `{"postToMain": boolean, "postToMainPrefix"?: string}`
- `runtime` (optional)
- `{"maxAttempts"?:number,"retryBackoffMs"?:number}` (best-effort retries; defaults off)
- `state` (runtime-maintained)
- `{"nextRunAtMs":number,"lastRunAtMs"?:number,"lastStatus"?: "ok"|"error"|"skipped","lastError"?:string,"lastDurationMs"?:number}`
### Key behavior
- `sessionTarget:"main"` jobs always enqueue `payload.kind:"systemEvent"` (directly or derived from `agentTurn` results; see below).
- `sessionTarget:"isolated"` jobs create/use a stable session key: `cron:<jobId>`.
## Storage location
We can store this directly under `~/.clawdis` without a subfolder, but a folder gives us room for future artifacts (per-job state, migration backups, run history).
Current behavior (v1):
- Default store: `~/.clawdis/cron.json`
- If `~/.clawdis/cron/jobs.json` exists, it is preferred (and is a good location for future per-cron artifacts).
- Any path can be forced via `cron.store` in config.
The scheduler should never require additional configuration for the base directory (Clawdis already treats `~/.clawdis` as fixed).
## Enabling
Cron execution should be opt-in via config:
```json5
{
cron: {
enabled: true,
// optional:
store: "~/.clawdis/cron.json",
maxConcurrentRuns: 1
}
}
```
## Scheduler design
### Ownership
The Gateway owns:
- the scheduler timer,
- job store reads/writes,
- job execution (enqueue system events and/or agent turns).
This keeps scheduling unified with the always-on process and prevents “two schedulers” when multiple CLIs run.
### Timer strategy
- Maintain an in-memory heap/array of enabled jobs keyed by `state.nextRunAtMs`.
- Use a **single `setTimeout`** to wake at the earliest next run.
- On wake:
- compute all due jobs (now >= nextRunAtMs),
- mark them “in flight” (in memory),
- persist updated `state` (at least bump `nextRunAtMs` / `lastRunAtMs`) before starting execution to minimize duplicate runs on crash,
- execute jobs (with concurrency limits),
- persist final `lastStatus/lastError/lastDurationMs`,
- re-arm timer for the next earliest run.
### Schedule parsing
V1 can ship with `at` + `every` without extra deps.
If we add `"kind":"cron"`:
- Use a well-maintained parser (we use `croner`) and support:
- 5-field cron (`min hour dom mon dow`) at minimum
- optional `tz`
- Store `nextRunAtMs` computed by the parser; re-compute after each run.
## Execution semantics
### Main session jobs
Main session jobs do not run the agent directly by default.
When due:
1) `enqueueSystemEvent(job.payload.text)` (or a derived message)
2) If `wakeMode:"now"`, trigger an immediate heartbeat run (see “Heartbeat wake hook”).
3) Otherwise do nothing else (the next scheduled heartbeat will pick up the system event).
Why: This keeps the main sessions “proactive” behavior centralized in the heartbeat rules and avoids ad-hoc agent turns that might fight with inbound message processing.
### Isolated session jobs
Isolated jobs run an agent turn in a dedicated session key, intended to be separate from main.
When due:
- Build a message body that includes schedule metadata, e.g.:
- `"[cron:<jobId>] <job.name>: <payload.message>"`
- Execute via the same agent runner path as other command-mode runs, but pinned to:
- `sessionKey = cron:<jobId>`
- `sessionId = store[sessionKey].sessionId` (create if missing)
- Optionally deliver output (`payload.deliver === true`) to the configured channel/to.
- If `isolation.postToMain` is true, enqueue a summary system event to main, e.g.:
- `System: Cron "<name>" completed: <1-line summary>`
### “Run in parallel to main”
Clawdis currently serializes command execution through a global in-process queue (`src/process/command-queue.ts`) to avoid collisions.
To support isolated cron jobs running “in parallel”, we should introduce **lanes** (keyed queues) plus a global concurrency cap:
- Lane `"main"`: inbound auto-replies + main heartbeat.
- Lane `"cron"` (or `cron:<jobId>`): isolated jobs.
- Configurable `cron.maxConcurrentRuns` (default 1 or 2).
This yields:
- isolated jobs can overlap with the main lane (up to cap),
- each lane still preserves ordering for its own work (optional),
- we retain safety knobs to prevent runaway resource contention.
## Heartbeat wake hook (immediate vs next heartbeat)
We need a way for the Gateway (or the scheduler) to request an immediate heartbeat without duplicating heartbeat logic.
Design:
- `monitorWebProvider` owns the real `runReplyHeartbeat()` function (it already has all the local state needed).
- Add a small global hook module:
- `setReplyHeartbeatWakeHandler(fn | null)` installed by `monitorWebProvider`
- `requestReplyHeartbeatNow({ reason, coalesceMs? })`
- If the handler is absent (provider not connected), the request is stored as “pending”; the next time the handler is installed, it runs once.
- Coalesce rapid calls and respect the existing “skip when queue busy” behavior (prefer retrying soon vs dropping).
## Run history log (JSONL)
In addition to normal structured logs, the Gateway writes an append-only run history “ledger” (JSONL) whenever a job finishes. This is intended for quick debugging (“did the job run, when, and what happened?”).
Path rules:
- If the cron store path basename is `jobs.json` (e.g. `~/.clawdis/cron/jobs.json`), logs go to `.../runs/<jobId>.jsonl` (e.g. `~/.clawdis/cron/runs/<jobId>.jsonl`).
- Otherwise logs go to `<storeBase>.runs.jsonl` in the same directory (e.g. `~/.clawdis/cron.json``~/.clawdis/cron.runs.jsonl`).
Retention:
- Best-effort pruning when the file grows beyond ~2MB; keep the newest ~2000 lines.
## Gateway API
New methods (names can be bikeshed; `cron.*` is suggested):
- `wake`
- params: `{ mode: "now" | "next-heartbeat", text: string }`
- effect: `enqueueSystemEvent(text)`, plus optional immediate heartbeat trigger
- `cron.list`
- params: optional `{ includeDisabled?: boolean }`
- returns: `{ jobs: CronJob[] }`
- `cron.add`
- params: job payload without `id/state` (server generates and returns created job)
- `cron.update`
- params: `{ id: string, patch: Partial<CronJobWritableFields> }`
- `cron.remove`
- params: `{ id: string }`
- `cron.run`
- params: `{ id: string, mode?: "due" | "force" }` (debugging; does not change schedule unless `force` requires it)
- `cron.runs`
- params: `{ id?: string, limit?: number }`
- returns: `{ entries: CronRunLogEntry[] }`
- note: if the store layout is `.../jobs.json`, `id` is required (runs are stored per-job).
The Gateway should broadcast a `cron` event for UI/debug:
- event: `cron`
- payload: `{ jobId, action: "added"|"updated"|"removed"|"started"|"finished", status?, error?, nextRunAtMs? }`
## CLI surface
Add a `cron` command group (all commands should also support `--json` where sensible):
- `clawdis cron list [--json] [--all]`
- `clawdis cron add ...`
- schedule flags:
- `--at <iso8601|ms|relative>` (one-shot)
- `--every <duration>` (e.g. `10m`, `1h`)
- `--cron "<expr>" [--tz "<tz>"]`
- target flags:
- `--session main|isolated`
- `--wake now|next`
- payload flags (choose one):
- `--system-event "<text>"`
- `--message "<agent message>" [--deliver] [--channel last|whatsapp|telegram] [--to <dest>]`
- `clawdis cron edit <id> ...` (patch-by-flags, non-interactive)
- `clawdis cron rm <id>`
- `clawdis cron enable <id>` / `clawdis cron disable <id>`
- `clawdis cron run <id> [--force]` (debug)
Additionally:
- `clawdis wake --mode now|next --text "<text>"` as a thin wrapper around `wake` for agents to call.
## Examples
### Run once at a specific time
One-shot reminder that targets the main session and triggers a heartbeat immediately at the scheduled time:
```bash
clawdis cron add \
--at "2025-12-14T07:00:00-08:00" \
--session main \
--wake now \
--system-event "Alarm: wake up (meeting in 30 minutes)."
```
### Run daily (calendar-accurate)
Daily at 07:00 in a specific timezone (preferred over “every 24h” to avoid DST drift):
```bash
clawdis cron add \
--cron "0 7 * * *" \
--tz "America/Los_Angeles" \
--session isolated \
--wake now \
--message "Daily check: scan calendar + inbox; deliver only if urgent." \
--deliver \
--channel last
```
### Run weekly (every Wednesday)
Every Wednesday at 09:00:
```bash
clawdis cron add \
--cron "0 9 * * 3" \
--tz "America/Los_Angeles" \
--session isolated \
--wake now \
--message "Weekly: summarize status and remind me of goals." \
--deliver \
--channel last
```
### “Next heartbeat”
Enqueue a note for the main session but let the existing heartbeat cadence pick it up:
```bash
clawdis wake --mode next --text "Next heartbeat: check battery + upcoming meetings."
```
## Logging & observability
Logging requirements:
- Use `getChildLogger({ module: "cron", jobId, runId, name })` for every run.
- Log lifecycle:
- store load/save (debug; include job count)
- schedule recompute (debug; include nextRunAt)
- job start/end (info)
- job skipped (info; include reason)
- job error (warn; include error + stack where available)
- Emit a concise user-facing line to stdout when running in CLI mode (similar to heartbeat logs).
Suggested log events:
- `cron: scheduler started` (jobCount, nextWakeAt)
- `cron: job started` (jobId, scheduleKind, sessionTarget, wakeMode)
- `cron: job finished` (status, durationMs, nextRunAtMs)
## Safety & security
- Respect existing allowlists/routing rules: delivery defaults should not send to arbitrary destinations unless explicitly configured.
- Provide a global “kill switch”:
- `cron.enabled: boolean` config default true (or false until enabled).
- `gateway method set-heartbeats` already exists; cron should have similar.
- Avoid persistence of sensitive payloads unless requested; job text may contain private content.
## Testing plan (v1)
- Unit tests:
- schedule computation for `at` and `every`
- job store read/write + migration behavior
- lane concurrency: main vs cron overlap is bounded
- “wake now” coalescing and pending behavior when provider not ready
- Integration tests:
- start Gateway with `CLAWDIS_SKIP_PROVIDERS=1`, add jobs, list/edit/remove
- simulate due jobs and assert `enqueueSystemEvent` called + cron events broadcast
## Rollout plan
1) Add the `wake` primitive + heartbeat wake hook (no persistent jobs yet).
2) Add `cron.*` API and CLI wrappers with `at` + `every`.
3) Add optional cron expression parsing (`kind:"cron"`) if needed.
4) Add UI surfacing in WebChat/macOS app (optional).

View File

@ -19,7 +19,8 @@ We now serialize all command-based auto-replies (WhatsApp Web listener) through
## Scope and guarantees ## Scope and guarantees
- Applies only to config-driven command replies; plain text replies are unaffected. - Applies only to config-driven command replies; plain text replies are unaffected.
- Queue is process-wide, so the web inbox listener (and any future entrypoints) all respect the same lock. - Default lane (`main`) is process-wide for inbound + main heartbeats to keep the primary workflow serialized.
- Additional lanes may exist (e.g. `cron`) so background jobs can run in parallel without blocking inbound replies.
- No external dependencies or background worker threads; pure TypeScript + promises. - No external dependencies or background worker threads; pure TypeScript + promises.
## Troubleshooting ## Troubleshooting

View File

@ -43,6 +43,7 @@
"body-parser": "^2.2.1", "body-parser": "^2.2.1",
"chalk": "^5.6.2", "chalk": "^5.6.2",
"commander": "^14.0.2", "commander": "^14.0.2",
"croner": "^9.1.0",
"detect-libc": "^2.1.2", "detect-libc": "^2.1.2",
"dotenv": "^17.2.3", "dotenv": "^17.2.3",
"express": "^5.2.1", "express": "^5.2.1",

414
src/cli/cron-cli.ts Normal file
View File

@ -0,0 +1,414 @@
import type { Command } from "commander";
import { danger } from "../globals.js";
import { defaultRuntime } from "../runtime.js";
import { addGatewayClientOptions, callGatewayFromCli } from "./gateway-rpc.js";
function parseDurationMs(input: string): number | null {
const raw = input.trim();
if (!raw) return null;
const match = raw.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/i);
if (!match) return null;
const n = Number.parseFloat(match[1] ?? "");
if (!Number.isFinite(n) || n <= 0) return null;
const unit = (match[2] ?? "").toLowerCase();
const factor =
unit === "ms"
? 1
: unit === "s"
? 1000
: unit === "m"
? 60_000
: unit === "h"
? 3_600_000
: 86_400_000;
return Math.floor(n * factor);
}
function parseAtMs(input: string): number | null {
const raw = input.trim();
if (!raw) return null;
const asNum = Number(raw);
if (Number.isFinite(asNum) && asNum > 0) return Math.floor(asNum);
const parsed = Date.parse(raw);
if (Number.isFinite(parsed)) return parsed;
const dur = parseDurationMs(raw);
if (dur) return Date.now() + dur;
return null;
}
export function registerCronCli(program: Command) {
addGatewayClientOptions(
program
.command("wake")
.description(
"Enqueue a system event and optionally trigger an immediate heartbeat",
)
.requiredOption("--text <text>", "System event text")
.option(
"--mode <mode>",
"Wake mode (now|next-heartbeat)",
"next-heartbeat",
)
.option("--json", "Output JSON", false),
).action(async (opts) => {
try {
const result = await callGatewayFromCli(
"wake",
opts,
{ mode: opts.mode, text: opts.text },
{ expectFinal: false },
);
if (opts.json) defaultRuntime.log(JSON.stringify(result, null, 2));
else defaultRuntime.log("ok");
} catch (err) {
defaultRuntime.error(danger(String(err)));
defaultRuntime.exit(1);
}
});
const cron = program
.command("cron")
.description("Manage cron jobs (via Gateway)");
addGatewayClientOptions(
cron
.command("list")
.description("List cron jobs")
.option("--all", "Include disabled jobs", false)
.option("--json", "Output JSON", false)
.action(async (opts) => {
try {
const res = await callGatewayFromCli("cron.list", opts, {
includeDisabled: Boolean(opts.all),
});
defaultRuntime.log(JSON.stringify(res, null, 2));
} catch (err) {
defaultRuntime.error(danger(String(err)));
defaultRuntime.exit(1);
}
}),
);
addGatewayClientOptions(
cron
.command("add")
.description("Add a cron job")
.option("--name <name>", "Optional name")
.option("--disabled", "Create job disabled", false)
.option("--session <target>", "Session target (main|isolated)", "main")
.option(
"--wake <mode>",
"Wake mode (now|next-heartbeat)",
"next-heartbeat",
)
.option("--at <when>", "Run once at time (ISO) or +duration (e.g. 20m)")
.option("--every <duration>", "Run every duration (e.g. 10m, 1h)")
.option("--cron <expr>", "Cron expression (5-field)")
.option("--tz <iana>", "Timezone for cron expressions (IANA)", "")
.option("--system-event <text>", "System event payload (main session)")
.option("--message <text>", "Agent message payload")
.option(
"--thinking <level>",
"Thinking level for agent jobs (off|minimal|low|medium|high)",
)
.option("--timeout-seconds <n>", "Timeout seconds for agent jobs")
.option("--deliver", "Deliver agent output", false)
.option(
"--channel <channel>",
"Delivery channel (last|whatsapp|telegram)",
"last",
)
.option("--to <dest>", "Delivery destination (E.164 or Telegram chatId)")
.option(
"--best-effort-deliver",
"Do not fail the job if delivery fails",
false,
)
.option("--post-to-main", "Post a 1-line summary to main session", false)
.option(
"--post-prefix <prefix>",
"Prefix for summary system event",
"Cron",
)
.option("--json", "Output JSON", false)
.action(async (opts) => {
try {
const schedule = (() => {
const at = typeof opts.at === "string" ? opts.at : "";
const every = typeof opts.every === "string" ? opts.every : "";
const cronExpr = typeof opts.cron === "string" ? opts.cron : "";
const chosen = [
Boolean(at),
Boolean(every),
Boolean(cronExpr),
].filter(Boolean).length;
if (chosen !== 1) {
throw new Error(
"Choose exactly one schedule: --at, --every, or --cron",
);
}
if (at) {
const atMs = parseAtMs(at);
if (!atMs)
throw new Error(
"Invalid --at; use ISO time or duration like 20m",
);
return { kind: "at" as const, atMs };
}
if (every) {
const everyMs = parseDurationMs(every);
if (!everyMs)
throw new Error("Invalid --every; use e.g. 10m, 1h, 1d");
return { kind: "every" as const, everyMs };
}
return {
kind: "cron" as const,
expr: cronExpr,
tz:
typeof opts.tz === "string" && opts.tz.trim()
? opts.tz.trim()
: undefined,
};
})();
const sessionTarget = String(opts.session ?? "main");
if (sessionTarget !== "main" && sessionTarget !== "isolated") {
throw new Error("--session must be main or isolated");
}
const wakeMode = String(opts.wake ?? "next-heartbeat");
if (wakeMode !== "now" && wakeMode !== "next-heartbeat") {
throw new Error("--wake must be now or next-heartbeat");
}
const payload = (() => {
const systemEvent =
typeof opts.systemEvent === "string"
? opts.systemEvent.trim()
: "";
const message =
typeof opts.message === "string" ? opts.message.trim() : "";
const chosen = [Boolean(systemEvent), Boolean(message)].filter(
Boolean,
).length;
if (chosen !== 1) {
throw new Error(
"Choose exactly one payload: --system-event or --message",
);
}
if (systemEvent)
return { kind: "systemEvent" as const, text: systemEvent };
const timeoutSeconds = opts.timeoutSeconds
? Number.parseInt(String(opts.timeoutSeconds), 10)
: undefined;
return {
kind: "agentTurn" as const,
message,
thinking:
typeof opts.thinking === "string" && opts.thinking.trim()
? opts.thinking.trim()
: undefined,
timeoutSeconds:
timeoutSeconds && Number.isFinite(timeoutSeconds)
? timeoutSeconds
: undefined,
deliver: Boolean(opts.deliver),
channel: typeof opts.channel === "string" ? opts.channel : "last",
to:
typeof opts.to === "string" && opts.to.trim()
? opts.to.trim()
: undefined,
bestEffortDeliver: Boolean(opts.bestEffortDeliver),
};
})();
if (sessionTarget === "isolated" && payload.kind !== "agentTurn") {
throw new Error(
"Isolated jobs require --message (agentTurn payload).",
);
}
const isolation = opts.postToMain
? {
postToMain: true,
postToMainPrefix: String(opts.postPrefix ?? "Cron"),
}
: undefined;
const params = {
name:
typeof opts.name === "string" && opts.name.trim()
? opts.name.trim()
: undefined,
enabled: !opts.disabled,
schedule,
sessionTarget,
wakeMode,
payload,
isolation,
};
const res = await callGatewayFromCli("cron.add", opts, params);
defaultRuntime.log(JSON.stringify(res, null, 2));
} catch (err) {
defaultRuntime.error(danger(String(err)));
defaultRuntime.exit(1);
}
}),
);
addGatewayClientOptions(
cron
.command("rm")
.description("Remove a cron job")
.argument("<id>", "Job id")
.option("--json", "Output JSON", false)
.action(async (id, opts) => {
try {
const res = await callGatewayFromCli("cron.remove", opts, { id });
defaultRuntime.log(JSON.stringify(res, null, 2));
} catch (err) {
defaultRuntime.error(danger(String(err)));
defaultRuntime.exit(1);
}
}),
);
addGatewayClientOptions(
cron
.command("edit")
.description("Edit a cron job (patch fields)")
.argument("<id>", "Job id")
.option("--name <name>", "Set name")
.option("--enable", "Enable job", false)
.option("--disable", "Disable job", false)
.option("--session <target>", "Session target (main|isolated)")
.option("--wake <mode>", "Wake mode (now|next-heartbeat)")
.option("--at <when>", "Set one-shot time (ISO) or duration like 20m")
.option("--every <duration>", "Set interval duration like 10m")
.option("--cron <expr>", "Set cron expression")
.option("--tz <iana>", "Timezone for cron expressions (IANA)")
.option("--system-event <text>", "Set systemEvent payload")
.option("--message <text>", "Set agentTurn payload message")
.option("--thinking <level>", "Thinking level for agent jobs")
.option("--timeout-seconds <n>", "Timeout seconds for agent jobs")
.option("--deliver", "Deliver agent output", false)
.option(
"--channel <channel>",
"Delivery channel (last|whatsapp|telegram)",
)
.option("--to <dest>", "Delivery destination")
.option(
"--best-effort-deliver",
"Do not fail job if delivery fails",
false,
)
.option("--post-to-main", "Post a 1-line summary to main session", false)
.option("--post-prefix <prefix>", "Prefix for summary system event")
.action(async (id, opts) => {
try {
const patch: Record<string, unknown> = {};
if (typeof opts.name === "string") patch.name = opts.name;
if (opts.enable && opts.disable)
throw new Error("Choose --enable or --disable, not both");
if (opts.enable) patch.enabled = true;
if (opts.disable) patch.enabled = false;
if (typeof opts.session === "string")
patch.sessionTarget = opts.session;
if (typeof opts.wake === "string") patch.wakeMode = opts.wake;
const scheduleChosen = [opts.at, opts.every, opts.cron].filter(
Boolean,
).length;
if (scheduleChosen > 1)
throw new Error("Choose at most one schedule change");
if (opts.at) {
const atMs = parseAtMs(String(opts.at));
if (!atMs) throw new Error("Invalid --at");
patch.schedule = { kind: "at", atMs };
} else if (opts.every) {
const everyMs = parseDurationMs(String(opts.every));
if (!everyMs) throw new Error("Invalid --every");
patch.schedule = { kind: "every", everyMs };
} else if (opts.cron) {
patch.schedule = {
kind: "cron",
expr: String(opts.cron),
tz:
typeof opts.tz === "string" && opts.tz.trim()
? opts.tz.trim()
: undefined,
};
}
const payloadChosen = [opts.systemEvent, opts.message].filter(
Boolean,
).length;
if (payloadChosen > 1)
throw new Error("Choose at most one payload change");
if (opts.systemEvent) {
patch.payload = {
kind: "systemEvent",
text: String(opts.systemEvent),
};
} else if (opts.message) {
const timeoutSeconds = opts.timeoutSeconds
? Number.parseInt(String(opts.timeoutSeconds), 10)
: undefined;
patch.payload = {
kind: "agentTurn",
message: String(opts.message),
thinking:
typeof opts.thinking === "string" ? opts.thinking : undefined,
timeoutSeconds:
timeoutSeconds && Number.isFinite(timeoutSeconds)
? timeoutSeconds
: undefined,
deliver: Boolean(opts.deliver),
channel:
typeof opts.channel === "string" ? opts.channel : undefined,
to: typeof opts.to === "string" ? opts.to : undefined,
bestEffortDeliver: Boolean(opts.bestEffortDeliver),
};
}
if (opts.postToMain) {
patch.isolation = {
postToMain: true,
postToMainPrefix:
typeof opts.postPrefix === "string" ? opts.postPrefix : "Cron",
};
}
const res = await callGatewayFromCli("cron.update", opts, {
id,
patch,
});
defaultRuntime.log(JSON.stringify(res, null, 2));
} catch (err) {
defaultRuntime.error(danger(String(err)));
defaultRuntime.exit(1);
}
}),
);
addGatewayClientOptions(
cron
.command("run")
.description("Run a cron job now (debug)")
.argument("<id>", "Job id")
.option("--force", "Run even if not due", false)
.action(async (id, opts) => {
try {
const res = await callGatewayFromCli("cron.run", opts, {
id,
mode: opts.force ? "force" : "due",
});
defaultRuntime.log(JSON.stringify(res, null, 2));
} catch (err) {
defaultRuntime.error(danger(String(err)));
defaultRuntime.exit(1);
}
}),
);
}

275
src/cli/gateway-cli.ts Normal file
View File

@ -0,0 +1,275 @@
import type { Command } from "commander";
import { callGateway, randomIdempotencyKey } from "../gateway/call.js";
import { startGatewayServer } from "../gateway/server.js";
import { info, setVerbose } from "../globals.js";
import { GatewayLockError } from "../infra/gateway-lock.js";
import { defaultRuntime } from "../runtime.js";
import { createDefaultDeps } from "./deps.js";
import { forceFreePort } from "./ports.js";
type GatewayRpcOpts = {
url?: string;
token?: string;
timeout?: string;
expectFinal?: boolean;
};
const gatewayCallOpts = (cmd: Command) =>
cmd
.option("--url <url>", "Gateway WebSocket URL", "ws://127.0.0.1:18789")
.option("--token <token>", "Gateway token (if required)")
.option("--timeout <ms>", "Timeout in ms", "10000")
.option("--expect-final", "Wait for final response (agent)", false);
const callGatewayCli = async (
method: string,
opts: GatewayRpcOpts,
params?: unknown,
) =>
callGateway({
url: opts.url,
token: opts.token,
method,
params,
expectFinal: Boolean(opts.expectFinal),
timeoutMs: Number(opts.timeout ?? 10_000),
clientName: "cli",
mode: "cli",
});
export function registerGatewayCli(program: Command) {
const gateway = program
.command("gateway")
.description("Run the WebSocket Gateway")
.option("--port <port>", "Port for the gateway WebSocket", "18789")
.option(
"--webchat-port <port>",
"Port for the loopback WebChat HTTP server (default 18788)",
)
.option(
"--token <token>",
"Shared token required in connect.params.auth.token (default: CLAWDIS_GATEWAY_TOKEN env if set)",
)
.option(
"--force",
"Kill any existing listener on the target port before starting",
false,
)
.option("--verbose", "Verbose logging to stdout/stderr", false)
.action(async (opts) => {
setVerbose(Boolean(opts.verbose));
const port = Number.parseInt(String(opts.port ?? "18789"), 10);
if (Number.isNaN(port) || port <= 0) {
defaultRuntime.error("Invalid port");
defaultRuntime.exit(1);
}
const webchatPort = opts.webchatPort
? Number.parseInt(String(opts.webchatPort), 10)
: undefined;
if (
webchatPort !== undefined &&
(Number.isNaN(webchatPort) || webchatPort <= 0)
) {
defaultRuntime.error("Invalid webchat port");
defaultRuntime.exit(1);
}
if (opts.force) {
try {
const killed = forceFreePort(port);
if (killed.length === 0) {
defaultRuntime.log(info(`Force: no listeners on port ${port}`));
} else {
for (const proc of killed) {
defaultRuntime.log(
info(
`Force: killed pid ${proc.pid}${proc.command ? ` (${proc.command})` : ""} on port ${port}`,
),
);
}
await new Promise((resolve) => setTimeout(resolve, 200));
}
} catch (err) {
defaultRuntime.error(`Force: ${String(err)}`);
defaultRuntime.exit(1);
return;
}
}
if (opts.token) {
process.env.CLAWDIS_GATEWAY_TOKEN = String(opts.token);
}
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
let shuttingDown = false;
let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
const onSigterm = () => shutdown("SIGTERM");
const onSigint = () => shutdown("SIGINT");
const shutdown = (signal: string) => {
// Ensure we don't leak listeners across restarts/tests.
process.removeListener("SIGTERM", onSigterm);
process.removeListener("SIGINT", onSigint);
if (shuttingDown) {
defaultRuntime.log(
info(`gateway: received ${signal} during shutdown; exiting now`),
);
defaultRuntime.exit(0);
}
shuttingDown = true;
defaultRuntime.log(info(`gateway: received ${signal}; shutting down`));
// Avoid hanging forever if a provider task ignores abort.
forceExitTimer = setTimeout(() => {
defaultRuntime.error(
"gateway: shutdown timed out; exiting without full cleanup",
);
defaultRuntime.exit(0);
}, 5000);
void (async () => {
try {
await server?.close();
} catch (err) {
defaultRuntime.error(`gateway: shutdown error: ${String(err)}`);
} finally {
if (forceExitTimer) clearTimeout(forceExitTimer);
defaultRuntime.exit(0);
}
})();
};
process.once("SIGTERM", onSigterm);
process.once("SIGINT", onSigint);
try {
server = await startGatewayServer(port, { webchatPort });
} catch (err) {
if (err instanceof GatewayLockError) {
defaultRuntime.error(`Gateway failed to start: ${err.message}`);
defaultRuntime.exit(1);
return;
}
defaultRuntime.error(`Gateway failed to start: ${String(err)}`);
defaultRuntime.exit(1);
}
// Keep process alive
await new Promise<never>(() => {});
});
gatewayCallOpts(
gateway
.command("call")
.description("Call a Gateway method and print JSON")
.argument(
"<method>",
"Method name (health/status/system-presence/send/agent/cron.*)",
)
.option("--params <json>", "JSON object string for params", "{}")
.action(async (method, opts) => {
try {
const params = JSON.parse(String(opts.params ?? "{}"));
const result = await callGatewayCli(method, opts, params);
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(`Gateway call failed: ${String(err)}`);
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
gateway
.command("health")
.description("Fetch Gateway health")
.action(async (opts) => {
try {
const result = await callGatewayCli("health", opts);
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
gateway
.command("status")
.description("Fetch Gateway status")
.action(async (opts) => {
try {
const result = await callGatewayCli("status", opts);
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
gateway
.command("send")
.description("Send a message via the Gateway")
.requiredOption("--to <jidOrPhone>", "Destination (E.164 or jid)")
.requiredOption("--message <text>", "Message text")
.option("--media-url <url>", "Optional media URL")
.option("--idempotency-key <key>", "Idempotency key")
.action(async (opts) => {
try {
const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey();
const result = await callGatewayCli("send", opts, {
to: opts.to,
message: opts.message,
mediaUrl: opts.mediaUrl,
idempotencyKey,
});
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
gateway
.command("agent")
.description("Run an agent turn via the Gateway (waits for final)")
.requiredOption("--message <text>", "User message")
.option("--to <jidOrPhone>", "Destination")
.option("--session-id <id>", "Session id")
.option("--thinking <level>", "Thinking level")
.option("--deliver", "Deliver response", false)
.option("--timeout-seconds <n>", "Agent timeout seconds")
.option("--idempotency-key <key>", "Idempotency key")
.action(async (opts) => {
try {
const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey();
const result = await callGatewayCli(
"agent",
{ ...opts, expectFinal: true },
{
message: opts.message,
to: opts.to,
sessionId: opts.sessionId,
thinking: opts.thinking,
deliver: Boolean(opts.deliver),
timeout: opts.timeoutSeconds
? Number.parseInt(String(opts.timeoutSeconds), 10)
: undefined,
idempotencyKey,
},
);
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
// Build default deps (keeps parity with other commands; future-proofing).
void createDefaultDeps();
}

35
src/cli/gateway-rpc.ts Normal file
View File

@ -0,0 +1,35 @@
import type { Command } from "commander";
import { callGateway } from "../gateway/call.js";
export type GatewayRpcOpts = {
url?: string;
token?: string;
timeout?: string;
expectFinal?: boolean;
};
export function addGatewayClientOptions(cmd: Command) {
return cmd
.option("--url <url>", "Gateway WebSocket URL", "ws://127.0.0.1:18789")
.option("--token <token>", "Gateway token (if required)")
.option("--timeout <ms>", "Timeout in ms", "10000")
.option("--expect-final", "Wait for final response (agent)", false);
}
export async function callGatewayFromCli(
method: string,
opts: GatewayRpcOpts,
params?: unknown,
extra?: { expectFinal?: boolean },
) {
return await callGateway({
url: opts.url,
token: opts.token,
method,
params,
expectFinal: extra?.expectFinal ?? Boolean(opts.expectFinal),
timeoutMs: Number(opts.timeout ?? 10_000),
clientName: "cli",
mode: "cli",
});
}

View File

@ -5,15 +5,14 @@ import { healthCommand } from "../commands/health.js";
import { sendCommand } from "../commands/send.js"; import { sendCommand } from "../commands/send.js";
import { sessionsCommand } from "../commands/sessions.js"; import { sessionsCommand } from "../commands/sessions.js";
import { statusCommand } from "../commands/status.js"; import { statusCommand } from "../commands/status.js";
import { callGateway, randomIdempotencyKey } from "../gateway/call.js";
import { startGatewayServer } from "../gateway/server.js";
import { danger, info, setVerbose } from "../globals.js"; import { danger, info, setVerbose } from "../globals.js";
import { GatewayLockError } from "../infra/gateway-lock.js";
import { loginWeb, logoutWeb } from "../provider-web.js"; import { loginWeb, logoutWeb } from "../provider-web.js";
import { defaultRuntime } from "../runtime.js"; import { defaultRuntime } from "../runtime.js";
import { VERSION } from "../version.js"; import { VERSION } from "../version.js";
import { startWebChatServer } from "../webchat/server.js"; import { startWebChatServer } from "../webchat/server.js";
import { registerCronCli } from "./cron-cli.js";
import { createDefaultDeps } from "./deps.js"; import { createDefaultDeps } from "./deps.js";
import { registerGatewayCli } from "./gateway-cli.js";
import { forceFreePort } from "./ports.js"; import { forceFreePort } from "./ports.js";
export { forceFreePort }; export { forceFreePort };
@ -209,266 +208,8 @@ Examples:
} }
}); });
program; registerGatewayCli(program);
const gateway = program registerCronCli(program);
.command("gateway")
.description("Run the WebSocket Gateway")
.option("--port <port>", "Port for the gateway WebSocket", "18789")
.option(
"--webchat-port <port>",
"Port for the loopback WebChat HTTP server (default 18788)",
)
.option(
"--token <token>",
"Shared token required in connect.params.auth.token (default: CLAWDIS_GATEWAY_TOKEN env if set)",
)
.option(
"--force",
"Kill any existing listener on the target port before starting",
false,
)
.option("--verbose", "Verbose logging to stdout/stderr", false)
.action(async (opts) => {
setVerbose(Boolean(opts.verbose));
const port = Number.parseInt(String(opts.port ?? "18789"), 10);
if (Number.isNaN(port) || port <= 0) {
defaultRuntime.error("Invalid port");
defaultRuntime.exit(1);
}
const webchatPort = opts.webchatPort
? Number.parseInt(String(opts.webchatPort), 10)
: undefined;
if (
webchatPort !== undefined &&
(Number.isNaN(webchatPort) || webchatPort <= 0)
) {
defaultRuntime.error("Invalid webchat port");
defaultRuntime.exit(1);
}
if (opts.force) {
try {
const killed = forceFreePort(port);
if (killed.length === 0) {
defaultRuntime.log(info(`Force: no listeners on port ${port}`));
} else {
for (const proc of killed) {
defaultRuntime.log(
info(
`Force: killed pid ${proc.pid}${proc.command ? ` (${proc.command})` : ""} on port ${port}`,
),
);
}
await new Promise((resolve) => setTimeout(resolve, 200));
}
} catch (err) {
defaultRuntime.error(`Force: ${String(err)}`);
defaultRuntime.exit(1);
return;
}
}
if (opts.token) {
process.env.CLAWDIS_GATEWAY_TOKEN = String(opts.token);
}
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
let shuttingDown = false;
let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
const onSigterm = () => shutdown("SIGTERM");
const onSigint = () => shutdown("SIGINT");
const shutdown = (signal: string) => {
// Ensure we don't leak listeners across restarts/tests.
process.removeListener("SIGTERM", onSigterm);
process.removeListener("SIGINT", onSigint);
if (shuttingDown) {
defaultRuntime.log(
info(`gateway: received ${signal} during shutdown; exiting now`),
);
defaultRuntime.exit(0);
}
shuttingDown = true;
defaultRuntime.log(info(`gateway: received ${signal}; shutting down`));
// Avoid hanging forever if a provider task ignores abort.
forceExitTimer = setTimeout(() => {
defaultRuntime.error(
"gateway: shutdown timed out; exiting without full cleanup",
);
defaultRuntime.exit(0);
}, 5000);
void (async () => {
try {
await server?.close();
} catch (err) {
defaultRuntime.error(`gateway: shutdown error: ${String(err)}`);
} finally {
if (forceExitTimer) clearTimeout(forceExitTimer);
defaultRuntime.exit(0);
}
})();
};
process.once("SIGTERM", onSigterm);
process.once("SIGINT", onSigint);
try {
server = await startGatewayServer(port, { webchatPort });
} catch (err) {
if (err instanceof GatewayLockError) {
defaultRuntime.error(`Gateway failed to start: ${err.message}`);
defaultRuntime.exit(1);
return;
}
defaultRuntime.error(`Gateway failed to start: ${String(err)}`);
defaultRuntime.exit(1);
}
// Keep process alive
await new Promise<never>(() => {});
});
const gatewayCallOpts = (cmd: Command) =>
cmd
.option("--url <url>", "Gateway WebSocket URL", "ws://127.0.0.1:18789")
.option("--token <token>", "Gateway token (if required)")
.option("--timeout <ms>", "Timeout in ms", "10000")
.option("--expect-final", "Wait for final response (agent)", false);
const callGatewayCli = async (
method: string,
opts: {
url?: string;
token?: string;
timeout?: string;
expectFinal?: boolean;
},
params?: unknown,
) =>
callGateway({
url: opts.url,
token: opts.token,
method,
params,
expectFinal: Boolean(opts.expectFinal),
timeoutMs: Number(opts.timeout ?? 10_000),
clientName: "cli",
mode: "cli",
});
gatewayCallOpts(
gateway
.command("call")
.description("Call a Gateway method and print JSON")
.argument(
"<method>",
"Method name (health/status/system-presence/send/agent)",
)
.option("--params <json>", "JSON object string for params", "{}")
.action(async (method, opts) => {
try {
const params = JSON.parse(String(opts.params ?? "{}"));
const result = await callGatewayCli(method, opts, params);
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(`Gateway call failed: ${String(err)}`);
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
gateway
.command("health")
.description("Fetch Gateway health")
.action(async (opts) => {
try {
const result = await callGatewayCli("health", opts);
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
gateway
.command("status")
.description("Fetch Gateway status")
.action(async (opts) => {
try {
const result = await callGatewayCli("status", opts);
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
gateway
.command("send")
.description("Send a message via the Gateway")
.requiredOption("--to <jidOrPhone>", "Destination (E.164 or jid)")
.requiredOption("--message <text>", "Message text")
.option("--media-url <url>", "Optional media URL")
.option("--idempotency-key <key>", "Idempotency key")
.action(async (opts) => {
try {
const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey();
const result = await callGatewayCli("send", opts, {
to: opts.to,
message: opts.message,
mediaUrl: opts.mediaUrl,
idempotencyKey,
});
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
gatewayCallOpts(
gateway
.command("agent")
.description("Run an agent turn via the Gateway (waits for final)")
.requiredOption("--message <text>", "User message")
.option("--to <jidOrPhone>", "Destination")
.option("--session-id <id>", "Session id")
.option("--thinking <level>", "Thinking level")
.option("--deliver", "Deliver response", false)
.option("--timeout-seconds <n>", "Agent timeout seconds")
.option("--idempotency-key <key>", "Idempotency key")
.action(async (opts) => {
try {
const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey();
const result = await callGatewayCli(
"agent",
{ ...opts, expectFinal: true },
{
message: opts.message,
to: opts.to,
sessionId: opts.sessionId,
thinking: opts.thinking,
deliver: Boolean(opts.deliver),
timeout: opts.timeoutSeconds
? Number.parseInt(String(opts.timeoutSeconds), 10)
: undefined,
idempotencyKey,
},
);
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(String(err));
defaultRuntime.exit(1);
}
}),
);
program program
.command("status") .command("status")
.description("Show web session health and recent session recipients") .description("Show web session health and recent session recipients")

View File

@ -49,6 +49,12 @@ export type WebChatConfig = {
port?: number; port?: number;
}; };
export type CronConfig = {
enabled?: boolean;
store?: string;
maxConcurrentRuns?: number;
};
export type TelegramConfig = { export type TelegramConfig = {
botToken?: string; botToken?: string;
requireMention?: boolean; requireMention?: boolean;
@ -107,6 +113,7 @@ export type ClawdisConfig = {
web?: WebConfig; web?: WebConfig;
telegram?: TelegramConfig; telegram?: TelegramConfig;
webchat?: WebChatConfig; webchat?: WebChatConfig;
cron?: CronConfig;
}; };
// New branding path (preferred) // New branding path (preferred)
@ -218,6 +225,13 @@ const ClawdisSchema = z.object({
reply: ReplySchema.optional(), reply: ReplySchema.optional(),
}) })
.optional(), .optional(),
cron: z
.object({
enabled: z.boolean().optional(),
store: z.string().optional(),
maxConcurrentRuns: z.number().int().positive().optional(),
})
.optional(),
web: z web: z
.object({ .object({
heartbeatSeconds: z.number().int().positive().optional(), heartbeatSeconds: z.number().int().positive().optional(),

341
src/cron/isolated-agent.ts Normal file
View File

@ -0,0 +1,341 @@
import crypto from "node:crypto";
import { chunkText } from "../auto-reply/chunk.js";
import { runCommandReply } from "../auto-reply/command-reply.js";
import {
applyTemplate,
type TemplateContext,
} from "../auto-reply/templating.js";
import { normalizeThinkLevel } from "../auto-reply/thinking.js";
import type { CliDeps } from "../cli/deps.js";
import type { ClawdisConfig } from "../config/config.js";
import {
DEFAULT_IDLE_MINUTES,
loadSessionStore,
resolveStorePath,
type SessionEntry,
saveSessionStore,
} from "../config/sessions.js";
import { enqueueCommandInLane } from "../process/command-queue.js";
import { normalizeE164 } from "../utils.js";
import type { CronJob } from "./types.js";
export type RunCronAgentTurnResult = {
status: "ok" | "error" | "skipped";
summary?: string;
};
function assertCommandReplyConfig(cfg: ClawdisConfig) {
const reply = cfg.inbound?.reply;
if (!reply || reply.mode !== "command" || !reply.command?.length) {
throw new Error(
"Configure inbound.reply.mode=command with reply.command before using cron agent jobs.",
);
}
return reply as NonNullable<
NonNullable<ClawdisConfig["inbound"]>["reply"]
> & {
mode: "command";
command: string[];
};
}
function pickSummaryFromOutput(text: string | undefined) {
const clean = (text ?? "").trim();
if (!clean) return undefined;
const oneLine = clean.replace(/\s+/g, " ");
return oneLine.length > 200 ? `${oneLine.slice(0, 200)}` : oneLine;
}
function resolveDeliveryTarget(
cfg: ClawdisConfig,
jobPayload: {
channel?: "last" | "whatsapp" | "telegram";
to?: string;
},
) {
const requestedChannel =
typeof jobPayload.channel === "string" ? jobPayload.channel : "last";
const explicitTo =
typeof jobPayload.to === "string" && jobPayload.to.trim()
? jobPayload.to.trim()
: undefined;
const sessionCfg = cfg.inbound?.reply?.session;
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
const storePath = resolveStorePath(sessionCfg?.store);
const store = loadSessionStore(storePath);
const main = store[mainKey];
const lastChannel =
main?.lastChannel && main.lastChannel !== "webchat"
? main.lastChannel
: undefined;
const lastTo = typeof main?.lastTo === "string" ? main.lastTo.trim() : "";
const channel = (() => {
if (requestedChannel === "whatsapp" || requestedChannel === "telegram") {
return requestedChannel;
}
return lastChannel ?? "whatsapp";
})();
const to = (() => {
if (explicitTo) return explicitTo;
return lastTo || undefined;
})();
const sanitizedWhatsappTo = (() => {
if (channel !== "whatsapp") return to;
const rawAllow = cfg.inbound?.allowFrom ?? [];
if (rawAllow.includes("*")) return to;
const allowFrom = rawAllow
.map((val) => normalizeE164(val))
.filter((val) => val.length > 1);
if (allowFrom.length === 0) return to;
if (!to) return allowFrom[0];
const normalized = normalizeE164(to);
if (allowFrom.includes(normalized)) return normalized;
return allowFrom[0];
})();
return {
channel,
to: channel === "whatsapp" ? sanitizedWhatsappTo : to,
};
}
function resolveCronSession(params: {
cfg: ClawdisConfig;
sessionKey: string;
nowMs: number;
}) {
const sessionCfg = params.cfg.inbound?.reply?.session;
const idleMinutes = Math.max(
sessionCfg?.idleMinutes ?? DEFAULT_IDLE_MINUTES,
1,
);
const idleMs = idleMinutes * 60_000;
const storePath = resolveStorePath(sessionCfg?.store);
const store = loadSessionStore(storePath);
const entry = store[params.sessionKey];
const fresh = entry && params.nowMs - entry.updatedAt <= idleMs;
const sessionId = fresh ? entry.sessionId : crypto.randomUUID();
const systemSent = fresh ? Boolean(entry.systemSent) : false;
const sessionEntry: SessionEntry = {
sessionId,
updatedAt: params.nowMs,
systemSent,
thinkingLevel: entry?.thinkingLevel,
verboseLevel: entry?.verboseLevel,
model: entry?.model,
contextTokens: entry?.contextTokens,
lastChannel: entry?.lastChannel,
lastTo: entry?.lastTo,
syncing: entry?.syncing,
};
return { storePath, store, sessionEntry, systemSent, isNewSession: !fresh };
}
export async function runCronIsolatedAgentTurn(params: {
cfg: ClawdisConfig;
deps: CliDeps;
job: CronJob;
message: string;
sessionKey: string;
lane?: string;
}): Promise<RunCronAgentTurnResult> {
const replyCfg = assertCommandReplyConfig(params.cfg);
const now = Date.now();
const cronSession = resolveCronSession({
cfg: params.cfg,
sessionKey: params.sessionKey,
nowMs: now,
});
const sendSystemOnce = replyCfg.session?.sendSystemOnce === true;
const isFirstTurnInSession =
cronSession.isNewSession || !cronSession.systemSent;
const sessionIntro = replyCfg.session?.sessionIntro
? applyTemplate(replyCfg.session.sessionIntro, {
SessionId: cronSession.sessionEntry.sessionId,
})
: "";
const bodyPrefix = replyCfg.bodyPrefix
? applyTemplate(replyCfg.bodyPrefix, {
SessionId: cronSession.sessionEntry.sessionId,
})
: "";
const thinkOverride = normalizeThinkLevel(replyCfg.thinkingDefault);
const jobThink = normalizeThinkLevel(
(params.job.payload.kind === "agentTurn"
? params.job.payload.thinking
: undefined) ?? undefined,
);
const thinkLevel = jobThink ?? thinkOverride;
const timeoutSecondsRaw =
params.job.payload.kind === "agentTurn" && params.job.payload.timeoutSeconds
? params.job.payload.timeoutSeconds
: (replyCfg.timeoutSeconds ?? 600);
const timeoutSeconds = Math.max(Math.floor(timeoutSecondsRaw), 1);
const timeoutMs = timeoutSeconds * 1000;
const delivery =
params.job.payload.kind === "agentTurn" &&
params.job.payload.deliver === true;
const bestEffortDeliver =
params.job.payload.kind === "agentTurn" &&
params.job.payload.bestEffortDeliver === true;
const resolvedDelivery = resolveDeliveryTarget(params.cfg, {
channel:
params.job.payload.kind === "agentTurn"
? params.job.payload.channel
: "last",
to:
params.job.payload.kind === "agentTurn"
? params.job.payload.to
: undefined,
});
const base =
`[cron:${params.job.id}${params.job.name ? ` ${params.job.name}` : ""}] ${params.message}`.trim();
let commandBody = base;
if (!sendSystemOnce || isFirstTurnInSession) {
commandBody = bodyPrefix ? `${bodyPrefix}${commandBody}` : commandBody;
}
if (sessionIntro) {
commandBody = `${sessionIntro}\n\n${commandBody}`;
}
const templatingCtx: TemplateContext = {
Body: commandBody,
BodyStripped: commandBody,
SessionId: cronSession.sessionEntry.sessionId,
From: resolvedDelivery.to ?? "",
To: resolvedDelivery.to ?? "",
Surface: "Cron",
IsNewSession: cronSession.isNewSession ? "true" : "false",
};
// Persist systemSent before the run, mirroring the inbound auto-reply behavior.
if (sendSystemOnce && isFirstTurnInSession) {
cronSession.sessionEntry.systemSent = true;
cronSession.store[params.sessionKey] = cronSession.sessionEntry;
await saveSessionStore(cronSession.storePath, cronSession.store);
} else {
cronSession.store[params.sessionKey] = cronSession.sessionEntry;
await saveSessionStore(cronSession.storePath, cronSession.store);
}
const lane = params.lane?.trim() || "cron";
const runResult = await runCommandReply({
reply: { ...replyCfg, mode: "command" },
templatingCtx,
sendSystemOnce,
isNewSession: cronSession.isNewSession,
isFirstTurnInSession,
systemSent: cronSession.sessionEntry.systemSent ?? false,
timeoutMs,
timeoutSeconds,
thinkLevel,
enqueue: (task, opts) => enqueueCommandInLane(lane, task, opts),
runId: cronSession.sessionEntry.sessionId,
});
const payloads = runResult.payloads ?? [];
const firstText = payloads[0]?.text ?? "";
const summary = pickSummaryFromOutput(firstText);
if (delivery) {
if (resolvedDelivery.channel === "whatsapp") {
if (!resolvedDelivery.to) {
if (!bestEffortDeliver) {
return {
status: "error",
summary: "Cron delivery to WhatsApp requires a recipient.",
};
}
return {
status: "skipped",
summary: "Delivery skipped (no WhatsApp recipient).",
};
}
const to = normalizeE164(resolvedDelivery.to);
try {
for (const payload of payloads) {
const mediaList =
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
const primaryMedia = mediaList[0];
await params.deps.sendMessageWhatsApp(to, payload.text ?? "", {
verbose: false,
mediaUrl: primaryMedia,
});
for (const extra of mediaList.slice(1)) {
await params.deps.sendMessageWhatsApp(to, "", {
verbose: false,
mediaUrl: extra,
});
}
}
} catch (err) {
if (!bestEffortDeliver) throw err;
return {
status: "ok",
summary: summary
? `${summary} (delivery failed)`
: "completed (delivery failed)",
};
}
} else if (resolvedDelivery.channel === "telegram") {
if (!resolvedDelivery.to) {
if (!bestEffortDeliver) {
return {
status: "error",
summary: "Cron delivery to Telegram requires a chatId.",
};
}
return {
status: "skipped",
summary: "Delivery skipped (no Telegram chatId).",
};
}
const chatId = resolvedDelivery.to;
try {
for (const payload of payloads) {
const mediaList =
payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
if (mediaList.length === 0) {
for (const chunk of chunkText(payload.text ?? "", 4000)) {
await params.deps.sendMessageTelegram(chatId, chunk, {
verbose: false,
});
}
} else {
let first = true;
for (const url of mediaList) {
const caption = first ? (payload.text ?? "") : "";
first = false;
await params.deps.sendMessageTelegram(chatId, caption, {
verbose: false,
mediaUrl: url,
});
}
}
}
} catch (err) {
if (!bestEffortDeliver) throw err;
return {
status: "ok",
summary: summary
? `${summary} (delivery failed)`
: "completed (delivery failed)",
};
}
}
}
return { status: "ok", summary };
}

98
src/cron/run-log.test.ts Normal file
View File

@ -0,0 +1,98 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import {
appendCronRunLog,
readCronRunLogEntries,
resolveCronRunLogPath,
} from "./run-log.js";
describe("cron run log", () => {
it("resolves a flat store path to cron.runs.jsonl", () => {
const storePath = path.join(os.tmpdir(), "cron.json");
const p = resolveCronRunLogPath({ storePath, jobId: "job-1" });
expect(p.endsWith(path.join(os.tmpdir(), "cron.runs.jsonl"))).toBe(true);
});
it("resolves jobs.json to per-job runs/<jobId>.jsonl", () => {
const storePath = path.join(os.tmpdir(), "cron", "jobs.json");
const p = resolveCronRunLogPath({ storePath, jobId: "job-1" });
expect(
p.endsWith(path.join(os.tmpdir(), "cron", "runs", "job-1.jsonl")),
).toBe(true);
});
it("appends JSONL and prunes by line count", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-cron-log-"));
const logPath = path.join(dir, "cron.runs.jsonl");
for (let i = 0; i < 10; i++) {
await appendCronRunLog(
logPath,
{
ts: 1000 + i,
jobId: "job-1",
action: "finished",
status: "ok",
durationMs: i,
},
{ maxBytes: 1, keepLines: 3 },
);
}
const raw = await fs.readFile(logPath, "utf-8");
const lines = raw
.split("\n")
.map((l) => l.trim())
.filter(Boolean);
expect(lines.length).toBe(3);
const last = JSON.parse(lines[2] ?? "{}") as { ts?: number };
expect(last.ts).toBe(1009);
await fs.rm(dir, { recursive: true, force: true });
});
it("reads newest entries and filters by jobId", async () => {
const dir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-cron-log-read-"),
);
const logPath = path.join(dir, "cron.runs.jsonl");
await appendCronRunLog(logPath, {
ts: 1,
jobId: "a",
action: "finished",
status: "ok",
});
await appendCronRunLog(logPath, {
ts: 2,
jobId: "b",
action: "finished",
status: "error",
error: "nope",
});
await appendCronRunLog(logPath, {
ts: 3,
jobId: "a",
action: "finished",
status: "skipped",
});
const all = await readCronRunLogEntries(logPath, { limit: 10 });
expect(all.map((e) => e.jobId)).toEqual(["a", "b", "a"]);
const onlyA = await readCronRunLogEntries(logPath, {
limit: 10,
jobId: "a",
});
expect(onlyA.map((e) => e.ts)).toEqual([1, 3]);
const lastOne = await readCronRunLogEntries(logPath, { limit: 1 });
expect(lastOne.map((e) => e.ts)).toEqual([3]);
await fs.rm(dir, { recursive: true, force: true });
});
});

101
src/cron/run-log.ts Normal file
View File

@ -0,0 +1,101 @@
import fs from "node:fs/promises";
import path from "node:path";
export type CronRunLogEntry = {
ts: number;
jobId: string;
action: "finished";
status?: "ok" | "error" | "skipped";
error?: string;
runAtMs?: number;
durationMs?: number;
nextRunAtMs?: number;
};
export function resolveCronRunLogPath(params: {
storePath: string;
jobId: string;
}) {
const storePath = path.resolve(params.storePath);
const dir = path.dirname(storePath);
const base = path.basename(storePath);
if (base === "jobs.json") {
return path.join(dir, "runs", `${params.jobId}.jsonl`);
}
const ext = path.extname(base);
const baseNoExt = ext ? base.slice(0, -ext.length) : base;
return path.join(dir, `${baseNoExt}.runs.jsonl`);
}
const writesByPath = new Map<string, Promise<void>>();
async function pruneIfNeeded(
filePath: string,
opts: { maxBytes: number; keepLines: number },
) {
const stat = await fs.stat(filePath).catch(() => null);
if (!stat || stat.size <= opts.maxBytes) return;
const raw = await fs.readFile(filePath, "utf-8").catch(() => "");
const lines = raw
.split("\n")
.map((l) => l.trim())
.filter(Boolean);
const kept = lines.slice(Math.max(0, lines.length - opts.keepLines));
const tmp = `${filePath}.${process.pid}.${Math.random().toString(16).slice(2)}.tmp`;
await fs.writeFile(tmp, `${kept.join("\n")}\n`, "utf-8");
await fs.rename(tmp, filePath);
}
export async function appendCronRunLog(
filePath: string,
entry: CronRunLogEntry,
opts?: { maxBytes?: number; keepLines?: number },
) {
const resolved = path.resolve(filePath);
const prev = writesByPath.get(resolved) ?? Promise.resolve();
const next = prev
.catch(() => undefined)
.then(async () => {
await fs.mkdir(path.dirname(resolved), { recursive: true });
await fs.appendFile(resolved, `${JSON.stringify(entry)}\n`, "utf-8");
await pruneIfNeeded(resolved, {
maxBytes: opts?.maxBytes ?? 2_000_000,
keepLines: opts?.keepLines ?? 2_000,
});
});
writesByPath.set(resolved, next);
await next;
}
export async function readCronRunLogEntries(
filePath: string,
opts?: { limit?: number; jobId?: string },
): Promise<CronRunLogEntry[]> {
const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200)));
const jobId = opts?.jobId?.trim() || undefined;
const raw = await fs
.readFile(path.resolve(filePath), "utf-8")
.catch(() => "");
if (!raw.trim()) return [];
const parsed: CronRunLogEntry[] = [];
const lines = raw.split("\n");
for (let i = lines.length - 1; i >= 0 && parsed.length < limit; i--) {
const line = lines[i]?.trim();
if (!line) continue;
try {
const obj = JSON.parse(line) as Partial<CronRunLogEntry> | null;
if (!obj || typeof obj !== "object") continue;
if (obj.action !== "finished") continue;
if (typeof obj.jobId !== "string" || obj.jobId.trim().length === 0)
continue;
if (typeof obj.ts !== "number" || !Number.isFinite(obj.ts)) continue;
if (jobId && obj.jobId !== jobId) continue;
parsed.push(obj as CronRunLogEntry);
} catch {
// ignore invalid lines
}
}
return parsed.reverse();
}

26
src/cron/schedule.test.ts Normal file
View File

@ -0,0 +1,26 @@
import { describe, expect, it } from "vitest";
import { computeNextRunAtMs } from "./schedule.js";
describe("cron schedule", () => {
it("computes next run for cron expression with timezone", () => {
// Saturday, Dec 13 2025 00:00:00Z
const nowMs = Date.parse("2025-12-13T00:00:00.000Z");
const next = computeNextRunAtMs(
{ kind: "cron", expr: "0 9 * * 3", tz: "America/Los_Angeles" },
nowMs,
);
// Next Wednesday at 09:00 PST -> 17:00Z
expect(next).toBe(Date.parse("2025-12-17T17:00:00.000Z"));
});
it("computes next run for every schedule", () => {
const anchor = Date.parse("2025-12-13T00:00:00.000Z");
const now = anchor + 10_000;
const next = computeNextRunAtMs(
{ kind: "every", everyMs: 30_000, anchorMs: anchor },
now,
);
expect(next).toBe(anchor + 30_000);
});
});

29
src/cron/schedule.ts Normal file
View File

@ -0,0 +1,29 @@
import { Cron } from "croner";
import type { CronSchedule } from "./types.js";
export function computeNextRunAtMs(
schedule: CronSchedule,
nowMs: number,
): number | undefined {
if (schedule.kind === "at") {
return schedule.atMs > nowMs ? schedule.atMs : undefined;
}
if (schedule.kind === "every") {
const everyMs = Math.max(1, Math.floor(schedule.everyMs));
const anchor = Math.max(0, Math.floor(schedule.anchorMs ?? nowMs));
if (nowMs <= anchor) return anchor;
const elapsed = nowMs - anchor;
const steps = Math.floor((elapsed + everyMs - 1) / everyMs);
return anchor + steps * everyMs;
}
const expr = schedule.expr.trim();
if (!expr) return undefined;
const cron = new Cron(expr, {
timezone: schedule.tz?.trim() || undefined,
catch: false,
});
const next = cron.nextRun(new Date(nowMs));
return next ? next.getTime() : undefined;
}

120
src/cron/service.test.ts Normal file
View File

@ -0,0 +1,120 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { CronService } from "./service.js";
const noopLogger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
async function makeStorePath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-cron-"));
return {
storePath: path.join(dir, "cron.json"),
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
}
describe("CronService", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z"));
noopLogger.debug.mockClear();
noopLogger.info.mockClear();
noopLogger.warn.mockClear();
noopLogger.error.mockClear();
});
afterEach(() => {
vi.useRealTimers();
});
it("runs a one-shot main job and disables it after success", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestReplyHeartbeatNow = vi.fn();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestReplyHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
const job = await cron.add({
enabled: true,
schedule: { kind: "at", atMs },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "hello" },
});
expect(job.state.nextRunAtMs).toBe(atMs);
vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z"));
await vi.runOnlyPendingTimersAsync();
const jobs = await cron.list({ includeDisabled: true });
const updated = jobs.find((j) => j.id === job.id);
expect(updated?.enabled).toBe(false);
expect(enqueueSystemEvent).toHaveBeenCalledWith("hello");
expect(requestReplyHeartbeatNow).toHaveBeenCalled();
await cron.list({ includeDisabled: true });
cron.stop();
await store.cleanup();
});
it("runs an isolated job and posts summary to main", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestReplyHeartbeatNow = vi.fn();
const runIsolatedAgentJob = vi.fn(async () => ({
status: "ok" as const,
summary: "done",
}));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestReplyHeartbeatNow,
runIsolatedAgentJob,
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
await cron.add({
enabled: true,
name: "weekly",
schedule: { kind: "at", atMs },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "do it", deliver: false },
isolation: { postToMain: true, postToMainPrefix: "Cron" },
});
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
await vi.runOnlyPendingTimersAsync();
await cron.list({ includeDisabled: true });
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done");
expect(requestReplyHeartbeatNow).toHaveBeenCalled();
cron.stop();
await store.cleanup();
});
});

431
src/cron/service.ts Normal file
View File

@ -0,0 +1,431 @@
import crypto from "node:crypto";
import { computeNextRunAtMs } from "./schedule.js";
import { loadCronStore, saveCronStore } from "./store.js";
import type {
CronJob,
CronJobCreate,
CronJobPatch,
CronPayload,
CronStoreFile,
} from "./types.js";
export type CronEvent = {
jobId: string;
action: "added" | "updated" | "removed" | "started" | "finished";
runAtMs?: number;
durationMs?: number;
status?: "ok" | "error" | "skipped";
error?: string;
nextRunAtMs?: number;
};
type Logger = {
debug: (obj: unknown, msg?: string) => void;
info: (obj: unknown, msg?: string) => void;
warn: (obj: unknown, msg?: string) => void;
error: (obj: unknown, msg?: string) => void;
};
export type CronServiceDeps = {
nowMs?: () => number;
log: Logger;
storePath: string;
cronEnabled: boolean;
enqueueSystemEvent: (text: string) => void;
requestReplyHeartbeatNow: (opts?: { reason?: string }) => void;
runIsolatedAgentJob: (params: {
job: CronJob;
message: string;
}) => Promise<{ status: "ok" | "error" | "skipped"; summary?: string }>;
onEvent?: (evt: CronEvent) => void;
};
const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
function isNonEmptyString(value: unknown): value is string {
return typeof value === "string" && value.trim().length > 0;
}
function normalizePayloadToSystemText(payload: CronPayload) {
if (payload.kind === "systemEvent") return payload.text.trim();
return payload.message.trim();
}
export class CronService {
private readonly deps: Required<Omit<CronServiceDeps, "onEvent">> &
Pick<CronServiceDeps, "onEvent">;
private store: CronStoreFile | null = null;
private timer: NodeJS.Timeout | null = null;
private running = false;
private op: Promise<unknown> = Promise.resolve();
constructor(deps: CronServiceDeps) {
this.deps = {
...deps,
nowMs: deps.nowMs ?? (() => Date.now()),
onEvent: deps.onEvent,
};
}
async start() {
await this.locked(async () => {
if (!this.deps.cronEnabled) {
this.deps.log.info({ enabled: false }, "cron: disabled");
return;
}
await this.ensureLoaded();
this.recomputeNextRuns();
await this.persist();
this.armTimer();
this.deps.log.info(
{
enabled: true,
jobs: this.store?.jobs.length ?? 0,
nextWakeAtMs: this.nextWakeAtMs() ?? null,
},
"cron: started",
);
});
}
stop() {
if (this.timer) clearTimeout(this.timer);
this.timer = null;
}
async list(opts?: { includeDisabled?: boolean }) {
return await this.locked(async () => {
await this.ensureLoaded();
const includeDisabled = opts?.includeDisabled === true;
const jobs = (this.store?.jobs ?? []).filter(
(j) => includeDisabled || j.enabled,
);
return jobs.sort(
(a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0),
);
});
}
async add(input: CronJobCreate) {
return await this.locked(async () => {
await this.ensureLoaded();
const now = this.deps.nowMs();
const id = crypto.randomUUID();
const job: CronJob = {
id,
name: input.name?.trim() || undefined,
enabled: input.enabled !== false,
createdAtMs: now,
updatedAtMs: now,
schedule: input.schedule,
sessionTarget: input.sessionTarget,
wakeMode: input.wakeMode,
payload: input.payload,
isolation: input.isolation,
state: {
...input.state,
},
};
job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, now);
this.store?.jobs.push(job);
await this.persist();
this.armTimer();
this.emit({
jobId: id,
action: "added",
nextRunAtMs: job.state.nextRunAtMs,
});
return job;
});
}
async update(id: string, patch: CronJobPatch) {
return await this.locked(async () => {
await this.ensureLoaded();
const job = this.findJobOrThrow(id);
const now = this.deps.nowMs();
if (isNonEmptyString(patch.name)) job.name = patch.name.trim();
if (patch.name === null || patch.name === "") job.name = undefined;
if (typeof patch.enabled === "boolean") job.enabled = patch.enabled;
if (patch.schedule) job.schedule = patch.schedule;
if (patch.sessionTarget) job.sessionTarget = patch.sessionTarget;
if (patch.wakeMode) job.wakeMode = patch.wakeMode;
if (patch.payload) job.payload = patch.payload;
if (patch.isolation) job.isolation = patch.isolation;
if (patch.state) job.state = { ...job.state, ...patch.state };
job.updatedAtMs = now;
if (job.enabled) {
job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, now);
} else {
job.state.nextRunAtMs = undefined;
job.state.runningAtMs = undefined;
}
await this.persist();
this.armTimer();
this.emit({
jobId: id,
action: "updated",
nextRunAtMs: job.state.nextRunAtMs,
});
return job;
});
}
async remove(id: string) {
return await this.locked(async () => {
await this.ensureLoaded();
const before = this.store?.jobs.length ?? 0;
if (!this.store) return { ok: false, removed: false };
this.store.jobs = this.store.jobs.filter((j) => j.id !== id);
const removed = (this.store.jobs.length ?? 0) !== before;
await this.persist();
this.armTimer();
if (removed) this.emit({ jobId: id, action: "removed" });
return { ok: true, removed };
});
}
async run(id: string, mode?: "due" | "force") {
return await this.locked(async () => {
await this.ensureLoaded();
const job = this.findJobOrThrow(id);
const now = this.deps.nowMs();
const due =
mode === "force" ||
(job.enabled &&
typeof job.state.nextRunAtMs === "number" &&
now >= job.state.nextRunAtMs);
if (!due) return { ok: true, ran: false, reason: "not-due" as const };
await this.executeJob(job, now, { forced: mode === "force" });
await this.persist();
this.armTimer();
return { ok: true, ran: true };
});
}
wake(opts: { mode: "now" | "next-heartbeat"; text: string }) {
const text = opts.text.trim();
if (!text) return { ok: false };
this.deps.enqueueSystemEvent(text);
if (opts.mode === "now") {
this.deps.requestReplyHeartbeatNow({ reason: "wake" });
}
return { ok: true };
}
private async locked<T>(fn: () => Promise<T>): Promise<T> {
const next = this.op.then(fn, fn);
// Keep the chain alive even when the operation fails.
this.op = next.then(
() => undefined,
() => undefined,
);
return (await next) as T;
}
private async ensureLoaded() {
if (this.store) return;
const loaded = await loadCronStore(this.deps.storePath);
this.store = { version: 1, jobs: loaded.jobs ?? [] };
}
private async persist() {
if (!this.store) return;
await saveCronStore(this.deps.storePath, this.store);
}
private findJobOrThrow(id: string) {
const job = this.store?.jobs.find((j) => j.id === id);
if (!job) throw new Error(`unknown cron job id: ${id}`);
return job;
}
private computeJobNextRunAtMs(job: CronJob, nowMs: number) {
if (!job.enabled) return undefined;
if (job.schedule.kind === "at") {
// One-shot jobs stay due until they successfully finish.
if (job.state.lastStatus === "ok" && job.state.lastRunAtMs)
return undefined;
return job.schedule.atMs;
}
return computeNextRunAtMs(job.schedule, nowMs);
}
private recomputeNextRuns() {
if (!this.store) return;
const now = this.deps.nowMs();
for (const job of this.store.jobs) {
if (!job.state) job.state = {};
if (!job.enabled) {
job.state.nextRunAtMs = undefined;
job.state.runningAtMs = undefined;
continue;
}
const runningAt = job.state.runningAtMs;
if (typeof runningAt === "number" && now - runningAt > STUCK_RUN_MS) {
this.deps.log.warn(
{ jobId: job.id, runningAtMs: runningAt },
"cron: clearing stuck running marker",
);
job.state.runningAtMs = undefined;
}
job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, now);
}
}
private nextWakeAtMs() {
const jobs = this.store?.jobs ?? [];
const enabled = jobs.filter(
(j) => j.enabled && typeof j.state.nextRunAtMs === "number",
);
if (enabled.length === 0) return undefined;
return enabled.reduce(
(min, j) => Math.min(min, j.state.nextRunAtMs as number),
enabled[0].state.nextRunAtMs as number,
);
}
private armTimer() {
if (this.timer) clearTimeout(this.timer);
this.timer = null;
if (!this.deps.cronEnabled) return;
const nextAt = this.nextWakeAtMs();
if (!nextAt) return;
const delay = Math.max(nextAt - this.deps.nowMs(), 0);
this.timer = setTimeout(() => {
void this.onTimer().catch((err) => {
this.deps.log.error({ err: String(err) }, "cron: timer tick failed");
});
}, delay);
this.timer.unref?.();
}
private async onTimer() {
if (this.running) return;
this.running = true;
try {
await this.locked(async () => {
await this.ensureLoaded();
await this.runDueJobs();
await this.persist();
this.armTimer();
});
} finally {
this.running = false;
}
}
private async runDueJobs() {
if (!this.store) return;
const now = this.deps.nowMs();
const due = this.store.jobs.filter((j) => {
if (!j.enabled) return false;
if (typeof j.state.runningAtMs === "number") return false;
const next = j.state.nextRunAtMs;
return typeof next === "number" && now >= next;
});
for (const job of due) {
await this.executeJob(job, now, { forced: false });
}
}
private async executeJob(
job: CronJob,
nowMs: number,
opts: { forced: boolean },
) {
const startedAt = this.deps.nowMs();
job.state.runningAtMs = startedAt;
job.state.lastError = undefined;
this.emit({ jobId: job.id, action: "started", runAtMs: startedAt });
const finish = async (
status: "ok" | "error" | "skipped",
err?: string,
summary?: string,
) => {
const endedAt = this.deps.nowMs();
job.state.runningAtMs = undefined;
job.state.lastRunAtMs = startedAt;
job.state.lastStatus = status;
job.state.lastDurationMs = Math.max(0, endedAt - startedAt);
job.state.lastError = err;
if (job.schedule.kind === "at" && status === "ok") {
// One-shot job completed successfully; disable it.
job.enabled = false;
job.state.nextRunAtMs = undefined;
} else if (job.enabled) {
job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, endedAt);
} else {
job.state.nextRunAtMs = undefined;
}
this.emit({
jobId: job.id,
action: "finished",
status,
error: err,
runAtMs: startedAt,
durationMs: job.state.lastDurationMs,
nextRunAtMs: job.state.nextRunAtMs,
});
if (summary && job.isolation?.postToMain) {
const prefix = job.isolation.postToMainPrefix?.trim() || "Cron";
this.deps.enqueueSystemEvent(`${prefix}: ${summary}`);
if (job.wakeMode === "now") {
this.deps.requestReplyHeartbeatNow({ reason: `cron:${job.id}:post` });
}
}
};
try {
if (job.sessionTarget === "main") {
const text = normalizePayloadToSystemText(job.payload);
this.deps.enqueueSystemEvent(text);
if (job.wakeMode === "now") {
this.deps.requestReplyHeartbeatNow({ reason: `cron:${job.id}` });
}
await finish("ok");
return;
}
if (job.payload.kind !== "agentTurn") {
await finish("skipped", "isolated job requires payload.kind=agentTurn");
return;
}
const res = await this.deps.runIsolatedAgentJob({
job,
message: job.payload.message,
});
if (res.status === "ok") await finish("ok", undefined, res.summary);
else if (res.status === "skipped")
await finish("skipped", undefined, res.summary);
else await finish("error", res.summary ?? "cron job failed");
} catch (err) {
await finish("error", String(err));
} finally {
job.updatedAtMs = nowMs;
if (!opts.forced && job.enabled) {
// Keep nextRunAtMs in sync in case the schedule advanced during a long run.
job.state.nextRunAtMs = this.computeJobNextRunAtMs(
job,
this.deps.nowMs(),
);
}
}
}
private emit(evt: CronEvent) {
try {
this.deps.onEvent?.(evt);
} catch {
/* ignore */
}
}
}

52
src/cron/store.ts Normal file
View File

@ -0,0 +1,52 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import JSON5 from "json5";
import { CONFIG_DIR } from "../utils.js";
import type { CronStoreFile } from "./types.js";
export const LEGACY_CRON_STORE_PATH = path.join(
CONFIG_DIR,
"cron",
"jobs.json",
);
export const DEFAULT_CRON_STORE_PATH = path.join(CONFIG_DIR, "cron.json");
export function resolveCronStorePath(storePath?: string) {
if (storePath?.trim()) {
const raw = storePath.trim();
if (raw.startsWith("~"))
return path.resolve(raw.replace("~", os.homedir()));
return path.resolve(raw);
}
if (fs.existsSync(LEGACY_CRON_STORE_PATH)) return LEGACY_CRON_STORE_PATH;
return DEFAULT_CRON_STORE_PATH;
}
export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
try {
const raw = await fs.promises.readFile(storePath, "utf-8");
const parsed = JSON5.parse(raw) as Partial<CronStoreFile> | null;
const jobs = Array.isArray(parsed?.jobs) ? (parsed?.jobs as never[]) : [];
return {
version: 1,
jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"],
};
} catch {
return { version: 1, jobs: [] };
}
}
export async function saveCronStore(storePath: string, store: CronStoreFile) {
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
const tmp = `${storePath}.${process.pid}.${Math.random().toString(16).slice(2)}.tmp`;
const json = JSON.stringify(store, null, 2);
await fs.promises.writeFile(tmp, json, "utf-8");
await fs.promises.rename(tmp, storePath);
try {
await fs.promises.copyFile(storePath, `${storePath}.bak`);
} catch {
// best-effort
}
}

64
src/cron/types.ts Normal file
View File

@ -0,0 +1,64 @@
export type CronSchedule =
| { kind: "at"; atMs: number }
| { kind: "every"; everyMs: number; anchorMs?: number }
| { kind: "cron"; expr: string; tz?: string };
export type CronSessionTarget = "main" | "isolated";
export type CronWakeMode = "next-heartbeat" | "now";
export type CronPayload =
| { kind: "systemEvent"; text: string }
| {
kind: "agentTurn";
message: string;
thinking?: string;
timeoutSeconds?: number;
deliver?: boolean;
channel?: "last" | "whatsapp" | "telegram";
to?: string;
bestEffortDeliver?: boolean;
};
export type CronIsolation = {
postToMain?: boolean;
postToMainPrefix?: string;
};
export type CronJobState = {
nextRunAtMs?: number;
runningAtMs?: number;
lastRunAtMs?: number;
lastStatus?: "ok" | "error" | "skipped";
lastError?: string;
lastDurationMs?: number;
};
export type CronJob = {
id: string;
name?: string;
enabled: boolean;
createdAtMs: number;
updatedAtMs: number;
schedule: CronSchedule;
sessionTarget: CronSessionTarget;
wakeMode: CronWakeMode;
payload: CronPayload;
isolation?: CronIsolation;
state: CronJobState;
};
export type CronStoreFile = {
version: 1;
jobs: CronJob[];
};
export type CronJobCreate = Omit<
CronJob,
"id" | "createdAtMs" | "updatedAtMs" | "state"
> & {
state?: Partial<CronJobState>;
};
export type CronJobPatch = Partial<
Omit<CronJob, "id" | "createdAtMs" | "state"> & { state: CronJobState }
>;

View File

@ -9,6 +9,21 @@ import {
ChatSendParamsSchema, ChatSendParamsSchema,
type ConnectParams, type ConnectParams,
ConnectParamsSchema, ConnectParamsSchema,
type CronAddParams,
CronAddParamsSchema,
type CronJob,
CronJobSchema,
type CronListParams,
CronListParamsSchema,
type CronRemoveParams,
CronRemoveParamsSchema,
type CronRunLogEntry,
type CronRunParams,
CronRunParamsSchema,
type CronRunsParams,
CronRunsParamsSchema,
type CronUpdateParams,
CronUpdateParamsSchema,
ErrorCodes, ErrorCodes,
type ErrorShape, type ErrorShape,
ErrorShapeSchema, ErrorShapeSchema,
@ -36,6 +51,8 @@ import {
StateVersionSchema, StateVersionSchema,
type TickEvent, type TickEvent,
TickEventSchema, TickEventSchema,
type WakeParams,
WakeParamsSchema,
} from "./schema.js"; } from "./schema.js";
const ajv = new ( const ajv = new (
@ -54,6 +71,21 @@ export const validateRequestFrame =
ajv.compile<RequestFrame>(RequestFrameSchema); ajv.compile<RequestFrame>(RequestFrameSchema);
export const validateSendParams = ajv.compile(SendParamsSchema); export const validateSendParams = ajv.compile(SendParamsSchema);
export const validateAgentParams = ajv.compile(AgentParamsSchema); export const validateAgentParams = ajv.compile(AgentParamsSchema);
export const validateWakeParams = ajv.compile<WakeParams>(WakeParamsSchema);
export const validateCronListParams =
ajv.compile<CronListParams>(CronListParamsSchema);
export const validateCronAddParams =
ajv.compile<CronAddParams>(CronAddParamsSchema);
export const validateCronUpdateParams = ajv.compile<CronUpdateParams>(
CronUpdateParamsSchema,
);
export const validateCronRemoveParams = ajv.compile<CronRemoveParams>(
CronRemoveParamsSchema,
);
export const validateCronRunParams =
ajv.compile<CronRunParams>(CronRunParamsSchema);
export const validateCronRunsParams =
ajv.compile<CronRunsParams>(CronRunsParamsSchema);
export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema); export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema);
export const validateChatSendParams = ajv.compile(ChatSendParamsSchema); export const validateChatSendParams = ajv.compile(ChatSendParamsSchema);
export const validateChatEvent = ajv.compile(ChatEventSchema); export const validateChatEvent = ajv.compile(ChatEventSchema);
@ -80,6 +112,14 @@ export {
ChatEventSchema, ChatEventSchema,
SendParamsSchema, SendParamsSchema,
AgentParamsSchema, AgentParamsSchema,
WakeParamsSchema,
CronJobSchema,
CronListParamsSchema,
CronAddParamsSchema,
CronUpdateParamsSchema,
CronRemoveParamsSchema,
CronRunParamsSchema,
CronRunsParamsSchema,
ChatHistoryParamsSchema, ChatHistoryParamsSchema,
ChatSendParamsSchema, ChatSendParamsSchema,
TickEventSchema, TickEventSchema,
@ -105,4 +145,13 @@ export type {
ChatEvent, ChatEvent,
TickEvent, TickEvent,
ShutdownEvent, ShutdownEvent,
WakeParams,
CronJob,
CronListParams,
CronAddParams,
CronUpdateParams,
CronRemoveParams,
CronRunParams,
CronRunsParams,
CronRunLogEntry,
}; };

View File

@ -203,6 +203,185 @@ export const AgentParamsSchema = Type.Object(
{ additionalProperties: false }, { additionalProperties: false },
); );
export const WakeParamsSchema = Type.Object(
{
mode: Type.Union([Type.Literal("now"), Type.Literal("next-heartbeat")]),
text: NonEmptyString,
},
{ additionalProperties: false },
);
export const CronScheduleSchema = Type.Union([
Type.Object(
{
kind: Type.Literal("at"),
atMs: Type.Integer({ minimum: 0 }),
},
{ additionalProperties: false },
),
Type.Object(
{
kind: Type.Literal("every"),
everyMs: Type.Integer({ minimum: 1 }),
anchorMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
),
Type.Object(
{
kind: Type.Literal("cron"),
expr: NonEmptyString,
tz: Type.Optional(Type.String()),
},
{ additionalProperties: false },
),
]);
export const CronPayloadSchema = Type.Union([
Type.Object(
{
kind: Type.Literal("systemEvent"),
text: NonEmptyString,
},
{ additionalProperties: false },
),
Type.Object(
{
kind: Type.Literal("agentTurn"),
message: NonEmptyString,
thinking: Type.Optional(Type.String()),
timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })),
deliver: Type.Optional(Type.Boolean()),
channel: Type.Optional(
Type.Union([
Type.Literal("last"),
Type.Literal("whatsapp"),
Type.Literal("telegram"),
]),
),
to: Type.Optional(Type.String()),
bestEffortDeliver: Type.Optional(Type.Boolean()),
},
{ additionalProperties: false },
),
]);
export const CronIsolationSchema = Type.Object(
{
postToMain: Type.Optional(Type.Boolean()),
postToMainPrefix: Type.Optional(Type.String()),
},
{ additionalProperties: false },
);
export const CronJobStateSchema = Type.Object(
{
nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
runningAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
lastRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
lastStatus: Type.Optional(
Type.Union([
Type.Literal("ok"),
Type.Literal("error"),
Type.Literal("skipped"),
]),
),
lastError: Type.Optional(Type.String()),
lastDurationMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);
export const CronJobSchema = Type.Object(
{
id: NonEmptyString,
name: Type.Optional(Type.String()),
enabled: Type.Boolean(),
createdAtMs: Type.Integer({ minimum: 0 }),
updatedAtMs: Type.Integer({ minimum: 0 }),
schedule: CronScheduleSchema,
sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]),
wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]),
payload: CronPayloadSchema,
isolation: Type.Optional(CronIsolationSchema),
state: CronJobStateSchema,
},
{ additionalProperties: false },
);
export const CronListParamsSchema = Type.Object(
{
includeDisabled: Type.Optional(Type.Boolean()),
},
{ additionalProperties: false },
);
export const CronAddParamsSchema = Type.Object(
{
name: Type.Optional(Type.String()),
enabled: Type.Optional(Type.Boolean()),
schedule: CronScheduleSchema,
sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]),
wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]),
payload: CronPayloadSchema,
isolation: Type.Optional(CronIsolationSchema),
},
{ additionalProperties: false },
);
export const CronUpdateParamsSchema = Type.Object(
{
id: NonEmptyString,
patch: Type.Partial(CronAddParamsSchema),
},
{ additionalProperties: false },
);
export const CronRemoveParamsSchema = Type.Object(
{
id: NonEmptyString,
},
{ additionalProperties: false },
);
export const CronRunParamsSchema = Type.Object(
{
id: NonEmptyString,
mode: Type.Optional(
Type.Union([Type.Literal("due"), Type.Literal("force")]),
),
},
{ additionalProperties: false },
);
export const CronRunsParamsSchema = Type.Object(
{
id: Type.Optional(NonEmptyString),
limit: Type.Optional(Type.Integer({ minimum: 1, maximum: 5000 })),
},
{ additionalProperties: false },
);
export const CronRunLogEntrySchema = Type.Object(
{
ts: Type.Integer({ minimum: 0 }),
jobId: NonEmptyString,
action: Type.Literal("finished"),
status: Type.Optional(
Type.Union([
Type.Literal("ok"),
Type.Literal("error"),
Type.Literal("skipped"),
]),
),
error: Type.Optional(Type.String()),
runAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
durationMs: Type.Optional(Type.Integer({ minimum: 0 })),
nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);
// WebChat/WebSocket-native chat methods // WebChat/WebSocket-native chat methods
export const ChatHistoryParamsSchema = Type.Object( export const ChatHistoryParamsSchema = Type.Object(
{ {
@ -256,6 +435,15 @@ export const ProtocolSchemas: Record<string, TSchema> = {
AgentEvent: AgentEventSchema, AgentEvent: AgentEventSchema,
SendParams: SendParamsSchema, SendParams: SendParamsSchema,
AgentParams: AgentParamsSchema, AgentParams: AgentParamsSchema,
WakeParams: WakeParamsSchema,
CronJob: CronJobSchema,
CronListParams: CronListParamsSchema,
CronAddParams: CronAddParamsSchema,
CronUpdateParams: CronUpdateParamsSchema,
CronRemoveParams: CronRemoveParamsSchema,
CronRunParams: CronRunParamsSchema,
CronRunsParams: CronRunsParamsSchema,
CronRunLogEntry: CronRunLogEntrySchema,
ChatHistoryParams: ChatHistoryParamsSchema, ChatHistoryParams: ChatHistoryParamsSchema,
ChatSendParams: ChatSendParamsSchema, ChatSendParams: ChatSendParamsSchema,
ChatEvent: ChatEventSchema, ChatEvent: ChatEventSchema,
@ -276,6 +464,15 @@ export type PresenceEntry = Static<typeof PresenceEntrySchema>;
export type ErrorShape = Static<typeof ErrorShapeSchema>; export type ErrorShape = Static<typeof ErrorShapeSchema>;
export type StateVersion = Static<typeof StateVersionSchema>; export type StateVersion = Static<typeof StateVersionSchema>;
export type AgentEvent = Static<typeof AgentEventSchema>; export type AgentEvent = Static<typeof AgentEventSchema>;
export type WakeParams = Static<typeof WakeParamsSchema>;
export type CronJob = Static<typeof CronJobSchema>;
export type CronListParams = Static<typeof CronListParamsSchema>;
export type CronAddParams = Static<typeof CronAddParamsSchema>;
export type CronUpdateParams = Static<typeof CronUpdateParamsSchema>;
export type CronRemoveParams = Static<typeof CronRemoveParamsSchema>;
export type CronRunParams = Static<typeof CronRunParamsSchema>;
export type CronRunsParams = Static<typeof CronRunsParamsSchema>;
export type CronRunLogEntry = Static<typeof CronRunLogEntrySchema>;
export type ChatEvent = Static<typeof ChatEventSchema>; export type ChatEvent = Static<typeof ChatEventSchema>;
export type TickEvent = Static<typeof TickEventSchema>; export type TickEvent = Static<typeof TickEventSchema>;
export type ShutdownEvent = Static<typeof ShutdownEventSchema>; export type ShutdownEvent = Static<typeof ShutdownEventSchema>;

View File

@ -14,6 +14,7 @@ import { startGatewayServer } from "./server.js";
let testSessionStorePath: string | undefined; let testSessionStorePath: string | undefined;
let testAllowFrom: string[] | undefined; let testAllowFrom: string[] | undefined;
let testCronStorePath: string | undefined;
vi.mock("../config/config.js", () => ({ vi.mock("../config/config.js", () => ({
loadConfig: () => ({ loadConfig: () => ({
inbound: { inbound: {
@ -24,6 +25,7 @@ vi.mock("../config/config.js", () => ({
session: { mainKey: "main", store: testSessionStorePath }, session: { mainKey: "main", store: testSessionStorePath },
}, },
}, },
cron: { enabled: false, store: testCronStorePath },
}), }),
})); }));
@ -173,6 +175,273 @@ async function connectOk(
} }
describe("gateway server", () => { describe("gateway server", () => {
test("supports cron.add and cron.list", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-cron-"));
testCronStorePath = path.join(dir, "cron.json");
await fs.writeFile(
testCronStorePath,
JSON.stringify({ version: 1, jobs: [] }),
);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
ws.send(
JSON.stringify({
type: "req",
id: "cron-add-1",
method: "cron.add",
params: {
name: "daily",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "hello" },
},
}),
);
const addRes = await onceMessage<{
type: "res";
ok: boolean;
payload?: unknown;
}>(ws, (o) => o.type === "res" && o.id === "cron-add-1");
expect(addRes.ok).toBe(true);
expect(typeof (addRes.payload as { id?: unknown } | null)?.id).toBe(
"string",
);
ws.send(
JSON.stringify({
type: "req",
id: "cron-list-1",
method: "cron.list",
params: { includeDisabled: true },
}),
);
const listRes = await onceMessage<{
type: "res";
ok: boolean;
payload?: unknown;
}>(ws, (o) => o.type === "res" && o.id === "cron-list-1");
expect(listRes.ok).toBe(true);
const jobs = (listRes.payload as { jobs?: unknown } | null)?.jobs;
expect(Array.isArray(jobs)).toBe(true);
expect((jobs as unknown[]).length).toBe(1);
expect(((jobs as Array<{ name?: unknown }>)[0]?.name as string) ?? "").toBe(
"daily",
);
ws.close();
await server.close();
await fs.rm(dir, { recursive: true, force: true });
testCronStorePath = undefined;
});
test("writes cron run history for flat store paths", async () => {
const dir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-gw-cron-log-"),
);
testCronStorePath = path.join(dir, "cron.json");
await fs.writeFile(
testCronStorePath,
JSON.stringify({ version: 1, jobs: [] }),
);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const atMs = Date.now() - 1;
ws.send(
JSON.stringify({
type: "req",
id: "cron-add-log-1",
method: "cron.add",
params: {
enabled: true,
schedule: { kind: "at", atMs },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "hello" },
},
}),
);
const addRes = await onceMessage<{
type: "res";
ok: boolean;
payload?: unknown;
}>(ws, (o) => o.type === "res" && o.id === "cron-add-log-1");
expect(addRes.ok).toBe(true);
const jobId = String((addRes.payload as { id?: unknown } | null)?.id ?? "");
expect(jobId.length > 0).toBe(true);
ws.send(
JSON.stringify({
type: "req",
id: "cron-run-log-1",
method: "cron.run",
params: { id: jobId, mode: "force" },
}),
);
const runRes = await onceMessage<{ type: "res"; ok: boolean }>(
ws,
(o) => o.type === "res" && o.id === "cron-run-log-1",
8000,
);
expect(runRes.ok).toBe(true);
const logPath = path.join(dir, "cron.runs.jsonl");
const waitForLog = async () => {
for (let i = 0; i < 200; i++) {
const raw = await fs.readFile(logPath, "utf-8").catch(() => "");
if (raw.trim().length > 0) return raw;
await new Promise((r) => setTimeout(r, 10));
}
throw new Error("timeout waiting for cron run log");
};
const raw = await waitForLog();
const lines = raw
.split("\n")
.map((l) => l.trim())
.filter(Boolean);
expect(lines.length).toBeGreaterThan(0);
const last = JSON.parse(lines.at(-1) ?? "{}") as {
jobId?: unknown;
action?: unknown;
status?: unknown;
};
expect(last.action).toBe("finished");
expect(last.jobId).toBe(jobId);
expect(last.status).toBe("ok");
ws.send(
JSON.stringify({
type: "req",
id: "cron-runs-1",
method: "cron.runs",
params: { id: jobId, limit: 50 },
}),
);
const runsRes = await onceMessage<{
type: "res";
ok: boolean;
payload?: unknown;
}>(ws, (o) => o.type === "res" && o.id === "cron-runs-1", 8000);
expect(runsRes.ok).toBe(true);
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
expect(Array.isArray(entries)).toBe(true);
expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
ws.close();
await server.close();
await fs.rm(dir, { recursive: true, force: true });
testCronStorePath = undefined;
});
test("writes cron run history to per-job runs/ when store is jobs.json", async () => {
const dir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-gw-cron-log-jobs-"),
);
const cronDir = path.join(dir, "cron");
testCronStorePath = path.join(cronDir, "jobs.json");
await fs.mkdir(cronDir, { recursive: true });
await fs.writeFile(
testCronStorePath,
JSON.stringify({ version: 1, jobs: [] }),
);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const atMs = Date.now() - 1;
ws.send(
JSON.stringify({
type: "req",
id: "cron-add-log-2",
method: "cron.add",
params: {
enabled: true,
schedule: { kind: "at", atMs },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "hello" },
},
}),
);
const addRes = await onceMessage<{
type: "res";
ok: boolean;
payload?: unknown;
}>(ws, (o) => o.type === "res" && o.id === "cron-add-log-2");
expect(addRes.ok).toBe(true);
const jobId = String((addRes.payload as { id?: unknown } | null)?.id ?? "");
expect(jobId.length > 0).toBe(true);
ws.send(
JSON.stringify({
type: "req",
id: "cron-run-log-2",
method: "cron.run",
params: { id: jobId, mode: "force" },
}),
);
const runRes = await onceMessage<{ type: "res"; ok: boolean }>(
ws,
(o) => o.type === "res" && o.id === "cron-run-log-2",
8000,
);
expect(runRes.ok).toBe(true);
const logPath = path.join(cronDir, "runs", `${jobId}.jsonl`);
const waitForLog = async () => {
for (let i = 0; i < 200; i++) {
const raw = await fs.readFile(logPath, "utf-8").catch(() => "");
if (raw.trim().length > 0) return raw;
await new Promise((r) => setTimeout(r, 10));
}
throw new Error("timeout waiting for per-job cron run log");
};
const raw = await waitForLog();
const line = raw
.split("\n")
.map((l) => l.trim())
.filter(Boolean)
.at(-1);
const last = JSON.parse(line ?? "{}") as {
jobId?: unknown;
action?: unknown;
};
expect(last.action).toBe("finished");
expect(last.jobId).toBe(jobId);
ws.send(
JSON.stringify({
type: "req",
id: "cron-runs-2",
method: "cron.runs",
params: { id: jobId, limit: 20 },
}),
);
const runsRes = await onceMessage<{
type: "res";
ok: boolean;
payload?: unknown;
}>(ws, (o) => o.type === "res" && o.id === "cron-runs-2", 8000);
expect(runsRes.ok).toBe(true);
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
expect(Array.isArray(entries)).toBe(true);
expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
ws.close();
await server.close();
await fs.rm(dir, { recursive: true, force: true });
testCronStorePath = undefined;
});
test("broadcasts heartbeat events and serves last-heartbeat", async () => { test("broadcasts heartbeat events and serves last-heartbeat", async () => {
type HeartbeatPayload = { type HeartbeatPayload = {
ts: number; ts: number;
@ -196,16 +465,7 @@ describe("gateway server", () => {
}; };
const { server, ws } = await startServerWithClient(); const { server, ws } = await startServerWithClient();
ws.send( await connectOk(ws);
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
const waitHeartbeat = onceMessage<EventFrame>( const waitHeartbeat = onceMessage<EventFrame>(
ws, ws,
@ -631,13 +891,18 @@ describe("gateway server", () => {
await connectOk(ws); await connectOk(ws);
// Emit a fake agent event directly through the shared emitter. // Emit a fake agent event directly through the shared emitter.
const runId = randomUUID();
const evtPromise = onceMessage( const evtPromise = onceMessage(
ws, ws,
(o) => o.type === "event" && o.event === "agent", (o) =>
o.type === "event" &&
o.event === "agent" &&
o.payload?.runId === runId &&
o.payload?.stream === "job",
); );
emitAgentEvent({ runId: "run-1", stream: "job", data: { msg: "hi" } }); emitAgentEvent({ runId, stream: "job", data: { msg: "hi" } });
const evt = await evtPromise; const evt = await evtPromise;
expect(evt.payload.runId).toBe("run-1"); expect(evt.payload.runId).toBe(runId);
expect(typeof evt.seq).toBe("number"); expect(typeof evt.seq).toBe("number");
expect(evt.payload.data.msg).toBe("hi"); expect(evt.payload.data.msg).toBe("hi");

View File

@ -19,6 +19,15 @@ import {
type SessionEntry, type SessionEntry,
saveSessionStore, saveSessionStore,
} from "../config/sessions.js"; } from "../config/sessions.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import {
appendCronRunLog,
readCronRunLogEntries,
resolveCronRunLogPath,
} from "../cron/run-log.js";
import { CronService } from "../cron/service.js";
import { resolveCronStorePath } from "../cron/store.js";
import type { CronJobCreate, CronJobPatch } from "../cron/types.js";
import { isVerbose } from "../globals.js"; import { isVerbose } from "../globals.js";
import { onAgentEvent } from "../infra/agent-events.js"; import { onAgentEvent } from "../infra/agent-events.js";
import { GatewayLockError } from "../infra/gateway-lock.js"; import { GatewayLockError } from "../infra/gateway-lock.js";
@ -33,7 +42,12 @@ import {
upsertPresence, upsertPresence,
} from "../infra/system-presence.js"; } from "../infra/system-presence.js";
import { logError, logInfo, logWarn } from "../logger.js"; import { logError, logInfo, logWarn } from "../logger.js";
import { getLogger, getResolvedLoggerSettings } from "../logging.js"; import {
getChildLogger,
getLogger,
getResolvedLoggerSettings,
} from "../logging.js";
import { setCommandLaneConcurrency } from "../process/command-queue.js";
import { monitorWebProvider, webAuthExists } from "../providers/web/index.js"; import { monitorWebProvider, webAuthExists } from "../providers/web/index.js";
import { defaultRuntime } from "../runtime.js"; import { defaultRuntime } from "../runtime.js";
import { monitorTelegramProvider } from "../telegram/monitor.js"; import { monitorTelegramProvider } from "../telegram/monitor.js";
@ -41,6 +55,7 @@ import { sendMessageTelegram } from "../telegram/send.js";
import { normalizeE164 } from "../utils.js"; import { normalizeE164 } from "../utils.js";
import { setHeartbeatsEnabled } from "../web/auto-reply.js"; import { setHeartbeatsEnabled } from "../web/auto-reply.js";
import { sendMessageWhatsApp } from "../web/outbound.js"; import { sendMessageWhatsApp } from "../web/outbound.js";
import { requestReplyHeartbeatNow } from "../web/reply-heartbeat-wake.js";
import { ensureWebChatServerFromConfig } from "../webchat/server.js"; import { ensureWebChatServerFromConfig } from "../webchat/server.js";
import { buildMessageWithAttachments } from "./chat-attachments.js"; import { buildMessageWithAttachments } from "./chat-attachments.js";
import { import {
@ -56,8 +71,15 @@ import {
validateChatHistoryParams, validateChatHistoryParams,
validateChatSendParams, validateChatSendParams,
validateConnectParams, validateConnectParams,
validateCronAddParams,
validateCronListParams,
validateCronRemoveParams,
validateCronRunParams,
validateCronRunsParams,
validateCronUpdateParams,
validateRequestFrame, validateRequestFrame,
validateSendParams, validateSendParams,
validateWakeParams,
} from "./protocol/index.js"; } from "./protocol/index.js";
type Client = { type Client = {
@ -72,6 +94,13 @@ const METHODS = [
"status", "status",
"last-heartbeat", "last-heartbeat",
"set-heartbeats", "set-heartbeats",
"wake",
"cron.list",
"cron.add",
"cron.update",
"cron.remove",
"cron.run",
"cron.runs",
"system-presence", "system-presence",
"system-event", "system-event",
"send", "send",
@ -89,6 +118,7 @@ const EVENTS = [
"shutdown", "shutdown",
"health", "health",
"heartbeat", "heartbeat",
"cron",
]; ];
export type GatewayServer = { export type GatewayServer = {
@ -322,6 +352,59 @@ export async function startGatewayServer(
const providerAbort = new AbortController(); const providerAbort = new AbortController();
const providerTasks: Array<Promise<unknown>> = []; const providerTasks: Array<Promise<unknown>> = [];
const clients = new Set<Client>(); const clients = new Set<Client>();
const cfgAtStart = loadConfig();
setCommandLaneConcurrency("cron", cfgAtStart.cron?.maxConcurrentRuns ?? 1);
const cronStorePath = resolveCronStorePath(cfgAtStart.cron?.store);
const cronLogger = getChildLogger({
module: "cron",
storePath: cronStorePath,
});
const cronDeps = createDefaultDeps();
const cronEnabled =
process.env.CLAWDIS_SKIP_CRON !== "1" && cfgAtStart.cron?.enabled === true;
const cron = new CronService({
storePath: cronStorePath,
cronEnabled,
enqueueSystemEvent,
requestReplyHeartbeatNow,
runIsolatedAgentJob: async ({ job, message }) => {
const cfg = loadConfig();
return await runCronIsolatedAgentTurn({
cfg,
deps: cronDeps,
job,
message,
sessionKey: `cron:${job.id}`,
lane: "cron",
});
},
log: cronLogger,
onEvent: (evt) => {
broadcast("cron", evt, { dropIfSlow: true });
if (evt.action === "finished") {
const logPath = resolveCronRunLogPath({
storePath: cronStorePath,
jobId: evt.jobId,
});
void appendCronRunLog(logPath, {
ts: Date.now(),
jobId: evt.jobId,
action: "finished",
status: evt.status,
error: evt.error,
runAtMs: evt.runAtMs,
durationMs: evt.durationMs,
nextRunAtMs: evt.nextRunAtMs,
}).catch((err) => {
cronLogger.warn(
{ err: String(err), logPath },
"cron: run log append failed",
);
});
}
},
});
const startProviders = async () => { const startProviders = async () => {
const cfg = loadConfig(); const cfg = loadConfig();
@ -513,6 +596,10 @@ export async function startGatewayServer(
broadcast("heartbeat", evt, { dropIfSlow: true }); broadcast("heartbeat", evt, { dropIfSlow: true });
}); });
void cron
.start()
.catch((err) => logError(`cron failed to start: ${String(err)}`));
wss.on("connection", (socket) => { wss.on("connection", (socket) => {
let client: Client | null = null; let client: Client | null = null;
let closed = false; let closed = false;
@ -988,6 +1075,157 @@ export async function startGatewayServer(
} }
break; break;
} }
case "wake": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateWakeParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid wake params: ${formatValidationErrors(validateWakeParams.errors)}`,
),
);
break;
}
const p = params as {
mode: "now" | "next-heartbeat";
text: string;
};
const result = cron.wake({ mode: p.mode, text: p.text });
respond(true, result, undefined);
break;
}
case "cron.list": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateCronListParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.list params: ${formatValidationErrors(validateCronListParams.errors)}`,
),
);
break;
}
const p = params as { includeDisabled?: boolean };
const jobs = await cron.list({
includeDisabled: p.includeDisabled,
});
respond(true, { jobs }, undefined);
break;
}
case "cron.add": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateCronAddParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.add params: ${formatValidationErrors(validateCronAddParams.errors)}`,
),
);
break;
}
const job = await cron.add(params as unknown as CronJobCreate);
respond(true, job, undefined);
break;
}
case "cron.update": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateCronUpdateParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.update params: ${formatValidationErrors(validateCronUpdateParams.errors)}`,
),
);
break;
}
const p = params as { id: string; patch: Record<string, unknown> };
const job = await cron.update(
p.id,
p.patch as unknown as CronJobPatch,
);
respond(true, job, undefined);
break;
}
case "cron.remove": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateCronRemoveParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.remove params: ${formatValidationErrors(validateCronRemoveParams.errors)}`,
),
);
break;
}
const p = params as { id: string };
const result = await cron.remove(p.id);
respond(true, result, undefined);
break;
}
case "cron.run": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateCronRunParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.run params: ${formatValidationErrors(validateCronRunParams.errors)}`,
),
);
break;
}
const p = params as { id: string; mode?: "due" | "force" };
const result = await cron.run(p.id, p.mode);
respond(true, result, undefined);
break;
}
case "cron.runs": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateCronRunsParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.runs params: ${formatValidationErrors(validateCronRunsParams.errors)}`,
),
);
break;
}
const p = params as { id?: string; limit?: number };
if (!p.id && cronStorePath.endsWith(`${path.sep}jobs.json`)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"cron.runs requires id when using jobs.json store layout",
),
);
break;
}
const logPath = resolveCronRunLogPath({
storePath: cronStorePath,
jobId: p.id ?? "all",
});
const entries = await readCronRunLogEntries(logPath, {
limit: p.limit,
jobId: p.id,
});
respond(true, { entries }, undefined);
break;
}
case "status": { case "status": {
const status = await getStatusSummary(); const status = await getStatusSummary();
respond(true, status, undefined); respond(true, status, undefined);
@ -1426,6 +1664,7 @@ export async function startGatewayServer(
return { return {
close: async () => { close: async () => {
providerAbort.abort(); providerAbort.abort();
cron.stop();
broadcast("shutdown", { broadcast("shutdown", {
reason: "gateway stopping", reason: "gateway stopping",
restartExpectedMs: null, restartExpectedMs: null,

View File

@ -1,5 +1,7 @@
// Minimal in-process queue to serialize command executions. // Minimal in-process queue to serialize command executions.
// Ensures only one command runs at a time across webhook, poller, and web inbox flows. // Default lane ("main") preserves the existing behavior. Additional lanes allow
// low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for
// the main auto-reply workflow.
type QueueEntry = { type QueueEntry = {
task: () => Promise<unknown>; task: () => Promise<unknown>;
@ -10,26 +12,91 @@ type QueueEntry = {
onWait?: (waitMs: number, queuedAhead: number) => void; onWait?: (waitMs: number, queuedAhead: number) => void;
}; };
const queue: QueueEntry[] = []; type LaneState = {
let draining = false; lane: string;
queue: QueueEntry[];
active: number;
maxConcurrent: number;
draining: boolean;
};
async function drainQueue() { const lanes = new Map<string, LaneState>();
if (draining) return;
draining = true; function getLaneState(lane: string): LaneState {
while (queue.length) { const existing = lanes.get(lane);
const entry = queue.shift() as QueueEntry; if (existing) return existing;
const waitedMs = Date.now() - entry.enqueuedAt; const created: LaneState = {
if (waitedMs >= entry.warnAfterMs) { lane,
entry.onWait?.(waitedMs, queue.length); queue: [],
active: 0,
maxConcurrent: 1,
draining: false,
};
lanes.set(lane, created);
return created;
}
function drainLane(lane: string) {
const state = getLaneState(lane);
if (state.draining) return;
state.draining = true;
const pump = () => {
while (state.active < state.maxConcurrent && state.queue.length > 0) {
const entry = state.queue.shift() as QueueEntry;
const waitedMs = Date.now() - entry.enqueuedAt;
if (waitedMs >= entry.warnAfterMs) {
entry.onWait?.(waitedMs, state.queue.length);
}
state.active += 1;
void (async () => {
try {
const result = await entry.task();
state.active -= 1;
pump();
entry.resolve(result);
} catch (err) {
state.active -= 1;
pump();
entry.reject(err);
}
})();
} }
try { state.draining = false;
const result = await entry.task(); };
entry.resolve(result);
} catch (err) { pump();
entry.reject(err); }
}
} export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) {
draining = false; const cleaned = lane.trim() || "main";
const state = getLaneState(cleaned);
state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent));
drainLane(cleaned);
}
export function enqueueCommandInLane<T>(
lane: string,
task: () => Promise<T>,
opts?: {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
},
): Promise<T> {
const cleaned = lane.trim() || "main";
const warnAfterMs = opts?.warnAfterMs ?? 2_000;
const state = getLaneState(cleaned);
return new Promise<T>((resolve, reject) => {
state.queue.push({
task: () => task(),
resolve: (value) => resolve(value as T),
reject,
enqueuedAt: Date.now(),
warnAfterMs,
onWait: opts?.onWait,
});
drainLane(cleaned);
});
} }
export function enqueueCommand<T>( export function enqueueCommand<T>(
@ -39,20 +106,19 @@ export function enqueueCommand<T>(
onWait?: (waitMs: number, queuedAhead: number) => void; onWait?: (waitMs: number, queuedAhead: number) => void;
}, },
): Promise<T> { ): Promise<T> {
const warnAfterMs = opts?.warnAfterMs ?? 2_000; return enqueueCommandInLane("main", task, opts);
return new Promise<T>((resolve, reject) => {
queue.push({
task: () => task(),
resolve: (value) => resolve(value as T),
reject,
enqueuedAt: Date.now(),
warnAfterMs,
onWait: opts?.onWait,
});
void drainQueue();
});
} }
export function getQueueSize() { export function getQueueSize(lane = "main") {
return queue.length + (draining ? 1 : 0); const state = lanes.get(lane);
if (!state) return 0;
return state.queue.length + state.active;
}
export function getTotalQueueSize() {
let total = 0;
for (const s of lanes.values()) {
total += s.queue.length + s.active;
}
return total;
} }

View File

@ -115,7 +115,9 @@ describe("heartbeat helpers", () => {
describe("resolveHeartbeatRecipients", () => { describe("resolveHeartbeatRecipients", () => {
it("returns the sole session recipient", async () => { it("returns the sole session recipient", async () => {
const now = Date.now(); const now = Date.now();
const store = await makeSessionStore({ "+1000": { updatedAt: now } }); const store = await makeSessionStore({
main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" },
});
const cfg: ClawdisConfig = { const cfg: ClawdisConfig = {
inbound: { inbound: {
allowFrom: ["+1999"], allowFrom: ["+1999"],
@ -131,8 +133,8 @@ describe("resolveHeartbeatRecipients", () => {
it("surfaces ambiguity when multiple sessions exist", async () => { it("surfaces ambiguity when multiple sessions exist", async () => {
const now = Date.now(); const now = Date.now();
const store = await makeSessionStore({ const store = await makeSessionStore({
"+1000": { updatedAt: now }, main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" },
"+2000": { updatedAt: now - 10 }, alt: { updatedAt: now - 10, lastChannel: "whatsapp", lastTo: "+2000" },
}); });
const cfg: ClawdisConfig = { const cfg: ClawdisConfig = {
inbound: { inbound: {
@ -162,7 +164,9 @@ describe("resolveHeartbeatRecipients", () => {
it("merges sessions and allowFrom when --all is set", async () => { it("merges sessions and allowFrom when --all is set", async () => {
const now = Date.now(); const now = Date.now();
const store = await makeSessionStore({ "+1000": { updatedAt: now } }); const store = await makeSessionStore({
main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" },
});
const cfg: ClawdisConfig = { const cfg: ClawdisConfig = {
inbound: { inbound: {
allowFrom: ["+1999"], allowFrom: ["+1999"],

View File

@ -32,6 +32,8 @@ import {
resolveReconnectPolicy, resolveReconnectPolicy,
sleepWithAbort, sleepWithAbort,
} from "./reconnect.js"; } from "./reconnect.js";
import type { ReplyHeartbeatWakeResult } from "./reply-heartbeat-wake.js";
import { setReplyHeartbeatWakeHandler } from "./reply-heartbeat-wake.js";
import { formatError, getWebAuthAgeMs, readWebSelfId } from "./session.js"; import { formatError, getWebAuthAgeMs, readWebSelfId } from "./session.js";
const WEB_TEXT_LIMIT = 4000; const WEB_TEXT_LIMIT = 4000;
@ -379,21 +381,24 @@ export async function runWebHeartbeatOnce(opts: {
} }
function getFallbackRecipient(cfg: ReturnType<typeof loadConfig>) { function getFallbackRecipient(cfg: ReturnType<typeof loadConfig>) {
const storePath = resolveStorePath(cfg.inbound?.reply?.session?.store); const sessionCfg = cfg.inbound?.reply?.session;
const storePath = resolveStorePath(sessionCfg?.store);
const store = loadSessionStore(storePath); const store = loadSessionStore(storePath);
const candidates = Object.entries(store).filter(([key]) => key !== "global"); const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
if (candidates.length === 0) { const main = store[mainKey];
const allowFrom = const lastTo = typeof main?.lastTo === "string" ? main.lastTo.trim() : "";
Array.isArray(cfg.inbound?.allowFrom) && cfg.inbound.allowFrom.length > 0 const lastChannel = main?.lastChannel;
? cfg.inbound.allowFrom.filter((v) => v !== "*")
: []; if (lastChannel === "whatsapp" && lastTo) {
if (allowFrom.length === 0) return null; return normalizeE164(lastTo);
return allowFrom[0] ? normalizeE164(allowFrom[0]) : null;
} }
const mostRecent = candidates.sort(
(a, b) => (b[1]?.updatedAt ?? 0) - (a[1]?.updatedAt ?? 0), const allowFrom =
)[0]; Array.isArray(cfg.inbound?.allowFrom) && cfg.inbound.allowFrom.length > 0
return mostRecent ? normalizeE164(mostRecent[0]) : null; ? cfg.inbound.allowFrom.filter((v) => v !== "*")
: [];
if (allowFrom.length === 0) return null;
return allowFrom[0] ? normalizeE164(allowFrom[0]) : null;
} }
function getSessionRecipients(cfg: ReturnType<typeof loadConfig>) { function getSessionRecipients(cfg: ReturnType<typeof loadConfig>) {
@ -402,14 +407,30 @@ function getSessionRecipients(cfg: ReturnType<typeof loadConfig>) {
if (scope === "global") return []; if (scope === "global") return [];
const storePath = resolveStorePath(cfg.inbound?.reply?.session?.store); const storePath = resolveStorePath(cfg.inbound?.reply?.session?.store);
const store = loadSessionStore(storePath); const store = loadSessionStore(storePath);
return Object.entries(store) const isGroupKey = (key: string) =>
key.startsWith("group:") || key.includes("@g.us");
const isCronKey = (key: string) => key.startsWith("cron:");
const recipients = Object.entries(store)
.filter(([key]) => key !== "global" && key !== "unknown") .filter(([key]) => key !== "global" && key !== "unknown")
.map(([key, entry]) => ({ .filter(([key]) => !isGroupKey(key) && !isCronKey(key))
to: normalizeE164(key), .map(([_, entry]) => ({
to:
entry?.lastChannel === "whatsapp" && entry?.lastTo
? normalizeE164(entry.lastTo)
: "",
updatedAt: entry?.updatedAt ?? 0, updatedAt: entry?.updatedAt ?? 0,
})) }))
.filter(({ to }) => Boolean(to)) .filter(({ to }) => to.length > 1)
.sort((a, b) => b.updatedAt - a.updatedAt); .sort((a, b) => b.updatedAt - a.updatedAt);
// Dedupe while preserving recency ordering.
const seen = new Set<string>();
return recipients.filter((r) => {
if (seen.has(r.to)) return false;
seen.add(r.to);
return true;
});
} }
export function resolveHeartbeatRecipients( export function resolveHeartbeatRecipients(
@ -1055,6 +1076,7 @@ export async function monitorWebProvider(
const closeListener = async () => { const closeListener = async () => {
setActiveWebListener(null); setActiveWebListener(null);
setReplyHeartbeatWakeHandler(null);
if (heartbeat) clearInterval(heartbeat); if (heartbeat) clearInterval(heartbeat);
if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer); if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer);
if (watchdogTimer) clearInterval(watchdogTimer); if (watchdogTimer) clearInterval(watchdogTimer);
@ -1126,8 +1148,11 @@ export async function monitorWebProvider(
}, WATCHDOG_CHECK_MS); }, WATCHDOG_CHECK_MS);
} }
const runReplyHeartbeat = async () => { const runReplyHeartbeat = async (): Promise<ReplyHeartbeatWakeResult> => {
if (!heartbeatsEnabled) return; const started = Date.now();
if (!heartbeatsEnabled) {
return { status: "skipped", reason: "disabled" };
}
const queued = getQueueSize(); const queued = getQueueSize();
if (queued > 0) { if (queued > 0) {
heartbeatLogger.info( heartbeatLogger.info(
@ -1135,16 +1160,18 @@ export async function monitorWebProvider(
"reply heartbeat skipped", "reply heartbeat skipped",
); );
console.log(success("heartbeat: skipped (requests in flight)")); console.log(success("heartbeat: skipped (requests in flight)"));
return; return { status: "skipped", reason: "requests-in-flight" };
}
if (!replyHeartbeatMinutes) {
return { status: "skipped", reason: "disabled" };
} }
if (!replyHeartbeatMinutes) return;
if (lastInboundMsg?.chatType === "group") { if (lastInboundMsg?.chatType === "group") {
heartbeatLogger.info( heartbeatLogger.info(
{ connectionId, reason: "last-inbound-group" }, { connectionId, reason: "last-inbound-group" },
"reply heartbeat skipped", "reply heartbeat skipped",
); );
console.log(success("heartbeat: skipped (group chat)")); console.log(success("heartbeat: skipped (group chat)"));
return; return { status: "skipped", reason: "group-chat" };
} }
const tickStart = Date.now(); const tickStart = Date.now();
if (!lastInboundMsg) { if (!lastInboundMsg) {
@ -1159,7 +1186,7 @@ export async function monitorWebProvider(
"reply heartbeat skipped", "reply heartbeat skipped",
); );
console.log(success("heartbeat: skipped (no recent inbound)")); console.log(success("heartbeat: skipped (no recent inbound)"));
return; return { status: "skipped", reason: "no-recent-inbound" };
} }
const snapshot = getSessionSnapshot(cfg, fallbackTo, true); const snapshot = getSessionSnapshot(cfg, fallbackTo, true);
if (!snapshot.entry) { if (!snapshot.entry) {
@ -1168,7 +1195,7 @@ export async function monitorWebProvider(
"reply heartbeat skipped", "reply heartbeat skipped",
); );
console.log(success("heartbeat: skipped (no session to resume)")); console.log(success("heartbeat: skipped (no session to resume)"));
return; return { status: "skipped", reason: "no-session-for-fallback" };
} }
if (isVerbose()) { if (isVerbose()) {
heartbeatLogger.info( heartbeatLogger.info(
@ -1199,7 +1226,7 @@ export async function monitorWebProvider(
}, },
"reply heartbeat sent (fallback session)", "reply heartbeat sent (fallback session)",
); );
return; return { status: "ran", durationMs: Date.now() - started };
} }
try { try {
@ -1252,7 +1279,7 @@ export async function monitorWebProvider(
"reply heartbeat skipped", "reply heartbeat skipped",
); );
console.log(success("heartbeat: ok (empty reply)")); console.log(success("heartbeat: ok (empty reply)"));
return; return { status: "ran", durationMs: Date.now() - started };
} }
const stripped = stripHeartbeatToken(replyPayload.text); const stripped = stripHeartbeatToken(replyPayload.text);
@ -1270,7 +1297,7 @@ export async function monitorWebProvider(
"reply heartbeat skipped", "reply heartbeat skipped",
); );
console.log(success("heartbeat: ok (HEARTBEAT_OK)")); console.log(success("heartbeat: ok (HEARTBEAT_OK)"));
return; return { status: "ran", durationMs: Date.now() - started };
} }
// Apply response prefix if configured (same as regular messages) // Apply response prefix if configured (same as regular messages)
@ -1310,6 +1337,7 @@ export async function monitorWebProvider(
}, },
"reply heartbeat sent", "reply heartbeat sent",
); );
return { status: "ran", durationMs: Date.now() - started };
} catch (err) { } catch (err) {
const durationMs = Date.now() - tickStart; const durationMs = Date.now() - tickStart;
heartbeatLogger.warn( heartbeatLogger.warn(
@ -1323,9 +1351,12 @@ export async function monitorWebProvider(
console.log( console.log(
danger(`heartbeat: failed (${formatDuration(durationMs)})`), danger(`heartbeat: failed (${formatDuration(durationMs)})`),
); );
return { status: "failed", reason: String(err) };
} }
}; };
setReplyHeartbeatWakeHandler(async () => runReplyHeartbeat());
if (replyHeartbeatMinutes && !replyHeartbeatTimer) { if (replyHeartbeatMinutes && !replyHeartbeatTimer) {
const intervalMs = replyHeartbeatMinutes * 60_000; const intervalMs = replyHeartbeatMinutes * 60_000;
replyHeartbeatTimer = setInterval(() => { replyHeartbeatTimer = setInterval(() => {

View File

@ -0,0 +1,77 @@
export type ReplyHeartbeatWakeResult =
| { status: "ran"; durationMs: number }
| { status: "skipped"; reason: string }
| { status: "failed"; reason: string };
export type ReplyHeartbeatWakeHandler = (opts: {
reason?: string;
}) => Promise<ReplyHeartbeatWakeResult>;
let handler: ReplyHeartbeatWakeHandler | null = null;
let pendingReason: string | null = null;
let scheduled = false;
let running = false;
let timer: NodeJS.Timeout | null = null;
const DEFAULT_COALESCE_MS = 250;
const DEFAULT_RETRY_MS = 1_000;
function schedule(coalesceMs: number) {
if (timer) return;
timer = setTimeout(async () => {
timer = null;
scheduled = false;
const active = handler;
if (!active) return;
if (running) {
scheduled = true;
schedule(coalesceMs);
return;
}
const reason = pendingReason;
pendingReason = null;
running = true;
try {
const res = await active({ reason: reason ?? undefined });
if (res.status === "skipped" && res.reason === "requests-in-flight") {
// The main lane is busy; retry soon.
pendingReason = reason ?? "retry";
schedule(DEFAULT_RETRY_MS);
}
} catch (err) {
pendingReason = reason ?? "retry";
schedule(DEFAULT_RETRY_MS);
throw err;
} finally {
running = false;
if (pendingReason || scheduled) schedule(coalesceMs);
}
}, coalesceMs);
timer.unref?.();
}
export function setReplyHeartbeatWakeHandler(
next: ReplyHeartbeatWakeHandler | null,
) {
handler = next;
if (handler && pendingReason) {
schedule(DEFAULT_COALESCE_MS);
}
}
export function requestReplyHeartbeatNow(opts?: {
reason?: string;
coalesceMs?: number;
}) {
pendingReason = opts?.reason ?? pendingReason ?? "requested";
schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS);
}
export function hasReplyHeartbeatWakeHandler() {
return handler !== null;
}
export function hasPendingReplyHeartbeatWake() {
return pendingReason !== null || Boolean(timer) || scheduled;
}