@@ -105,7 +105,15 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
105105 future <- NULL
106106 client <- NULL
107107 offer_expires <- Inf
108-
108+
109+ # # List of known requests
110+ requests <- data.frame (
111+ future = character (0L ),
112+ expires = as.POSIXct(NA_real_ )[FALSE ],
113+ client = character (0L )
114+ )
115+ stop_if_not(is.data.frame(requests ))
116+
109117 info(" waiting for request" )
110118
111119 repeat tryCatch({
@@ -130,6 +138,7 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
130138
131139 if (" ready" %in% worker_status ) {
132140 info(" worker process is ready" )
141+ if (state == " interrupt" ) state <- " waiting"
133142 }
134143 if (" interrupted" %in% worker_status ) {
135144 signalCondition(worker_interrupt())
@@ -148,88 +157,87 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
148157
149158 # # Any messages from the P2P message board?
150159# res <- poll(list(p), ms = 100)[[1]]
151- # if (! "ready" %in% res[["output"]]) next
160+ # if ("ready" %in% res[["output"]]) {
152161
153- # # Read next message?
154- m <- pico_p2p_next_message(p )
162+ # # New message message?
163+ m <- pico_p2p_next_message(p ) # # This is non-block; may return NULL
155164
156165 # # Expired?
157166 now <- Sys.time()
158167 if (now > expires ) {
159168 info(" expired" )
160- signalCondition(future_withdraw(" worker expired; terminating" ))
169+ signalCondition(future_withdraw(" worker expired; terminating" , future = future ))
161170 next
162171 } else if (state == " offer" && now > offer_expires ) {
163172 info(" work offer expired" )
164- signalCondition(future_withdraw(" worker offer expired" ))
173+ signalCondition(future_withdraw(" worker offer expired" , future = future ))
165174 next
166175 }
167176
168- # # Process request ?
177+ # # Process messages from the message board ?
169178 if (length(m ) > 0 ) {
170- # # Are we read to offer to do work?
171- if (state == " waiting" && m [[" type" ]] == " request" ) {
172- stop_if_not(is.null(future ), is.null(client ))
173- future <- m [[" future" ]]
174- client <- m [[" from" ]]
175-
176- # # Make a work offer for 15 seconds
177- duration <- 15.0
178- info(" offer to process future %s for client %s (valid for %g seconds)" , sQuote(future ), sQuote(client ), duration )
179-
180- state <- " offer"
181- m0 <- pico_p2p_take_on_future(p , to = client , future = future , duration = duration )
182- offer_expires <- m0 [[" expires" ]]
183- } else if (state == " offer" && future %in% m [[" future" ]]) {
184- info(" waiting for acceptance of our work offer" )
185- if (m [[" type" ]] == " accept" ) {
186- if (m [[" to" ]] == worker_id ) {
187- info(" client %s accepted our offer to process future %s" , sQuote(client ), sQuote(future ))
188-
189- # # Do we support the file transfer protocol?
190- via <- m [[" via" ]]
191- uri <- parse_transfer_uri(via )
192- if (! uri [[" protocol" ]] %in% supported_transfer_protocols()) {
193- info(" non-supported protocol" )
194- signalCondition(future_withdraw(sprintf(" non-supported file-transfer protocol: %s" , uri [[" protocol" ]])))
195- }
196-
197- state <- " working"
198-
199- # # Tell worker to receive future from client
200- tx_worker(sprintf(" download=%s,via=%s" , future , via ))
201-
202- # # Wait for worker to *start* download future
203- repeat {
204- worker_status <- process_worker_messages(rx , debug = debug )
205- if (" downloading" %in% worker_status ) {
206- # # FIXME: Acknowledge to work on future
207- break
208- }
209- Sys.sleep(0.1 )
210- }
211- } else {
212- info(" withdraw offer for future %s, because client %s accepted another worker's offer" , sQuote(future ), sQuote(client ))
213- signalCondition(future_withdraw(" another worker took on the future" ))
214- }
215- } else if (m [[" type" ]] == " withdraw" ) {
216- signalCondition(future_withdraw())
217- }
218- } else if (state == " working" ) {
219- # # Withdrawal of future?
220- if (m [[" type" ]] == " withdraw" && future %in% m [[" future" ]]) {
221- signalCondition(future_withdraw())
179+ type <- m [[" type" ]]
180+
181+ # # A request?
182+ if (type == " request" ) {
183+ # # A new request?
184+ if (! m [[" future" ]] %in% requests [[" future" ]]) {
185+ request <- data.frame (
186+ future = m [[" future" ]],
187+ expires = as.POSIXct(as.numeric(m [[" expires" ]])),
188+ client = m [[" from" ]]
189+ )
190+ requests <- rbind(requests , request )
191+ stop_if_not(is.data.frame(requests ))
222192 }
223193 }
224- } # # if (length(m) > 0)
194+
195+ # # Request accepted by another worker
196+ if (m [[" type" ]] == " accept" && m [[" to" ]] != worker_id ) {
197+ info(" withdraw offer for future %s, because client %s accepted another worker's offer" , sQuote(m [[" future" ]]), sQuote(m [[" from" ]]))
198+ signalCondition(future_withdraw(" another worker took on the future" , future = m [[" future" ]]))
199+ }
200+
201+ # # Withdrawal of future?
202+ if (type == " withdraw" ) {
203+ signalCondition(future_withdraw(future = m [[" future" ]]))
204+ }
205+ }
206+
207+ # # Drop expired requests
208+ requests <- subset(requests , expires > = now )
209+ stop_if_not(is.data.frame(requests ))
225210
226- if (state == " working" ) {
211+ if (nrow(requests ) > 0 ) {
212+ if (debug ) {
213+ mdebugf(" Known requests: [n=%d]" , nrow(requests ))
214+ mprint(requests )
215+ }
216+ }
217+
218+ # # Are we read to offer to do work?
219+ if (state == " waiting" && nrow(requests ) > 0 ) {
220+ stop_if_not(is.null(future ), is.null(client ))
221+ request <- head(requests , 1L )
222+ future <- request [[" future" ]]
223+ client <- request [[" client" ]]
224+
225+ # # Make a work offer for 15 seconds
226+ duration <- 15.0
227+ info(" offer to process future %s for client %s (valid for %g seconds)" , sQuote(future ), sQuote(client ), duration )
228+
229+ state <- " offer"
230+ m0 <- pico_p2p_take_on_future(p , to = client , future = future , duration = duration )
231+ offer_expires <- m0 [[" expires" ]]
232+ next
233+ } else if (state == " working" ) {
227234 # # Check if worker is done
228235 if (" resolved" %in% worker_status ) {
229236 state <- " resolved"
230237 info(" Future %s has been resolved and results will be sent to client %s" , sQuote(future ), sQuote(client ))
231238 # # FIXME: Inform client that future has been resolved
232239 }
240+ next
233241 } else if (state == " resolved" ) {
234242 # # Check if future results have been transferred
235243 if (" ready" %in% worker_status ) {
@@ -240,7 +248,48 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
240248 info(" Future %s has been resolved and results have been sent to client %s" , sQuote(future ), sQuote(client ))
241249 info(" waiting for request" )
242250 }
251+ next
243252 }
253+
254+ if (length(m ) > 0 ) {
255+ if (state == " offer" && future %in% m [[" future" ]]) {
256+ info(" waiting for acceptance of our work offer" )
257+ if (m [[" type" ]] == " accept" && m [[" to" ]] == worker_id ) {
258+ info(" client %s accepted our offer to process future %s" , sQuote(client ), sQuote(future ))
259+
260+ # # Do we support the file transfer protocol?
261+ via <- m [[" via" ]]
262+ uri <- parse_transfer_uri(via )
263+ if (! uri [[" protocol" ]] %in% supported_transfer_protocols()) {
264+ info(" non-supported protocol" )
265+ signalCondition(future_withdraw(sprintf(" non-supported file-transfer protocol: %s" , uri [[" protocol" ]]), future = m [[" future" ]]))
266+ state <- " waiting"
267+ next
268+ }
269+
270+ # # Drop future from list of requests
271+ if (! is.null(m [[" future" ]])) {
272+ requests <- subset(requests , future != m [[" future" ]])
273+ stop_if_not(is.data.frame(requests ))
274+ }
275+
276+ state <- " working"
277+
278+ # # Tell worker to receive future from client
279+ tx_worker(sprintf(" download=%s,via=%s" , future , via ))
280+
281+ # # Wait for worker to *start* download future
282+ repeat {
283+ worker_status <- process_worker_messages(rx , debug = debug )
284+ if (" downloading" %in% worker_status ) {
285+ # # FIXME: Acknowledge to work on future
286+ break
287+ }
288+ Sys.sleep(0.1 )
289+ }
290+ }
291+ }
292+ } # # if (length(m) > 0)
244293 }, future_withdraw = function (c ) {
245294 msg <- conditionMessage(c )
246295 info <- sprintf(" state %s" , sQuote(state ))
@@ -249,22 +298,30 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
249298 info <- paste(info , collapse = " , " )
250299 msg <- sprintf(" %s [%s]" , msg , info )
251300 info(msg )
252-
253- # # Client withdrew future
254- if (state == " waiting" ) {
255- state <<- " waiting"
256- } else if (state == " offer" ) {
257- # # FIXME: Decline work offer (although we can just ignore it
258- # # because the client did not respect what we support)
259- state <<- " waiting"
260- } else if (state == " working" ) {
261- state <<- " interrupt"
262- rx $ interrupt()
301+
302+ # # Client withdrew a future we're either work on or offered to work on
303+ if (! is.null(future ) && (future %in% c [[" future" ]])) {
304+ if (state == " offer" ) {
305+ # # FIXME: Decline work offer (although we can just ignore it
306+ # # because the client did not respect what we support)
307+ state <<- " waiting"
308+ } else if (state == " working" ) {
309+ state <<- " interrupt"
310+ rx $ interrupt()
311+ } else {
312+ stop(FutureError(sprintf(" Internal error: state %s" , sQuote(state ))))
313+ }
314+ offer_expires <<- Inf
315+ future <<- NULL
316+ client <<- NULL
263317 }
264-
265- offer_expires <<- Inf
266- future <<- NULL
267- client <<- NULL
318+
319+ # # Drop future from list of requests, in case it's there
320+ if (! is.null(c [[" future" ]])) {
321+ requests <<- subset(requests , future != c [[" future" ]])
322+ stop_if_not(is.data.frame(requests ))
323+ }
324+
268325 info(" waiting for request" )
269326 # # FIXME: Acknowledge withdrawal of future
270327 }, worker_interrupt = function (c ) {
@@ -412,8 +469,9 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
412469cli_fcn(worker ) <- c(" --(cluster)=(.*)" , " --(host)=(.*)" , " --(ssh_args)=(.*)" , " --(duration)=([[:digit:]]+)" )
413470
414471
415- future_withdraw <- function (message = " future withdrawn by client" , call = NULL ) {
472+ future_withdraw <- function (message = " future withdrawn by client" , call = NULL , future = NULL ) {
416473 cond <- simpleCondition(message = message , call = call )
474+ cond [[" future" ]] <- future
417475 class(cond ) <- c(" future_withdraw" , class(cond ))
418476 cond
419477}
@@ -427,7 +485,7 @@ worker_interrupt <- function(message = "worker process interrupted", call = NULL
427485
428486
429487process_worker_messages <- function (rx , debug = FALSE ) {
430- if (debug ) {
488+ if (debug && isTRUE(getOption( " future.debug " )) ) {
431489 mdebug_push(" process_worker_messages() ..." )
432490 on.exit({
433491 mdebugf(" worker_status: [n=%d] %s" , length(worker_status ), commaq((worker_status )))
0 commit comments