Skip to content

Commit 40e44df

Browse files
CLEANUP: Un-export all pico_p2p_nnn() functions
1 parent 891cf8f commit 40e44df

File tree

7 files changed

+119
-124
lines changed

7 files changed

+119
-124
lines changed

.github/workflows/R-CMD-check.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
_R_CHECK_SUGGESTS_ONLY_: true
3939
_R_CHECK_THINGS_IN_TEMP_DIR_: true
4040
_R_CHECK_TESTS_NLINES_: 300
41-
RCMDCHECK_ERROR_ON: error
41+
RCMDCHECK_ERROR_ON: warning
4242
## Specific to futures
4343
R_FUTURE_CONNECTIONS_ONMISUSE: "error[details=TRUE]"
4444
R_FUTURE_GLOBALENV_ONMISUSE: "error"

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: future.p2p
22
Title: A Peer-to-Peer Compute Cluster via Futureverse
3-
Version: 0.1.0-9010
3+
Version: 0.1.0-9011
44
Description: Implementation of the 'Future' API <doi:10.32614/RJ-2021-048> that resolves futures on a peer-to-peer ('P2P') compute environment. By using this future backend, you and your friends can share your spare compute resources with each other.
55
Depends:
66
future (>= 1.67.0)

NAMESPACE

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,8 @@ S3method(resolved,PicoP2PFuture)
1111
S3method(result,PicoP2PFuture)
1212
export(PicoP2PFutureBackend)
1313
export(cluster)
14-
export(future_id)
1514
export(host_cluster)
1615
export(p2p_cluster_name)
17-
export(pico_hosted_channels)
18-
export(pico_p2p_have_future)
19-
export(pico_p2p_hello)
20-
export(pico_p2p_hosted_clusters)
21-
export(pico_p2p_receive_future)
22-
export(pico_p2p_receive_result)
23-
export(pico_p2p_send_future)
24-
export(pico_p2p_send_result)
25-
export(pico_p2p_take_on_future)
26-
export(pico_p2p_wait_for)
27-
export(pico_pipe)
28-
export(pico_receive_message)
29-
export(pico_receive_message_dataframe)
30-
export(pico_send_message)
31-
export(pico_terminate)
3216
export(pico_username)
3317
export(worker)
3418
importFrom(callr,r_bg)

R/PicoP2PFutureBackend-class.R

Lines changed: 1 addition & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ launchFuture.PicoP2PFutureBackend <- function(backend, future, ...) {
138138
FutureRegistry(reg, action = "add", future = future, earlySignal = FALSE)
139139

140140
## 3. Launch future, i.e. submit it to the Pico P2P cluster dispatcher
141-
future <- dispatch_future(future)
141+
future <- pico_p2p_dispatch_future(future)
142142

143143
invisible(future)
144144
} ## launchFuture()
@@ -189,94 +189,6 @@ waitForWorker <- function(...) {
189189
}
190190

191191

192-
#' @importFrom callr r_bg
193-
#' @importFrom utils file_test
194-
dispatch_future <- function(future) {
195-
send_future <- function(topic, name, host = host, ssh_args = ssh_args, future_id, file, to, via, duration) {
196-
pico <- future.p2p::pico_pipe(topic, user = name, host = host, ssh_args = ssh_args)
197-
m <- future.p2p::pico_p2p_hello(pico, type = "client")
198-
199-
## 2. Announce future
200-
repeat {
201-
m1 <- future.p2p::pico_p2p_have_future(pico, future = file, duration = duration)
202-
m2 <- future.p2p::pico_p2p_wait_for(pico, type = "offer", futures = m1[["future"]], expires = m1[["expires"]])
203-
if (m2[["type"]] != "expired") break
204-
}
205-
206-
## 3. Send future to workers
207-
worker <- m2[["from"]]
208-
stopifnot(is.character(worker), nzchar(worker))
209-
m3 <- future.p2p::pico_p2p_send_future(pico, future = file, to = worker, via = via)
210-
211-
## 4. Remove temporary file
212-
file.remove(file)
213-
214-
## 5. Wait for and receive FutureResult file
215-
path <- file.path(dirname(dirname(file)), "results")
216-
tryCatch({
217-
file <- future.p2p::pico_p2p_receive_result(pico, via = via, path = path)
218-
}, interrupt = function(int) {
219-
cat(file = "foo.log", "interrupted\n")
220-
})
221-
222-
invisible(file)
223-
}
224-
225-
debug <- isTRUE(getOption("future.p2p.debug"))
226-
if (debug) {
227-
mdebug_push("dispatch_future()...")
228-
on.exit(mdebugf_pop())
229-
}
230-
231-
## Get backend
232-
backend <- future[["backend"]]
233-
stopifnot(inherits(backend, "FutureBackend"))
234-
235-
cluster <- backend[["cluster"]]
236-
name <- backend[["name"]]
237-
host <- backend[["host"]]
238-
ssh_args <- backend[["ssh_args"]]
239-
via <- via_transfer_uri()
240-
241-
## 1. Put future on the dispatcher queue
242-
void <- p2p_dir("results")
243-
future[["file"]] <- saveFuture(future, path = p2p_dir("queued"))
244-
245-
if (debug) mdebugf("File: %s", sQuote(future[["file"]]))
246-
247-
## 1. Connect to pico and say hello
248-
cluster_owner <- dirname(cluster)
249-
if (cluster_owner == pico_username()) {
250-
topic <- sprintf("%s/future.p2p", basename(cluster))
251-
} else {
252-
topic <- sprintf("%s/future.p2p", cluster)
253-
}
254-
255-
args <- list(
256-
topic = topic,
257-
name = name,
258-
host = host,
259-
ssh_args = ssh_args,
260-
future_id = future_id(future),
261-
file = future[["file"]],
262-
via = via,
263-
duration = getOption("future.p2p.duration.request", 10.0)
264-
)
265-
if (debug) {
266-
mstr(args)
267-
}
268-
269-
rx <- r_bg(send_future, args = args, supervise = TRUE)
270-
future[["rx"]] <- rx
271-
272-
future[["pico_via"]] <- via
273-
274-
## Update future state
275-
future[["state"]] <- "running"
276-
277-
invisible(future)
278-
}
279-
280192

281193
#' @importFrom utils file_test
282194
p2p_dir <- function(dir = c("queued", "running", "results")) {

R/pico.R

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
#' This function relies on the <https://pico.sh> services.
3232
#'
3333
#' @importFrom processx process
34-
#' @export
3534
pico_pipe <- function(topic = NULL, command = c("pipe", "pub", "sub", "ls", "help"), args = c(), host = "pipe.pico.sh", ssh_args = NULL, ...) {
3635
command <- match.arg(command)
3736
if (command %in% c("pipe", "pub", "sub")) {
@@ -91,14 +90,12 @@ print.pico <- function(x, ...) {
9190
}
9291

9392
#' @rdname pico_pipe
94-
#' @export
9593
pico_terminate <- function(p, ...) {
9694
stopifnot(inherits(p, "pico_pipe"))
9795
p$process$kill()
9896
}
9997

10098
#' @rdname pico_pipe
101-
#' @export
10299
pico_send_message <- function(p, message, newline = TRUE, ...) {
103100
stopifnot(inherits(p, "pico_pipe"))
104101
stopifnot(length(message) == 1L, is.character(message), !is.na(message))
@@ -107,6 +104,9 @@ pico_send_message <- function(p, message, newline = TRUE, ...) {
107104
}
108105

109106

107+
#' @param df (data.frame) Data frame to send as a message.
108+
#'
109+
#' @rdname pico_pipe
110110
pico_send_message_dataframe <- function(p, df) {
111111
msg <- unlist(df, use.names = TRUE)
112112
msg <- sprintf("%s=%s", names(msg), msg)
@@ -118,15 +118,16 @@ pico_send_message_dataframe <- function(p, df) {
118118

119119

120120
#' @rdname pico_pipe
121-
#' @export
122121
pico_receive_message <- function(p, n = 1L, ...) {
123122
stopifnot(inherits(p, "pico_pipe"))
124123
stopifnot(length(n) == 1L, is.numeric(n), !is.na(n), n > 0L)
125124
p$process$read_output_lines(n)
126125
}
127126

128127

129-
#' @export
128+
#' @param pattern (character string; optional) A regular expression so scan for.
129+
#'
130+
#' @rdname pico_pipe
130131
pico_receive_message_dataframe <- function(p, ..., pattern = NULL) {
131132
msg <- pico_receive_message(p, ...)
132133

@@ -150,8 +151,9 @@ pico_receive_message_dataframe <- function(p, ..., pattern = NULL) {
150151
}
151152

152153

153-
154-
#' @export
154+
#' @param timeout (numeric scalar) Timeout (in seconds).
155+
#'
156+
#' @rdname pico_pipe
155157
pico_hosted_channels <- function(host = "pipe.pico.sh", ssh_args = NULL, timeout = 10.0) {
156158
username <- pico_username()
157159
t_max <- proc.time()[3] + timeout

R/pico_p2p.R

Lines changed: 92 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ now_str <- function(when = pico_p2p_time()) {
1212
format(when, format = "%FT%T")
1313
}
1414

15-
#' @export
1615
future_id <- function(future, ...) {
1716
if (inherits(future, "Future")) {
1817
id <- paste(future[["uuid"]], collapse = "-")
@@ -23,7 +22,7 @@ future_id <- function(future, ...) {
2322
id
2423
}
2524

26-
#' @export
25+
2726
pico_p2p_hello <- function(p, from = p$user, type = c("worker", "client", "cluster"), expires = NULL, duration = 60*60, ...) {
2827
type <- match.arg(type)
2928

@@ -58,7 +57,7 @@ pico_p2p_expired <- function() {
5857
)
5958
}
6059

61-
#' @export
60+
6261
pico_p2p_wait_for <- function(p, type, futures = NULL, expires = NULL, duration = 60, delay = 0.1, ...) {
6362
if (is.null(expires)) {
6463
expires <- Sys.time() + duration
@@ -95,7 +94,6 @@ pico_p2p_wait_for <- function(p, type, futures = NULL, expires = NULL, duration
9594

9695

9796
#' @importFrom utils file_test
98-
#' @export
9997
pico_p2p_have_future <- function(p, future, duration = getOption("future.p2p.duration.request", 60), from = p$user, ...) {
10098
debug <- isTRUE(getOption("future.p2p.debug"))
10199
if (debug) {
@@ -136,7 +134,6 @@ pico_p2p_have_future <- function(p, future, duration = getOption("future.p2p.dur
136134
pico_send_message_dataframe(p, m)
137135
}
138136

139-
#' @export
140137
pico_p2p_take_on_future <- function(p, to, future, duration = 60, from = p$user, ...) {
141138
debug <- isTRUE(getOption("future.p2p.debug"))
142139
if (debug) {
@@ -165,7 +162,6 @@ pico_p2p_take_on_future <- function(p, to, future, duration = 60, from = p$user,
165162
pico_send_message_dataframe(p, m)
166163
}
167164

168-
#' @export
169165
pico_p2p_send_future <- function(p, future, to, via = via_transfer_uri(), duration = 60, from = p$user, ...) {
170166
debug <- isTRUE(getOption("future.p2p.debug"))
171167
if (debug) {
@@ -215,7 +211,6 @@ pico_p2p_send_future <- function(p, future, to, via = via_transfer_uri(), durati
215211
}
216212

217213

218-
#' @export
219214
pico_p2p_receive_future <- function(p, via, duration = 60) {
220215
debug <- isTRUE(getOption("future.p2p.debug"))
221216
if (debug) {
@@ -244,7 +239,6 @@ pico_p2p_receive_future <- function(p, via, duration = 60) {
244239

245240

246241
#' @importFrom future result
247-
#' @export
248242
pico_p2p_send_result <- function(p, future, via, duration = 60) {
249243
debug <- isTRUE(getOption("future.p2p.debug"))
250244
if (debug) {
@@ -269,7 +263,6 @@ pico_p2p_send_result <- function(p, future, via, duration = 60) {
269263
}
270264

271265

272-
#' @export
273266
pico_p2p_receive_result <- function(p, via, duration = 60, path = tempdir()) {
274267
debug <- isTRUE(getOption("future.p2p.debug"))
275268
if (debug) {
@@ -291,11 +284,100 @@ pico_p2p_receive_result <- function(p, via, duration = 60, path = tempdir()) {
291284
}
292285

293286

294-
#' @export
295287
pico_p2p_hosted_clusters <- function(host = "pipe.pico.sh", ssh_args = NULL, timeout = 10.0) {
296288
clusters <- pico_hosted_channels(host, ssh_args = ssh_args, timeout = timeout)
297289
keep <- grep("/future.p2p$", clusters$name)
298290
clusters <- clusters[keep, ]
299291
clusters$name <- sub("/future.p2p$", "", clusters$name)
300292
clusters
301293
}
294+
295+
296+
297+
#' @importFrom callr r_bg
298+
#' @importFrom utils file_test
299+
pico_p2p_dispatch_future <- function(future) {
300+
send_future <- function(topic, name, host = host, ssh_args = ssh_args, future_id, file, to, via, duration) {
301+
pico <- future.p2p:::pico_pipe(topic, user = name, host = host, ssh_args = ssh_args)
302+
m <- future.p2p:::pico_p2p_hello(pico, type = "client")
303+
304+
## 2. Announce future
305+
repeat {
306+
m1 <- future.p2p:::pico_p2p_have_future(pico, future = file, duration = duration)
307+
m2 <- future.p2p:::pico_p2p_wait_for(pico, type = "offer", futures = m1[["future"]], expires = m1[["expires"]])
308+
if (m2[["type"]] != "expired") break
309+
}
310+
311+
## 3. Send future to workers
312+
worker <- m2[["from"]]
313+
stopifnot(is.character(worker), nzchar(worker))
314+
m3 <- future.p2p:::pico_p2p_send_future(pico, future = file, to = worker, via = via)
315+
316+
## 4. Remove temporary file
317+
file.remove(file)
318+
319+
## 5. Wait for and receive FutureResult file
320+
path <- file.path(dirname(dirname(file)), "results")
321+
tryCatch({
322+
file <- future.p2p:::pico_p2p_receive_result(pico, via = via, path = path)
323+
}, interrupt = function(int) {
324+
cat(file = "foo.log", "interrupted\n")
325+
})
326+
327+
invisible(file)
328+
}
329+
330+
debug <- isTRUE(getOption("future.p2p.debug"))
331+
if (debug) {
332+
mdebug_push("dispatch_future()...")
333+
on.exit(mdebugf_pop())
334+
}
335+
336+
## Get backend
337+
backend <- future[["backend"]]
338+
stopifnot(inherits(backend, "FutureBackend"))
339+
340+
cluster <- backend[["cluster"]]
341+
name <- backend[["name"]]
342+
host <- backend[["host"]]
343+
ssh_args <- backend[["ssh_args"]]
344+
via <- via_transfer_uri()
345+
346+
## 1. Put future on the dispatcher queue
347+
void <- p2p_dir("results")
348+
future[["file"]] <- saveFuture(future, path = p2p_dir("queued"))
349+
350+
if (debug) mdebugf("File: %s", sQuote(future[["file"]]))
351+
352+
## 1. Connect to pico and say hello
353+
cluster_owner <- dirname(cluster)
354+
if (cluster_owner == pico_username()) {
355+
topic <- sprintf("%s/future.p2p", basename(cluster))
356+
} else {
357+
topic <- sprintf("%s/future.p2p", cluster)
358+
}
359+
360+
args <- list(
361+
topic = topic,
362+
name = name,
363+
host = host,
364+
ssh_args = ssh_args,
365+
future_id = future_id(future),
366+
file = future[["file"]],
367+
via = via,
368+
duration = getOption("future.p2p.duration.request", 10.0)
369+
)
370+
if (debug) {
371+
mstr(args)
372+
}
373+
374+
rx <- r_bg(send_future, args = args, supervise = TRUE)
375+
future[["rx"]] <- rx
376+
377+
future[["pico_via"]] <- via
378+
379+
## Update future state
380+
future[["state"]] <- "running"
381+
382+
invisible(future)
383+
}

0 commit comments

Comments
 (0)