refactor(macos): centralize gateway endpoint resolution
parent
6354dddff2
commit
14e3b34a8e
|
|
@ -34,7 +34,7 @@ final class ConnectionModeCoordinator {
|
||||||
WebChatManager.shared.resetTunnels()
|
WebChatManager.shared.resetTunnels()
|
||||||
|
|
||||||
do {
|
do {
|
||||||
_ = try await RemoteTunnelManager.shared.ensureControlTunnel()
|
_ = try await GatewayEndpointStore.shared.ensureRemoteControlTunnel()
|
||||||
let settings = CommandResolver.connectionSettings()
|
let settings = CommandResolver.connectionSettings()
|
||||||
try await ControlChannel.shared.configure(mode: .remote(
|
try await ControlChannel.shared.configure(mode: .remote(
|
||||||
target: settings.target,
|
target: settings.target,
|
||||||
|
|
|
||||||
|
|
@ -79,10 +79,9 @@ final class ControlChannel: ObservableObject {
|
||||||
case .local:
|
case .local:
|
||||||
await self.configure()
|
await self.configure()
|
||||||
case let .remote(target, identity):
|
case let .remote(target, identity):
|
||||||
// Create/ensure SSH tunnel, then talk to the forwarded local port.
|
|
||||||
_ = (target, identity)
|
|
||||||
do {
|
do {
|
||||||
_ = try await RemoteTunnelManager.shared.ensureControlTunnel()
|
_ = (target, identity)
|
||||||
|
_ = try await GatewayEndpointStore.shared.ensureRemoteControlTunnel()
|
||||||
await self.configure()
|
await self.configure()
|
||||||
} catch {
|
} catch {
|
||||||
self.state = .degraded(error.localizedDescription)
|
self.state = .degraded(error.localizedDescription)
|
||||||
|
|
@ -215,8 +214,7 @@ final class ControlChannel: ObservableObject {
|
||||||
switch push {
|
switch push {
|
||||||
case let .event(evt) where evt.event == "agent":
|
case let .event(evt) where evt.event == "agent":
|
||||||
if let payload = evt.payload,
|
if let payload = evt.payload,
|
||||||
let payloadData = try? JSONEncoder().encode(payload),
|
let agent = try? GatewayPayloadDecoding.decode(payload, as: ControlAgentEvent.self)
|
||||||
let agent = try? JSONDecoder().decode(ControlAgentEvent.self, from: payloadData)
|
|
||||||
{
|
{
|
||||||
AgentEventStore.shared.append(agent)
|
AgentEventStore.shared.append(agent)
|
||||||
self.routeWorkActivity(from: agent)
|
self.routeWorkActivity(from: agent)
|
||||||
|
|
|
||||||
|
|
@ -109,20 +109,6 @@ actor GatewayConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static func defaultConfigProvider() async throws -> Config {
|
private static func defaultConfigProvider() async throws -> Config {
|
||||||
let mode = await MainActor.run { AppStateStore.shared.connectionMode }
|
try await GatewayEndpointStore.shared.requireConfig()
|
||||||
let token = ProcessInfo.processInfo.environment["CLAWDIS_GATEWAY_TOKEN"]
|
|
||||||
switch mode {
|
|
||||||
case .local:
|
|
||||||
let port = GatewayEnvironment.gatewayPort()
|
|
||||||
return (URL(string: "ws://127.0.0.1:\(port)")!, token)
|
|
||||||
case .remote:
|
|
||||||
if let forwarded = await RemoteTunnelManager.shared.controlTunnelPortIfRunning() {
|
|
||||||
return (URL(string: "ws://127.0.0.1:\(Int(forwarded))")!, token)
|
|
||||||
}
|
|
||||||
throw NSError(
|
|
||||||
domain: "RemoteTunnel",
|
|
||||||
code: 2,
|
|
||||||
userInfo: [NSLocalizedDescriptionKey: "Remote mode is enabled, but the control tunnel is not active"])
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,120 @@
|
||||||
|
import Foundation
|
||||||
|
import OSLog
|
||||||
|
|
||||||
|
enum GatewayEndpointState: Sendable, Equatable {
|
||||||
|
case ready(mode: AppState.ConnectionMode, url: URL, token: String?)
|
||||||
|
case unavailable(mode: AppState.ConnectionMode, reason: String)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Single place to resolve (and publish) the effective gateway control endpoint.
|
||||||
|
///
|
||||||
|
/// This is intentionally separate from `GatewayConnection`:
|
||||||
|
/// - `GatewayConnection` consumes the resolved endpoint (no tunnel side-effects).
|
||||||
|
/// - The endpoint store owns observation + explicit "ensure tunnel" actions.
|
||||||
|
actor GatewayEndpointStore {
|
||||||
|
static let shared = GatewayEndpointStore()
|
||||||
|
|
||||||
|
struct Deps: Sendable {
|
||||||
|
let mode: @Sendable () async -> AppState.ConnectionMode
|
||||||
|
let token: @Sendable () -> String?
|
||||||
|
let localPort: @Sendable () -> Int
|
||||||
|
let remotePortIfRunning: @Sendable () async -> UInt16?
|
||||||
|
let ensureRemoteTunnel: @Sendable () async throws -> UInt16
|
||||||
|
|
||||||
|
static let live = Deps(
|
||||||
|
mode: { await MainActor.run { AppStateStore.shared.connectionMode } },
|
||||||
|
token: { ProcessInfo.processInfo.environment["CLAWDIS_GATEWAY_TOKEN"] },
|
||||||
|
localPort: { GatewayEnvironment.gatewayPort() },
|
||||||
|
remotePortIfRunning: { await RemoteTunnelManager.shared.controlTunnelPortIfRunning() },
|
||||||
|
ensureRemoteTunnel: { try await RemoteTunnelManager.shared.ensureControlTunnel() })
|
||||||
|
}
|
||||||
|
|
||||||
|
private let deps: Deps
|
||||||
|
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "gateway-endpoint")
|
||||||
|
|
||||||
|
private var state: GatewayEndpointState
|
||||||
|
private var subscribers: [UUID: AsyncStream<GatewayEndpointState>.Continuation] = [:]
|
||||||
|
|
||||||
|
init(deps: Deps = .live) {
|
||||||
|
self.deps = deps
|
||||||
|
let port = deps.localPort()
|
||||||
|
self.state = .ready(mode: .local, url: URL(string: "ws://127.0.0.1:\(port)")!, token: deps.token())
|
||||||
|
}
|
||||||
|
|
||||||
|
func subscribe(bufferingNewest: Int = 1) -> AsyncStream<GatewayEndpointState> {
|
||||||
|
let id = UUID()
|
||||||
|
let initial = self.state
|
||||||
|
let store = self
|
||||||
|
return AsyncStream(bufferingPolicy: .bufferingNewest(bufferingNewest)) { continuation in
|
||||||
|
continuation.yield(initial)
|
||||||
|
self.subscribers[id] = continuation
|
||||||
|
continuation.onTermination = { @Sendable _ in
|
||||||
|
Task { await store.removeSubscriber(id) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func refresh() async {
|
||||||
|
let mode = await self.deps.mode()
|
||||||
|
await self.setMode(mode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setMode(_ mode: AppState.ConnectionMode) async {
|
||||||
|
let token = self.deps.token()
|
||||||
|
switch mode {
|
||||||
|
case .local:
|
||||||
|
let port = self.deps.localPort()
|
||||||
|
self.setState(.ready(mode: .local, url: URL(string: "ws://127.0.0.1:\(port)")!, token: token))
|
||||||
|
case .remote:
|
||||||
|
let port = await self.deps.remotePortIfRunning()
|
||||||
|
guard let port else {
|
||||||
|
self.setState(.unavailable(mode: .remote, reason: "Remote mode enabled but no active control tunnel"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
self.setState(.ready(mode: .remote, url: URL(string: "ws://127.0.0.1:\(Int(port))")!, token: token))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Explicit action: ensure the remote control tunnel is established and publish the resolved endpoint.
|
||||||
|
func ensureRemoteControlTunnel() async throws -> UInt16 {
|
||||||
|
let mode = await self.deps.mode()
|
||||||
|
guard mode == .remote else {
|
||||||
|
throw NSError(
|
||||||
|
domain: "RemoteTunnel",
|
||||||
|
code: 1,
|
||||||
|
userInfo: [NSLocalizedDescriptionKey: "Remote mode is not enabled"])
|
||||||
|
}
|
||||||
|
let port = try await self.deps.ensureRemoteTunnel()
|
||||||
|
await self.setMode(.remote)
|
||||||
|
return port
|
||||||
|
}
|
||||||
|
|
||||||
|
func requireConfig() async throws -> GatewayConnection.Config {
|
||||||
|
await self.refresh()
|
||||||
|
switch self.state {
|
||||||
|
case let .ready(_, url, token):
|
||||||
|
return (url, token)
|
||||||
|
case let .unavailable(_, reason):
|
||||||
|
throw NSError(domain: "GatewayEndpoint", code: 1, userInfo: [NSLocalizedDescriptionKey: reason])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func removeSubscriber(_ id: UUID) {
|
||||||
|
self.subscribers[id] = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
private func setState(_ next: GatewayEndpointState) {
|
||||||
|
guard next != self.state else { return }
|
||||||
|
self.state = next
|
||||||
|
for (_, continuation) in self.subscribers {
|
||||||
|
continuation.yield(next)
|
||||||
|
}
|
||||||
|
switch next {
|
||||||
|
case let .ready(mode, url, _):
|
||||||
|
self.logger.debug("resolved endpoint mode=\(String(describing: mode), privacy: .public) url=\(url.absoluteString, privacy: .public)")
|
||||||
|
case let .unavailable(mode, reason):
|
||||||
|
self.logger.debug("endpoint unavailable mode=\(String(describing: mode), privacy: .public) reason=\(reason, privacy: .public)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -220,7 +220,7 @@ final class WebChatWindowController: NSWindowController, WKNavigationDelegate, N
|
||||||
|
|
||||||
private func prepareGatewayPort() async throws -> Int {
|
private func prepareGatewayPort() async throws -> Int {
|
||||||
if CommandResolver.connectionModeIsRemote() {
|
if CommandResolver.connectionModeIsRemote() {
|
||||||
let forwarded = try await RemoteTunnelManager.shared.ensureControlTunnel()
|
let forwarded = try await GatewayEndpointStore.shared.ensureRemoteControlTunnel()
|
||||||
return Int(forwarded)
|
return Int(forwarded)
|
||||||
}
|
}
|
||||||
return GatewayEnvironment.gatewayPort()
|
return GatewayEnvironment.gatewayPort()
|
||||||
|
|
@ -684,7 +684,7 @@ final class WebChatManager {
|
||||||
let gatewayPort: Int
|
let gatewayPort: Int
|
||||||
if CommandResolver.connectionModeIsRemote() {
|
if CommandResolver.connectionModeIsRemote() {
|
||||||
do {
|
do {
|
||||||
let forwarded = try await RemoteTunnelManager.shared.ensureControlTunnel()
|
let forwarded = try await GatewayEndpointStore.shared.ensureRemoteControlTunnel()
|
||||||
gatewayPort = Int(forwarded)
|
gatewayPort = Int(forwarded)
|
||||||
|
|
||||||
let root = try WebChatWindowController.webChatAssetsRootURL()
|
let root = try WebChatWindowController.webChatAssetsRootURL()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue