@@ -29,6 +29,8 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
2929 mdebugf_pop()
3030 })
3131 }
32+
33+ options(future.p2p.info.from = " worker coordinator" )
3234
3335 if (inherits(duration , " ssh_args" )) {
3436 # # e.g. ssh_args = "-J dt1"
@@ -132,9 +134,9 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
132134
133135 # # Main loop monitoring the P2P message board and the background worker
134136 state <- " waiting"
137+ info(" waiting for request" )
138+
135139 repeat tryCatch({
136- # info("worker state=%s", sQuote(state))
137-
138140 # # Is the worker process still alive
139141 if (! rx $ is_alive()) {
140142 info(" terminated" )
@@ -148,14 +150,12 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
148150 # # Relay stdout?
149151 if (" ready" %in% res [[" output" ]]) {
150152 out <- rx $ read_output_lines()
151- out <- sprintf(" [worker process] %s" , out )
152153 writeLines(out , con = stdout())
153154 }
154155
155156 # # Relay stderr?
156157 if (" ready" %in% res [[" error" ]]) {
157158 err <- rx $ read_error_lines()
158- err <- sprintf(" [worker process] %s" , err )
159159 writeLines(err , con = stderr())
160160 }
161161
@@ -194,11 +194,7 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
194194 # # Expired?
195195 if (Sys.time() > expires ) {
196196 info(" expired" )
197- rx $ interrupt()
198- future <- NULL
199- client <- NULL
200- # # FIXME: Update the P2P message board
201- break
197+ signalCondition(future_withdraw(" worker expired; terminating" ))
202198 }
203199
204200 # # Process request?
@@ -267,30 +263,40 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
267263 future <- NULL
268264 client <- NULL
269265 info(" Future %s has been resolved and results have been sent to client %s" , sQuote(future ), sQuote(client ))
266+ info(" waiting for request" )
270267 }
271268 }
272269 }, future_withdraw = function (c ) {
270+ msg <- conditionMessage(c )
271+ info <- sprintf(" state %s" , sQuote(state ))
272+ if (! is.null(client )) info <- c(info , sprintf(" client %s" , sQuote(client )))
273+ if (! is.null(future )) info <- c(info , sprintf(" future %s" , sQuote(future )))
274+ info <- paste(info , collapse = " , " )
275+ msg <- sprintf(" %s [%s]" , msg , info )
276+ info(msg )
277+
273278 # # Client withdrew future
274279 if (state == " waiting" ) {
275- info(" client %s withdrew future %s" , sQuote(client ), sQuote(future ))
276- state <<- " waiting"
280+ state <<- " waiting"
277281 } else if (state == " offer" ) {
278- # # FIXME: Decline work offer (although we can just ignore it
279- # # because the client did not respect what we support)
280- state <<- " waiting"
282+ # # FIXME: Decline work offer (although we can just ignore it
283+ # # because the client did not respect what we support)
284+ state <<- " waiting"
281285 } else if (state == " working" ) {
282- info(" Interrupting future %s, because client %s withdrew it" , sQuote(client ), sQuote(future ))
283286 state <<- " interrupt"
284287 rx $ interrupt()
285288 }
289+
286290 future <<- NULL
287291 client <<- NULL
292+ info(" waiting for request" )
288293 # # FIXME: Acknowledge withdrawal of future
289294 }, worker_interrupt = function (c ) {
290295 info(" Worker process was interrupted" )
291296 state <- " waiting"
292297 future <- NULL
293298 client <- NULL
299+ info(" waiting for request" )
294300 # # FIXME: Acknowledge withdrawal of future
295301 }, interrupt = function (c ) {
296302 info(" interrupted" )
@@ -300,6 +306,7 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
300306 client <- NULL
301307 # # FIXME: Update the P2P message board
302308 state <<- " exit"
309+ info(" exiting" )
303310 }) # # repeat tryCatch({ ... })
304311
305312 info(" Waiting 5 seconds before kill worker process and its children ..." )
@@ -318,7 +325,10 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
318325
319326# ' @importFrom future resolve plan sequential
320327run_worker <- function (cluster , worker_id , host , ssh_args , duration , channels ) {
321- old_opts <- options(parallelly.availableCores.fallback = 1L )
328+ old_opts <- options(
329+ parallelly.availableCores.fallback = 1L ,
330+ future.p2p.info.from = " worker"
331+ )
322332 on.exit(options(old_opts ))
323333 with(plan(sequential ), local = TRUE )
324334
@@ -393,9 +403,10 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
393403 info(" Future %s resolved after %s" , sQuote(future ), format(dt ))
394404 tx_parent(" resolved" )
395405
396- info(" sending future %s via %s" , sQuote(future ), sQuote(via ))
406+ info(" sending future result %s via %s" , sQuote(future ), sQuote(via ))
397407 res <- pico_p2p_send_result(p , future = f , via = via )
398408 tx_parent(" uploaded" )
409+ info(" future result %s sent" , sQuote(future ), sQuote(via ))
399410 }
400411 }, interrupt = function (c ) {
401412 info(" interrupted" )
0 commit comments