@@ -206,64 +206,67 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
206206 m <- pico_p2p_next_message(p )
207207
208208 # # Expired?
209- if (Sys.time() > expires ) {
209+ now <- Sys.time()
210+ if (now > expires ) {
210211 info(" expired" )
211212 signalCondition(future_withdraw(" worker expired; terminating" ))
212- }
213-
214- if (Sys.time() > offer_expires ) {
213+ next
214+ } else if (state == " offer" && now > offer_expires ) {
215215 info(" work offer expired" )
216- state <- " waiting"
217- offer_expires <- Inf
218- future <- NULL
219- client <- NULL
220- # # FIXME: Update client via P2P message board
221- next
216+ signalCondition(future_withdraw(" worker offer expired" ))
217+ next
222218 }
223-
219+
224220 # # Process request?
225221 if (length(m ) > 0 ) {
226222 # # Are we read to offer to do work?
227223 if (state == " waiting" && m [[" type" ]] == " request" ) {
228224 stop_if_not(is.null(future ), is.null(client ))
229225 future <- m [[" future" ]]
230226 client <- m [[" from" ]]
231- duration <- 5.0
227+
228+ # # Make a work offer for 15 seconds
229+ duration <- 15.0
232230 info(" offer to process future %s for client %s (valid for %g seconds)" , sQuote(future ), sQuote(client ), duration )
231+
233232 state <- " offer"
234- # # Make a work offer for 5 seconds
235233 m0 <- pico_p2p_take_on_future(p , to = client , future = future , duration = duration )
236234 offer_expires <- m0 [[" expires" ]]
237235 } else if (state == " offer" && future %in% m [[" future" ]]) {
238236 info(" waiting for acceptance of our work offer" )
239- if (m [[" type" ]] == " accept" && m [[" to" ]] == worker_id ) {
240- info(" client %s accepted our offer to process future %s" , sQuote(client ), sQuote(future ))
237+ if (m [[" type" ]] == " accept" ) {
238+ if (m [[" to" ]] == worker_id ) {
239+ info(" client %s accepted our offer to process future %s" , sQuote(client ), sQuote(future ))
241240
242- # # Do we support the file transfer protocol?
243- via <- m [[" via" ]]
244- uri <- parse_transfer_uri(via )
245- if (! uri [[" protocol" ]] %in% supported_transfer_protocols()) {
246- info(" non-supported protocol" )
247- signalCondition(future_withdraw(sprintf(" non-supported file-transfer protocol: %s" , uri [[" protocol" ]])))
248- }
249-
250- state <- " working"
251-
252- # # Tell worker to receive future from client
253- tx_worker(sprintf(" download=%s,via=%s" , future , via ))
254-
255- # # Wait for worker to *start* download future
256- repeat {
257- info <- rx_worker()
258- if (length(info ) == 0 ) {
259- Sys.sleep(0.1 )
260- next
241+ # # Do we support the file transfer protocol?
242+ via <- m [[" via" ]]
243+ uri <- parse_transfer_uri(via )
244+ if (! uri [[" protocol" ]] %in% supported_transfer_protocols()) {
245+ info(" non-supported protocol" )
246+ signalCondition(future_withdraw(sprintf(" non-supported file-transfer protocol: %s" , uri [[" protocol" ]])))
261247 }
262248
263- if (" downloading" %in% info ) {
264- # # FIXME: Acknowledge to work on future
265- break
249+ state <- " working"
250+
251+ # # Tell worker to receive future from client
252+ tx_worker(sprintf(" download=%s,via=%s" , future , via ))
253+
254+ # # Wait for worker to *start* download future
255+ repeat {
256+ info <- rx_worker()
257+ if (length(info ) == 0 ) {
258+ Sys.sleep(0.1 )
259+ next
260+ }
261+
262+ if (" downloading" %in% info ) {
263+ # # FIXME: Acknowledge to work on future
264+ break
265+ }
266266 }
267+ } else {
268+ info(" withdraw offer for future %s, because client %s accepted another worker's offer" , sQuote(future ), sQuote(client ))
269+ signalCondition(future_withdraw(" another worker took on the future" ))
267270 }
268271 } else if (m [[" type" ]] == " withdraw" ) {
269272 signalCondition(future_withdraw())
@@ -315,26 +318,27 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
315318 rx $ interrupt()
316319 }
317320
321+ offer_expires <<- Inf
318322 future <<- NULL
319323 client <<- NULL
320324 info(" waiting for request" )
321325 # # FIXME: Acknowledge withdrawal of future
322326 }, worker_interrupt = function (c ) {
323327 info(" Worker process was interrupted" )
324- state <- " waiting"
325- offer_expires <- Inf
326- future <- NULL
327- client <- NULL
328+ state << - " waiting"
329+ offer_expires << - Inf
330+ future << - NULL
331+ client << - NULL
328332 info(" waiting for request" )
329333 # # FIXME: Acknowledge withdrawal of future
330334 }, interrupt = function (c ) {
331335 info(" interrupted" )
332336 # # Interrupt worker
333337 rx $ interrupt()
334338 state <<- " exit"
335- offer_expires <- Inf
336- future <- NULL
337- client <- NULL
339+ offer_expires << - Inf
340+ future << - NULL
341+ client << - NULL
338342 # # FIXME: Update the P2P message board
339343 info(" exiting" )
340344 }) # # repeat tryCatch({ ... })
0 commit comments