@@ -48,8 +48,27 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
4848 lapply(channels , FUN = file.create , showWarnings = FALSE )
4949 on.exit({
5050 lapply(channels , FUN = file.remove , showWarnings = FALSE )
51- })
51+ }, add = TRUE )
5252
53+ rx_worker <- function (channel = channels [[" rx" ]], clear = TRUE ) {
54+ if (file.size(channel ) == 0L ) return (character (0L ))
55+ # # Read everything available
56+ bfr <- readLines(channel , n = 1e6 , warn = FALSE )
57+ # # Consume communication channel 'rx'?
58+ if (clear ) file.create(channel )
59+ info(" state = %s, rx_worker() = %s" , sQuote(state ), commaq(bfr ))
60+ bfr
61+ } # # rx_worker()
62+
63+ tx_worker <- function (msg , channel = channels [[" tx" ]]) {
64+ writeLines(msg , con = channel )
65+ } # # tx_worker()
66+
67+
68+ now <- pico_p2p_time()
69+
70+ expires <- pico_p2p_time(delta = duration )
71+
5372 args <- list (
5473 cluster = cluster ,
5574 worker_id = worker_id ,
@@ -67,63 +86,208 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
6786 topic <- sprintf(" %s/future.p2p" , cluster )
6887 }
6988 p <- pico_pipe(topic , user = worker_id , host = host , ssh_args = ssh_args )
89+ on.exit({
90+ # # FIXME: Update the P2P message board that we're disconnecting
91+ }, add = TRUE )
7092
7193 info(" launching background worker process" )
7294 rx <- r_bg(run_worker , args = args , supervise = TRUE , package = TRUE )
7395 attr(rx , " channels" ) <- args [[" channels" ]]
7496
75- future_uuid <- NULL
76-
77- # # Relay output from the worker process
78- while (rx $ is_alive()) {
79- bfr <- readLines(channels [[" rx" ]], n = 1e6 , warn = FALSE )
80- if (any(grepl(" ^future=" , bfr ))) {
81- future_uuid <- grep(" ^future=" , bfr , value = TRUE )
82- if (length(future_uuid ) > 0 ) {
83- stopifnot(length(future_uuid ) == 1L )
84- future_uuid <- sub(" ^future=" , " " , future_uuid )
85- info(sprintf(" Processing future %s" , future_uuid ))
86-
87- # # Consume communication channel 'rx'
88- file.create(channels [[" rx" ]])
89- }
90- }
97+ # # Wait for worker to launch
98+ info(" waiting for worker process to start" )
99+ while (" started" %in% rx_worker()) {
100+ Sys.sleep(0.1 )
101+ }
102+ info(" worker process started" )
103+
104+ # # Announce we're available
105+ info(" announcing to p2p message board we are joining as a worker" )
106+ m <- pico_p2p_hello(p , type = " worker" , expires = expires )
107+
108+ # # The ID of the future we're offering to work on or that is being processed
109+ future <- NULL
110+ client <- NULL
91111
92- m <- pico_p2p_next_message(p )
93- if (! is.null(m )) {
94- to <- m [[" to" ]]
95- if (identical(to , worker_id )) {
96- type <- m [[" type" ]]
97- future_id <- m [[" future" ]]
98- if (" withdraw" %in% type && future_id %in% future_uuid ) {
99- info(" interrupting worker process for future %s" , sQuote(future_id ))
100- rx $ interrupt()
101- future_uuid <- NULL
102- }
103- }
112+ # # Main loop monitoring the P2P message board and the background worker
113+ state <- " waiting"
114+ repeat tryCatch({
115+ # info("worker state=%s", sQuote(state))
116+
117+ # # Is the worker process still alive
118+ if (! rx $ is_alive()) {
119+ info(" terminated" )
120+ # # FIXME: Update the P2P message board
121+ break
104122 }
105-
123+
124+ # # Any updates from worker, e.g. output to be relayed?
106125 res <- poll(list (rx ), ms = 100 )[[1 ]]
107126
108127 # # Relay stdout?
109128 if (" ready" %in% res [[" output" ]]) {
110129 out <- rx $ read_output_lines()
130+ out <- sprintf(" [worker process] %s" , out )
111131 writeLines(out , con = stdout())
112132 }
113133
114134 # # Relay stderr?
115135 if (" ready" %in% res [[" error" ]]) {
116136 err <- rx $ read_error_lines()
137+ err <- sprintf(" [worker process] %s" , err )
117138 writeLines(err , con = stderr())
118139 }
119- } # # while (rx$is_alive())
120140
121- info(" wait for worker process to terminate" )
122- rx $ wait()
141+ if (state == " exit" ) {
142+ info(" Terminating worker" )
143+ break
144+ }
145+
146+ # # Expired?
147+ if (Sys.time() > expires ) {
148+ info(" time is out" )
149+ # # FIXME: Update the P2P message board
150+ rx $ interrupt()
151+ future <- NULL
152+ client <- NULL
153+ break
154+ }
155+
156+ # # Was worker process interrupted?
157+ info <- rx_worker()
158+ if (" interrupted" %in% info ) {
159+ state <- " waiting"
160+ future <- NULL
161+ client <- NULL
162+ # # FIXME: Update client via P2P message board
163+ next
164+ }
165+
166+ # # Any messages from the P2P message board?
167+ # res <- poll(list(p), ms = 100)[[1]]
168+ # if (!"ready" %in% res[["output"]]) next
169+
170+ # # Read next message?
171+ m <- pico_p2p_next_message(p )
172+
173+ # # Expired?
174+ if (Sys.time() > expires ) {
175+ info(" expired" )
176+ rx $ interrupt()
177+ future <- NULL
178+ client <- NULL
179+ # # FIXME: Update the P2P message board
180+ break
181+ }
123182
124- info(" get worker process result" )
125- results <- rx $ get_result()
183+ # # Process request?
184+ if (length(m ) > 0 ) {
185+ # # Are we read to offer to do work?
186+ if (state == " waiting" && m [[" type" ]] == " request" ) {
187+ stop_if_not(is.null(future ), is.null(client ))
188+ future <- m [[" future" ]]
189+ client <- m [[" from" ]]
190+ info(" offer to process future %s for client %s" , sQuote(future ), sQuote(client ))
191+ state <- " offer"
192+ pico_p2p_take_on_future(p , to = client , future = future )
193+ } else if (state == " offer" && future %in% m [[" future" ]]) {
194+ info(" waiting for acceptance of our work offer" )
195+ if (m [[" type" ]] == " accept" && m [[" to" ]] == worker_id ) {
196+ info(" client %s accepted our offer to process future %s" , sQuote(client ), sQuote(future ))
197+
198+ # # Do we support the file transfer protocol?
199+ via <- m [[" via" ]]
200+ uri <- parse_transfer_uri(via )
201+ if (! uri [[" protocol" ]] %in% supported_transfer_protocols()) {
202+ info(" non-supported protocol" )
203+ # # FIXME: Decline work offer (although we can just ignore it
204+ # # because the client did not respect what we support)
205+ state <- " waiting"
206+ future <- NULL
207+ client <- NULL
208+ next
209+ }
210+
211+ state <- " working"
212+
213+ # # Tell worker to receive future from client
214+ tx_worker(sprintf(" download=%s,via=%s" , future , via ))
215+
216+ # # Wait for worker to *start* download future
217+ repeat {
218+ info <- rx_worker()
219+ if (length(info ) == 0 ) {
220+ Sys.sleep(0.1 )
221+ next
222+ }
126223
224+ if (" interrupted" %in% info ) {
225+ state <- " waiting"
226+ future <- NULL
227+ client <- NULL
228+ # # FIXME: Update client via P2P message board
229+ break
230+ }
231+
232+ if (" downloading" %in% info ) {
233+ # # FIXME: Acknowledge to work on future
234+ break
235+ }
236+ }
237+ } else if (m [[" type" ]] == " withdraw" ) {
238+ info(" client %s withdrew future %s" , sQuote(client ), sQuote(future ))
239+ state <- " waiting"
240+ future <- NULL
241+ client <- NULL
242+ # # FIXME: Acknowledge withdrawal of future
243+ }
244+ } else if (state == " working" ) {
245+ # # Withdrawal of future?
246+ if (m [[" type" ]] == " withdraw" && future %in% m [[" future" ]]) {
247+ info(" Interrupting future %s, because client %s withdrew it" , sQuote(client ), sQuote(future ))
248+ state <- " interrupt"
249+ rx $ interrupt()
250+ future <- NULL
251+ client <- NULL
252+ # # FIXME: Acknowledge withdrawal of future
253+ next
254+ }
255+ }
256+ } # # if (length(m) > 0)
257+
258+ if (state == " working" ) {
259+ # # Check if worker is done
260+ if (" resolved" %in% info ) {
261+ state <- " resolved"
262+ info(" Future %s has been resolved and results will be sent to client %s" , sQuote(future ), sQuote(client ))
263+ # # FIXME: Inform client that future has been resolved
264+ }
265+ } else if (state == " resolved" ) {
266+ # # Check if future results have been transferred
267+ if (" uploaded" %in% info ) {
268+ state <- " waiting"
269+ future <- NULL
270+ client <- NULL
271+ info(" Future %s has been resolved and results have been sent to client %s" , sQuote(future ), sQuote(client ))
272+ }
273+ }
274+ }, interrupt = function (c ) {
275+ info(" interrupted" )
276+ # # Interrupt worker
277+ rx $ interrupt()
278+ future <- NULL
279+ client <- NULL
280+ # # FIXME: Update the P2P message board
281+ state <<- " exit"
282+ }) # # repeat tryCatch({ ... })
283+
284+ info(" Waiting 5 seconds before kill worker process and its children ..." )
285+ Sys.sleep(5.0 )
286+ rx $ kill_tree()
287+
288+ info(" wait for the worker process to terminate" )
289+ rx $ wait()
290+
127291 info(" finalize worker process" )
128292 rx $ finalize()
129293
@@ -161,69 +325,60 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
161325
162326 rx <- channels [[" rx" ]]
163327 tx <- channels [[" tx" ]]
164-
165- repeat tryCatch({
166- # # Erase communication channels
167- file.create(channels )
168-
169- if (Sys.time() > expires ) {
170- info(" time is out" )
171- break
172- }
173-
174- info(" hello" )
175- m <- pico_p2p_hello(p , type = " worker" , expires = expires )
176-
177- info(" wait for request" )
178- m <- pico_p2p_wait_for(p , type = " request" , expires = expires )
179- if (m [[" type" ]] == " expired" ) {
180- info(" time is out" )
181- break
182- }
183-
184- client <- m $ from
185328
186- info(" offer to work for %s" , sQuote(client ))
187- pico_p2p_take_on_future(p , to = client , future = m $ future )
329+ rx_parent <- function (channel = channels [[" tx" ]], clear = TRUE ) {
330+ # # Read everything available
331+ bfr <- readLines(channel , n = 1e6 , warn = FALSE )
332+ # # Consume communication channel 'rx'?
333+ if (clear ) file.create(channel )
334+ bfr
335+ } # # rx_parent()
188336
189- info(" wait for accept" )
190- m <- pico_p2p_wait_for(p , type = c(" accept" , " withdraw" ), futures = m $ future )
191- if (m [[" type" ]] == " expired" ) {
192- info(" future request expired" )
193- next
194- }
337+ tx_parent <- function (msg , channel = channels [[" rx" ]]) {
338+ writeLines(msg , con = channel )
339+ } # # tx_parent()
195340
196- if (m [[" type" ]] == " withdraw" ) {
197- info(" future request withdraws" )
198- next
199- }
341+ # # Tell parent we're alive
342+ tx_parent(" started" )
200343
201- uri <- parse_transfer_uri(m [[" via" ]])
202- if (! uri [[" protocol" ]] %in% supported_transfer_protocols()) {
203- info(" non-supported protocol" )
344+ repeat tryCatch({
345+ # # Wait for instructions from parent
346+ action <- rx_parent()
347+ if (length(action ) == 0 ) {
348+ Sys.sleep(0.1 )
204349 next
205350 }
206351
207- if (m [[" to" ]] == worker_id ) {
208- info(" receive future from %s" , sQuote(client ))
209- res <- pico_p2p_receive_future(p , via = m [[" via" ]])
352+ # # Download and process future?
353+ pattern <- " ^download=([^,]+),via=(.*)$"
354+ if (grepl(pattern , action )) {
355+ future <- sub(pattern , " \\ 1" , action )
356+ via <- sub(pattern , " \\ 2" , action )
357+ info(" download future %s via %s" , sQuote(future ), sQuote(via ))
358+ stop_if_not(
359+ nzchar(future ), ! grepl(" [,=]" , future ),
360+ nzchar(via ), ! grepl(" [,=]" , via )
361+ )
362+ tx_parent(" downloading" )
363+ res <- pico_p2p_receive_future(p , via = via )
210364 f <- res [[" future" ]]
211- cat(sprintf( " future=%s " , f [[" uuid" ]]), file = rx , append = FALSE )
365+ stop_if_not(paste( f [[" uuid" ]], collapse = " - " ) == future )
212366
213- info(" process future %s:%s " , sQuote( client ) , sQuoteLabel(f ))
367+ info(" process future %s" , sQuoteLabel(f ))
214368 dt <- system.time({
215369 r <- tryCatch(result(f ), error = identity )
216370 })
217371 dt <- difftime(dt [3 ], 0 )
372+ info(" Future %s resolved after %s" , sQuote(future ), format(dt ))
373+ tx_parent(" resolved" )
218374
219- # # Erase communication channels
220- file.create(channels )
221-
222- info(" send future result to %s after %s processing" , sQuote(client ), format(dt ))
223- res <- pico_p2p_send_result(p , future = f , via = m [[" via" ]])
375+ info(" sending future %s via %s" , sQuote(future ), sQuote(via ))
376+ res <- pico_p2p_send_result(p , future = f , via = via )
377+ tx_parent(" uploaded" )
224378 }
225379 }, interrupt = function (c ) {
226380 info(" interrupted" )
381+ tx_parent(" interrupted" )
227382 }) # # repeat tryCatch({ ... })
228383
229384 info(" bye" )
0 commit comments