Skip to content

Commit 37ff63c

Browse files
Merge branch 'release/0.4.0'
2 parents 3270d15 + 274d6fb commit 37ff63c

22 files changed

+1018
-141
lines changed

DESCRIPTION

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
Package: future.p2p
22
Title: A Peer-to-Peer Compute Cluster via Futureverse
3-
Version: 0.3.0
3+
Version: 0.4.0
44
Description: Implementation of the 'Future' API <doi:10.32614/RJ-2021-048> that resolves futures on a peer-to-peer ('P2P') compute environment. By using this future backend, you and your friends can share your spare compute resources with each other.
55
Imports:
66
future (>= 1.67.0),
77
processx,
8-
callr
8+
callr,
9+
utils
910
Suggests:
1011
commonmark,
1112
base64enc
@@ -18,4 +19,4 @@ URL: https://future.p2p.futureverse.org/
1819
BugReports: https://github.com/futureverse/future.p2p/issues
1920
Encoding: UTF-8
2021
Roxygen: list(markdown = TRUE)
21-
RoxygenNote: 7.3.2
22+
RoxygenNote: 7.3.3

NAMESPACE

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Generated by roxygen2: do not edit by hand
22

3+
S3method(interruptFuture,PicoP2PFutureBackend)
34
S3method(launchFuture,PicoP2PFutureBackend)
45
S3method(nbrOfFreeWorkers,PicoP2PFutureBackend)
56
S3method(nbrOfWorkers,PicoP2PFutureBackend)
@@ -19,8 +20,11 @@ importFrom(callr,r_bg)
1920
importFrom(future,Future)
2021
importFrom(future,FutureBackend)
2122
importFrom(future,FutureError)
23+
importFrom(future,FutureInterruptError)
24+
importFrom(future,FutureResult)
2225
importFrom(future,UnexpectedFutureResultError)
2326
importFrom(future,future)
27+
importFrom(future,interruptFuture)
2428
importFrom(future,launchFuture)
2529
importFrom(future,nbrOfFreeWorkers)
2630
importFrom(future,nbrOfWorkers)
@@ -31,9 +35,11 @@ importFrom(future,resolved)
3135
importFrom(future,result)
3236
importFrom(future,run)
3337
importFrom(future,sequential)
38+
importFrom(processx,poll)
3439
importFrom(processx,process)
3540
importFrom(utils,capture.output)
3641
importFrom(utils,download.file)
3742
importFrom(utils,file_test)
43+
importFrom(utils,head)
3844
importFrom(utils,packageVersion)
3945
importFrom(utils,str)

NEWS.md

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,28 @@
1+
# Version 0.4.0 [2025-11-20]
2+
3+
## New Features
4+
5+
* Add support for canceling and interrupting 'future.p2p' futures.
6+
7+
## Bug Fixes
8+
9+
* The `future.p2p` backend was holding on to temporary **callr**
10+
files longer than necessary. Such files were only removed when the
11+
future object itself was removed. This would result in a large
12+
number of temporary files accumulating where there were many
13+
futures processed. Now the backend finalizes the **callr** process
14+
as soon as the future results have been collected, which results in
15+
removing temporary files created by callr sooner. Previously, the
16+
finalizer was only run when the future object was removed and
17+
garbage collected.
18+
19+
* Package gave errors on "Error in as.POSIXct.numeric(time1) :
20+
'origin' must be supplied" when using R (< 4.3.0).
21+
22+
* Package failed to install 'wormhole-williams' automatically on ARM7
23+
machines like Raspberry Pi.
24+
25+
126
# Version 0.3.0 [2025-08-26]
227

328
## Significant Changes
@@ -24,4 +49,3 @@
2449
# Version 0.1.0 [2025-08-10]
2550

2651
This is the first public version of the **future.p2p** package.
27-

R/001.import_future_functions.R

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,13 @@ import_future_functions <- function() {
1515

1616
.debug <<- import_future(".debug", mode = "environment", default = new.env(parent = emptyenv()))
1717
}
18+
19+
20+
#' @importFrom utils packageVersion
21+
future_supports_state_submitted <- local({
22+
.value <- NA
23+
function() {
24+
if (is.na(.value)) .value <<- (packageVersion("future") > "1.67.0")
25+
.value
26+
}
27+
})

R/PicoP2PFuture-class.R

Lines changed: 87 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,41 @@ resolved.PicoP2PFuture <- function(x, .signalEarly = TRUE, ...) {
2424

2525
## Still running?
2626
rx <- future[["rx"]]
27-
resolved <- !rx$is_alive()
27+
if (is.null(rx)) {
28+
resolved <- TRUE
29+
} else {
30+
resolved <- !rx$is_alive()
31+
dispatcher_status <- process_dispatcher_messages(rx, debug = debug)
32+
}
33+
34+
state <- future[["state"]]
35+
36+
## Update state?
37+
if (state == "submitted" || state == "running") {
38+
if (debug) mdebugf("Future state before: %s", commaq(state))
39+
if (debug) mdebugf("Child process updates: [n=%d] %s", length(dispatcher_status), commaq(dispatcher_status))
40+
if ("wait" %in% dispatcher_status) state <- "running"
41+
future[["state"]] <- state
42+
if (debug) mdebugf("Future state after: %s", commaq(state))
43+
}
44+
45+
## Remove communication channels?
46+
if (resolved) {
47+
future[["state"]] <- "finished"
48+
channels <- attr(rx, "channels", exact = TRUE)
49+
if (length(channels) > 0) {
50+
if (debug) mdebugf("Removing communication channel files: [n=%d] %s", length(channels), commaq(channels))
51+
stop_if_not(is.character(channels))
52+
file.remove(channels)
53+
attr(rx, "channels") <- NULL
54+
future[["rx"]] <- rx
55+
}
56+
}
2857

2958
resolved
3059
}
3160

61+
3262
#' @importFrom future result UnexpectedFutureResultError
3363
#' @keywords internal
3464
#' @export
@@ -51,25 +81,66 @@ result.PicoP2PFuture <- function(future, ...) {
5181
}
5282

5383
rx <- future[["rx"]]
84+
5485
if (debug) mdebug("Waiting for dispatch process to finish")
5586
rx$wait()
56-
file <- rx$get_result()
57-
future[["rx"]] <- NULL
58-
if (debug) mdebugf("FutureResult file: %s [%g bytes]", sQuote(file), file.size(file))
59-
if (!file_test("-f", file)) {
60-
stop(FutureError(sprintf("FutureResult file not found: ", sQuote(file))), future = future)
87+
88+
## Get the results
89+
response <- rx$get_result()
90+
91+
## Relay output, handle messages
92+
dispatcher_status <- process_dispatcher_messages(rx, debug = debug)
93+
94+
## Finalize the 'callr' process, which includes removing any temporary
95+
## files that it created
96+
rx$finalize()
97+
98+
## Remove communication channels
99+
channels <- attr(rx, "channels", exact = TRUE)
100+
if (length(channels) > 0) {
101+
if (debug) mdebugf("Removing communication channel files: [n=%d] %s", length(channels), commaq(channels))
102+
stop_if_not(is.character(channels))
103+
file.remove(channels)
61104
}
62-
63-
result <- local({
64-
if (debug) {
65-
mdebug_push("Reading FutureResult from file")
66-
mdebugf("FutureResult file: %s [%g bytes]", sQuote(file), file.size(file))
67-
on.exit(mdebug_pop())
105+
106+
future[["rx"]] <- NULL
107+
108+
stop_if_not(is.list(response))
109+
type <- response[["type"]]
110+
if (type == "event") {
111+
event <- response[["value"]]
112+
if (event == "interrupted") {
113+
result <- FutureResult(
114+
conditions = list(list(
115+
condition = structure(list(), class = c("interrupt", "condition")),
116+
signaled = 0L
117+
)),
118+
uuid = future[["uuid"]]
119+
)
120+
} else {
121+
stop(FutureError(sprintf("Unknown event from future dispatcher: event = %s", sQuote(event))))
122+
}
123+
} else if (type == "file") {
124+
file <- response[["value"]]
125+
if (debug) mdebugf("FutureResult file: %s [%g bytes]", sQuote(file), file.size(file))
126+
if (!file_test("-f", file)) {
127+
stop(FutureError(sprintf("FutureResult file not found: ", sQuote(file))), future = future)
68128
}
69-
result <- readRDS(file)
70-
file.remove(file)
71-
result
72-
})
129+
130+
result <- local({
131+
if (debug) {
132+
mdebug_push("Reading FutureResult from file")
133+
mdebugf("FutureResult file: %s [%g bytes]", sQuote(file), file.size(file))
134+
on.exit(mdebug_pop())
135+
}
136+
result <- readRDS(file)
137+
stop_if_not(is.character(file))
138+
file.remove(file)
139+
result
140+
})
141+
} else {
142+
stop(FutureError(sprintf("Unknown response from future dispatcher: type = %s", sQuote(type))))
143+
}
73144

74145
future[["result"]] <- result
75146

R/PicoP2PFutureBackend-class.R

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ PicoP2PFutureBackend <- function(cluster = p2p_cluster_name(), host = "pipe.pico
100100
core <- structure(core, class = c("PicoP2PFutureBackend", "MultiprocessFutureBackend", "FutureBackend", class(core)))
101101

102102
if (!p2p_can_connect(cluster, name = name, host = host, ssh_args = ssh_args)) {
103-
stop(sprintf("Cannot connect to P2P cluster %s - make sure they have given you (%s) access", sQuote(cluster), sQuote(pico_username())))
103+
stop(sprintf("Cannot connect to P2P cluster %s - make sure they have given you (%s) access", sQuote(cluster), sQuote(pico_username(host = host, ssh_args = ssh_args))))
104104
}
105105

106106
core
@@ -144,6 +144,48 @@ launchFuture.PicoP2PFutureBackend <- function(backend, future, ...) {
144144
} ## launchFuture()
145145

146146

147+
#' @importFrom future interruptFuture
148+
#' @export
149+
interruptFuture.PicoP2PFutureBackend <- function(backend, future, ...) {
150+
debug <- isTRUE(getOption("future.debug"))
151+
if (debug) {
152+
mdebugf_push("interruptFuture(<%s>, future = <%s>, ...) ...", class(backend)[1], class(future)[1])
153+
on.exit(mdebugf_pop())
154+
}
155+
156+
## Has interrupts been disabled by user?
157+
if (!backend[["interrupts"]]) {
158+
if (debug) mdebug("Skipping, because interrupts are disabled for this backend")
159+
return(future)
160+
}
161+
162+
rx <- future[["rx"]]
163+
if (is.null(rx)) {
164+
if (debug) mdebug("Skipping, because there is no future dispatcher processes")
165+
return(future)
166+
}
167+
168+
channels <- attr(rx, "channels", exact = TRUE)
169+
tx <- channels[["tx"]]
170+
if (is.null(tx)) {
171+
if (debug) mdebug("Skipping, because there is no communication channel to future dispatcher processes")
172+
return(future)
173+
}
174+
175+
local({
176+
con <- file(tx, open = "w")
177+
on.exit(close(con))
178+
cat("interrupt\n", file = con)
179+
flush(con)
180+
})
181+
if (debug) mdebug("Sent 'interrupt' to future dispatcher processes")
182+
183+
future[["state"]] <- "interrupted"
184+
185+
future
186+
}
187+
188+
147189
#' @importFrom future nbrOfWorkers
148190
#' @export
149191
nbrOfWorkers.PicoP2PFutureBackend <- function(evaluator) {
@@ -208,19 +250,21 @@ print.PicoP2PFutureBackend <- function(x, ...) {
208250
cat(sprintf("P2P cluster: %s\n", sQuote(backend[["cluster"]])))
209251
cat(sprintf("P2P client ID: %s\n", sQuote(backend[["name"]])))
210252

211-
clusters <- pico_p2p_hosted_clusters(backend[["host"]], backend[["ssh_args"]])
253+
host <- backend[["host"]]
254+
ssh_args <- backend[["ssh_args"]]
255+
clusters <- pico_p2p_hosted_clusters(host = host, ssh_args = ssh_args)
212256
cat(sprintf("P2P clusters you are hosting: [n=%d]\n", nrow(clusters)))
213257
for (kk in seq_len(nrow(clusters))) {
214258
cluster <- clusters[kk, ]
215259
users <- strsplit(cluster$users, split = ",", fixed = TRUE)[[1]]
216-
users <- unique(c(users, pico_username()))
260+
users <- unique(c(users, pico_username(host = host, ssh_args = ssh_args)))
217261
users <- paste(users, collapse = ", ")
218262
cat(sprintf(" %2d. %s (users: %s)\n", kk, sQuote(cluster$name), users))
219263
}
220264

221265
cat("Message board:\n")
222266
cat(sprintf(" - Server: %s\n", backend[["host"]]))
223-
username <- pico_username(backend[["host"]], backend[["ssh_args"]])
267+
username <- pico_username(host = host, ssh_args = ssh_args)
224268
cat(sprintf(" - Username: %s\n", sQuote(username)))
225269

226270
cat("Data transfer tools:\n")
@@ -236,7 +280,7 @@ print.PicoP2PFutureBackend <- function(x, ...) {
236280

237281
p2p_can_connect <- function(cluster, name, host = "pipe.pico.sh", ssh_args = NULL, timeout = 10.0) {
238282
cluster_owner <- dirname(cluster)
239-
if (cluster_owner == pico_username()) {
283+
if (cluster_owner == pico_username(host = host, ssh_args = ssh_args)) {
240284
topic <- sprintf("%s/future.p2p", basename(cluster))
241285
} else {
242286
topic <- sprintf("%s/future.p2p", cluster)

R/host_cluster.R

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,25 @@
1616
#'
1717
#' @importFrom future resolve plan sequential
1818
#' @export
19-
host_cluster <- function(cluster = p2p_cluster_name(users), users = character(0L), host = "pipe.pico.sh", ssh_args = NULL, duration = 14*24*60*60) {
19+
host_cluster <- function(cluster = p2p_cluster_name(users, host = host, ssh_args = ssh_args), users = character(0L), host = "pipe.pico.sh", ssh_args = NULL, duration = 14*24*60*60) {
2020
stopifnot(length(cluster) == 1L, is.character(cluster), !is.na(cluster), nzchar(cluster))
2121

2222
parts <- strsplit(cluster, split = "/", fixed = TRUE)[[1]]
2323
okay <- FALSE
2424
if (length(parts) == 1L) {
2525
okay <- TRUE
2626
cluster_name <- cluster
27-
cluster_owner <- pico_username()
27+
cluster_owner <- pico_username(host = host, ssh_args = ssh_args)
2828
cluster <- sprintf("%s/%s", cluster_owner, cluster_name)
2929
} else if (length(parts) == 2L) {
3030
cluster_owner <- parts[1]
31-
if (cluster_owner == pico_username()) {
31+
if (cluster_owner == pico_username(host = host, ssh_args = ssh_args)) {
3232
okay <- TRUE
3333
cluster_name <- parts[2]
3434
}
3535
}
3636
if (!okay) {
37-
stop(sprintf("Argument 'cluster' must be of format '{owner}/{name}' or '{name}' where '{owner}' is your Pico username (%s): %s", sQuote(cluster), sQuote(pico_username())))
37+
stop(sprintf("Argument 'cluster' must be of format '{owner}/{name}' or '{name}' where '{owner}' is your Pico username (%s): %s", sQuote(cluster), sQuote(pico_username(host = host, ssh_args = ssh_args))))
3838
}
3939

4040
stopifnot(
@@ -56,7 +56,7 @@ host_cluster <- function(cluster = p2p_cluster_name(users), users = character(0L
5656
now <- pico_p2p_time()
5757

5858
expires <- pico_p2p_time(delta = duration)
59-
duration <- difftime(duration, 0)
59+
duration <- difftime2(duration, 0)
6060

6161
info("Launch p2p cluster %s for %d users (%s) until %s (%s)", sQuote(cluster), length(users), commaq(users), format(Sys.time() + duration), format(duration))
6262
topic <- sprintf("%s/future.p2p", cluster_name)

R/p2p_client_id.R

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,21 @@
33
#' @param users Users to have access to the cluster. This controls whether
44
#' the default cluster names should be "personal" or "friends".
55
#'
6+
#' @param \ldots Passed as-is to [pico_username()].
7+
#'
68
#' @return
79
#' `p2p_cluster_name()` returns R option `future.p2p.cluster`, if set.
810
#' If not set, it returns `{pico_name}/personal` if `length(users) == 0`,
911
#' otherwise `{pico_name}/friends`.
1012
#'
1113
#' @export
12-
p2p_cluster_name <- function(users = character(0)) {
14+
p2p_cluster_name <- function(users = character(0), ...) {
1315
users <- unique(users)
1416
name <- getOption("future.p2p.cluster")
1517
if (is.null(name)) {
16-
users <- setdiff(users, pico_username())
18+
users <- setdiff(users, pico_username(...))
1719
name <- if (length(users) == 0) "personal" else "friends"
18-
name <- sprintf("%s/%s", pico_username(), name)
20+
name <- sprintf("%s/%s", pico_username(...), name)
1921
}
2022
name
2123
}
@@ -30,9 +32,9 @@ p2p_cluster_name <- function(users = character(0)) {
3032
#' @keywords internal
3133
p2p_client_id <- local({
3234
name <- NULL
33-
function() {
35+
function(...) {
3436
if (is.null(name)) {
35-
user <- pico_username()
37+
user <- pico_username(...)
3638
hostname <- Sys.info()[["nodename"]]
3739
pid <- Sys.getpid()
3840
name <<- sprintf("%s@%s:%d", user, hostname, pid)

0 commit comments

Comments
 (0)