@@ -2,13 +2,26 @@ import Foundation
22import GRPC
33import NIO
44import os
5+ import Semaphore
56import Subprocess
7+ import SwiftUI
68
79@MainActor
810public protocol FileSyncDaemon : ObservableObject {
911 var state : DaemonState { get }
10- func start( ) async
12+ func start( ) async throws ( DaemonError )
1113 func stop( ) async
14+ func listSessions( ) async throws -> [ FileSyncSession ]
15+ func createSession( with: FileSyncSession ) async throws
16+ }
17+
18+ public struct FileSyncSession {
19+ public let id : String
20+ public let name : String
21+ public let localPath : URL
22+ public let workspace : String
23+ public let agent : String
24+ public let remotePath : URL
1225}
1326
1427@MainActor
@@ -17,7 +30,14 @@ public class MutagenDaemon: FileSyncDaemon {
1730
1831 @Published public var state : DaemonState = . stopped {
1932 didSet {
20- logger. info ( " daemon state changed: \( self . state. description, privacy: . public) " )
33+ logger. info ( " daemon state set: \( self . state. description, privacy: . public) " )
34+ if case . failed = state {
35+ Task {
36+ try ? await cleanupGRPC ( )
37+ }
38+ mutagenProcess? . kill ( )
39+ mutagenProcess = nil
40+ }
2141 }
2242 }
2343
@@ -26,46 +46,61 @@ public class MutagenDaemon: FileSyncDaemon {
2646 private let mutagenDataDirectory : URL
2747 private let mutagenDaemonSocket : URL
2848
49+ // Non-nil when the daemon is running
2950 private var group : MultiThreadedEventLoopGroup ?
3051 private var channel : GRPCChannel ?
31- private var client : Daemon_DaemonAsyncClient ?
32-
33- public init ( ) {
34- #if arch(arm64)
35- mutagenPath = Bundle . main. url ( forResource: " mutagen-darwin-arm64 " , withExtension: nil )
36- #elseif arch(x86_64)
37- mutagenPath = Bundle . main. url ( forResource: " mutagen-darwin-amd64 " , withExtension: nil )
38- #else
39- fatalError ( " unknown architecture " )
40- #endif
41- mutagenDataDirectory = FileManager . default. urls (
42- for: . applicationSupportDirectory,
43- in: . userDomainMask
44- ) . first!. appending ( path: " Coder Desktop " ) . appending ( path: " Mutagen " )
52+ private var client : DaemonClient ?
53+
54+ // Protect start & stop transitions against re-entrancy
55+ private let transition = AsyncSemaphore ( value: 1 )
56+
57+ public init ( mutagenPath: URL ? = nil ,
58+ mutagenDataDirectory: URL = FileManager . default. urls (
59+ for: . applicationSupportDirectory,
60+ in: . userDomainMask
61+ ) . first!. appending ( path: " Coder Desktop " ) . appending ( path: " Mutagen " ) )
62+ {
63+ self . mutagenPath = mutagenPath
64+ self . mutagenDataDirectory = mutagenDataDirectory
4565 mutagenDaemonSocket = mutagenDataDirectory. appending ( path: " daemon " ) . appending ( path: " daemon.sock " )
4666 // It shouldn't be fatal if the app was built without Mutagen embedded,
4767 // but file sync will be unavailable.
4868 if mutagenPath == nil {
4969 logger. warning ( " Mutagen not embedded in app, file sync will be unavailable " )
5070 state = . unavailable
71+ return
72+ }
73+
74+ // If there are sync sessions, the daemon should be running
75+ Task {
76+ do throws ( DaemonError) {
77+ try await start ( )
78+ } catch {
79+ state = . failed( error)
80+ return
81+ }
82+ await stopIfNoSessions ( )
5183 }
5284 }
5385
54- public func start( ) async {
86+ public func start( ) async throws ( DaemonError ) {
5587 if case . unavailable = state { return }
5688
5789 // Stop an orphaned daemon, if there is one
5890 try ? await connect ( )
5991 await stop ( )
6092
93+ await transition. wait ( )
94+ defer { transition. signal ( ) }
95+ logger. info ( " starting mutagen daemon " )
96+
6197 mutagenProcess = createMutagenProcess ( )
6298 // swiftlint:disable:next large_tuple
6399 let ( standardOutput, standardError, waitForExit) : ( Pipe . AsyncBytes , Pipe . AsyncBytes , @Sendable ( ) async -> Void )
64100 do {
65101 ( standardOutput, standardError, waitForExit) = try mutagenProcess!. run ( )
66102 } catch {
67- state = . failed( DaemonError . daemonStartFailure ( error) )
68- return
103+ throw . daemonStartFailure( error)
69104 }
70105
71106 Task {
@@ -85,10 +120,11 @@ public class MutagenDaemon: FileSyncDaemon {
85120 do {
86121 try await connect ( )
87122 } catch {
88- state = . failed( DaemonError . daemonStartFailure ( error) )
89- return
123+ throw . daemonStartFailure( error)
90124 }
91125
126+ try await waitForDaemonStart ( )
127+
92128 state = . running
93129 logger. info (
94130 """
@@ -98,6 +134,34 @@ public class MutagenDaemon: FileSyncDaemon {
98134 )
99135 }
100136
137+ // The daemon takes a moment to open the socket, and we don't want to hog the main actor
138+ // so poll for it on a background thread
139+ private func waitForDaemonStart(
140+ maxAttempts: Int = 5 ,
141+ attemptInterval: Duration = . milliseconds( 100 )
142+ ) async throws ( DaemonError) {
143+ do {
144+ try await Task . detached ( priority: . background) {
145+ for attempt in 0 ... maxAttempts {
146+ do {
147+ _ = try await self . client!. mgmt. version (
148+ Daemon_VersionRequest ( ) ,
149+ callOptions: . init( timeLimit: . timeout( . milliseconds( 500 ) ) )
150+ )
151+ return
152+ } catch {
153+ if attempt == maxAttempts {
154+ throw error
155+ }
156+ try ? await Task . sleep ( for: attemptInterval)
157+ }
158+ }
159+ } . value
160+ } catch {
161+ throw . daemonStartFailure( error)
162+ }
163+ }
164+
101165 private func connect( ) async throws ( DaemonError) {
102166 guard client == nil else {
103167 // Already connected
@@ -110,14 +174,17 @@ public class MutagenDaemon: FileSyncDaemon {
110174 transportSecurity: . plaintext,
111175 eventLoopGroup: group!
112176 )
113- client = Daemon_DaemonAsyncClient ( channel: channel!)
177+ client = DaemonClient (
178+ mgmt: Daemon_DaemonAsyncClient ( channel: channel!) ,
179+ sync: Synchronization_SynchronizationAsyncClient ( channel: channel!)
180+ )
114181 logger. info (
115182 " Successfully connected to mutagen daemon, socket: \( self . mutagenDaemonSocket. path, privacy: . public) "
116183 )
117184 } catch {
118185 logger. error ( " Failed to connect to gRPC: \( error) " )
119186 try ? await cleanupGRPC ( )
120- throw DaemonError . connectionFailure ( error)
187+ throw . connectionFailure( error)
121188 }
122189 }
123190
@@ -132,6 +199,10 @@ public class MutagenDaemon: FileSyncDaemon {
132199
133200 public func stop( ) async {
134201 if case . unavailable = state { return }
202+ await transition. wait ( )
203+ defer { transition. signal ( ) }
204+ logger. info ( " stopping mutagen daemon " )
205+
135206 state = . stopped
136207 guard FileManager . default. fileExists ( atPath: mutagenDaemonSocket. path) else {
137208 // Already stopped
@@ -140,7 +211,7 @@ public class MutagenDaemon: FileSyncDaemon {
140211
141212 // "We don't check the response or error, because the daemon
142213 // may terminate before it has a chance to send the response."
143- _ = try ? await client? . terminate (
214+ _ = try ? await client? . mgmt . terminate (
144215 Daemon_TerminateRequest ( ) ,
145216 callOptions: . init( timeLimit: . timeout( . milliseconds( 500 ) ) )
146217 )
@@ -175,6 +246,7 @@ public class MutagenDaemon: FileSyncDaemon {
175246 """
176247 )
177248 state = . failed( . terminatedUnexpectedly)
249+ return
178250 }
179251 }
180252
@@ -183,6 +255,55 @@ public class MutagenDaemon: FileSyncDaemon {
183255 logger. info ( " \( line, privacy: . public) " )
184256 }
185257 }
258+
259+ public func listSessions( ) async throws -> [ FileSyncSession ] {
260+ guard case . running = state else {
261+ return [ ]
262+ }
263+ // TODO: Implement
264+ return [ ]
265+ }
266+
267+ public func createSession( with _: FileSyncSession ) async throws {
268+ if case . stopped = state {
269+ do throws ( DaemonError) {
270+ try await start ( )
271+ } catch {
272+ state = . failed( error)
273+ return
274+ }
275+ }
276+ // TODO: Add Session
277+ }
278+
279+ public func deleteSession( ) async throws {
280+ // TODO: Delete session
281+ await stopIfNoSessions ( )
282+ }
283+
284+ private func stopIfNoSessions( ) async {
285+ let sessions : Synchronization_ListResponse
286+ do {
287+ sessions = try await client!. sync. list ( Synchronization_ListRequest . with { req in
288+ req. selection = . with { selection in
289+ selection. all = true
290+ }
291+ } )
292+ } catch {
293+ state = . failed( . daemonStartFailure( error) )
294+ return
295+ }
296+ // If there's no configured sessions, the daemon doesn't need to be running
297+ if sessions. sessionStates. isEmpty {
298+ logger. info ( " No sync sessions found " )
299+ await stop ( )
300+ }
301+ }
302+ }
303+
304+ struct DaemonClient {
305+ let mgmt : Daemon_DaemonAsyncClient
306+ let sync : Synchronization_SynchronizationAsyncClient
186307}
187308
188309public enum DaemonState {
@@ -191,7 +312,7 @@ public enum DaemonState {
191312 case failed( DaemonError )
192313 case unavailable
193314
194- var description : String {
315+ public var description : String {
195316 switch self {
196317 case . running:
197318 " Running "
@@ -203,12 +324,27 @@ public enum DaemonState {
203324 " Unavailable "
204325 }
205326 }
327+
328+ public var color : Color {
329+ switch self {
330+ case . running:
331+ . green
332+ case . stopped:
333+ . gray
334+ case . failed:
335+ . red
336+ case . unavailable:
337+ . gray
338+ }
339+ }
206340}
207341
208342public enum DaemonError : Error {
343+ case daemonNotRunning
209344 case daemonStartFailure( Error )
210345 case connectionFailure( Error )
211346 case terminatedUnexpectedly
347+ case grpcFailure( Error )
212348
213349 var description : String {
214350 switch self {
@@ -218,6 +354,10 @@ public enum DaemonError: Error {
218354 " Connection failure: \( error) "
219355 case . terminatedUnexpectedly:
220356 " Daemon terminated unexpectedly "
357+ case . daemonNotRunning:
358+ " The daemon must be started first "
359+ case let . grpcFailure( error) :
360+ " Failed to communicate with daemon: \( error) "
221361 }
222362 }
223363
0 commit comments