@@ -114,7 +114,7 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
114114 )
115115 stop_if_not(is.data.frame(requests ))
116116
117- info(" waiting for request" )
117+ info(sprintf( " waiting for request [state=%s] " , state ) )
118118
119119 repeat tryCatch({
120120 # # Is the worker process still alive
@@ -133,51 +133,57 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
133133
134134 # # Handle worker status updates
135135 if (length(worker_status ) > 0 ) {
136- worker_status <- sub(" ^worker_status=" , " " , worker_status )
137136 info(" Status update received from worker: [n=%d] %s" , length(worker_status ), commaq(worker_status ))
138137
138+ if (! is.null(future )) {
139+ if (" resolved" %in% worker_status ) {
140+ state <- " resolved"
141+ info(" Future %s has been resolved" , sQuote(future ))
142+ }
143+
144+ if (" uploading" %in% worker_status ) {
145+ state <- " uploading"
146+ info(" Future results %s are being sent to client %s" , sQuote(future ), sQuote(client ))
147+ }
148+ }
149+
139150 if (" ready" %in% worker_status ) {
140- info(" worker process is ready" )
141- if (state == " interrupt" ) state <- " waiting"
151+ info(" worker process is ready" )
152+ # # Drop future from list of requests
153+ if (! is.null(future )) {
154+ drop <- future
155+ requests <- subset(requests , future != drop )
156+ stop_if_not(is.data.frame(requests ))
157+ }
158+ state <- " waiting"
159+ offer_expires <- Inf
160+ future <- NULL
161+ client <- NULL
162+ info(" Future %s has been resolved and results have been sent to client %s" , sQuote(future ), sQuote(client ))
163+ info(sprintf(" waiting for request [state=%s]" , state ))
142164 }
165+
143166 if (" interrupted" %in% worker_status ) {
144167 signalCondition(worker_interrupt())
145168 }
146- }
169+ } # # if (length(worker_status) > 0)
147170
148- # # Expired?
149- if (Sys.time() > expires ) {
150- info(" time is out" )
151- # # FIXME: Update the P2P message board
152- rx $ interrupt()
153- future <- NULL
154- client <- NULL
155- break
156- }
157-
158- # # Any messages from the P2P message board?
159- # res <- poll(list(p), ms = 100)[[1]]
160- # if ("ready" %in% res[["output"]]) {
161-
162- # # New message message?
163- m <- pico_p2p_next_message(p ) # # This is non-block; may return NULL
164-
165171 # # Expired?
166172 now <- Sys.time()
167173 if (now > expires ) {
168174 info(" expired" )
169175 signalCondition(future_withdraw(" worker expired; terminating" , future = future ))
170- next
171176 } else if (state == " offer" && now > offer_expires ) {
172177 info(" work offer expired" )
173178 signalCondition(future_withdraw(" worker offer expired" , future = future ))
174- next
175179 }
176-
177- # # Process messages from the message board?
180+
181+ # # New message message?
182+ m <- pico_p2p_next_message(p ) # # This is non-block; may return NULL
183+
178184 if (length(m ) > 0 ) {
179185 type <- m [[" type" ]]
180-
186+
181187 # # A request?
182188 if (type == " request" ) {
183189 # # A new request?
@@ -191,26 +197,32 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
191197 stop_if_not(is.data.frame(requests ))
192198 }
193199 }
194-
200+
195201 # # Request accepted by another worker
196202 if (m [[" type" ]] == " accept" && m [[" to" ]] != worker_id ) {
197- info(" withdraw offer for future %s, because client %s accepted another worker's offer" , sQuote(m [[" future" ]]), sQuote(m [[" from" ]]))
198- signalCondition(future_withdraw(" another worker took on the future" , future = m [[" future" ]]))
203+ if (state %in% " offer" ) {
204+ info(" withdraw offer for future %s, because client %s accepted another worker's offer" , sQuote(m [[" future" ]]), sQuote(m [[" from" ]]))
205+ signalCondition(future_withdraw(sprintf(" another worker took on the future (state %s)" , sQuote(state )), future = m [[" future" ]]))
206+ } else {
207+ info(" drop request for future %s, because accepted by another worker" , sQuote(m [[" future" ]]))
208+ signalCondition(future_withdraw(sprintf(" drop request for future (state %s)" , sQuote(state )), future = m [[" future" ]]))
209+ }
199210 }
200211
201212 # # Withdrawal of future?
202213 if (type == " withdraw" ) {
203214 signalCondition(future_withdraw(future = m [[" future" ]]))
215+ next
204216 }
205- }
217+ } # # if (length(m) > 0)
206218
207219 # # Drop expired requests
208220 requests <- subset(requests , expires > = now )
209221 stop_if_not(is.data.frame(requests ))
210222
211223 if (nrow(requests ) > 0 ) {
212224 if (debug ) {
213- mdebugf(" Known requests: [n=%d]" , nrow(requests ))
225+ mdebugf(" Known requests: [n=%d] (state %s) " , nrow(requests ), sQuote( state ))
214226 mprint(requests )
215227 }
216228 }
@@ -229,25 +241,9 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
229241 state <- " offer"
230242 m0 <- pico_p2p_take_on_future(p , to = client , future = future , duration = duration )
231243 offer_expires <- m0 [[" expires" ]]
232- next
233- } else if (state == " working" ) {
234- # # Check if worker is done
235- if (" resolved" %in% worker_status ) {
236- state <- " resolved"
237- info(" Future %s has been resolved and results will be sent to client %s" , sQuote(future ), sQuote(client ))
238- m0 <- pico_p2p_take_on_future(p , to = client , future = future , duration = duration )
239- }
240- next
241- } else if (state == " resolved" ) {
242- # # Check if future results have been transferred
243- if (" ready" %in% worker_status ) {
244- state <- " waiting"
245- offer_expires <- Inf
246- future <- NULL
247- client <- NULL
248- info(" Future %s has been resolved and results have been sent to client %s" , sQuote(future ), sQuote(client ))
249- info(" waiting for request" )
250- }
244+
245+ # # Drop from known requests
246+ requests <- subset(requests , future != request [[" future" ]])
251247 next
252248 }
253249
@@ -274,10 +270,11 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
274270 }
275271
276272 state <- " working"
273+ info(" downloading" )
277274
278275 # # Tell worker to receive future from client
279276 tx_worker(sprintf(" download=%s,via=%s,from=%s" , future , via , client ))
280-
277+
281278 # # Wait for worker to *start* download future
282279 repeat {
283280 worker_status <- process_worker_messages(rx , debug = debug )
@@ -304,34 +301,33 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
304301 if (state == " offer" ) {
305302 # # FIXME: Decline work offer (although we can just ignore it
306303 # # because the client did not respect what we support)
307- state <<- " waiting"
308- } else if (state == " working" ) {
309- info(" Interrupting worker" )
310- state <<- " interrupt"
304+ } else if (state %in% c(" working" , " resolved" )) {
305+ info(sprintf(" Interrupting worker [state %s]" , state ))
311306 rx $ interrupt()
312307 } else {
313308 stop(FutureError(sprintf(" Internal error: state %s" , sQuote(state ))))
314309 }
310+ state <<- " waiting"
315311 offer_expires <<- Inf
316312 future <<- NULL
317313 client <<- NULL
314+
315+ info(sprintf(" waiting for request [state=%s]" , state ))
318316 }
319317
320318 # # Drop future from list of requests, in case it's there
321319 if (! is.null(c [[" future" ]])) {
322320 requests <<- subset(requests , future != c [[" future" ]])
323321 stop_if_not(is.data.frame(requests ))
324322 }
325-
326- info(" waiting for request" )
327323 # # FIXME: Acknowledge withdrawal of future
328324 }, worker_interrupt = function (c ) {
329325 info(" Worker process was interrupted" )
330326 state <<- " waiting"
331327 offer_expires <<- Inf
332328 future <<- NULL
333329 client <<- NULL
334- info(" waiting for request" )
330+ info(sprintf( " waiting for request [state=%s] " , state ) )
335331 # # FIXME: Acknowledge withdrawal of future
336332 }, interrupt = function (c ) {
337333 info(" interrupted" )
@@ -411,6 +407,7 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
411407 # # Tell parent that worker is ready
412408 tx_parent(" ready" )
413409
410+ state <- " ready"
414411 repeat tryCatch({
415412 # # Wait for instructions from parent
416413 action <- rx_parent()
@@ -431,6 +428,7 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
431428 nzchar(via ), ! grepl(" [,=]" , via )
432429 )
433430 tx_parent(" downloading" )
431+ state <- " downloading"
434432 dt <- system.time({
435433 res <- pico_p2p_receive_future(p , via = via )
436434 })
@@ -441,6 +439,7 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
441439 stop_if_not(paste(f [[" uuid" ]], collapse = " -" ) == future )
442440
443441 info(" process future %s" , sQuoteLabel(f ))
442+ state <- " processing"
444443 dt <- system.time({
445444 r <- tryCatch({ result(f ) }, error = identity ) # # Note, result() handles 'interrupt':s
446445 })
@@ -449,16 +448,21 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
449448 tx_parent(" resolved" )
450449
451450 info(" sending future result %s via %s" , sQuote(future ), sQuote(via ))
451+ state <- " uploading"
452+ # # NOTE, this may be interrupted
452453 dt <- system.time({
453- res <- pico_p2p_send_result(p , future = f , to = client , via = via )
454+ pico_p2p_send_result(p , future = f , to = client , via = via )
454455 })
455456 dt <- difftime(dt [3 ], 0 )
456457 info(" future result %s sent in %s" , sQuote(future ), format(dt ))
457458 tx_parent(" ready" )
459+ state <- " ready"
458460 }
459461 }, interrupt = function (c ) {
460- info(" interrupted" )
462+ info(sprintf( " interrupted [state %s] " , sQuote( state )) )
461463 tx_parent(" interrupted" )
464+ state <<- " ready"
465+ tx_parent(" ready" )
462466 }) # # repeat tryCatch({ ... })
463467
464468 info(" bye" )
0 commit comments