Relay: enforce single instance lock
parent
3dff09424d
commit
2cd27d0d4a
|
|
@ -8,6 +8,7 @@ import { statusCommand } from "../commands/status.js";
|
||||||
import { loadConfig } from "../config/config.js";
|
import { loadConfig } from "../config/config.js";
|
||||||
import { danger, info, setVerbose } from "../globals.js";
|
import { danger, info, setVerbose } from "../globals.js";
|
||||||
import { startControlChannel } from "../infra/control-channel.js";
|
import { startControlChannel } from "../infra/control-channel.js";
|
||||||
|
import { acquireRelayLock, RelayLockError } from "../infra/relay-lock.js";
|
||||||
import { getResolvedLoggerSettings } from "../logging.js";
|
import { getResolvedLoggerSettings } from "../logging.js";
|
||||||
import {
|
import {
|
||||||
loginWeb,
|
loginWeb,
|
||||||
|
|
@ -383,6 +384,18 @@ Examples:
|
||||||
const { file: logFile, level: logLevel } = getResolvedLoggerSettings();
|
const { file: logFile, level: logLevel } = getResolvedLoggerSettings();
|
||||||
defaultRuntime.log(info(`logs: ${logFile} (level ${logLevel})`));
|
defaultRuntime.log(info(`logs: ${logFile} (level ${logLevel})`));
|
||||||
|
|
||||||
|
let releaseRelayLock: (() => Promise<void>) | null = null;
|
||||||
|
try {
|
||||||
|
releaseRelayLock = await acquireRelayLock();
|
||||||
|
} catch (err) {
|
||||||
|
if (err instanceof RelayLockError) {
|
||||||
|
defaultRuntime.error(danger(`Relay already running: ${err.message}`));
|
||||||
|
defaultRuntime.exit(1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
|
||||||
const providerOpt = (opts.provider ?? "auto").toLowerCase();
|
const providerOpt = (opts.provider ?? "auto").toLowerCase();
|
||||||
const cfg = loadConfig();
|
const cfg = loadConfig();
|
||||||
const telegramToken =
|
const telegramToken =
|
||||||
|
|
@ -589,6 +602,7 @@ Examples:
|
||||||
defaultRuntime.error(danger(`Relay failed: ${String(err)}`));
|
defaultRuntime.error(danger(`Relay failed: ${String(err)}`));
|
||||||
defaultRuntime.exit(1);
|
defaultRuntime.exit(1);
|
||||||
} finally {
|
} finally {
|
||||||
|
if (releaseRelayLock) await releaseRelayLock();
|
||||||
if (control) await control.close();
|
if (control) await control.close();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,29 @@
|
||||||
|
import fs from "node:fs";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
|
||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
|
||||||
|
import { acquireRelayLock, RelayLockError } from "./relay-lock.js";
|
||||||
|
|
||||||
|
const newLockPath = () =>
|
||||||
|
path.join(os.tmpdir(), `clawdis-relay-lock-test-${process.pid}-${Math.random().toString(16).slice(2)}.sock`);
|
||||||
|
|
||||||
|
describe("relay-lock", () => {
|
||||||
|
it("prevents concurrent relay instances and releases cleanly", async () => {
|
||||||
|
const lockPath = newLockPath();
|
||||||
|
|
||||||
|
const release1 = await acquireRelayLock(lockPath);
|
||||||
|
expect(fs.existsSync(lockPath)).toBe(true);
|
||||||
|
|
||||||
|
await expect(acquireRelayLock(lockPath)).rejects.toBeInstanceOf(RelayLockError);
|
||||||
|
|
||||||
|
await release1();
|
||||||
|
expect(fs.existsSync(lockPath)).toBe(false);
|
||||||
|
|
||||||
|
// After release, lock can be reacquired.
|
||||||
|
const release2 = await acquireRelayLock(lockPath);
|
||||||
|
await release2();
|
||||||
|
expect(fs.existsSync(lockPath)).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -0,0 +1,94 @@
|
||||||
|
import fs from "node:fs";
|
||||||
|
import net from "node:net";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
|
||||||
|
const DEFAULT_LOCK_PATH = path.join(os.tmpdir(), "clawdis-relay.lock");
|
||||||
|
|
||||||
|
export class RelayLockError extends Error {}
|
||||||
|
|
||||||
|
type ReleaseFn = () => Promise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquire an exclusive single-instance lock for the relay using a Unix domain socket.
|
||||||
|
*
|
||||||
|
* Why a socket? If the process crashes or is SIGKILLed, the socket file remains but
|
||||||
|
* the next start will detect ECONNREFUSED when connecting and clean the stale path
|
||||||
|
* before retrying. This keeps the lock self-healing without manual pidfile cleanup.
|
||||||
|
*/
|
||||||
|
export async function acquireRelayLock(lockPath = DEFAULT_LOCK_PATH): Promise<ReleaseFn> {
|
||||||
|
// Fast path: try to listen on the lock path.
|
||||||
|
const attemptListen = (): Promise<net.Server> =>
|
||||||
|
new Promise((resolve, reject) => {
|
||||||
|
const server = net.createServer();
|
||||||
|
|
||||||
|
server.once("error", async (err: NodeJS.ErrnoException) => {
|
||||||
|
if (err.code !== "EADDRINUSE") {
|
||||||
|
reject(new RelayLockError(`lock listen failed: ${err.message}`));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Something is already bound. Try to connect to see if it is alive.
|
||||||
|
const client = net.connect({ path: lockPath });
|
||||||
|
|
||||||
|
client.once("connect", () => {
|
||||||
|
client.destroy();
|
||||||
|
reject(new RelayLockError("another relay instance is already running"));
|
||||||
|
});
|
||||||
|
|
||||||
|
client.once("error", (connErr: NodeJS.ErrnoException) => {
|
||||||
|
// Nothing is listening -> stale socket file. Remove and retry once.
|
||||||
|
if (connErr.code === "ECONNREFUSED" || connErr.code === "ENOENT") {
|
||||||
|
try {
|
||||||
|
fs.rmSync(lockPath, { force: true });
|
||||||
|
} catch (rmErr) {
|
||||||
|
reject(
|
||||||
|
new RelayLockError(
|
||||||
|
`failed to clean stale lock at ${lockPath}: ${String(rmErr)}`,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
attemptListen().then(resolve, reject);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
reject(
|
||||||
|
new RelayLockError(
|
||||||
|
`failed to connect to existing lock (${lockPath}): ${connErr.message}`,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(lockPath, () => resolve(server));
|
||||||
|
});
|
||||||
|
|
||||||
|
const server = await attemptListen();
|
||||||
|
|
||||||
|
let released = false;
|
||||||
|
const release = async (): Promise<void> => {
|
||||||
|
if (released) return;
|
||||||
|
released = true;
|
||||||
|
await new Promise<void>((resolve) => server.close(() => resolve()));
|
||||||
|
try {
|
||||||
|
fs.rmSync(lockPath, { force: true });
|
||||||
|
} catch {
|
||||||
|
/* ignore */
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const cleanupSignals: NodeJS.Signals[] = ["SIGINT", "SIGTERM", "SIGHUP"];
|
||||||
|
const handleSignal = async () => {
|
||||||
|
await release();
|
||||||
|
process.exit(0);
|
||||||
|
};
|
||||||
|
|
||||||
|
cleanupSignals.forEach((sig) => process.once(sig, handleSignal));
|
||||||
|
process.once("exit", () => {
|
||||||
|
// Exit handler must be sync-safe; release is async but close+rm are fast.
|
||||||
|
void release();
|
||||||
|
});
|
||||||
|
|
||||||
|
return release;
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue