@@ -2,6 +2,7 @@ import Foundation
22import GRPC
33import NIO
44import os
5+ import Subprocess
56
67@MainActor
78public protocol FileSyncDaemon : ObservableObject {
@@ -20,8 +21,7 @@ public class MutagenDaemon: FileSyncDaemon {
2021 }
2122 }
2223
23- private var mutagenProcess : Process ?
24- private var mutagenPipe : Pipe ?
24+ private var mutagenProcess : Subprocess ?
2525 private let mutagenPath : URL !
2626 private let mutagenDataDirectory : URL
2727 private let mutagenDaemonSocket : URL
@@ -58,24 +58,42 @@ public class MutagenDaemon: FileSyncDaemon {
5858 try ? await connect ( )
5959 await stop ( )
6060
61- ( mutagenProcess, mutagenPipe) = createMutagenProcess ( )
61+ mutagenProcess = createMutagenProcess ( )
62+ // swiftlint:disable:next large_tuple
63+ let ( standardOutput, standardError, waitForExit) : ( Pipe . AsyncBytes , Pipe . AsyncBytes , @Sendable ( ) async -> Void )
6264 do {
63- try mutagenProcess? . run ( )
65+ ( standardOutput , standardError , waitForExit ) = try mutagenProcess! . run ( )
6466 } catch {
6567 state = . failed( DaemonError . daemonStartFailure ( error) )
68+ return
69+ }
70+
71+ Task {
72+ await streamHandler ( io: standardOutput)
73+ logger. info ( " standard output stream closed " )
74+ }
75+
76+ Task {
77+ await streamHandler ( io: standardError)
78+ logger. info ( " standard error stream closed " )
79+ }
80+
81+ Task {
82+ await terminationHandler ( waitForExit: waitForExit)
6683 }
6784
6885 do {
6986 try await connect ( )
7087 } catch {
7188 state = . failed( DaemonError . daemonStartFailure ( error) )
89+ return
7290 }
7391
7492 state = . running
7593 logger. info (
7694 """
7795 mutagen daemon started, pid:
78- \( self . mutagenProcess? . processIdentifier . description ?? " unknown " , privacy: . public)
96+ \( self . mutagenProcess? . pid . description ?? " unknown " , privacy: . public)
7997 """
8098 )
8199 }
@@ -129,46 +147,39 @@ public class MutagenDaemon: FileSyncDaemon {
129147
130148 try ? await cleanupGRPC ( )
131149
132- mutagenProcess? . terminate ( )
150+ mutagenProcess? . kill ( )
151+ mutagenProcess = nil
133152 logger. info ( " Daemon stopped and gRPC connection closed " )
134153 }
135154
136- private func createMutagenProcess( ) -> ( Process , Pipe ) {
137- let outputPipe = Pipe ( )
138- outputPipe. fileHandleForReading. readabilityHandler = logOutput
139- let process = Process ( )
140- process. executableURL = mutagenPath
141- process. arguments = [ " daemon " , " run " ]
142- logger. info ( " setting mutagen data directory: \( self . mutagenDataDirectory. path, privacy: . public) " )
155+ private func createMutagenProcess( ) -> Subprocess {
156+ let process = Subprocess ( [ mutagenPath. path, " daemon " , " run " ] )
143157 process. environment = [
144158 " MUTAGEN_DATA_DIRECTORY " : mutagenDataDirectory. path,
145159 ]
146- process. standardOutput = outputPipe
147- process. standardError = outputPipe
148- process. terminationHandler = terminationHandler
149- return ( process, outputPipe)
160+ logger. info ( " setting mutagen data directory: \( self . mutagenDataDirectory. path, privacy: . public) " )
161+ return process
150162 }
151163
152- private nonisolated func terminationHandler( process _: Process ) {
153- Task { @MainActor in
154- self . mutagenPipe? . fileHandleForReading. readabilityHandler = nil
155- mutagenProcess = nil
164+ private func terminationHandler( waitForExit: @Sendable ( ) async -> Void ) async {
165+ await waitForExit ( )
156166
157- try ? await cleanupGRPC ( )
158-
159- switch self . state {
160- case . stopped:
161- logger. info ( " mutagen daemon stopped " )
162- return
163- default :
164- logger. error ( " mutagen daemon exited unexpectedly " )
165- self . state = . failed( . terminatedUnexpectedly)
166- }
167+ switch state {
168+ case . stopped:
169+ logger. info ( " mutagen daemon stopped " )
170+ default :
171+ logger. error (
172+ """
173+ mutagen daemon exited unexpectedly with code:
174+ \( self . mutagenProcess? . exitCode. description ?? " unknown " )
175+ """
176+ )
177+ state = . failed( . terminatedUnexpectedly)
167178 }
168179 }
169180
170- private nonisolated func logOutput ( pipe : FileHandle ) {
171- if let line = String ( data : pipe . availableData , encoding : . utf8 ) , line != " " {
181+ private func streamHandler ( io : Pipe . AsyncBytes ) async {
182+ for await line in io . lines {
172183 logger. info ( " \( line, privacy: . public) " )
173184 }
174185 }
0 commit comments