feat(cron): add scheduler status endpoint

main
Peter Steinberger 2025-12-13 03:43:40 +00:00
parent a641250da6
commit 415cb857d9
6 changed files with 1075 additions and 0 deletions

View File

@ -386,6 +386,234 @@ public struct AgentParams: Codable {
}
}
public struct WakeParams: Codable {
public let mode: AnyCodable
public let text: String
public init(
mode: AnyCodable,
text: String
) {
self.mode = mode
self.text = text
}
private enum CodingKeys: String, CodingKey {
case mode
case text
}
}
public struct CronJob: Codable {
public let id: String
public let name: String?
public let enabled: Bool
public let createdatms: Int
public let updatedatms: Int
public let schedule: AnyCodable
public let sessiontarget: AnyCodable
public let wakemode: AnyCodable
public let payload: AnyCodable
public let isolation: [String: AnyCodable]?
public let state: [String: AnyCodable]
public init(
id: String,
name: String?,
enabled: Bool,
createdatms: Int,
updatedatms: Int,
schedule: AnyCodable,
sessiontarget: AnyCodable,
wakemode: AnyCodable,
payload: AnyCodable,
isolation: [String: AnyCodable]?,
state: [String: AnyCodable]
) {
self.id = id
self.name = name
self.enabled = enabled
self.createdatms = createdatms
self.updatedatms = updatedatms
self.schedule = schedule
self.sessiontarget = sessiontarget
self.wakemode = wakemode
self.payload = payload
self.isolation = isolation
self.state = state
}
private enum CodingKeys: String, CodingKey {
case id
case name
case enabled
case createdatms = "createdAtMs"
case updatedatms = "updatedAtMs"
case schedule
case sessiontarget = "sessionTarget"
case wakemode = "wakeMode"
case payload
case isolation
case state
}
}
public struct CronListParams: Codable {
public let includedisabled: Bool?
public init(
includedisabled: Bool?
) {
self.includedisabled = includedisabled
}
private enum CodingKeys: String, CodingKey {
case includedisabled = "includeDisabled"
}
}
public struct CronStatusParams: Codable {
}
public struct CronAddParams: Codable {
public let name: String?
public let enabled: Bool?
public let schedule: AnyCodable
public let sessiontarget: AnyCodable
public let wakemode: AnyCodable
public let payload: AnyCodable
public let isolation: [String: AnyCodable]?
public init(
name: String?,
enabled: Bool?,
schedule: AnyCodable,
sessiontarget: AnyCodable,
wakemode: AnyCodable,
payload: AnyCodable,
isolation: [String: AnyCodable]?
) {
self.name = name
self.enabled = enabled
self.schedule = schedule
self.sessiontarget = sessiontarget
self.wakemode = wakemode
self.payload = payload
self.isolation = isolation
}
private enum CodingKeys: String, CodingKey {
case name
case enabled
case schedule
case sessiontarget = "sessionTarget"
case wakemode = "wakeMode"
case payload
case isolation
}
}
public struct CronUpdateParams: Codable {
public let id: String
public let patch: [String: AnyCodable]
public init(
id: String,
patch: [String: AnyCodable]
) {
self.id = id
self.patch = patch
}
private enum CodingKeys: String, CodingKey {
case id
case patch
}
}
public struct CronRemoveParams: Codable {
public let id: String
public init(
id: String
) {
self.id = id
}
private enum CodingKeys: String, CodingKey {
case id
}
}
public struct CronRunParams: Codable {
public let id: String
public let mode: AnyCodable?
public init(
id: String,
mode: AnyCodable?
) {
self.id = id
self.mode = mode
}
private enum CodingKeys: String, CodingKey {
case id
case mode
}
}
public struct CronRunsParams: Codable {
public let id: String?
public let limit: Int?
public init(
id: String?,
limit: Int?
) {
self.id = id
self.limit = limit
}
private enum CodingKeys: String, CodingKey {
case id
case limit
}
}
public struct CronRunLogEntry: Codable {
public let ts: Int
public let jobid: String
public let action: String
public let status: AnyCodable?
public let error: String?
public let runatms: Int?
public let durationms: Int?
public let nextrunatms: Int?
public init(
ts: Int,
jobid: String,
action: String,
status: AnyCodable?,
error: String?,
runatms: Int?,
durationms: Int?,
nextrunatms: Int?
) {
self.ts = ts
self.jobid = jobid
self.action = action
self.status = status
self.error = error
self.runatms = runatms
self.durationms = durationms
self.nextrunatms = nextrunatms
}
private enum CodingKeys: String, CodingKey {
case ts
case jobid = "jobId"
case action
case status
case error
case runatms = "runAtMs"
case durationms = "durationMs"
case nextrunatms = "nextRunAtMs"
}
}
public struct ChatHistoryParams: Codable {
public let sessionkey: String

View File

@ -802,6 +802,792 @@
"idempotencyKey"
]
},
"WakeParams": {
"additionalProperties": false,
"type": "object",
"properties": {
"mode": {
"anyOf": [
{
"const": "now",
"type": "string"
},
{
"const": "next-heartbeat",
"type": "string"
}
]
},
"text": {
"minLength": 1,
"type": "string"
}
},
"required": [
"mode",
"text"
]
},
"CronJob": {
"additionalProperties": false,
"type": "object",
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"name": {
"type": "string"
},
"enabled": {
"type": "boolean"
},
"createdAtMs": {
"minimum": 0,
"type": "integer"
},
"updatedAtMs": {
"minimum": 0,
"type": "integer"
},
"schedule": {
"anyOf": [
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "at",
"type": "string"
},
"atMs": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"kind",
"atMs"
]
},
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "every",
"type": "string"
},
"everyMs": {
"minimum": 1,
"type": "integer"
},
"anchorMs": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"kind",
"everyMs"
]
},
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "cron",
"type": "string"
},
"expr": {
"minLength": 1,
"type": "string"
},
"tz": {
"type": "string"
}
},
"required": [
"kind",
"expr"
]
}
]
},
"sessionTarget": {
"anyOf": [
{
"const": "main",
"type": "string"
},
{
"const": "isolated",
"type": "string"
}
]
},
"wakeMode": {
"anyOf": [
{
"const": "next-heartbeat",
"type": "string"
},
{
"const": "now",
"type": "string"
}
]
},
"payload": {
"anyOf": [
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "systemEvent",
"type": "string"
},
"text": {
"minLength": 1,
"type": "string"
}
},
"required": [
"kind",
"text"
]
},
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "agentTurn",
"type": "string"
},
"message": {
"minLength": 1,
"type": "string"
},
"thinking": {
"type": "string"
},
"timeoutSeconds": {
"minimum": 1,
"type": "integer"
},
"deliver": {
"type": "boolean"
},
"channel": {
"anyOf": [
{
"const": "last",
"type": "string"
},
{
"const": "whatsapp",
"type": "string"
},
{
"const": "telegram",
"type": "string"
}
]
},
"to": {
"type": "string"
},
"bestEffortDeliver": {
"type": "boolean"
}
},
"required": [
"kind",
"message"
]
}
]
},
"isolation": {
"additionalProperties": false,
"type": "object",
"properties": {
"postToMain": {
"type": "boolean"
},
"postToMainPrefix": {
"type": "string"
}
}
},
"state": {
"additionalProperties": false,
"type": "object",
"properties": {
"nextRunAtMs": {
"minimum": 0,
"type": "integer"
},
"runningAtMs": {
"minimum": 0,
"type": "integer"
},
"lastRunAtMs": {
"minimum": 0,
"type": "integer"
},
"lastStatus": {
"anyOf": [
{
"const": "ok",
"type": "string"
},
{
"const": "error",
"type": "string"
},
{
"const": "skipped",
"type": "string"
}
]
},
"lastError": {
"type": "string"
},
"lastDurationMs": {
"minimum": 0,
"type": "integer"
}
}
}
},
"required": [
"id",
"enabled",
"createdAtMs",
"updatedAtMs",
"schedule",
"sessionTarget",
"wakeMode",
"payload",
"state"
]
},
"CronListParams": {
"additionalProperties": false,
"type": "object",
"properties": {
"includeDisabled": {
"type": "boolean"
}
}
},
"CronStatusParams": {
"additionalProperties": false,
"type": "object",
"properties": {}
},
"CronAddParams": {
"additionalProperties": false,
"type": "object",
"properties": {
"name": {
"type": "string"
},
"enabled": {
"type": "boolean"
},
"schedule": {
"anyOf": [
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "at",
"type": "string"
},
"atMs": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"kind",
"atMs"
]
},
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "every",
"type": "string"
},
"everyMs": {
"minimum": 1,
"type": "integer"
},
"anchorMs": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"kind",
"everyMs"
]
},
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "cron",
"type": "string"
},
"expr": {
"minLength": 1,
"type": "string"
},
"tz": {
"type": "string"
}
},
"required": [
"kind",
"expr"
]
}
]
},
"sessionTarget": {
"anyOf": [
{
"const": "main",
"type": "string"
},
{
"const": "isolated",
"type": "string"
}
]
},
"wakeMode": {
"anyOf": [
{
"const": "next-heartbeat",
"type": "string"
},
{
"const": "now",
"type": "string"
}
]
},
"payload": {
"anyOf": [
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "systemEvent",
"type": "string"
},
"text": {
"minLength": 1,
"type": "string"
}
},
"required": [
"kind",
"text"
]
},
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "agentTurn",
"type": "string"
},
"message": {
"minLength": 1,
"type": "string"
},
"thinking": {
"type": "string"
},
"timeoutSeconds": {
"minimum": 1,
"type": "integer"
},
"deliver": {
"type": "boolean"
},
"channel": {
"anyOf": [
{
"const": "last",
"type": "string"
},
{
"const": "whatsapp",
"type": "string"
},
{
"const": "telegram",
"type": "string"
}
]
},
"to": {
"type": "string"
},
"bestEffortDeliver": {
"type": "boolean"
}
},
"required": [
"kind",
"message"
]
}
]
},
"isolation": {
"additionalProperties": false,
"type": "object",
"properties": {
"postToMain": {
"type": "boolean"
},
"postToMainPrefix": {
"type": "string"
}
}
}
},
"required": [
"schedule",
"sessionTarget",
"wakeMode",
"payload"
]
},
"CronUpdateParams": {
"additionalProperties": false,
"type": "object",
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"patch": {
"additionalProperties": false,
"type": "object",
"properties": {
"name": {
"type": "string"
},
"enabled": {
"type": "boolean"
},
"schedule": {
"anyOf": [
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "at",
"type": "string"
},
"atMs": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"kind",
"atMs"
]
},
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "every",
"type": "string"
},
"everyMs": {
"minimum": 1,
"type": "integer"
},
"anchorMs": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"kind",
"everyMs"
]
},
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "cron",
"type": "string"
},
"expr": {
"minLength": 1,
"type": "string"
},
"tz": {
"type": "string"
}
},
"required": [
"kind",
"expr"
]
}
]
},
"sessionTarget": {
"anyOf": [
{
"const": "main",
"type": "string"
},
{
"const": "isolated",
"type": "string"
}
]
},
"wakeMode": {
"anyOf": [
{
"const": "next-heartbeat",
"type": "string"
},
{
"const": "now",
"type": "string"
}
]
},
"payload": {
"anyOf": [
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "systemEvent",
"type": "string"
},
"text": {
"minLength": 1,
"type": "string"
}
},
"required": [
"kind",
"text"
]
},
{
"additionalProperties": false,
"type": "object",
"properties": {
"kind": {
"const": "agentTurn",
"type": "string"
},
"message": {
"minLength": 1,
"type": "string"
},
"thinking": {
"type": "string"
},
"timeoutSeconds": {
"minimum": 1,
"type": "integer"
},
"deliver": {
"type": "boolean"
},
"channel": {
"anyOf": [
{
"const": "last",
"type": "string"
},
{
"const": "whatsapp",
"type": "string"
},
{
"const": "telegram",
"type": "string"
}
]
},
"to": {
"type": "string"
},
"bestEffortDeliver": {
"type": "boolean"
}
},
"required": [
"kind",
"message"
]
}
]
},
"isolation": {
"additionalProperties": false,
"type": "object",
"properties": {
"postToMain": {
"type": "boolean"
},
"postToMainPrefix": {
"type": "string"
}
}
}
}
}
},
"required": [
"id",
"patch"
]
},
"CronRemoveParams": {
"additionalProperties": false,
"type": "object",
"properties": {
"id": {
"minLength": 1,
"type": "string"
}
},
"required": [
"id"
]
},
"CronRunParams": {
"additionalProperties": false,
"type": "object",
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"mode": {
"anyOf": [
{
"const": "due",
"type": "string"
},
{
"const": "force",
"type": "string"
}
]
}
},
"required": [
"id"
]
},
"CronRunsParams": {
"additionalProperties": false,
"type": "object",
"properties": {
"id": {
"minLength": 1,
"type": "string"
},
"limit": {
"minimum": 1,
"maximum": 5000,
"type": "integer"
}
}
},
"CronRunLogEntry": {
"additionalProperties": false,
"type": "object",
"properties": {
"ts": {
"minimum": 0,
"type": "integer"
},
"jobId": {
"minLength": 1,
"type": "string"
},
"action": {
"const": "finished",
"type": "string"
},
"status": {
"anyOf": [
{
"const": "ok",
"type": "string"
},
{
"const": "error",
"type": "string"
},
{
"const": "skipped",
"type": "string"
}
]
},
"error": {
"type": "string"
},
"runAtMs": {
"minimum": 0,
"type": "integer"
},
"durationMs": {
"minimum": 0,
"type": "integer"
},
"nextRunAtMs": {
"minimum": 0,
"type": "integer"
}
},
"required": [
"ts",
"jobId",
"action"
]
},
"ChatHistoryParams": {
"additionalProperties": false,
"type": "object",

View File

@ -59,6 +59,7 @@ export class CronService {
private timer: NodeJS.Timeout | null = null;
private running = false;
private op: Promise<unknown> = Promise.resolve();
private warnedDisabled = false;
constructor(deps: CronServiceDeps) {
this.deps = {
@ -94,6 +95,19 @@ export class CronService {
this.timer = null;
}
async status() {
return await this.locked(async () => {
await this.ensureLoaded();
return {
enabled: this.deps.cronEnabled,
storePath: this.deps.storePath,
jobs: this.store?.jobs.length ?? 0,
nextWakeAtMs:
this.deps.cronEnabled === true ? (this.nextWakeAtMs() ?? null) : null,
};
});
}
async list(opts?: { includeDisabled?: boolean }) {
return await this.locked(async () => {
await this.ensureLoaded();
@ -109,6 +123,7 @@ export class CronService {
async add(input: CronJobCreate) {
return await this.locked(async () => {
this.warnIfDisabled("add");
await this.ensureLoaded();
const now = this.deps.nowMs();
const id = crypto.randomUUID();
@ -142,6 +157,7 @@ export class CronService {
async update(id: string, patch: CronJobPatch) {
return await this.locked(async () => {
this.warnIfDisabled("update");
await this.ensureLoaded();
const job = this.findJobOrThrow(id);
const now = this.deps.nowMs();
@ -176,6 +192,7 @@ export class CronService {
async remove(id: string) {
return await this.locked(async () => {
this.warnIfDisabled("remove");
await this.ensureLoaded();
const before = this.store?.jobs.length ?? 0;
if (!this.store) return { ok: false, removed: false };
@ -190,6 +207,7 @@ export class CronService {
async run(id: string, mode?: "due" | "force") {
return await this.locked(async () => {
this.warnIfDisabled("run");
await this.ensureLoaded();
const job = this.findJobOrThrow(id);
const now = this.deps.nowMs();
@ -232,6 +250,16 @@ export class CronService {
this.store = { version: 1, jobs: loaded.jobs ?? [] };
}
private warnIfDisabled(action: string) {
if (this.deps.cronEnabled) return;
if (this.warnedDisabled) return;
this.warnedDisabled = true;
this.deps.log.warn(
{ enabled: false, action, storePath: this.deps.storePath },
"cron: scheduler disabled; jobs will not run automatically",
);
}
private async persist() {
if (!this.store) return;
await saveCronStore(this.deps.storePath, this.store);

View File

@ -22,6 +22,8 @@ import {
CronRunParamsSchema,
type CronRunsParams,
CronRunsParamsSchema,
type CronStatusParams,
CronStatusParamsSchema,
type CronUpdateParams,
CronUpdateParamsSchema,
ErrorCodes,
@ -74,6 +76,9 @@ export const validateAgentParams = ajv.compile(AgentParamsSchema);
export const validateWakeParams = ajv.compile<WakeParams>(WakeParamsSchema);
export const validateCronListParams =
ajv.compile<CronListParams>(CronListParamsSchema);
export const validateCronStatusParams = ajv.compile<CronStatusParams>(
CronStatusParamsSchema,
);
export const validateCronAddParams =
ajv.compile<CronAddParams>(CronAddParamsSchema);
export const validateCronUpdateParams = ajv.compile<CronUpdateParams>(
@ -115,6 +120,7 @@ export {
WakeParamsSchema,
CronJobSchema,
CronListParamsSchema,
CronStatusParamsSchema,
CronAddParamsSchema,
CronUpdateParamsSchema,
CronRemoveParamsSchema,
@ -148,6 +154,7 @@ export type {
WakeParams,
CronJob,
CronListParams,
CronStatusParams,
CronAddParams,
CronUpdateParams,
CronRemoveParams,

View File

@ -316,6 +316,11 @@ export const CronListParamsSchema = Type.Object(
{ additionalProperties: false },
);
export const CronStatusParamsSchema = Type.Object(
{},
{ additionalProperties: false },
);
export const CronAddParamsSchema = Type.Object(
{
name: Type.Optional(Type.String()),
@ -438,6 +443,7 @@ export const ProtocolSchemas: Record<string, TSchema> = {
WakeParams: WakeParamsSchema,
CronJob: CronJobSchema,
CronListParams: CronListParamsSchema,
CronStatusParams: CronStatusParamsSchema,
CronAddParams: CronAddParamsSchema,
CronUpdateParams: CronUpdateParamsSchema,
CronRemoveParams: CronRemoveParamsSchema,
@ -467,6 +473,7 @@ export type AgentEvent = Static<typeof AgentEventSchema>;
export type WakeParams = Static<typeof WakeParamsSchema>;
export type CronJob = Static<typeof CronJobSchema>;
export type CronListParams = Static<typeof CronListParamsSchema>;
export type CronStatusParams = Static<typeof CronStatusParamsSchema>;
export type CronAddParams = Static<typeof CronAddParamsSchema>;
export type CronUpdateParams = Static<typeof CronUpdateParamsSchema>;
export type CronRemoveParams = Static<typeof CronRemoveParamsSchema>;

View File

@ -76,6 +76,7 @@ import {
validateCronRemoveParams,
validateCronRunParams,
validateCronRunsParams,
validateCronStatusParams,
validateCronUpdateParams,
validateRequestFrame,
validateSendParams,
@ -96,6 +97,7 @@ const METHODS = [
"set-heartbeats",
"wake",
"cron.list",
"cron.status",
"cron.add",
"cron.update",
"cron.remove",
@ -1116,6 +1118,23 @@ export async function startGatewayServer(
respond(true, { jobs }, undefined);
break;
}
case "cron.status": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateCronStatusParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid cron.status params: ${formatValidationErrors(validateCronStatusParams.errors)}`,
),
);
break;
}
const status = await cron.status();
respond(true, status, undefined);
break;
}
case "cron.add": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateCronAddParams(params)) {