Gateway: fix multi-agent sessions.usage discovery (#11523)
* Gateway: fix multi-agent sessions.usage discovery * Gateway: resolve sessions.usage keys via sessionIdmain
parent
b8f740fb14
commit
9271fcb3d4
|
|
@ -0,0 +1,146 @@
|
||||||
|
import fs from "node:fs";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
vi.mock("../../config/config.js", () => {
|
||||||
|
return {
|
||||||
|
loadConfig: vi.fn(() => ({
|
||||||
|
agents: {
|
||||||
|
list: [{ id: "main" }, { id: "opus" }],
|
||||||
|
},
|
||||||
|
session: {},
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.mock("../session-utils.js", async () => {
|
||||||
|
const actual = await vi.importActual<typeof import("../session-utils.js")>("../session-utils.js");
|
||||||
|
return {
|
||||||
|
...actual,
|
||||||
|
loadCombinedSessionStoreForGateway: vi.fn(() => ({ storePath: "(multiple)", store: {} })),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.mock("../../infra/session-cost-usage.js", async () => {
|
||||||
|
const actual = await vi.importActual<typeof import("../../infra/session-cost-usage.js")>(
|
||||||
|
"../../infra/session-cost-usage.js",
|
||||||
|
);
|
||||||
|
return {
|
||||||
|
...actual,
|
||||||
|
discoverAllSessions: vi.fn(async (params?: { agentId?: string }) => {
|
||||||
|
if (params?.agentId === "main") {
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
sessionId: "s-main",
|
||||||
|
sessionFile: "/tmp/agents/main/sessions/s-main.jsonl",
|
||||||
|
mtime: 100,
|
||||||
|
firstUserMessage: "hello",
|
||||||
|
},
|
||||||
|
];
|
||||||
|
}
|
||||||
|
if (params?.agentId === "opus") {
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
sessionId: "s-opus",
|
||||||
|
sessionFile: "/tmp/agents/opus/sessions/s-opus.jsonl",
|
||||||
|
mtime: 200,
|
||||||
|
firstUserMessage: "hi",
|
||||||
|
},
|
||||||
|
];
|
||||||
|
}
|
||||||
|
return [];
|
||||||
|
}),
|
||||||
|
loadSessionCostSummary: vi.fn(async () => ({
|
||||||
|
input: 0,
|
||||||
|
output: 0,
|
||||||
|
cacheRead: 0,
|
||||||
|
cacheWrite: 0,
|
||||||
|
totalTokens: 0,
|
||||||
|
totalCost: 0,
|
||||||
|
inputCost: 0,
|
||||||
|
outputCost: 0,
|
||||||
|
cacheReadCost: 0,
|
||||||
|
cacheWriteCost: 0,
|
||||||
|
missingCostEntries: 0,
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
import { discoverAllSessions } from "../../infra/session-cost-usage.js";
|
||||||
|
import { loadCombinedSessionStoreForGateway } from "../session-utils.js";
|
||||||
|
import { usageHandlers } from "./usage.js";
|
||||||
|
|
||||||
|
describe("sessions.usage", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
vi.clearAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("discovers sessions across configured agents and keeps agentId in key", async () => {
|
||||||
|
const respond = vi.fn();
|
||||||
|
|
||||||
|
await usageHandlers["sessions.usage"]({
|
||||||
|
respond,
|
||||||
|
params: {
|
||||||
|
startDate: "2026-02-01",
|
||||||
|
endDate: "2026-02-02",
|
||||||
|
limit: 10,
|
||||||
|
},
|
||||||
|
} as unknown as Parameters<(typeof usageHandlers)["sessions.usage"]>[0]);
|
||||||
|
|
||||||
|
expect(vi.mocked(discoverAllSessions)).toHaveBeenCalledTimes(2);
|
||||||
|
expect(vi.mocked(discoverAllSessions).mock.calls[0]?.[0]?.agentId).toBe("main");
|
||||||
|
expect(vi.mocked(discoverAllSessions).mock.calls[1]?.[0]?.agentId).toBe("opus");
|
||||||
|
|
||||||
|
expect(respond).toHaveBeenCalledTimes(1);
|
||||||
|
expect(respond.mock.calls[0]?.[0]).toBe(true);
|
||||||
|
const result = respond.mock.calls[0]?.[1] as unknown as { sessions: Array<unknown> };
|
||||||
|
expect(result.sessions).toHaveLength(2);
|
||||||
|
|
||||||
|
// Sorted by most recent first (mtime=200 -> opus first).
|
||||||
|
expect(result.sessions[0].key).toBe("agent:opus:s-opus");
|
||||||
|
expect(result.sessions[0].agentId).toBe("opus");
|
||||||
|
expect(result.sessions[1].key).toBe("agent:main:s-main");
|
||||||
|
expect(result.sessions[1].agentId).toBe("main");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("resolves store entries by sessionId when queried via discovered agent-prefixed key", async () => {
|
||||||
|
const storeKey = "agent:opus:slack:dm:u123";
|
||||||
|
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-usage-test-"));
|
||||||
|
const sessionFile = path.join(tempDir, "s-opus.jsonl");
|
||||||
|
fs.writeFileSync(sessionFile, "", "utf-8");
|
||||||
|
const respond = vi.fn();
|
||||||
|
|
||||||
|
// Swap the store mock for this test: the canonical key differs from the discovered key
|
||||||
|
// but points at the same sessionId.
|
||||||
|
vi.mocked(loadCombinedSessionStoreForGateway).mockReturnValue({
|
||||||
|
storePath: "(multiple)",
|
||||||
|
store: {
|
||||||
|
[storeKey]: {
|
||||||
|
sessionId: "s-opus",
|
||||||
|
sessionFile,
|
||||||
|
label: "Named session",
|
||||||
|
updatedAt: 999,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Query via discovered key: agent:<id>:<sessionId>
|
||||||
|
await usageHandlers["sessions.usage"]({
|
||||||
|
respond,
|
||||||
|
params: {
|
||||||
|
startDate: "2026-02-01",
|
||||||
|
endDate: "2026-02-02",
|
||||||
|
key: "agent:opus:s-opus",
|
||||||
|
limit: 10,
|
||||||
|
},
|
||||||
|
} as unknown as Parameters<(typeof usageHandlers)["sessions.usage"]>[0]);
|
||||||
|
|
||||||
|
expect(respond).toHaveBeenCalledTimes(1);
|
||||||
|
expect(respond.mock.calls[0]?.[0]).toBe(true);
|
||||||
|
const result = respond.mock.calls[0]?.[1] as unknown as { sessions: Array<{ key: string }> };
|
||||||
|
expect(result.sessions).toHaveLength(1);
|
||||||
|
expect(result.sessions[0]?.key).toBe(storeKey);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -19,6 +19,7 @@ import {
|
||||||
loadSessionCostSummary,
|
loadSessionCostSummary,
|
||||||
loadSessionUsageTimeSeries,
|
loadSessionUsageTimeSeries,
|
||||||
discoverAllSessions,
|
discoverAllSessions,
|
||||||
|
type DiscoveredSession,
|
||||||
} from "../../infra/session-cost-usage.js";
|
} from "../../infra/session-cost-usage.js";
|
||||||
import { parseAgentSessionKey } from "../../routing/session-key.js";
|
import { parseAgentSessionKey } from "../../routing/session-key.js";
|
||||||
import {
|
import {
|
||||||
|
|
@ -27,7 +28,11 @@ import {
|
||||||
formatValidationErrors,
|
formatValidationErrors,
|
||||||
validateSessionsUsageParams,
|
validateSessionsUsageParams,
|
||||||
} from "../protocol/index.js";
|
} from "../protocol/index.js";
|
||||||
import { loadCombinedSessionStoreForGateway, loadSessionEntry } from "../session-utils.js";
|
import {
|
||||||
|
listAgentsForGateway,
|
||||||
|
loadCombinedSessionStoreForGateway,
|
||||||
|
loadSessionEntry,
|
||||||
|
} from "../session-utils.js";
|
||||||
|
|
||||||
const COST_USAGE_CACHE_TTL_MS = 30_000;
|
const COST_USAGE_CACHE_TTL_MS = 30_000;
|
||||||
|
|
||||||
|
|
@ -109,6 +114,27 @@ const parseDateRange = (params: {
|
||||||
return { startMs: defaultStartMs, endMs: todayEndMs };
|
return { startMs: defaultStartMs, endMs: todayEndMs };
|
||||||
};
|
};
|
||||||
|
|
||||||
|
type DiscoveredSessionWithAgent = DiscoveredSession & { agentId: string };
|
||||||
|
|
||||||
|
async function discoverAllSessionsForUsage(params: {
|
||||||
|
config: ReturnType<typeof loadConfig>;
|
||||||
|
startMs: number;
|
||||||
|
endMs: number;
|
||||||
|
}): Promise<DiscoveredSessionWithAgent[]> {
|
||||||
|
const agents = listAgentsForGateway(params.config).agents;
|
||||||
|
const results = await Promise.all(
|
||||||
|
agents.map(async (agent) => {
|
||||||
|
const sessions = await discoverAllSessions({
|
||||||
|
agentId: agent.id,
|
||||||
|
startMs: params.startMs,
|
||||||
|
endMs: params.endMs,
|
||||||
|
});
|
||||||
|
return sessions.map((session) => ({ ...session, agentId: agent.id }));
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
return results.flat().toSorted((a, b) => b.mtime - a.mtime);
|
||||||
|
}
|
||||||
|
|
||||||
async function loadCostUsageSummaryCached(params: {
|
async function loadCostUsageSummaryCached(params: {
|
||||||
startMs: number;
|
startMs: number;
|
||||||
endMs: number;
|
endMs: number;
|
||||||
|
|
@ -166,6 +192,7 @@ export const __test = {
|
||||||
parseDateToMs,
|
parseDateToMs,
|
||||||
parseDays,
|
parseDays,
|
||||||
parseDateRange,
|
parseDateRange,
|
||||||
|
discoverAllSessionsForUsage,
|
||||||
loadCostUsageSummaryCached,
|
loadCostUsageSummaryCached,
|
||||||
costUsageCache,
|
costUsageCache,
|
||||||
};
|
};
|
||||||
|
|
@ -282,18 +309,37 @@ export const usageHandlers: GatewayRequestHandlers = {
|
||||||
|
|
||||||
// Optimization: If a specific key is requested, skip full directory scan
|
// Optimization: If a specific key is requested, skip full directory scan
|
||||||
if (specificKey) {
|
if (specificKey) {
|
||||||
// Check if it's a named session in the store
|
const parsed = parseAgentSessionKey(specificKey);
|
||||||
const storeEntry = store[specificKey];
|
const agentIdFromKey = parsed?.agentId;
|
||||||
let sessionId = storeEntry?.sessionId ?? specificKey;
|
const keyRest = parsed?.rest ?? specificKey;
|
||||||
|
|
||||||
|
// Prefer the store entry when available, even if the caller provides a discovered key
|
||||||
|
// (`agent:<id>:<sessionId>`) for a session that now has a canonical store key.
|
||||||
|
const storeBySessionId = new Map<string, { key: string; entry: SessionEntry }>();
|
||||||
|
for (const [key, entry] of Object.entries(store)) {
|
||||||
|
if (entry?.sessionId) {
|
||||||
|
storeBySessionId.set(entry.sessionId, { key, entry });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const storeMatch = store[specificKey]
|
||||||
|
? { key: specificKey, entry: store[specificKey] }
|
||||||
|
: null;
|
||||||
|
const storeByIdMatch = storeBySessionId.get(keyRest) ?? null;
|
||||||
|
const resolvedStoreKey = storeMatch?.key ?? storeByIdMatch?.key ?? specificKey;
|
||||||
|
const storeEntry = storeMatch?.entry ?? storeByIdMatch?.entry;
|
||||||
|
const sessionId = storeEntry?.sessionId ?? keyRest;
|
||||||
|
|
||||||
// Resolve the session file path
|
// Resolve the session file path
|
||||||
const sessionFile = resolveSessionFilePath(sessionId, storeEntry);
|
const sessionFile = resolveSessionFilePath(sessionId, storeEntry, {
|
||||||
|
agentId: agentIdFromKey,
|
||||||
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const stats = fs.statSync(sessionFile);
|
const stats = fs.statSync(sessionFile);
|
||||||
if (stats.isFile()) {
|
if (stats.isFile()) {
|
||||||
mergedEntries.push({
|
mergedEntries.push({
|
||||||
key: specificKey,
|
key: resolvedStoreKey,
|
||||||
sessionId,
|
sessionId,
|
||||||
sessionFile,
|
sessionFile,
|
||||||
label: storeEntry?.label,
|
label: storeEntry?.label,
|
||||||
|
|
@ -306,7 +352,8 @@ export const usageHandlers: GatewayRequestHandlers = {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Full discovery for list view
|
// Full discovery for list view
|
||||||
const discoveredSessions = await discoverAllSessions({
|
const discoveredSessions = await discoverAllSessionsForUsage({
|
||||||
|
config,
|
||||||
startMs,
|
startMs,
|
||||||
endMs,
|
endMs,
|
||||||
});
|
});
|
||||||
|
|
@ -334,7 +381,8 @@ export const usageHandlers: GatewayRequestHandlers = {
|
||||||
} else {
|
} else {
|
||||||
// Unnamed session - use session ID as key, no label
|
// Unnamed session - use session ID as key, no label
|
||||||
mergedEntries.push({
|
mergedEntries.push({
|
||||||
key: discovered.sessionId,
|
// Keep agentId in the key so the dashboard can attribute sessions and later fetch logs.
|
||||||
|
key: `agent:${discovered.agentId}:${discovered.sessionId}`,
|
||||||
sessionId: discovered.sessionId,
|
sessionId: discovered.sessionId,
|
||||||
sessionFile: discovered.sessionFile,
|
sessionFile: discovered.sessionFile,
|
||||||
label: undefined, // No label for unnamed sessions
|
label: undefined, // No label for unnamed sessions
|
||||||
|
|
@ -711,8 +759,12 @@ export const usageHandlers: GatewayRequestHandlers = {
|
||||||
const { entry } = loadSessionEntry(key);
|
const { entry } = loadSessionEntry(key);
|
||||||
|
|
||||||
// For discovered sessions (not in store), try using key as sessionId directly
|
// For discovered sessions (not in store), try using key as sessionId directly
|
||||||
const sessionId = entry?.sessionId ?? key;
|
const parsed = parseAgentSessionKey(key);
|
||||||
const sessionFile = entry?.sessionFile ?? resolveSessionFilePath(key);
|
const agentId = parsed?.agentId;
|
||||||
|
const rawSessionId = parsed?.rest ?? key;
|
||||||
|
const sessionId = entry?.sessionId ?? rawSessionId;
|
||||||
|
const sessionFile =
|
||||||
|
entry?.sessionFile ?? resolveSessionFilePath(rawSessionId, entry, { agentId });
|
||||||
|
|
||||||
const timeseries = await loadSessionUsageTimeSeries({
|
const timeseries = await loadSessionUsageTimeSeries({
|
||||||
sessionId,
|
sessionId,
|
||||||
|
|
@ -749,8 +801,12 @@ export const usageHandlers: GatewayRequestHandlers = {
|
||||||
const { entry } = loadSessionEntry(key);
|
const { entry } = loadSessionEntry(key);
|
||||||
|
|
||||||
// For discovered sessions (not in store), try using key as sessionId directly
|
// For discovered sessions (not in store), try using key as sessionId directly
|
||||||
const sessionId = entry?.sessionId ?? key;
|
const parsed = parseAgentSessionKey(key);
|
||||||
const sessionFile = entry?.sessionFile ?? resolveSessionFilePath(key);
|
const agentId = parsed?.agentId;
|
||||||
|
const rawSessionId = parsed?.rest ?? key;
|
||||||
|
const sessionId = entry?.sessionId ?? rawSessionId;
|
||||||
|
const sessionFile =
|
||||||
|
entry?.sessionFile ?? resolveSessionFilePath(rawSessionId, entry, { agentId });
|
||||||
|
|
||||||
const { loadSessionLogs } = await import("../../infra/session-cost-usage.js");
|
const { loadSessionLogs } = await import("../../infra/session-cost-usage.js");
|
||||||
const logs = await loadSessionLogs({
|
const logs = await loadSessionLogs({
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue