fix(cron): prevent recomputeNextRuns from skipping due jobs in onTimer (#9823)
* fix(cron): prevent recomputeNextRuns from skipping due jobs in onTimer ensureLoaded(forceReload) called recomputeNextRuns before runDueJobs, which recalculated nextRunAtMs to a strictly future time. Since setTimeout always fires a few ms late, the due check (now >= nextRunAtMs) always failed and every/cron jobs never executed. Fixes #9788. * docs: add changelog entry for cron timer race fix (#9823) (thanks @pycckuu) --------- Co-authored-by: Tyler Yust <TYTYYUST@YAHOO.COM>main
parent
68393bfa36
commit
313e2f2e85
|
|
@ -54,6 +54,7 @@ Docs: https://docs.openclaw.ai
|
|||
- Voice call: add regression coverage for anonymous inbound caller IDs with allowlist policy. (#8104) Thanks @victormier.
|
||||
- Cron: accept epoch timestamps and 0ms durations in CLI `--at` parsing.
|
||||
- Cron: reload store data when the store file is recreated or mtime changes.
|
||||
- Cron: prevent `recomputeNextRuns` from skipping due jobs when timer fires late by reordering `onTimer` flow. (#9823, fixes #9788) Thanks @pycckuu.
|
||||
- Cron: deliver announce runs directly, honor delivery mode, and respect wakeMode for summaries. (#8540) Thanks @tyler6204.
|
||||
- Cron: correct announce delivery inference for thread session keys and null delivery inputs. (#9733) Thanks @tyler6204.
|
||||
- Telegram: include forward_from_chat metadata in forwarded messages and harden cron delivery target checks. (#8392) Thanks @Glucksberg.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,127 @@
|
|||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { CronService } from "./service.js";
|
||||
|
||||
const noopLogger = {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
|
||||
async function makeStorePath() {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-"));
|
||||
return {
|
||||
storePath: path.join(dir, "cron", "jobs.json"),
|
||||
cleanup: async () => {
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe("CronService interval/cron jobs fire on time", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z"));
|
||||
noopLogger.debug.mockClear();
|
||||
noopLogger.info.mockClear();
|
||||
noopLogger.warn.mockClear();
|
||||
noopLogger.error.mockClear();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("fires an every-type main job when the timer fires a few ms late", async () => {
|
||||
const store = await makeStorePath();
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const job = await cron.add({
|
||||
name: "every 10s check",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 10_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
});
|
||||
|
||||
const firstDueAt = job.state.nextRunAtMs!;
|
||||
expect(firstDueAt).toBe(Date.parse("2025-12-13T00:00:00.000Z") + 10_000);
|
||||
|
||||
// Simulate setTimeout firing 5ms late (the race condition).
|
||||
vi.setSystemTime(new Date(firstDueAt + 5));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
|
||||
// Wait for the async onTimer to complete via the lock queue.
|
||||
const jobs = await cron.list();
|
||||
const updated = jobs.find((j) => j.id === job.id);
|
||||
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("tick", { agentId: undefined });
|
||||
expect(updated?.state.lastStatus).toBe("ok");
|
||||
// nextRunAtMs must advance by at least one full interval past the due time.
|
||||
expect(updated?.state.nextRunAtMs).toBeGreaterThanOrEqual(firstDueAt + 10_000);
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("fires a cron-expression job when the timer fires a few ms late", async () => {
|
||||
const store = await makeStorePath();
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
|
||||
// Set time to just before a minute boundary.
|
||||
vi.setSystemTime(new Date("2025-12-13T00:00:59.000Z"));
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const job = await cron.add({
|
||||
name: "every minute check",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "* * * * *" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "cron-tick" },
|
||||
});
|
||||
|
||||
const firstDueAt = job.state.nextRunAtMs!;
|
||||
|
||||
// Simulate setTimeout firing 5ms late.
|
||||
vi.setSystemTime(new Date(firstDueAt + 5));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
|
||||
// Wait for the async onTimer to complete via the lock queue.
|
||||
const jobs = await cron.list();
|
||||
const updated = jobs.find((j) => j.id === job.id);
|
||||
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("cron-tick", { agentId: undefined });
|
||||
expect(updated?.state.lastStatus).toBe("ok");
|
||||
// nextRunAtMs should be the next whole-minute boundary (60s later).
|
||||
expect(updated?.state.nextRunAtMs).toBe(firstDueAt + 60_000);
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
});
|
||||
|
|
@ -126,7 +126,15 @@ async function getFileMtimeMs(path: string): Promise<number | null> {
|
|||
}
|
||||
}
|
||||
|
||||
export async function ensureLoaded(state: CronServiceState, opts?: { forceReload?: boolean }) {
|
||||
export async function ensureLoaded(
|
||||
state: CronServiceState,
|
||||
opts?: {
|
||||
forceReload?: boolean;
|
||||
/** Skip recomputing nextRunAtMs after load so the caller can run due
|
||||
* jobs against the persisted values first (see onTimer). */
|
||||
skipRecompute?: boolean;
|
||||
},
|
||||
) {
|
||||
// Fast path: store is already in memory. Other callers (add, list, run, …)
|
||||
// trust the in-memory copy to avoid a stat syscall on every operation.
|
||||
if (state.store && !opts?.forceReload) {
|
||||
|
|
@ -255,8 +263,9 @@ export async function ensureLoaded(state: CronServiceState, opts?: { forceReload
|
|||
state.storeLoadedAtMs = state.deps.nowMs();
|
||||
state.storeFileMtimeMs = fileMtimeMs;
|
||||
|
||||
// Recompute next runs after loading to ensure accuracy
|
||||
recomputeNextRuns(state);
|
||||
if (!opts?.skipRecompute) {
|
||||
recomputeNextRuns(state);
|
||||
}
|
||||
|
||||
if (mutated) {
|
||||
await persist(state);
|
||||
|
|
|
|||
|
|
@ -1,7 +1,12 @@
|
|||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import type { CronEvent, CronServiceState } from "./state.js";
|
||||
import { computeJobNextRunAtMs, nextWakeAtMs, resolveJobPayloadTextForMain } from "./jobs.js";
|
||||
import {
|
||||
computeJobNextRunAtMs,
|
||||
nextWakeAtMs,
|
||||
recomputeNextRuns,
|
||||
resolveJobPayloadTextForMain,
|
||||
} from "./jobs.js";
|
||||
import { locked } from "./locked.js";
|
||||
import { ensureLoaded, persist } from "./store.js";
|
||||
|
||||
|
|
@ -36,8 +41,12 @@ export async function onTimer(state: CronServiceState) {
|
|||
state.running = true;
|
||||
try {
|
||||
await locked(state, async () => {
|
||||
await ensureLoaded(state, { forceReload: true });
|
||||
// Reload persisted due-times without recomputing so runDueJobs sees
|
||||
// the original nextRunAtMs values. Recomputing first would advance
|
||||
// every/cron slots past the current tick when the timer fires late (#9788).
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
await runDueJobs(state);
|
||||
recomputeNextRuns(state);
|
||||
await persist(state);
|
||||
armTimer(state);
|
||||
});
|
||||
|
|
|
|||
Loading…
Reference in New Issue