Skip to content

Commit adfa227

Browse files
Make it possible to also interrupt a future currently being processed
1 parent 2eaf17f commit adfa227

File tree

3 files changed

+66
-9
lines changed

3 files changed

+66
-9
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: future.p2p
22
Title: A Peer-to-Peer Compute Cluster via Futureverse
3-
Version: 0.3.0-9025
3+
Version: 0.3.0-9026
44
Description: Implementation of the 'Future' API <doi:10.32614/RJ-2021-048> that resolves futures on a peer-to-peer ('P2P') compute environment. By using this future backend, you and your friends can share your spare compute resources with each other.
55
Imports:
66
future (>= 1.67.0),

R/pico_p2p.R

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,18 +279,32 @@ pico_p2p_receive_future <- function(p, via, duration = 60) {
279279

280280

281281
#' @importFrom future result
282-
pico_p2p_send_result <- function(p, future, via, duration = 60) {
282+
pico_p2p_send_result <- function(p, future, via, to, duration = 60, from = p$user) {
283283
debug <- isTRUE(getOption("future.p2p.debug"))
284284
if (debug) {
285285
mdebug_push("pico_p2p_send_result() ...")
286286
mdebugf("Future: %s", future_id(future))
287+
mdebugf("From: %s", from)
288+
mdebugf("To: %s", to)
287289
mdebugf("Via: %s", via)
288290
mdebugf("Duration: %g seconds", duration)
289291
on.exit({
290292
mdebug_pop()
291293
})
292294
}
293295

296+
## Update client about incoming result on the message board
297+
m <- data.frame(
298+
when = now_str(),
299+
expires = pico_p2p_time(delta = duration),
300+
type = "result",
301+
from = from,
302+
to = to,
303+
future = future_id(future),
304+
via = via
305+
)
306+
m_res <- pico_send_message_dataframe(p, m)
307+
294308
uri <- parse_transfer_uri(via)
295309
if (uri$protocol == "wormhole") {
296310
file <- file.path(tempdir(), sprintf("%s-FutureResult.rds", future_id(future)))
@@ -417,7 +431,10 @@ pico_p2p_dispatch_future <- function(future) {
417431
m <- pico_p2p_next_message(pico) ## This is non-block; may return NULL
418432

419433
## Skip if message is related to another future than ours
420-
if (is.null(m[["future"]]) || m[["future"]] != future_id) next
434+
if (is.null(m[["future"]]) || m[["future"]] != future_id) {
435+
Sys.sleep(0.1)
436+
next
437+
}
421438

422439
if (debug) mstr(list(m = m))
423440

@@ -470,6 +487,44 @@ pico_p2p_dispatch_future <- function(future) {
470487

471488
## 5. Wait for and receive FutureResult file
472489
update_parent("wait")
490+
repeat {
491+
## Check for interrupts
492+
if ("interrupt" %in% listen_parent()) {
493+
if (debug) mdebugf("interrupt (state = %s)", sQuote(state))
494+
m <- pico_p2p_withdraw_future(pico, future_id = future_id, to = worker)
495+
if (debug) mstr(list(m = m))
496+
}
497+
498+
## New message from message board?
499+
m <- pico_p2p_next_message(pico) ## This is non-block; may return NULL
500+
501+
## Skip if message is related to another future than ours
502+
if (is.null(m[["future"]]) || m[["future"]] != future_id) {
503+
Sys.sleep(0.1)
504+
next
505+
}
506+
507+
if (debug) mstr(list(m = m))
508+
509+
if (m[["type"]] == "result") {
510+
## Ignore, if result already expired
511+
if (Sys.time() > as.POSIXct(as.numeric(m[["expires"]]))) {
512+
if (debug) mdebug("Receiving expired results")
513+
## FIXME: We cannot just ignore the file transfer, because then the worker will
514+
## stall forever. To do that, we need the worker coordinator to interrupt the
515+
## transfer when expired.
516+
## return(list(type = "event", value = "expired"))
517+
}
518+
if (debug) mdebug("Receiving results")
519+
state <- "receive"
520+
break
521+
}
522+
523+
Sys.sleep(0.1)
524+
} ## repeat()
525+
526+
state <- "results"
527+
update_parent("results")
473528
path <- file.path(dirname(dirname(file)), "results")
474529
tryCatch({
475530
file <- pico_p2p_receive_result(pico, via = via, path = path)

R/worker.R

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
235235
if ("resolved" %in% worker_status) {
236236
state <- "resolved"
237237
info("Future %s has been resolved and results will be sent to client %s", sQuote(future), sQuote(client))
238-
## FIXME: Inform client that future has been resolved
238+
m0 <- pico_p2p_take_on_future(p, to = client, future = future, duration = duration)
239239
}
240240
next
241241
} else if (state == "resolved") {
@@ -276,7 +276,7 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
276276
state <- "working"
277277

278278
## Tell worker to receive future from client
279-
tx_worker(sprintf("download=%s,via=%s", future, via))
279+
tx_worker(sprintf("download=%s,via=%s,from=%s", future, via, client))
280280

281281
## Wait for worker to *start* download future
282282
repeat {
@@ -306,6 +306,7 @@ worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args),
306306
## because the client did not respect what we support)
307307
state <<- "waiting"
308308
} else if (state == "working") {
309+
info("Interrupting worker")
309310
state <<- "interrupt"
310311
rx$interrupt()
311312
} else {
@@ -419,11 +420,12 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
419420
}
420421

421422
## Download and process future?
422-
pattern <- "^download=([^,]+),via=(.*)$"
423+
pattern <- "^download=([^,]+),via=([^,]+),from=([^,]+)$"
423424
if (grepl(pattern, action)) {
424425
future <- sub(pattern, "\\1", action)
425426
via <- sub(pattern, "\\2", action)
426-
info("download future %s via %s", sQuote(future), sQuote(via))
427+
client <- sub(pattern, "\\3", action)
428+
info("download future %s via %s from %s", sQuote(future), sQuote(via), sQuote(client))
427429
stop_if_not(
428430
nzchar(future), !grepl("[,=]", future),
429431
nzchar(via), !grepl("[,=]", via)
@@ -440,15 +442,15 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
440442

441443
info("process future %s", sQuoteLabel(f))
442444
dt <- system.time({
443-
r <- tryCatch(result(f), error = identity)
445+
r <- tryCatch({ result(f) }, error = identity) ## Note, result() handles 'interrupt':s
444446
})
445447
dt <- difftime(dt[3], 0)
446448
info("Future %s resolved after %s", sQuote(future), format(dt))
447449
tx_parent("resolved")
448450

449451
info("sending future result %s via %s", sQuote(future), sQuote(via))
450452
dt <- system.time({
451-
res <- pico_p2p_send_result(p, future = f, via = via)
453+
res <- pico_p2p_send_result(p, future = f, to = client, via = via)
452454
})
453455
dt <- difftime(dt[3], 0)
454456
info("future result %s sent in %s", sQuote(future), format(dt))

0 commit comments

Comments
 (0)