chore(instances): harden presence refresh and fix lint
parent
658e0c6b03
commit
f0860ec145
|
|
@ -9,6 +9,9 @@ struct InstancesSettings: View {
|
||||||
if let err = store.lastError {
|
if let err = store.lastError {
|
||||||
Text("Error: \(err)")
|
Text("Error: \(err)")
|
||||||
.foregroundStyle(.red)
|
.foregroundStyle(.red)
|
||||||
|
} else if let info = store.statusMessage {
|
||||||
|
Text(info)
|
||||||
|
.foregroundStyle(.secondary)
|
||||||
}
|
}
|
||||||
if self.store.instances.isEmpty {
|
if self.store.instances.isEmpty {
|
||||||
Text("No instances reported yet.")
|
Text("No instances reported yet.")
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ final class InstancesStore: ObservableObject {
|
||||||
|
|
||||||
@Published var instances: [InstanceInfo] = []
|
@Published var instances: [InstanceInfo] = []
|
||||||
@Published var lastError: String?
|
@Published var lastError: String?
|
||||||
|
@Published var statusMessage: String?
|
||||||
@Published var isLoading = false
|
@Published var isLoading = false
|
||||||
|
|
||||||
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "instances")
|
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "instances")
|
||||||
|
|
@ -55,6 +56,7 @@ final class InstancesStore: ObservableObject {
|
||||||
|
|
||||||
func refresh() async {
|
func refresh() async {
|
||||||
if self.isLoading { return }
|
if self.isLoading { return }
|
||||||
|
self.statusMessage = nil
|
||||||
self.isLoading = true
|
self.isLoading = true
|
||||||
defer { self.isLoading = false }
|
defer { self.isLoading = false }
|
||||||
do {
|
do {
|
||||||
|
|
@ -65,7 +67,8 @@ final class InstancesStore: ObservableObject {
|
||||||
self.logger.error("instances fetch returned empty payload")
|
self.logger.error("instances fetch returned empty payload")
|
||||||
self.instances = [self.localFallbackInstance(reason: "no presence payload")]
|
self.instances = [self.localFallbackInstance(reason: "no presence payload")]
|
||||||
self.lastError = nil
|
self.lastError = nil
|
||||||
await self.probeHealthIfNeeded()
|
self.statusMessage = "No presence payload from relay; showing local fallback + health probe."
|
||||||
|
await self.probeHealthIfNeeded(reason: "no payload")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
let decoded = try JSONDecoder().decode([InstanceInfo].self, from: data)
|
let decoded = try JSONDecoder().decode([InstanceInfo].self, from: data)
|
||||||
|
|
@ -85,10 +88,12 @@ final class InstancesStore: ObservableObject {
|
||||||
if withIDs.isEmpty {
|
if withIDs.isEmpty {
|
||||||
self.instances = [self.localFallbackInstance(reason: "no presence entries")]
|
self.instances = [self.localFallbackInstance(reason: "no presence entries")]
|
||||||
self.lastError = nil
|
self.lastError = nil
|
||||||
await self.probeHealthIfNeeded()
|
self.statusMessage = "Presence list was empty; showing local fallback + health probe."
|
||||||
|
await self.probeHealthIfNeeded(reason: "empty list")
|
||||||
} else {
|
} else {
|
||||||
self.instances = withIDs
|
self.instances = withIDs
|
||||||
self.lastError = nil
|
self.lastError = nil
|
||||||
|
self.statusMessage = nil
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
|
|
@ -99,7 +104,8 @@ final class InstancesStore: ObservableObject {
|
||||||
""")
|
""")
|
||||||
self.instances = [self.localFallbackInstance(reason: "presence decode failed")]
|
self.instances = [self.localFallbackInstance(reason: "presence decode failed")]
|
||||||
self.lastError = nil
|
self.lastError = nil
|
||||||
await self.probeHealthIfNeeded()
|
self.statusMessage = "Presence data invalid; showing local fallback + health probe."
|
||||||
|
await self.probeHealthIfNeeded(reason: "decode failed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -181,7 +187,7 @@ final class InstancesStore: ObservableObject {
|
||||||
return "<\(data.count) bytes non-utf8>"
|
return "<\(data.count) bytes non-utf8>"
|
||||||
}
|
}
|
||||||
|
|
||||||
private func probeHealthIfNeeded() async {
|
private func probeHealthIfNeeded(reason: String? = nil) async {
|
||||||
do {
|
do {
|
||||||
let data = try await ControlChannel.shared.health(timeout: 8)
|
let data = try await ControlChannel.shared.health(timeout: 8)
|
||||||
guard let snap = decodeHealthSnapshot(from: data) else { return }
|
guard let snap = decodeHealthSnapshot(from: data) else { return }
|
||||||
|
|
@ -199,8 +205,12 @@ final class InstancesStore: ObservableObject {
|
||||||
self.instances.insert(entry, at: 0)
|
self.instances.insert(entry, at: 0)
|
||||||
}
|
}
|
||||||
self.lastError = nil
|
self.lastError = nil
|
||||||
|
self.statusMessage = "Presence unavailable (\(reason ?? "refresh")); showing health probe + local fallback."
|
||||||
} catch {
|
} catch {
|
||||||
self.logger.error("instances health probe failed: \(error.localizedDescription, privacy: .public)")
|
self.logger.error("instances health probe failed: \(error.localizedDescription, privacy: .public)")
|
||||||
|
if let reason {
|
||||||
|
self.statusMessage = "Presence unavailable (\(reason)), health probe failed: \(error.localizedDescription)"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -67,9 +67,9 @@ describe("statusCommand", () => {
|
||||||
expect(logs.some((l) => l.includes("Active sessions"))).toBe(true);
|
expect(logs.some((l) => l.includes("Active sessions"))).toBe(true);
|
||||||
expect(logs.some((l) => l.includes("Default model"))).toBe(true);
|
expect(logs.some((l) => l.includes("Default model"))).toBe(true);
|
||||||
expect(logs.some((l) => l.includes("tokens:"))).toBe(true);
|
expect(logs.some((l) => l.includes("tokens:"))).toBe(true);
|
||||||
expect(logs.some((l) => l.includes("flags:") && l.includes("verbose:on"))).toBe(
|
expect(
|
||||||
true,
|
logs.some((l) => l.includes("flags:") && l.includes("verbose:on")),
|
||||||
);
|
).toBe(true);
|
||||||
expect(mocks.logWebSelfId).toHaveBeenCalled();
|
expect(mocks.logWebSelfId).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -218,7 +218,9 @@ export async function statusCommand(
|
||||||
const defaultCtx = defaults.contextTokens
|
const defaultCtx = defaults.contextTokens
|
||||||
? ` (${formatKTokens(defaults.contextTokens)} ctx)`
|
? ` (${formatKTokens(defaults.contextTokens)} ctx)`
|
||||||
: "";
|
: "";
|
||||||
runtime.log(info(`Default model: ${defaults.model ?? "unknown"}${defaultCtx}`));
|
runtime.log(
|
||||||
|
info(`Default model: ${defaults.model ?? "unknown"}${defaultCtx}`),
|
||||||
|
);
|
||||||
runtime.log(info(`Active sessions: ${summary.sessions.count}`));
|
runtime.log(info(`Active sessions: ${summary.sessions.count}`));
|
||||||
if (summary.sessions.recent.length > 0) {
|
if (summary.sessions.recent.length > 0) {
|
||||||
runtime.log("Recent sessions:");
|
runtime.log("Recent sessions:");
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,10 @@ import { describe, expect, it } from "vitest";
|
||||||
import { acquireRelayLock, RelayLockError } from "./relay-lock.js";
|
import { acquireRelayLock, RelayLockError } from "./relay-lock.js";
|
||||||
|
|
||||||
const newLockPath = () =>
|
const newLockPath = () =>
|
||||||
path.join(os.tmpdir(), `clawdis-relay-lock-test-${process.pid}-${Math.random().toString(16).slice(2)}.sock`);
|
path.join(
|
||||||
|
os.tmpdir(),
|
||||||
|
`clawdis-relay-lock-test-${process.pid}-${Math.random().toString(16).slice(2)}.sock`,
|
||||||
|
);
|
||||||
|
|
||||||
describe("relay-lock", () => {
|
describe("relay-lock", () => {
|
||||||
it("prevents concurrent relay instances and releases cleanly", async () => {
|
it("prevents concurrent relay instances and releases cleanly", async () => {
|
||||||
|
|
@ -16,7 +19,9 @@ describe("relay-lock", () => {
|
||||||
const release1 = await acquireRelayLock(lockPath);
|
const release1 = await acquireRelayLock(lockPath);
|
||||||
expect(fs.existsSync(lockPath)).toBe(true);
|
expect(fs.existsSync(lockPath)).toBe(true);
|
||||||
|
|
||||||
await expect(acquireRelayLock(lockPath)).rejects.toBeInstanceOf(RelayLockError);
|
await expect(acquireRelayLock(lockPath)).rejects.toBeInstanceOf(
|
||||||
|
RelayLockError,
|
||||||
|
);
|
||||||
|
|
||||||
await release1();
|
await release1();
|
||||||
expect(fs.existsSync(lockPath)).toBe(false);
|
expect(fs.existsSync(lockPath)).toBe(false);
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,9 @@ type ReleaseFn = () => Promise<void>;
|
||||||
* the next start will detect ECONNREFUSED when connecting and clean the stale path
|
* the next start will detect ECONNREFUSED when connecting and clean the stale path
|
||||||
* before retrying. This keeps the lock self-healing without manual pidfile cleanup.
|
* before retrying. This keeps the lock self-healing without manual pidfile cleanup.
|
||||||
*/
|
*/
|
||||||
export async function acquireRelayLock(lockPath = DEFAULT_LOCK_PATH): Promise<ReleaseFn> {
|
export async function acquireRelayLock(
|
||||||
|
lockPath = DEFAULT_LOCK_PATH,
|
||||||
|
): Promise<ReleaseFn> {
|
||||||
// Fast path: try to listen on the lock path.
|
// Fast path: try to listen on the lock path.
|
||||||
const attemptListen = (): Promise<net.Server> =>
|
const attemptListen = (): Promise<net.Server> =>
|
||||||
new Promise((resolve, reject) => {
|
new Promise((resolve, reject) => {
|
||||||
|
|
@ -33,7 +35,9 @@ export async function acquireRelayLock(lockPath = DEFAULT_LOCK_PATH): Promise<Re
|
||||||
|
|
||||||
client.once("connect", () => {
|
client.once("connect", () => {
|
||||||
client.destroy();
|
client.destroy();
|
||||||
reject(new RelayLockError("another relay instance is already running"));
|
reject(
|
||||||
|
new RelayLockError("another relay instance is already running"),
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
client.once("error", (connErr: NodeJS.ErrnoException) => {
|
client.once("error", (connErr: NodeJS.ErrnoException) => {
|
||||||
|
|
@ -84,7 +88,11 @@ export async function acquireRelayLock(lockPath = DEFAULT_LOCK_PATH): Promise<Re
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
};
|
};
|
||||||
|
|
||||||
cleanupSignals.forEach((sig) => process.once(sig, handleSignal));
|
for (const sig of cleanupSignals) {
|
||||||
|
process.once(sig, () => {
|
||||||
|
void handleSignal();
|
||||||
|
});
|
||||||
|
}
|
||||||
process.once("exit", () => {
|
process.once("exit", () => {
|
||||||
// Exit handler must be sync-safe; release is async but close+rm are fast.
|
// Exit handler must be sync-safe; release is async but close+rm are fast.
|
||||||
void release();
|
void release();
|
||||||
|
|
|
||||||
|
|
@ -59,6 +59,17 @@ function ensureSelfPresence() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function touchSelfPresence() {
|
||||||
|
const host = os.hostname();
|
||||||
|
const key = host.toLowerCase();
|
||||||
|
const existing = entries.get(key);
|
||||||
|
if (existing) {
|
||||||
|
entries.set(key, { ...existing, ts: Date.now() });
|
||||||
|
} else {
|
||||||
|
initSelfPresence();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
initSelfPresence();
|
initSelfPresence();
|
||||||
|
|
||||||
function parsePresence(text: string): SystemPresence {
|
function parsePresence(text: string): SystemPresence {
|
||||||
|
|
@ -96,5 +107,6 @@ export function updateSystemPresence(text: string) {
|
||||||
|
|
||||||
export function listSystemPresence(): SystemPresence[] {
|
export function listSystemPresence(): SystemPresence[] {
|
||||||
ensureSelfPresence();
|
ensureSelfPresence();
|
||||||
|
touchSelfPresence();
|
||||||
return [...entries.values()].sort((a, b) => b.ts - a.ts);
|
return [...entries.values()].sort((a, b) => b.ts - a.ts);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue