@@ -60,29 +60,6 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
6060 lapply(channels , FUN = file.remove , showWarnings = FALSE )
6161 }, add = TRUE )
6262
63- rx_worker <- function (channel = channels [[" rx" ]], clear = TRUE ) {
64- if (file.size(channel ) == 0L ) return (character (0L ))
65- if (debug ) {
66- mdebugf_push(" rx_worker(clear = %s) ..." , clear )
67- mdebug_pop()
68- }
69- # # Read everything available
70- bfr <- readLines(channel , n = 1e6 , warn = FALSE )
71- if (debug ) {
72- mdebugf(" messages from worker process: [n=%d] %s" , length(bfr ), commaq(bfr ))
73- }
74-
75- # # Consume communication channel 'rx'?
76- if (clear ) file.create(channel )
77-
78- # # Was the worker interrupted?
79- if (" interrupted" %in% bfr ) {
80- signalCondition(worker_interrupt())
81- }
82-
83- bfr
84- } # # rx_worker()
85-
8663 tx_worker <- function (msg , channel = channels [[" tx" ]]) {
8764 writeLines(msg , con = channel )
8865 } # # tx_worker()
@@ -138,28 +115,8 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
138115 break
139116 }
140117
141- # # Any updates from worker, e.g. output to be relayed?
142- res <- poll(list (rx ), ms = 100 )[[1 ]]
143-
144- worker_status <- NULL
118+ worker_status <- process_worker_messages(rx , debug = debug )
145119
146- # # Relay stdout?
147- if (" ready" %in% res [[" output" ]]) {
148- out <- rx $ read_output_lines()
149- is_special <- grepl(" ^worker_status=" , out )
150- worker_status <- out [is_special ]
151- out <- out [! is_special ]
152- out <- sprintf(" %s" , out )
153- writeLines(out , con = stdout())
154- }
155-
156- # # Relay stderr?
157- if (" ready" %in% res [[" error" ]]) {
158- err <- rx $ read_error_lines()
159- err <- sprintf(" %s" , err )
160- writeLines(err , con = stderr())
161- }
162-
163120 if (state == " exit" ) {
164121 info(" Terminating worker" )
165122 break
@@ -188,16 +145,6 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
188145 break
189146 }
190147
191- # # Was worker process interrupted?
192- info <- rx_worker()
193- if (" interrupted" %in% info ) {
194- state <- " waiting"
195- future <- NULL
196- client <- NULL
197- # # FIXME: Update client via P2P message board
198- next
199- }
200-
201148 # # Any messages from the P2P message board?
202149# res <- poll(list(p), ms = 100)[[1]]
203150# if (!"ready" %in% res[["output"]]) next
@@ -253,16 +200,12 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
253200
254201 # # Wait for worker to *start* download future
255202 repeat {
256- info <- rx_worker()
257- if (length(info ) == 0 ) {
258- Sys.sleep(0.1 )
259- next
260- }
261-
262- if (" downloading" %in% info ) {
203+ worker_status <- process_worker_messages(rx , debug = debug )
204+ if (" downloading" %in% worker_status ) {
263205 # # FIXME: Acknowledge to work on future
264206 break
265207 }
208+ Sys.sleep(0.1 )
266209 }
267210 } else {
268211 info(" withdraw offer for future %s, because client %s accepted another worker's offer" , sQuote(future ), sQuote(client ))
@@ -281,14 +224,14 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
281224
282225 if (state == " working" ) {
283226 # # Check if worker is done
284- if (" resolved" %in% info ) {
227+ if (" resolved" %in% worker_status ) {
285228 state <- " resolved"
286229 info(" Future %s has been resolved and results will be sent to client %s" , sQuote(future ), sQuote(client ))
287230 # # FIXME: Inform client that future has been resolved
288231 }
289232 } else if (state == " resolved" ) {
290233 # # Check if future results have been transferred
291- if (" ready" %in% info ) {
234+ if (" ready" %in% worker_status ) {
292235 state <- " waiting"
293236 offer_expires <- Inf
294237 future <- NULL
@@ -481,3 +424,42 @@ worker_interrupt <- function(message = "worker process interrupted", call = NULL
481424 cond
482425}
483426
427+
428+ process_worker_messages <- function (rx , debug = FALSE ) {
429+ if (debug ) {
430+ mdebug_push(" process_worker_messages() ..." )
431+ on.exit({
432+ mdebugf(" worker_status: [n=%d] %s" , length(worker_status ), commaq((worker_status )))
433+ mdebug_pop()
434+ })
435+ }
436+
437+ # # Any updates from worker, e.g. output to be relayed?
438+ res <- poll(list (rx ), ms = 100 )[[1 ]]
439+
440+ worker_status <- NULL
441+
442+ # # Relay stdout?
443+ if (" ready" %in% res [[" output" ]]) {
444+ out <- rx $ read_output_lines()
445+
446+ # # Parse special messages
447+ pattern <- " ^worker_status="
448+ is_special <- grepl(pattern , out )
449+ worker_status <- sub(pattern , " " , out [is_special ])
450+ out <- out [! is_special ]
451+
452+ out <- sprintf(" %s" , out )
453+ writeLines(out , con = stdout())
454+ }
455+
456+ # # Relay stderr?
457+ if (" ready" %in% res [[" error" ]]) {
458+ err <- rx $ read_error_lines()
459+ err <- sprintf(" %s" , err )
460+ writeLines(err , con = stderr())
461+ }
462+
463+ # # Return new worker status, if received
464+ worker_status
465+ } # # process_worker_messages()
0 commit comments