@@ -348,7 +348,6 @@ pico_p2p_dispatch_future <- function(future) {
348348 tx <- channels [[" tx" ]]
349349 if (is.null(tx )) return (character (0L ))
350350 bfr <- readLines(tx , n = 1e6 , warn = FALSE )
351- if (debug ) mstr(list (" listen_parent" , bfr = bfr ))
352351 bfr
353352 }
354353
@@ -378,62 +377,94 @@ pico_p2p_dispatch_future <- function(future) {
378377 update_parent(" connected" )
379378
380379 worker <- " <none>"
381-
382- # # 2. Announce future
383- requested <- FALSE
380+
381+ # # 2. Announce future and wait for work offer
382+ state <- " started"
383+ request_expires <- - 1
384384 repeat {
385385 # # Check for interrupts
386386 if (" interrupt" %in% listen_parent()) {
387- if (requested ) {
388- if (debug ) mdebug (" interrupt after request " )
389- m0 <- pico_p2p_withdraw_future(pico , future_id = future_id , to = worker )
390- if (debug ) mstr(list (m0 = m0 ))
387+ if (state == " request " ) {
388+ if (debug ) mdebugf (" interrupt (state = %s) " , sQuote( state ) )
389+ m <- pico_p2p_withdraw_future(pico , future_id = future_id , to = worker )
390+ if (debug ) mstr(list (m = m ))
391391 } else {
392- if (debug ) mdebug (" interrupt before request " )
392+ if (debug ) mdebugf (" interrupt ignored (state = %s) " , sQuote( state ) )
393393 }
394394 return (list (type = " event" , value = " interrupted" ))
395395 }
396+
397+ if (state == " started" ) {
398+ update_parent(" request" )
399+ m <- pico_p2p_have_future(pico , future = file , duration = duration )
400+ state <- " request"
401+ request_expires <- as.POSIXct(as.numeric(m [[" expires" ]]))
402+ next
403+ }
404+
405+ # # Has request expired
406+ if (state == " request" ) {
407+ # # Ignore, if requet already expired
408+ if (Sys.time() > request_expires ) {
409+ if (debug ) mdebugf(" withdraw request (state = %s)" , sQuote(state ))
410+ m <- pico_p2p_withdraw_future(pico , future_id = future_id , to = worker )
411+ if (debug ) mstr(list (m = m ))
412+ return (list (type = " event" , value = " expired" ))
413+ }
414+ }
415+
416+ # # New message from message board?
417+ m <- pico_p2p_next_message(pico ) # # This is non-block; may return NULL
418+
419+ # # Skip if message is related to another future than ours
420+ if (is.null(m [[" future" ]]) || m [[" future" ]] != future_id ) next
396421
397- update_parent(" request" )
398- m1 <- pico_p2p_have_future(pico , future = file , duration = duration )
399- requested <- TRUE
422+ if (debug ) mstr(list (m = m ))
400423
401- # # Check for interrupts
402- if (" interrupt" %in% listen_parent()) {
403- if (debug ) mdebug(" interrupt after request" )
404- m0 <- pico_p2p_withdraw_future(pico , future_id = future_id , to = worker )
405- if (debug ) mstr(list (m0 = m0 ))
406- return (list (type = " event" , value = " interrupted" ))
424+ if (state == " request" && m [[" type" ]] == " offer" ) {
425+ # # Ignore, if offer already expired
426+ if (Sys.time() > as.POSIXct(as.numeric(m [[" expires" ]]))) {
427+ if (debug ) mdebug(" Received expired work offer" )
428+ next
429+ }
430+ if (debug ) mdebug(" Received work offer" )
431+ state <- " offer"
432+ break
407433 }
408434
409- m2 <- pico_p2p_wait_for(pico , type = " offer" , futures = m1 [[" future" ]], expires = m1 [[" expires" ]])
410-
411- if (m2 [[" type" ]] != " expired" ) break
435+ Sys.sleep(0.1 )
436+ } # # repeat()
437+
438+ if (debug ) {
439+ mdebug(" Work offer:" )
440+ mstr(list (state = state , m = m ))
412441 }
413442
414- # # 3. Send future to workers
415- worker <- m2 [[" from" ]]
443+ # # 3. Send future to worker
444+ worker <- m [[" from" ]]
416445 stopifnot(is.character(worker ), nzchar(worker ))
417446
418447 # # Check for interrupts
419448 if (" interrupt" %in% listen_parent()) {
420- if (debug ) mdebug (" interrupt after offer " )
421- m0 <- pico_p2p_withdraw_future(pico , future_id = future_id , to = worker )
422- if (debug ) mstr(list (m0 = m0 ))
449+ if (debug ) mdebugf (" interrupt (state = %s) " , sQuote( state ) )
450+ m <- pico_p2p_withdraw_future(pico , future_id = future_id , to = worker )
451+ if (debug ) mstr(list (m = m ))
423452 return (list (type = " event" , value = " interrupted" ))
424453 }
425454
426455 update_parent(" send" )
427- m3 <- pico_p2p_send_future(pico , future = file , to = worker , via = via )
428-
456+ m <- pico_p2p_send_future(pico , future = file , to = worker , via = via )
457+ if (debug ) mstr(list (m = m ))
458+ state <- " processing"
459+
429460 # # 4. Remove temporary file
430461 file.remove(file )
431462
432463 # # Check for interrupts
433464 if (" interrupt" %in% listen_parent()) {
434- if (debug ) mdebug (" interrupt after future sent " )
435- m0 <- pico_p2p_withdraw_future(pico , future_id = future_id , to = worker )
436- if (debug ) mstr(list (m0 = m0 ))
465+ if (debug ) mdebugf (" interrupt (state = %s) " , sQuote( state ) )
466+ m <- pico_p2p_withdraw_future(pico , future_id = future_id , to = worker )
467+ if (debug ) mstr(list (m = m ))
437468 return (list (type = " event" , value = " interrupted" ))
438469 }
439470
@@ -443,9 +474,9 @@ pico_p2p_dispatch_future <- function(future) {
443474 tryCatch({
444475 file <- pico_p2p_receive_result(pico , via = via , path = path )
445476 }, interrupt = function (int ) {
446- if (debug ) mdebug (" interrupt while receiving future result " )
447- m0 <- pico_p2p_withdraw_future(pico , future_id = future_id , to = worker )
448- if (debug ) mstr(list (m0 = m0 ))
477+ if (debug ) mdebugf (" interrupt (state = %s) " , sQuote( state ) )
478+ m <- pico_p2p_withdraw_future(pico , future_id = future_id , to = worker )
479+ if (debug ) mstr(list (m = m ))
449480 return (list (type = " event" , value = " interrupted" ))
450481 })
451482
0 commit comments