@@ -22,6 +22,14 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
2222 stop(sprintf(" Argument 'cluster' must be of format '{owner}/{name}': %s" , sQuote(cluster )))
2323 }
2424
25+ debug <- isTRUE(getOption(" future.p2p.debug" ))
26+ if (debug ) {
27+ mdebug_push(" future.p2p::worker() ..." )
28+ on.exit({
29+ mdebugf_pop()
30+ })
31+ }
32+
2533 if (inherits(duration , " ssh_args" )) {
2634 # # e.g. ssh_args = "-J dt1"
2735 ssh_args <- strsplit(ssh_args , split = " " , fixed = TRUE )[[1 ]]
@@ -52,11 +60,24 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
5260
5361 rx_worker <- function (channel = channels [[" rx" ]], clear = TRUE ) {
5462 if (file.size(channel ) == 0L ) return (character (0L ))
63+ if (debug ) {
64+ mdebugf_push(" rx_worker(clear = %s) ..." , clear )
65+ mdebug_pop()
66+ }
5567 # # Read everything available
5668 bfr <- readLines(channel , n = 1e6 , warn = FALSE )
69+ if (debug ) {
70+ mdebugf(" messages from worker process: [n=%d] %s" , length(bfr ), commaq(bfr ))
71+ }
72+
5773 # # Consume communication channel 'rx'?
5874 if (clear ) file.create(channel )
59- info(" state = %s, rx_worker() = %s" , sQuote(state ), commaq(bfr ))
75+
76+ # # Was the worker interrupted?
77+ if (" interrupted" %in% bfr ) {
78+ signalCondition(worker_interrupt())
79+ }
80+
6081 bfr
6182 } # # rx_worker()
6283
@@ -200,12 +221,7 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
200221 uri <- parse_transfer_uri(via )
201222 if (! uri [[" protocol" ]] %in% supported_transfer_protocols()) {
202223 info(" non-supported protocol" )
203- # # FIXME: Decline work offer (although we can just ignore it
204- # # because the client did not respect what we support)
205- state <- " waiting"
206- future <- NULL
207- client <- NULL
208- next
224+ signalCondition(future_withdraw(sprintf(" non-supported file-transfer protocol: %s" , uri [[" protocol" ]])))
209225 }
210226
211227 state <- " working"
@@ -235,11 +251,7 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
235251 }
236252 }
237253 } else if (m [[" type" ]] == " withdraw" ) {
238- info(" client %s withdrew future %s" , sQuote(client ), sQuote(future ))
239- state <- " waiting"
240- future <- NULL
241- client <- NULL
242- # # FIXME: Acknowledge withdrawal of future
254+ signalCondition(future_withdraw())
243255 }
244256 } else if (state == " working" ) {
245257 # # Withdrawal of future?
@@ -271,6 +283,23 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
271283 info(" Future %s has been resolved and results have been sent to client %s" , sQuote(future ), sQuote(client ))
272284 }
273285 }
286+ }, future_withdraw = function (c ) {
287+ # # Client withdrew future
288+ if (state == " waiting" ) {
289+ info(" client %s withdrew future %s" , sQuote(client ), sQuote(future ))
290+ state <<- " waiting"
291+ } else if (state == " offer" ) {
292+ # # FIXME: Decline work offer (although we can just ignore it
293+ # # because the client did not respect what we support)
294+ state <<- " waiting"
295+ } else if (state == " working" ) {
296+ info(" Interrupting future %s, because client %s withdrew it" , sQuote(client ), sQuote(future ))
297+ state <<- " interrupt"
298+ rx $ interrupt()
299+ }
300+ future <<- NULL
301+ client <<- NULL
302+ # # FIXME: Acknowledge withdrawal of future
274303 }, interrupt = function (c ) {
275304 info(" interrupted" )
276305 # # Interrupt worker
@@ -387,3 +416,18 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
387416
388417# # Expose function on the CLI
389418cli_fcn(worker ) <- c(" --(cluster)=(.*)" , " --(host)=(.*)" , " --(ssh_args)=(.*)" , " --(duration)=([[:digit:]]+)" )
419+
420+
421+ future_withdraw <- function (message = " future withdrawn by client" , call = NULL ) {
422+ cond <- simpleCondition(message = message , call = call )
423+ class(cond ) <- c(" future_withdraw" , class(cond ))
424+ cond
425+ }
426+
427+
428+ worker_interrupt <- function (message = " worker process interrupted" , call = NULL ) {
429+ cond <- simpleCondition(message = message , call = call )
430+ class(cond ) <- c(" worker_interrupt" , class(cond ))
431+ cond
432+ }
433+
0 commit comments