fix(mac): make Canvas file watcher reliable
parent
086f98471e
commit
03c84d0f11
|
|
@ -1,11 +1,10 @@
|
||||||
import Foundation
|
import Foundation
|
||||||
import Darwin
|
import CoreServices
|
||||||
|
|
||||||
final class CanvasFileWatcher: @unchecked Sendable {
|
final class CanvasFileWatcher: @unchecked Sendable {
|
||||||
private let url: URL
|
private let url: URL
|
||||||
private let queue: DispatchQueue
|
private let queue: DispatchQueue
|
||||||
private var source: DispatchSourceFileSystemObject?
|
private var stream: FSEventStreamRef?
|
||||||
private var fd: Int32 = -1
|
|
||||||
private var pending = false
|
private var pending = false
|
||||||
private let onChange: () -> Void
|
private let onChange: () -> Void
|
||||||
|
|
||||||
|
|
@ -20,42 +19,76 @@ final class CanvasFileWatcher: @unchecked Sendable {
|
||||||
}
|
}
|
||||||
|
|
||||||
func start() {
|
func start() {
|
||||||
guard self.source == nil else { return }
|
guard self.stream == nil else { return }
|
||||||
let path = (self.url as NSURL).fileSystemRepresentation
|
|
||||||
let fd = open(path, O_EVTONLY)
|
|
||||||
guard fd >= 0 else { return }
|
|
||||||
self.fd = fd
|
|
||||||
|
|
||||||
let source = DispatchSource.makeFileSystemObjectSource(
|
let retainedSelf = Unmanaged.passRetained(self)
|
||||||
fileDescriptor: fd,
|
var context = FSEventStreamContext(
|
||||||
eventMask: [.write, .delete, .rename, .attrib, .extend, .link, .revoke],
|
version: 0,
|
||||||
queue: self.queue)
|
info: retainedSelf.toOpaque(),
|
||||||
|
retain: nil,
|
||||||
|
release: { pointer in
|
||||||
|
guard let pointer else { return }
|
||||||
|
Unmanaged<CanvasFileWatcher>.fromOpaque(pointer).release()
|
||||||
|
},
|
||||||
|
copyDescription: nil)
|
||||||
|
|
||||||
source.setEventHandler { [weak self] in
|
let paths = [self.url.path] as CFArray
|
||||||
guard let self else { return }
|
let flags = FSEventStreamCreateFlags(
|
||||||
if self.pending { return }
|
kFSEventStreamCreateFlagFileEvents |
|
||||||
self.pending = true
|
kFSEventStreamCreateFlagUseCFTypes |
|
||||||
self.queue.asyncAfter(deadline: .now() + 0.12) { [weak self] in
|
kFSEventStreamCreateFlagNoDefer)
|
||||||
guard let self else { return }
|
|
||||||
self.pending = false
|
guard let stream = FSEventStreamCreate(
|
||||||
self.onChange()
|
kCFAllocatorDefault,
|
||||||
}
|
Self.callback,
|
||||||
|
&context,
|
||||||
|
paths,
|
||||||
|
FSEventStreamEventId(kFSEventStreamEventIdSinceNow),
|
||||||
|
0.05,
|
||||||
|
flags)
|
||||||
|
else {
|
||||||
|
retainedSelf.release()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
source.setCancelHandler { [weak self] in
|
self.stream = stream
|
||||||
guard let self else { return }
|
FSEventStreamSetDispatchQueue(stream, self.queue)
|
||||||
if self.fd >= 0 {
|
if FSEventStreamStart(stream) == false {
|
||||||
close(self.fd)
|
self.stream = nil
|
||||||
self.fd = -1
|
FSEventStreamSetDispatchQueue(stream, nil)
|
||||||
}
|
FSEventStreamInvalidate(stream)
|
||||||
|
FSEventStreamRelease(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
self.source = source
|
|
||||||
source.resume()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func stop() {
|
func stop() {
|
||||||
self.source?.cancel()
|
guard let stream = self.stream else { return }
|
||||||
self.source = nil
|
self.stream = nil
|
||||||
|
FSEventStreamStop(stream)
|
||||||
|
FSEventStreamSetDispatchQueue(stream, nil)
|
||||||
|
FSEventStreamInvalidate(stream)
|
||||||
|
FSEventStreamRelease(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension CanvasFileWatcher {
|
||||||
|
private static let callback: FSEventStreamCallback = { _, info, numEvents, _, eventFlags, _ in
|
||||||
|
guard let info else { return }
|
||||||
|
let watcher = Unmanaged<CanvasFileWatcher>.fromOpaque(info).takeUnretainedValue()
|
||||||
|
watcher.handleEvents(numEvents: numEvents, eventFlags: eventFlags)
|
||||||
|
}
|
||||||
|
|
||||||
|
private func handleEvents(numEvents: Int, eventFlags: UnsafePointer<FSEventStreamEventFlags>?) {
|
||||||
|
guard numEvents > 0 else { return }
|
||||||
|
guard eventFlags != nil else { return }
|
||||||
|
|
||||||
|
// Coalesce rapid changes (common during builds/atomic saves).
|
||||||
|
if self.pending { return }
|
||||||
|
self.pending = true
|
||||||
|
self.queue.asyncAfter(deadline: .now() + 0.12) { [weak self] in
|
||||||
|
guard let self else { return }
|
||||||
|
self.pending = false
|
||||||
|
self.onChange()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,78 @@
|
||||||
|
import Foundation
|
||||||
|
import os
|
||||||
|
import Testing
|
||||||
|
@testable import Clawdis
|
||||||
|
|
||||||
|
@Suite(.serialized) struct CanvasFileWatcherTests {
|
||||||
|
private func makeTempDir() throws -> URL {
|
||||||
|
let base = URL(fileURLWithPath: NSTemporaryDirectory(), isDirectory: true)
|
||||||
|
let dir = base.appendingPathComponent("clawdis-canvaswatch-\(UUID().uuidString)", isDirectory: true)
|
||||||
|
try FileManager.default.createDirectory(at: dir, withIntermediateDirectories: true)
|
||||||
|
return dir
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test func detectsInPlaceFileWrites() async throws {
|
||||||
|
let dir = try self.makeTempDir()
|
||||||
|
defer { try? FileManager.default.removeItem(at: dir) }
|
||||||
|
|
||||||
|
let file = dir.appendingPathComponent("index.html")
|
||||||
|
try "hello".write(to: file, atomically: false, encoding: .utf8)
|
||||||
|
|
||||||
|
let fired = OSAllocatedUnfairLock(initialState: false)
|
||||||
|
let waitState = OSAllocatedUnfairLock<(fired: Bool, cont: CheckedContinuation<Void, Never>?)>(
|
||||||
|
initialState: (false, nil))
|
||||||
|
|
||||||
|
func waitForFire(timeoutNs: UInt64) async -> Bool {
|
||||||
|
await withTaskGroup(of: Bool.self) { group in
|
||||||
|
group.addTask {
|
||||||
|
await withCheckedContinuation { cont in
|
||||||
|
let resumeImmediately = waitState.withLock { state in
|
||||||
|
if state.fired { return true }
|
||||||
|
state.cont = cont
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if resumeImmediately {
|
||||||
|
cont.resume()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
group.addTask {
|
||||||
|
try? await Task.sleep(nanoseconds: timeoutNs)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
let result = await group.next() ?? false
|
||||||
|
group.cancelAll()
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let watcher = CanvasFileWatcher(url: dir) {
|
||||||
|
fired.withLock { $0 = true }
|
||||||
|
let cont = waitState.withLock { state in
|
||||||
|
state.fired = true
|
||||||
|
let cont = state.cont
|
||||||
|
state.cont = nil
|
||||||
|
return cont
|
||||||
|
}
|
||||||
|
cont?.resume()
|
||||||
|
}
|
||||||
|
watcher.start()
|
||||||
|
defer { watcher.stop() }
|
||||||
|
|
||||||
|
// Give the stream a moment to start.
|
||||||
|
try await Task.sleep(nanoseconds: 150 * 1_000_000)
|
||||||
|
|
||||||
|
// Modify the file in-place (no rename). This used to be missed when only watching the directory vnode.
|
||||||
|
let handle = try FileHandle(forUpdating: file)
|
||||||
|
try handle.seekToEnd()
|
||||||
|
try handle.write(contentsOf: Data(" world".utf8))
|
||||||
|
try handle.close()
|
||||||
|
|
||||||
|
let ok = await waitForFire(timeoutNs: 2_000_000_000)
|
||||||
|
#expect(ok == true)
|
||||||
|
#expect(fired.withLock { $0 } == true)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue