Skip to content

Commit a809f07

Browse files
Forward all 'host' and 'ssh_args' arguments to pico_username(), etc.
1 parent 9d946ac commit a809f07

File tree

8 files changed

+32
-23
lines changed

8 files changed

+32
-23
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: future.p2p
22
Title: A Peer-to-Peer Compute Cluster via Futureverse
3-
Version: 0.3.0-9017
3+
Version: 0.3.0-9018
44
Description: Implementation of the 'Future' API <doi:10.32614/RJ-2021-048> that resolves futures on a peer-to-peer ('P2P') compute environment. By using this future backend, you and your friends can share your spare compute resources with each other.
55
Imports:
66
future (>= 1.67.0),

R/PicoP2PFutureBackend-class.R

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ PicoP2PFutureBackend <- function(cluster = p2p_cluster_name(), host = "pipe.pico
100100
core <- structure(core, class = c("PicoP2PFutureBackend", "MultiprocessFutureBackend", "FutureBackend", class(core)))
101101

102102
if (!p2p_can_connect(cluster, name = name, host = host, ssh_args = ssh_args)) {
103-
stop(sprintf("Cannot connect to P2P cluster %s - make sure they have given you (%s) access", sQuote(cluster), sQuote(pico_username())))
103+
stop(sprintf("Cannot connect to P2P cluster %s - make sure they have given you (%s) access", sQuote(cluster), sQuote(pico_username(host = host, ssh_args = ssh_args))))
104104
}
105105

106106
core
@@ -245,19 +245,21 @@ print.PicoP2PFutureBackend <- function(x, ...) {
245245
cat(sprintf("P2P cluster: %s\n", sQuote(backend[["cluster"]])))
246246
cat(sprintf("P2P client ID: %s\n", sQuote(backend[["name"]])))
247247

248-
clusters <- pico_p2p_hosted_clusters(backend[["host"]], backend[["ssh_args"]])
248+
host <- backend[["host"]]
249+
ssh_args <- backend[["ssh_args"]]
250+
clusters <- pico_p2p_hosted_clusters(host = host, ssh_args = ssh_args)
249251
cat(sprintf("P2P clusters you are hosting: [n=%d]\n", nrow(clusters)))
250252
for (kk in seq_len(nrow(clusters))) {
251253
cluster <- clusters[kk, ]
252254
users <- strsplit(cluster$users, split = ",", fixed = TRUE)[[1]]
253-
users <- unique(c(users, pico_username()))
255+
users <- unique(c(users, pico_username(host = host, ssh_args = ssh_args)))
254256
users <- paste(users, collapse = ", ")
255257
cat(sprintf(" %2d. %s (users: %s)\n", kk, sQuote(cluster$name), users))
256258
}
257259

258260
cat("Message board:\n")
259261
cat(sprintf(" - Server: %s\n", backend[["host"]]))
260-
username <- pico_username(backend[["host"]], backend[["ssh_args"]])
262+
username <- pico_username(host = host, ssh_args = ssh_args)
261263
cat(sprintf(" - Username: %s\n", sQuote(username)))
262264

263265
cat("Data transfer tools:\n")
@@ -273,7 +275,7 @@ print.PicoP2PFutureBackend <- function(x, ...) {
273275

274276
p2p_can_connect <- function(cluster, name, host = "pipe.pico.sh", ssh_args = NULL, timeout = 10.0) {
275277
cluster_owner <- dirname(cluster)
276-
if (cluster_owner == pico_username()) {
278+
if (cluster_owner == pico_username(host = host, ssh_args = ssh_args)) {
277279
topic <- sprintf("%s/future.p2p", basename(cluster))
278280
} else {
279281
topic <- sprintf("%s/future.p2p", cluster)

R/host_cluster.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,25 @@
1616
#'
1717
#' @importFrom future resolve plan sequential
1818
#' @export
19-
host_cluster <- function(cluster = p2p_cluster_name(users), users = character(0L), host = "pipe.pico.sh", ssh_args = NULL, duration = 14*24*60*60) {
19+
host_cluster <- function(cluster = p2p_cluster_name(users, host = host, ssh_args = ssh_args), users = character(0L), host = "pipe.pico.sh", ssh_args = NULL, duration = 14*24*60*60) {
2020
stopifnot(length(cluster) == 1L, is.character(cluster), !is.na(cluster), nzchar(cluster))
2121

2222
parts <- strsplit(cluster, split = "/", fixed = TRUE)[[1]]
2323
okay <- FALSE
2424
if (length(parts) == 1L) {
2525
okay <- TRUE
2626
cluster_name <- cluster
27-
cluster_owner <- pico_username()
27+
cluster_owner <- pico_username(host = host, ssh_args = ssh_args)
2828
cluster <- sprintf("%s/%s", cluster_owner, cluster_name)
2929
} else if (length(parts) == 2L) {
3030
cluster_owner <- parts[1]
31-
if (cluster_owner == pico_username()) {
31+
if (cluster_owner == pico_username(host = host, ssh_args = ssh_args)) {
3232
okay <- TRUE
3333
cluster_name <- parts[2]
3434
}
3535
}
3636
if (!okay) {
37-
stop(sprintf("Argument 'cluster' must be of format '{owner}/{name}' or '{name}' where '{owner}' is your Pico username (%s): %s", sQuote(cluster), sQuote(pico_username())))
37+
stop(sprintf("Argument 'cluster' must be of format '{owner}/{name}' or '{name}' where '{owner}' is your Pico username (%s): %s", sQuote(cluster), sQuote(pico_username(host = host, ssh_args = ssh_args))))
3838
}
3939

4040
stopifnot(

R/p2p_client_id.R

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@
99
#' otherwise `{pico_name}/friends`.
1010
#'
1111
#' @export
12-
p2p_cluster_name <- function(users = character(0)) {
12+
p2p_cluster_name <- function(users = character(0), ...) {
1313
users <- unique(users)
1414
name <- getOption("future.p2p.cluster")
1515
if (is.null(name)) {
16-
users <- setdiff(users, pico_username())
16+
users <- setdiff(users, pico_username(...))
1717
name <- if (length(users) == 0) "personal" else "friends"
18-
name <- sprintf("%s/%s", pico_username(), name)
18+
name <- sprintf("%s/%s", pico_username(...), name)
1919
}
2020
name
2121
}
@@ -30,9 +30,9 @@ p2p_cluster_name <- function(users = character(0)) {
3030
#' @keywords internal
3131
p2p_client_id <- local({
3232
name <- NULL
33-
function() {
33+
function(...) {
3434
if (is.null(name)) {
35-
user <- pico_username()
35+
user <- pico_username(...)
3636
hostname <- Sys.info()[["nodename"]]
3737
pid <- Sys.getpid()
3838
name <<- sprintf("%s@%s:%d", user, hostname, pid)

R/pico.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ pico_receive_message_dataframe <- function(p, ..., pattern = NULL) {
156156
#'
157157
#' @rdname pico_pipe
158158
pico_hosted_channels <- function(host = "pipe.pico.sh", ssh_args = NULL, timeout = 10.0) {
159-
username <- pico_username()
159+
username <- pico_username(host = host, ssh_args = ssh_args)
160160
t_max <- proc.time()[3] + timeout
161161
pattern_1 <- sprintf(".*[[:blank:]]%s/([^:]+):", username)
162162
pattern_2 <- sprintf(".*[[:blank:]]%s/([^:]+):[[:blank:]]+[(]Access List:[[:blank:]]+(.*)[)]", username)

R/pico_p2p.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ pico_p2p_dispatch_future <- function(future) {
465465

466466
## 1. Connect to pico and say hello
467467
cluster_owner <- dirname(cluster)
468-
if (cluster_owner == pico_username()) {
468+
if (cluster_owner == pico_username(host = host, ssh_args = ssh_args)) {
469469
topic <- sprintf("%s/future.p2p", basename(cluster))
470470
} else {
471471
topic <- sprintf("%s/future.p2p", cluster)

R/pico_username.R

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ pico_username <- local({
2525
})
2626
}
2727

28+
## Special case
29+
if (grepl("^pipe[.]", host)) {
30+
host <- sub("^pipe[.]", "", host)
31+
}
32+
2833
ssh_config <- list(options = ssh_args, host = host)
2934
args <- c(ssh_config[["options"]], ssh_config[["host"]], "user")
3035
if (debug) {
@@ -41,7 +46,9 @@ pico_username <- local({
4146
if (!is.null(status)) {
4247
stop(sprintf("Failed to infer pico.sh username. Exit code %s", status))
4348
}
44-
stopifnot(length(out) >= 3)
49+
if (length(out) < 3) {
50+
stop(sprintf("pico_username(): Received unexpected results: [n=%s]\n%s", length(out), paste(out, collapse = "\n")))
51+
}
4552
username <<- structure(out[1], id = out[2], created_on = as.POSIXct(sub("T", " ", out[3])), class = "pico_username")
4653
}
4754

R/worker.R

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#'
1717
#' @importFrom processx poll
1818
#' @export
19-
worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args = NULL, duration = 60*60) {
19+
worker <- function(cluster = p2p_cluster_name(host = host, ssh_args = ssh_args), host = "pipe.pico.sh", ssh_args = NULL, duration = 60*60) {
2020
parts <- strsplit(cluster, split = "/", fixed = TRUE)[[1]]
2121
if (length(parts) != 2L) {
2222
stop(sprintf("Argument 'cluster' must be of format '{owner}/{name}': %s", sQuote(cluster)))
@@ -47,7 +47,7 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
4747
info("assert connection to p2p cluster %s", sQuote(cluster))
4848
worker_id <- p2p_worker_id()
4949
if (!p2p_can_connect(cluster, name = worker_id, host = host, ssh_args = ssh_args)) {
50-
stop(sprintf("Cannot connect to P2P cluster %s - make sure they have given you (%s) access", sQuote(cluster), sQuote(pico_username())))
50+
stop(sprintf("Cannot connect to P2P cluster %s - make sure they have given you (%s) access", sQuote(cluster), sQuote(pico_username(host = host, ssh_args = ssh_args))))
5151
}
5252

5353
channel_prefix <- sprintf("%s_%s", .packageName, session_uuid())
@@ -103,7 +103,7 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
103103

104104
info("connect worker %s to p2p cluster %s", sQuote(worker_id), sQuote(cluster))
105105
cluster_owner <- dirname(cluster)
106-
if (cluster_owner == pico_username()) {
106+
if (cluster_owner == pico_username(host = host, ssh_args = ssh_args)) {
107107
topic <- sprintf("%s/future.p2p", basename(cluster))
108108
} else {
109109
topic <- sprintf("%s/future.p2p", cluster)
@@ -379,12 +379,12 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
379379

380380
info("assert connection to p2p cluster %s", sQuote(cluster))
381381
if (!p2p_can_connect(cluster, name = worker_id, host = host, ssh_args = ssh_args)) {
382-
stop(sprintf("Cannot connect to P2P cluster %s - make sure they have given you (%s) access", sQuote(cluster), sQuote(pico_username())))
382+
stop(sprintf("Cannot connect to P2P cluster %s - make sure they have given you (%s) access", sQuote(cluster), sQuote(pico_username(host = host, ssh_args = ssh_args))))
383383
}
384384

385385
info("connect background worker process %s to p2p cluster %s for %s until %s", sQuote(worker_id), sQuote(cluster), format(duration), expires)
386386
cluster_owner <- dirname(cluster)
387-
if (cluster_owner == pico_username()) {
387+
if (cluster_owner == pico_username(host = host, ssh_args = ssh_args)) {
388388
topic <- sprintf("%s/future.p2p", basename(cluster))
389389
} else {
390390
topic <- sprintf("%s/future.p2p", cluster)

0 commit comments

Comments
 (0)