Skip to content

Commit 8abd0fc

Browse files
Merge branch 'release/0.1.0'
2 parents 2e619ec + 6cb9a14 commit 8abd0fc

23 files changed

+450
-252
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: future.p2p
22
Title: A Peer-to-Peer Compute Cluster via Futureverse
3-
Version: 0.0.0.9027
3+
Version: 0.1.0
44
Description: Implementation of the 'Future' API <doi:10.32614/RJ-2021-048> that resolves futures on a peer-to-peer ('P2P') compute environment. By using this future backend, you and your friends can share your spare compute resources with each other.
55
Depends:
66
future (>= 1.58.0)

NAMESPACE

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ S3method(resolved,PicoP2PFuture)
1111
S3method(result,PicoP2PFuture)
1212
export(PicoP2PFutureBackend)
1313
export(availablePicoP2PWorkers)
14+
export(cluster)
1415
export(find_wormhole)
1516
export(future_id)
17+
export(host_cluster)
1618
export(install_wormhole)
1719
export(p2p_cluster)
1820
export(p2p_name)
1921
export(pico_hosted_channels)
20-
export(pico_p2p)
21-
export(pico_p2p_cluster)
2222
export(pico_p2p_have_future)
2323
export(pico_p2p_hello)
2424
export(pico_p2p_hosted_clusters)
@@ -28,13 +28,13 @@ export(pico_p2p_send_future)
2828
export(pico_p2p_send_result)
2929
export(pico_p2p_take_on_future)
3030
export(pico_p2p_wait_for)
31-
export(pico_p2p_worker)
3231
export(pico_pipe)
3332
export(pico_receive_message)
3433
export(pico_receive_message_dataframe)
3534
export(pico_send_message)
3635
export(pico_terminate)
3736
export(pico_username)
37+
export(worker)
3838
export(wormhole_filename)
3939
export(wormhole_receive)
4040
export(wormhole_send)

NEWS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Version 0.1.0 [2025-08-10]
2+
3+
This is the first public version of the **future.p2p** package.
4+

R/PicoP2PFutureBackend-class.R

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
#' pico_p2p futures
1+
#' p2p futures
22
#'
33
#' _WARNING: This function must never be called.
44
#' It may only be used with [future::plan()]_
55
#'
6-
#' A 'pico_p2p' future is an asynchronous multiprocess
6+
#' A 'p2p' future is an asynchronous multiprocess
77
#' future that will be evaluated in a background R session.
88
#'
99
#' @inheritParams pico_pipe
@@ -23,15 +23,14 @@
2323
#' being invited to a shared folder.
2424
#'
2525
#' Users who wish to contribute their compute power to the P2P cluster
26-
#' should call [pico_p2p_worker()].
26+
#' should call [worker()].
2727
#'
2828
#' Users who wish to take advantage of the compute power of the
29-
#' P2P cluster should use `plan(pico_p2p)`.
29+
#' P2P cluster should use `plan(future.p2p::cluster)`.
3030
#'
3131
#' @examplesIf interactive()
32-
#' ## Futures are pushed to the Pico P2P cluster and
33-
#' ## results are collected from there
34-
#' plan(future.p2p::pico_p2p, .init = FALSE)
32+
#' # Connect to personal P2P cluster, which is automatically launched
33+
#' plan(future.p2p::cluster)
3534
#'
3635
#' ## Create future
3736
#' a <- 42
@@ -47,16 +46,16 @@
4746
#'
4847
#' @importFrom future future
4948
#' @export
50-
pico_p2p <- function(cluster = p2p_cluster(), name = p2p_name(), host = "pipe.pico.sh", ssh_args = NULL, ...) {
51-
stop("INTERNAL ERROR: The future.p2p::pico_p2p() must never be called directly")
49+
cluster <- function(cluster = p2p_cluster(), name = p2p_name(), host = "pipe.pico.sh", ssh_args = NULL, ...) {
50+
stop("INTERNAL ERROR: The future.p2p::cluster() function must never be called directly")
5251
}
53-
class(pico_p2p) <- c("pico_p2p", "multiprocess", "future", "function")
54-
attr(pico_p2p, "init") <- TRUE
52+
class(cluster) <- c("pico_p2p", "multiprocess", "future", "function")
53+
attr(cluster, "init") <- TRUE
5554

5655

5756
#' A Pico P2P future is resolved through a Peer-to-Peer (P2P) workers communicating via pico.sh and Wormhole
5857
#'
59-
#' @inheritParams pico_p2p
58+
#' @inheritParams cluster
6059
#'
6160
#' @param \ldots Additional arguments passed to [future::FutureBackend()].
6261
#'
@@ -107,7 +106,7 @@ PicoP2PFutureBackend <- function(cluster = p2p_cluster(), name = p2p_name(), hos
107106

108107
core
109108
}
110-
attr(pico_p2p, "factory") <- PicoP2PFutureBackend
109+
attr(cluster, "factory") <- PicoP2PFutureBackend
111110

112111

113112

@@ -177,7 +176,7 @@ nbrOfFreeWorkers.PicoP2PFutureBackend <- function(evaluator = NULL, background =
177176
#' It will always return at least one worker, which is yourself.
178177
#' _WARNING: This is currently hardcoded to 10 workers, regardless of the number._
179178
#'
180-
#' @rdname pico_p2p
179+
#' @rdname cluster
181180
#' @export
182181
availablePicoP2PWorkers <- function() {
183182
nworkers <- 10L
@@ -302,7 +301,10 @@ print.PicoP2PFutureBackend <- function(x, ...) {
302301
cat(sprintf("P2P clusters you are hosting: [n=%d]\n", nrow(clusters)))
303302
for (kk in seq_len(nrow(clusters))) {
304303
cluster <- clusters[kk, ]
305-
cat(sprintf(" %2d. %s (%s)\n", kk, cluster$name, cluster$users))
304+
users <- strsplit(cluster$users, split = ",", fixed = TRUE)[[1]]
305+
users <- unique(c(users, pico_username()))
306+
users <- paste(users, collapse = ", ")
307+
cat(sprintf(" %2d. %s (users: %s)\n", kk, sQuote(cluster$name), users))
306308
}
307309

308310
cat("Message board:\n")
@@ -321,7 +323,7 @@ print.PicoP2PFutureBackend <- function(x, ...) {
321323
}
322324

323325

324-
p2p_can_connect <- function(cluster, name = name, host = "pipe.pico.sh", ssh_args = NULL, timeout = 10.0) {
326+
p2p_can_connect <- function(cluster, name = p2p_name(), host = "pipe.pico.sh", ssh_args = NULL, timeout = 10.0) {
325327
cluster_owner <- dirname(cluster)
326328
if (cluster_owner == pico_username()) {
327329
topic <- sprintf("%s/future.p2p", basename(cluster))

R/p2p_name.R

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
#' Gets the default name of the P2P cluster
22
#'
3+
#' @param users Users to have access to the cluster. This controls whether
4+
#' the default cluster names should be "personal" or "friends".
5+
#'
36
#' @return
4-
#' `p2p_cluster()` return R option `future.p2p.cluster`, if set,
5-
#' otherwise `{pico_name}/personal`.
7+
#' `p2p_cluster()` returns R option `future.p2p.cluster`, if set.
8+
#' If not set, it returns `{pico_name}/personal` if `length(users) == 0`,
9+
#' otherwise `{pico_name}/friends`.
610
#'
711
#' @export
8-
p2p_cluster <- function() {
12+
p2p_cluster <- function(users = character(0)) {
13+
users <- unique(users)
914
name <- getOption("future.p2p.cluster")
1015
if (is.null(name)) {
11-
name <- sprintf("%s/%s", pico_username(), "personal")
16+
users <- setdiff(users, pico_username())
17+
name <- if (length(users) == 0) "personal" else "friends"
18+
name <- sprintf("%s/%s", pico_username(), name)
1219
}
1320
name
1421
}

R/pico_p2p_cluster.R

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#' Launches a P2P cluster
1+
#' Hosts a P2P cluster
22
#'
33
#' @inheritParams pico_pipe
44
#'
@@ -13,11 +13,12 @@
1313
#' @param duration Duration (in seconds) to offer this cluster.
1414
#'
1515
#' @examplesIf interactive()
16-
#' pico_p2p_cluster()
16+
#' # Connect to personal P2P cluster, which is automatically launched
17+
#' host_cluster(users = c("bob", "carol"))
1718
#'
1819
#' @importFrom future resolve plan sequential
1920
#' @export
20-
pico_p2p_cluster <- function(cluster = p2p_cluster(), users = character(0L), name = p2p_name(), host = "pipe.pico.sh", ssh_args = NULL, duration = 14*24*60*60) {
21+
host_cluster <- function(cluster = p2p_cluster(users), users = character(0L), name = p2p_name(), host = "pipe.pico.sh", ssh_args = NULL, duration = 14*24*60*60) {
2122
stopifnot(length(cluster) == 1L, is.character(cluster), !is.na(cluster), nzchar(cluster))
2223

2324
parts <- strsplit(cluster, split = "/", fixed = TRUE)[[1]]
@@ -100,8 +101,8 @@ pico_p2p_cluster <- function(cluster = p2p_cluster(), users = character(0L), nam
100101
info("shutting down cluster ...")
101102
pico_terminate(p)
102103
info("bye")
103-
} ## pico_p2p_cluster()
104+
} ## host_cluster()
104105

105106

106107
## Expose function on the CLI
107-
cli_fcn(pico_p2p_cluster) <- c("--(cluster)=(.*)", "--(name)=(.*)", "--(users)=(.*)", "--(host)=(.*)", "--(ssh_args)=(.*)", "--(duration)=([[:digit:]]+)")
108+
cli_fcn(host_cluster) <- c("--(cluster)=(.*)", "--(name)=(.*)", "--(users)=(.*)", "--(host)=(.*)", "--(ssh_args)=(.*)", "--(duration)=([[:digit:]]+)")

R/pico_p2p_worker.R

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@
99
#' @param duration Duration (in seconds) to offer working on futures.
1010
#'
1111
#' @examplesIf interactive()
12-
#' pico_p2p_worker()
12+
#' ## Start a P2P cluster worker
13+
#' future.p2p::worker()
1314
#'
1415
#' @section Sequential, single-core processing by default:
1516
#' A P2P worker runs sequentially (`plan(sequential)`) and is configured
1617
#' to with a single CPU core to prevent nested parallelization.
1718
#'
1819
#' @importFrom future resolve plan sequential
1920
#' @export
20-
pico_p2p_worker <- function(cluster = p2p_cluster(), name = p2p_name(), host = "pipe.pico.sh", ssh_args = NULL, duration = 60*60) {
21+
worker <- function(cluster = p2p_cluster(), name = p2p_name(), host = "pipe.pico.sh", ssh_args = NULL, duration = 60*60) {
2122
parts <- strsplit(cluster, split = "/", fixed = TRUE)[[1]]
2223
if (length(parts) != 2L) {
2324
stop(sprintf("Argument 'cluster' must be of format '{owner}/{name}': %s", sQuote(cluster)))
@@ -108,8 +109,8 @@ pico_p2p_worker <- function(cluster = p2p_cluster(), name = p2p_name(), host = "
108109
}
109110
} ## repeat()
110111
info("bye")
111-
} ## pico_p2p_worker()
112+
} ## worker()
112113

113114

114115
## Expose function on the CLI
115-
cli_fcn(pico_p2p_worker) <- c("--(cluster)=(.*)", "--(name)=(.*)", "--(host)=(.*)", "--(ssh_args)=(.*)", "--(duration)=([[:digit:]]+)")
116+
cli_fcn(worker) <- c("--(cluster)=(.*)", "--(name)=(.*)", "--(host)=(.*)", "--(ssh_args)=(.*)", "--(duration)=([[:digit:]]+)")

R/utils-options.R

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@
1414
#' If you find yourself having to tweak one of the options, make sure to
1515
#' undo your changes immediately afterward.
1616
#'
17+
#' @section Options:
18+
#' \describe{
19+
#' \item{\option{future.p2p.wormhole}:}{(character)
20+
#' Specifies the absolute path to the `wormhole` executable. If not
21+
#' specified, a default one will be installed.
22+
# (Default: not set)
23+
#' }
24+
#' }
25+
#'
1726
#' @section Options for debugging:
1827
#' \describe{
1928
#' \item{\option{future.p2p.debug}:}{(logical)
@@ -40,8 +49,10 @@
4049
#' future.p2p.options
4150
#'
4251
#' future.p2p.debug
52+
#' future.p2p.wormhole
4353
#'
44-
#' R_FUTURE_DEBUG
54+
#' R_FUTURE_P2P_DEBUG
55+
#' R_FUTURE_P2P_WORMHOLE
4556
#'
4657
#' @name zzz-future.p2p.options
4758
NULL
@@ -138,4 +149,5 @@ update_package_option <- function(name, mode = "character", default = NULL, spli
138149
## Set options based on environment variables
139150
update_package_options <- function(debug = FALSE) {
140151
update_package_option("future.p2p.debug", mode = "logical")
152+
update_package_option("future.p2p.wormhole", mode = "character")
141153
}

R/vignette_engine.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ register_vignette_engine_during_build_only <- function(pkgname) {
1818

1919
html <- commonmark::markdown_html(md,
2020
smart = FALSE,
21+
footnotes = TRUE,
2122
extensions = "table",
2223
normalize = FALSE)
2324

R/wormhole.R

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,22 @@ wormhole_receive <- function(code, path = tempdir(), ..., rsh = NULL) {
6363
files
6464
}
6565

66-
#' Find the Wormhole Executable
66+
#' Find a Wormhole Executable
6767
#'
6868
#' @return
6969
#' The absolute path to the `wormhole` executable as a character string.
7070
#' Attribute `version-string` comprise the `wormhole --version` output,
7171
#' and attributes `name` and `version` the parsed version string.
7272
#' If no executable exists, an error is produced.
7373
#'
74+
#' @details
75+
#' Unless R option `future.p2p.wormhole` specifies an executable, the default
76+
#' is to download and install `wormhole-william` locally and use that one.
77+
#'
7478
#' @export
7579
find_wormhole <- local({
7680
bin <- NULL
81+
7782
function() {
7883
if (is.null(bin)) {
7984
debug <- isTRUE(getOption("future.p2p.debug"))
@@ -85,14 +90,24 @@ find_wormhole <- local({
8590
})
8691
}
8792

88-
res <- wormhole_pathname()
89-
90-
## Install wormhole?
91-
if (!file_test("-x", res)) res <- install_wormhole()
92-
93+
## User specified binary?
94+
res <- getOption("future.p2p.wormhole")
95+
if (!is.null(res)) {
96+
if (!file_test("-f", res)) {
97+
stop("R option 'future.p2p.wormhole' specifies a non-existing file: ", sQuote(res))
98+
} else if (!file_test("-x", res)) {
99+
stop("R option 'future.p2p.wormhole' specifies a file that is non-executable: ", sQuote(res))
100+
}
101+
} else {
102+
## If not, install automatically, if missing
103+
res <- wormhole_pathname()
104+
105+
## Install wormhole?
106+
if (!file_test("-x", res)) res <- install_wormhole()
107+
}
108+
93109
## Legacy: fall back to pre-existing 'wormhole' executable
94110
if (!file_test("-x", res)) res <- Sys.which("wormhole")
95-
96111
if (debug) mdebugf("Wormhole executable: %s", sQuote(res))
97112

98113
if (nzchar(res)) {
@@ -201,7 +216,7 @@ install_wormhole <- function(pathname = wormhole_pathname(), version = "1.0.8")
201216
filename <- basename(pathname)
202217

203218
url <- sprintf("https://github.com/psanford/wormhole-william/releases/download/v%s/%s", version, filename)
204-
tf <- file.path(tempdir(), filename)
219+
tf <- sprintf("%s.%s", pathname, basename(tempdir()))
205220
res <- download.file(url, destfile = tf, mode = "wb")
206221
stopifnot(file_test("-f", tf))
207222
Sys.chmod(tf, mode = "0755")

0 commit comments

Comments
 (0)