fix: cron announce delivery path (#8540) (thanks @tyler6204)

main
Peter Steinberger 2026-02-04 01:01:29 -08:00
parent c396877dd9
commit 6341819d74
7 changed files with 82 additions and 97 deletions

View File

@ -137,10 +137,13 @@ Key behaviors:
- Prompt is prefixed with `[cron:<jobId> <job name>]` for traceability. - Prompt is prefixed with `[cron:<jobId> <job name>]` for traceability.
- Each run starts a **fresh session id** (no prior conversation carry-over). - Each run starts a **fresh session id** (no prior conversation carry-over).
- Default behavior: if `delivery` is omitted, isolated jobs announce a summary immediately (`delivery.mode = "announce"`). - Default behavior: if `delivery` is omitted, isolated jobs announce a summary (`delivery.mode = "announce"`).
- `delivery.mode` (isolated-only) chooses what happens: - `delivery.mode` (isolated-only) chooses what happens:
- `announce`: subagent-style summary delivered immediately to a chat. - `announce`: deliver a summary to the target channel and post a brief summary to the main session.
- `none`: internal only (no delivery). - `none`: internal only (no delivery, no main-session summary).
- `wakeMode` controls when the main-session summary posts:
- `now`: immediate heartbeat.
- `next-heartbeat`: waits for the next scheduled heartbeat.
Use isolated jobs for noisy, frequent, or "background chores" that shouldn't spam Use isolated jobs for noisy, frequent, or "background chores" that shouldn't spam
your main chat history. your main chat history.
@ -166,10 +169,27 @@ Delivery config (isolated jobs only):
- `delivery.bestEffort`: avoid failing the job if announce delivery fails. - `delivery.bestEffort`: avoid failing the job if announce delivery fails.
Announce delivery suppresses messaging tool sends for the run; use `delivery.channel`/`delivery.to` Announce delivery suppresses messaging tool sends for the run; use `delivery.channel`/`delivery.to`
to target the chat instead. to target the chat instead. When `delivery.mode = "none"`, no summary is posted to the main session.
If `delivery` is omitted for isolated jobs, OpenClaw defaults to `announce`. If `delivery` is omitted for isolated jobs, OpenClaw defaults to `announce`.
#### Announce delivery flow
When `delivery.mode = "announce"`, cron delivers directly via the outbound channel adapters.
The main agent is not spun up to craft or forward the message.
Behavior details:
- Content: delivery uses the isolated run's outbound payloads (text/media) with normal chunking and
channel formatting.
- Heartbeat-only responses (`HEARTBEAT_OK` with no real content) are not delivered.
- If the isolated run already sent a message to the same target via the message tool, delivery is
skipped to avoid duplicates.
- Missing or invalid delivery targets fail the job unless `delivery.bestEffort = true`.
- A short summary is posted to the main session only when `delivery.mode = "announce"`.
- The main-session summary respects `wakeMode`: `now` triggers an immediate heartbeat and
`next-heartbeat` waits for the next scheduled heartbeat.
### Model and thinking overrides ### Model and thinking overrides
Isolated jobs (`agentTurn`) can override the model and thinking level: Isolated jobs (`agentTurn`) can override the model and thinking level:
@ -191,7 +211,7 @@ Resolution priority:
Isolated jobs can deliver output to a channel via the top-level `delivery` config: Isolated jobs can deliver output to a channel via the top-level `delivery` config:
- `delivery.mode`: `announce` (subagent-style summary) or `none`. - `delivery.mode`: `announce` (deliver a summary) or `none`.
- `delivery.channel`: `whatsapp` / `telegram` / `discord` / `slack` / `mattermost` (plugin) / `signal` / `imessage` / `last`. - `delivery.channel`: `whatsapp` / `telegram` / `discord` / `slack` / `mattermost` (plugin) / `signal` / `imessage` / `last`.
- `delivery.to`: channel-specific recipient target. - `delivery.to`: channel-specific recipient target.

View File

@ -5,6 +5,9 @@ import type { CliDeps } from "../cli/deps.js";
import type { OpenClawConfig } from "../config/config.js"; import type { OpenClawConfig } from "../config/config.js";
import type { CronJob } from "./types.js"; import type { CronJob } from "./types.js";
import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js"; import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js";
import { telegramOutbound } from "../channels/plugins/outbound/telegram.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
vi.mock("../agents/pi-embedded.js", () => ({ vi.mock("../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false), abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
@ -14,13 +17,9 @@ vi.mock("../agents/pi-embedded.js", () => ({
vi.mock("../agents/model-catalog.js", () => ({ vi.mock("../agents/model-catalog.js", () => ({
loadModelCatalog: vi.fn(), loadModelCatalog: vi.fn(),
})); }));
vi.mock("../agents/subagent-announce.js", () => ({
runSubagentAnnounceFlow: vi.fn(),
}));
import { loadModelCatalog } from "../agents/model-catalog.js"; import { loadModelCatalog } from "../agents/model-catalog.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> { async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
@ -87,7 +86,15 @@ describe("runCronIsolatedAgentTurn", () => {
beforeEach(() => { beforeEach(() => {
vi.mocked(runEmbeddedPiAgent).mockReset(); vi.mocked(runEmbeddedPiAgent).mockReset();
vi.mocked(loadModelCatalog).mockResolvedValue([]); vi.mocked(loadModelCatalog).mockResolvedValue([]);
vi.mocked(runSubagentAnnounceFlow).mockReset().mockResolvedValue(true); setActivePluginRegistry(
createTestRegistry([
{
pluginId: "telegram",
plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
source: "test",
},
]),
);
}); });
it("delivers when response has HEARTBEAT_OK but includes media", async () => { it("delivers when response has HEARTBEAT_OK but includes media", async () => {
@ -128,7 +135,7 @@ describe("runCronIsolatedAgentTurn", () => {
}); });
expect(res.status).toBe("ok"); expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); expect(deps.sendMessageTelegram).toHaveBeenCalled();
}); });
}); });
@ -178,7 +185,7 @@ describe("runCronIsolatedAgentTurn", () => {
}); });
expect(res.status).toBe("ok"); expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); expect(deps.sendMessageTelegram).toHaveBeenCalled();
}); });
}); });
}); });

View File

@ -4,16 +4,10 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
import type { CliDeps } from "../cli/deps.js"; import type { CliDeps } from "../cli/deps.js";
import type { OpenClawConfig } from "../config/config.js"; import type { OpenClawConfig } from "../config/config.js";
import type { CronJob } from "./types.js"; import type { CronJob } from "./types.js";
import { discordPlugin } from "../../extensions/discord/src/channel.js";
import { setDiscordRuntime } from "../../extensions/discord/src/runtime.js";
import { telegramPlugin } from "../../extensions/telegram/src/channel.js";
import { setTelegramRuntime } from "../../extensions/telegram/src/runtime.js";
import { whatsappPlugin } from "../../extensions/whatsapp/src/channel.js";
import { setWhatsAppRuntime } from "../../extensions/whatsapp/src/runtime.js";
import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js"; import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js";
import { telegramOutbound } from "../channels/plugins/outbound/telegram.js";
import { setActivePluginRegistry } from "../plugins/runtime.js"; import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createPluginRuntime } from "../plugins/runtime/index.js"; import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
import { createTestRegistry } from "../test-utils/channel-plugins.js";
vi.mock("../agents/pi-embedded.js", () => ({ vi.mock("../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false), abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
@ -23,13 +17,9 @@ vi.mock("../agents/pi-embedded.js", () => ({
vi.mock("../agents/model-catalog.js", () => ({ vi.mock("../agents/model-catalog.js", () => ({
loadModelCatalog: vi.fn(), loadModelCatalog: vi.fn(),
})); }));
vi.mock("../agents/subagent-announce.js", () => ({
runSubagentAnnounceFlow: vi.fn(),
}));
import { loadModelCatalog } from "../agents/model-catalog.js"; import { loadModelCatalog } from "../agents/model-catalog.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> { async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
@ -96,16 +86,13 @@ describe("runCronIsolatedAgentTurn", () => {
beforeEach(() => { beforeEach(() => {
vi.mocked(runEmbeddedPiAgent).mockReset(); vi.mocked(runEmbeddedPiAgent).mockReset();
vi.mocked(loadModelCatalog).mockResolvedValue([]); vi.mocked(loadModelCatalog).mockResolvedValue([]);
vi.mocked(runSubagentAnnounceFlow).mockReset().mockResolvedValue(true);
const runtime = createPluginRuntime();
setDiscordRuntime(runtime);
setTelegramRuntime(runtime);
setWhatsAppRuntime(runtime);
setActivePluginRegistry( setActivePluginRegistry(
createTestRegistry([ createTestRegistry([
{ pluginId: "whatsapp", plugin: whatsappPlugin, source: "test" }, {
{ pluginId: "telegram", plugin: telegramPlugin, source: "test" }, pluginId: "telegram",
{ pluginId: "discord", plugin: discordPlugin, source: "test" }, plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
source: "test",
},
]), ]),
); );
}); });
@ -143,9 +130,7 @@ describe("runCronIsolatedAgentTurn", () => {
}); });
expect(res.status).toBe("ok"); expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); expect(deps.sendMessageTelegram).toHaveBeenCalled();
const call = vi.mocked(runSubagentAnnounceFlow).mock.calls[0]?.[0];
expect(call?.label).toBe("Cron: job-1");
}); });
}); });
@ -184,7 +169,7 @@ describe("runCronIsolatedAgentTurn", () => {
}); });
expect(res.status).toBe("ok"); expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
}); });
}); });
@ -221,7 +206,7 @@ describe("runCronIsolatedAgentTurn", () => {
}); });
expect(res.status).toBe("ok"); expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
}); });
}); });
@ -230,7 +215,7 @@ describe("runCronIsolatedAgentTurn", () => {
const storePath = await writeSessionStore(home); const storePath = await writeSessionStore(home);
const deps: CliDeps = { const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(), sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(), sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")),
sendMessageDiscord: vi.fn(), sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(), sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(), sendMessageIMessage: vi.fn(),
@ -242,8 +227,6 @@ describe("runCronIsolatedAgentTurn", () => {
agentMeta: { sessionId: "s", provider: "p", model: "m" }, agentMeta: { sessionId: "s", provider: "p", model: "m" },
}, },
}); });
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false);
const res = await runCronIsolatedAgentTurn({ const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, { cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } }, channels: { telegram: { botToken: "t-1" } },
@ -259,7 +242,7 @@ describe("runCronIsolatedAgentTurn", () => {
}); });
expect(res.status).toBe("error"); expect(res.status).toBe("error");
expect(res.error).toBe("cron announce delivery failed"); expect(res.error).toBe("Error: boom");
}); });
}); });
@ -268,7 +251,7 @@ describe("runCronIsolatedAgentTurn", () => {
const storePath = await writeSessionStore(home); const storePath = await writeSessionStore(home);
const deps: CliDeps = { const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(), sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(), sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")),
sendMessageDiscord: vi.fn(), sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(), sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(), sendMessageIMessage: vi.fn(),
@ -280,8 +263,6 @@ describe("runCronIsolatedAgentTurn", () => {
agentMeta: { sessionId: "s", provider: "p", model: "m" }, agentMeta: { sessionId: "s", provider: "p", model: "m" },
}, },
}); });
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false);
const res = await runCronIsolatedAgentTurn({ const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, { cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } }, channels: { telegram: { botToken: "t-1" } },
@ -302,7 +283,7 @@ describe("runCronIsolatedAgentTurn", () => {
}); });
expect(res.status).toBe("ok"); expect(res.status).toBe("ok");
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); expect(deps.sendMessageTelegram).toHaveBeenCalled();
}); });
}); });
}); });

View File

@ -31,10 +31,6 @@ import {
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js"; import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js";
import { getSkillsSnapshotVersion } from "../../agents/skills/refresh.js"; import { getSkillsSnapshotVersion } from "../../agents/skills/refresh.js";
import {
runSubagentAnnounceFlow,
type SubagentRunOutcome,
} from "../../agents/subagent-announce.js";
import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
import { hasNonzeroUsage } from "../../agents/usage.js"; import { hasNonzeroUsage } from "../../agents/usage.js";
import { ensureAgentWorkspace } from "../../agents/workspace.js"; import { ensureAgentWorkspace } from "../../agents/workspace.js";
@ -44,13 +40,10 @@ import {
normalizeVerboseLevel, normalizeVerboseLevel,
supportsXHighThinking, supportsXHighThinking,
} from "../../auto-reply/thinking.js"; } from "../../auto-reply/thinking.js";
import { type CliDeps } from "../../cli/outbound-send-deps.js"; import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js";
import { import { resolveSessionTranscriptPath, updateSessionStore } from "../../config/sessions.js";
resolveAgentMainSessionKey,
resolveSessionTranscriptPath,
updateSessionStore,
} from "../../config/sessions.js";
import { registerAgentRunContext } from "../../infra/agent-events.js"; import { registerAgentRunContext } from "../../infra/agent-events.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import { getRemoteSkillEligibility } from "../../infra/skills-remote.js"; import { getRemoteSkillEligibility } from "../../infra/skills-remote.js";
import { logWarn } from "../../logger.js"; import { logWarn } from "../../logger.js";
import { buildAgentMainSessionKey, normalizeAgentId } from "../../routing/session-key.js"; import { buildAgentMainSessionKey, normalizeAgentId } from "../../routing/session-key.js";
@ -314,7 +307,7 @@ export async function runCronIsolatedAgentTurn(params: {
} }
if (deliveryRequested) { if (deliveryRequested) {
commandBody = commandBody =
`${commandBody}\n\nReturn your summary as plain text; it will be delivered by the main agent. If the task explicitly calls for messaging a specific external recipient, note who/where it should go instead of sending it yourself.`.trim(); `${commandBody}\n\nReturn your summary as plain text; it will be delivered automatically. If the task explicitly calls for messaging a specific external recipient, note who/where it should go instead of sending it yourself.`.trim();
} }
const existingSnapshot = cronSession.sessionEntry.skillsSnapshot; const existingSnapshot = cronSession.sessionEntry.skillsSnapshot;
@ -480,42 +473,21 @@ export async function runCronIsolatedAgentTurn(params: {
logWarn(`[cron:${params.job.id}] ${deliveryFailure.message}`); logWarn(`[cron:${params.job.id}] ${deliveryFailure.message}`);
return { status: "ok", summary, outputText }; return { status: "ok", summary, outputText };
} }
const requesterSessionKey = resolveAgentMainSessionKey({ try {
cfg: cfgWithAgentDefaults, await deliverOutboundPayloads({
agentId, cfg: cfgWithAgentDefaults,
}); channel: resolvedDelivery.channel,
const useExplicitOrigin = deliveryPlan.channel !== "last" || Boolean(deliveryPlan.to?.trim()); to: resolvedDelivery.to,
const requesterOrigin = useExplicitOrigin accountId: resolvedDelivery.accountId,
? { threadId: resolvedDelivery.threadId,
channel: resolvedDelivery.channel, payloads,
to: resolvedDelivery.to, bestEffort: deliveryBestEffort,
accountId: resolvedDelivery.accountId, deps: createOutboundSendDeps(params.deps),
threadId: resolvedDelivery.threadId, });
} } catch (err) {
: undefined; if (!deliveryBestEffort) {
const outcome: SubagentRunOutcome = { status: "ok" }; return { status: "error", summary, outputText, error: String(err) };
const taskLabel = params.job.name?.trim() || "cron job"; }
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: agentSessionKey,
childRunId: cronSession.sessionEntry.sessionId,
requesterSessionKey,
requesterOrigin,
requesterDisplayKey: requesterSessionKey,
task: taskLabel,
timeoutMs: 30_000,
cleanup: "keep",
roundOneReply: outputText ?? summary,
waitForCompletion: false,
label: `Cron: ${taskLabel}`,
outcome,
});
if (!didAnnounce && !deliveryBestEffort) {
return {
status: "error",
error: "cron announce delivery failed",
summary,
outputText,
};
} }
} }

View File

@ -211,7 +211,8 @@ describe("CronService", () => {
schedule: { kind: "at", at: new Date(atMs).toISOString() }, schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "isolated", sessionTarget: "isolated",
wakeMode: "now", wakeMode: "now",
payload: { kind: "agentTurn", message: "do it", deliver: false }, payload: { kind: "agentTurn", message: "do it" },
delivery: { mode: "announce" },
}); });
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
@ -359,7 +360,8 @@ describe("CronService", () => {
schedule: { kind: "at", at: new Date(atMs).toISOString() }, schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "isolated", sessionTarget: "isolated",
wakeMode: "now", wakeMode: "now",
payload: { kind: "agentTurn", message: "do it", deliver: false }, payload: { kind: "agentTurn", message: "do it" },
delivery: { mode: "announce" },
}); });
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));

View File

@ -188,12 +188,15 @@ export async function executeJob(
// Post a short summary back to the main session so the user sees // Post a short summary back to the main session so the user sees
// the cron result without opening the isolated session. // the cron result without opening the isolated session.
const summaryText = res.summary?.trim(); const summaryText = res.summary?.trim();
if (summaryText) { const deliveryMode = job.delivery?.mode ?? "announce";
if (summaryText && deliveryMode !== "none") {
const prefix = "Cron"; const prefix = "Cron";
const label = const label =
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`; res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
state.deps.enqueueSystemEvent(label, { agentId: job.agentId }); state.deps.enqueueSystemEvent(label, { agentId: job.agentId });
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); if (job.wakeMode === "now") {
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
}
} }
if (res.status === "ok") { if (res.status === "ok") {

View File

@ -1,4 +1,4 @@
import { describe, expect, it, vi } from "vitest"; import { describe, expect, it } from "vitest";
import { formatAgo, stripThinkingTags } from "./format.ts"; import { formatAgo, stripThinkingTags } from "./format.ts";
describe("formatAgo", () => { describe("formatAgo", () => {