Skip to content

Commit ce404bd

Browse files
Add rudimentary support for canceling and interrupting future.p2p futures
1 parent 12c57d3 commit ce404bd

File tree

5 files changed

+127
-19
lines changed

5 files changed

+127
-19
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: future.p2p
22
Title: A Peer-to-Peer Compute Cluster via Futureverse
3-
Version: 0.3.0-9004
3+
Version: 0.3.0-9005
44
Description: Implementation of the 'Future' API <doi:10.32614/RJ-2021-048> that resolves futures on a peer-to-peer ('P2P') compute environment. By using this future backend, you and your friends can share your spare compute resources with each other.
55
Imports:
66
future (>= 1.67.0),

NAMESPACE

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Generated by roxygen2: do not edit by hand
22

3+
S3method(interruptFuture,PicoP2PFutureBackend)
34
S3method(launchFuture,PicoP2PFutureBackend)
45
S3method(nbrOfFreeWorkers,PicoP2PFutureBackend)
56
S3method(nbrOfWorkers,PicoP2PFutureBackend)
@@ -19,8 +20,11 @@ importFrom(callr,r_bg)
1920
importFrom(future,Future)
2021
importFrom(future,FutureBackend)
2122
importFrom(future,FutureError)
23+
importFrom(future,FutureInterruptError)
24+
importFrom(future,FutureResult)
2225
importFrom(future,UnexpectedFutureResultError)
2326
importFrom(future,future)
27+
importFrom(future,interruptFuture)
2428
importFrom(future,launchFuture)
2529
importFrom(future,nbrOfFreeWorkers)
2630
importFrom(future,nbrOfWorkers)

R/PicoP2PFuture-class.R

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ result.PicoP2PFuture <- function(future, ...) {
8888
rx$wait()
8989

9090
## Get the results
91-
file <- rx$get_result()
91+
response <- rx$get_result()
9292

9393
## Finalize the 'callr' process, which includes removing any temporary
9494
## files that it created
@@ -102,21 +102,42 @@ result.PicoP2PFuture <- function(future, ...) {
102102
}
103103

104104
future[["rx"]] <- NULL
105-
if (debug) mdebugf("FutureResult file: %s [%g bytes]", sQuote(file), file.size(file))
106-
if (!file_test("-f", file)) {
107-
stop(FutureError(sprintf("FutureResult file not found: ", sQuote(file))), future = future)
108-
}
109-
110-
result <- local({
111-
if (debug) {
112-
mdebug_push("Reading FutureResult from file")
113-
mdebugf("FutureResult file: %s [%g bytes]", sQuote(file), file.size(file))
114-
on.exit(mdebug_pop())
105+
106+
stop_if_not(is.list(response))
107+
type <- response[["type"]]
108+
if (type == "event") {
109+
event <- response[["value"]]
110+
if (event == "interrupted") {
111+
result <- FutureResult(
112+
conditions = list(list(
113+
condition = structure(list(), class = c("interrupt", "condition")),
114+
signaled = 0L
115+
)),
116+
uuid = future[["uuid"]]
117+
)
118+
} else {
119+
stop(FutureError(sprintf("Unknown event from future dispatcher: event = %s", sQuote(event))))
115120
}
116-
result <- readRDS(file)
117-
file.remove(file)
118-
result
119-
})
121+
} else if (type == "file") {
122+
file <- response[["value"]]
123+
if (debug) mdebugf("FutureResult file: %s [%g bytes]", sQuote(file), file.size(file))
124+
if (!file_test("-f", file)) {
125+
stop(FutureError(sprintf("FutureResult file not found: ", sQuote(file))), future = future)
126+
}
127+
128+
result <- local({
129+
if (debug) {
130+
mdebug_push("Reading FutureResult from file")
131+
mdebugf("FutureResult file: %s [%g bytes]", sQuote(file), file.size(file))
132+
on.exit(mdebug_pop())
133+
}
134+
result <- readRDS(file)
135+
file.remove(file)
136+
result
137+
})
138+
} else {
139+
stop(FutureError(sprintf("Unknown response from future dispatcher: type = %s", sQuote(type))))
140+
}
120141

121142
future[["result"]] <- result
122143

R/PicoP2PFutureBackend-class.R

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,43 @@ launchFuture.PicoP2PFutureBackend <- function(backend, future, ...) {
144144
} ## launchFuture()
145145

146146

147+
#' @importFrom future interruptFuture
148+
#' @export
149+
interruptFuture.PicoP2PFutureBackend <- function(backend, future, ...) {
150+
debug <- isTRUE(getOption("future.debug"))
151+
if (debug) {
152+
mdebugf_push("interruptFuture(<%s>, future = <%s>, ...) ...", class(backend)[1], class(future)[1])
153+
on.exit(mdebugf_pop())
154+
}
155+
156+
## Has interrupts been disabled by user?
157+
if (!backend[["interrupts"]]) {
158+
if (debug) mdebug("Skipping, because interrupts are disabled for this backend")
159+
return(future)
160+
}
161+
162+
rx <- future[["rx"]]
163+
if (is.null(rx)) {
164+
if (debug) mdebug("Skipping, because there is no future dispatcher processes")
165+
return(future)
166+
}
167+
168+
channels <- attr(rx, "channels", exact = TRUE)
169+
tx <- channels[["tx"]]
170+
if (is.null(tx)) {
171+
if (debug) mdebug("Skipping, because there is no communication channel to future dispatcher processes")
172+
return(future)
173+
}
174+
175+
cat("interrupt", file = tx, append = TRUE)
176+
if (debug) mdebug("Sent 'interrupt' to future dispatcher processes")
177+
178+
future[["state"]] <- "interrupted"
179+
180+
future
181+
}
182+
183+
147184
#' @importFrom future nbrOfWorkers
148185
#' @export
149186
nbrOfWorkers.PicoP2PFutureBackend <- function(evaluator) {

R/pico_p2p.R

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ pico_p2p_hosted_clusters <- function(host = "pipe.pico.sh", ssh_args = NULL, tim
294294

295295

296296

297+
#' @importFrom future FutureInterruptError FutureResult
297298
#' @importFrom callr r_bg
298299
#' @importFrom utils file_test
299300
pico_p2p_dispatch_future <- function(future) {
@@ -312,21 +313,59 @@ pico_p2p_dispatch_future <- function(future) {
312313
pico_p2p_receive_result <- import_future.p2p("pico_p2p_receive_result")
313314

314315
update_parent <- function(msg, ...) {
316+
rx <- channels[["rx"]]
317+
if (is.null(rx)) return()
315318
msg <- sprintf("%s\n", msg)
316-
cat(msg, file = channels[["rx"]], append = TRUE)
319+
cat(msg, file = rx, append = TRUE)
317320
}
318321

322+
listen_parent <- function(...) {
323+
tx <- channels[["tx"]]
324+
if (is.null(tx)) return(character(0L))
325+
readLines(tx, n = 1e6, warn = FALSE)
326+
}
327+
328+
## Check for interrupts
329+
if ("interrupt" %in% listen_parent()) {
330+
return(list(type = "event", value = "interrupted"))
331+
}
332+
319333
update_parent("connecting")
320334
pico <- pico_pipe(topic, user = name, host = host, ssh_args = ssh_args)
335+
336+
## Check for interrupts
337+
if ("interrupt" %in% listen_parent()) {
338+
return(list(type = "event", value = "interrupted"))
339+
}
340+
321341
update_parent("announcement")
322342
m <- pico_p2p_hello(pico, type = "client")
343+
## Check for interrupts
344+
if ("interrupt" %in% listen_parent()) {
345+
return(list(type = "event", value = "interrupted"))
346+
}
347+
323348
update_parent("connected")
324349

325350
## 2. Announce future
326351
repeat {
327352
update_parent("request")
328353
m1 <- pico_p2p_have_future(pico, future = file, duration = duration)
354+
355+
## Check for interrupts
356+
if ("interrupt" %in% listen_parent()) {
357+
## TODO: Withdraw future
358+
return(list(type = "event", value = "interrupted"))
359+
}
360+
329361
m2 <- pico_p2p_wait_for(pico, type = "offer", futures = m1[["future"]], expires = m1[["expires"]])
362+
363+
## Check for interrupts
364+
if ("interrupt" %in% listen_parent()) {
365+
## TODO: Withdraw future
366+
return(list(type = "event", value = "interrupted"))
367+
}
368+
330369
if (m2[["type"]] != "expired") break
331370
}
332371

@@ -339,18 +378,25 @@ pico_p2p_dispatch_future <- function(future) {
339378
## 4. Remove temporary file
340379
file.remove(file)
341380

381+
## Check for interrupts
382+
if ("interrupt" %in% listen_parent()) {
383+
## TODO: Withdraw future
384+
return(list(type = "event", value = "interrupted"))
385+
}
386+
342387
## 5. Wait for and receive FutureResult file
343388
update_parent("wait")
344389
path <- file.path(dirname(dirname(file)), "results")
345390
tryCatch({
346391
file <- pico_p2p_receive_result(pico, via = via, path = path)
347392
}, interrupt = function(int) {
348-
cat(file = "foo.log", "interrupted\n")
393+
## TODO: Withdraw future
394+
return(list(type = "event", value = "interrupted"))
349395
})
350396

351397
update_parent("result")
352398

353-
invisible(file)
399+
list(type = "file", value = file)
354400
} ## send_future()
355401

356402
debug <- isTRUE(getOption("future.p2p.debug"))

0 commit comments

Comments
 (0)