Skip to content

Commit 13ddb9b

Browse files
Run worker in a background callr process; this will eventually allow us to listen to the message board for interrupts etc.
1 parent 5a2a379 commit 13ddb9b

File tree

3 files changed

+56
-5
lines changed

3 files changed

+56
-5
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: future.p2p
22
Title: A Peer-to-Peer Compute Cluster via Futureverse
3-
Version: 0.3.0-9008
3+
Version: 0.3.0-9009
44
Description: Implementation of the 'Future' API <doi:10.32614/RJ-2021-048> that resolves futures on a peer-to-peer ('P2P') compute environment. By using this future backend, you and your friends can share your spare compute resources with each other.
55
Imports:
66
future (>= 1.67.0),

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ importFrom(future,resolved)
3535
importFrom(future,result)
3636
importFrom(future,run)
3737
importFrom(future,sequential)
38+
importFrom(processx,poll)
3839
importFrom(processx,process)
3940
importFrom(utils,capture.output)
4041
importFrom(utils,download.file)

R/worker.R

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#' to with a single CPU core to prevent nested parallelization.
1616
#'
1717
#' @importFrom future resolve plan sequential
18+
#' @importFrom processx poll
1819
#' @export
1920
worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args = NULL, duration = 60*60) {
2021
parts <- strsplit(cluster, split = "/", fixed = TRUE)[[1]]
@@ -34,12 +35,61 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
3435
info("install wormhole-william, if missing")
3536
bin <- find_wormhole()
3637

37-
run_worker(cluster = cluster, worker_id = p2p_worker_id(), host = host, ssh_args = ssh_args, duration = duration)
38+
channel_prefix <- sprintf("%s_%s", .packageName, session_uuid())
39+
channels <- c(
40+
tx = tempfile(pattern = channel_prefix, fileext = ".tx"),
41+
rx = tempfile(pattern = channel_prefix, fileext = ".rx")
42+
)
43+
lapply(channels, FUN = file.create, showWarnings = FALSE)
44+
on.exit({
45+
lapply(channels, FUN = file.remove, showWarnings = FALSE)
46+
})
47+
48+
args <- list(
49+
cluster = cluster,
50+
worker_id = p2p_worker_id(),
51+
host = host,
52+
ssh_args = ssh_args,
53+
duration = duration,
54+
channels = channels
55+
)
56+
57+
info("launching worker")
58+
rx <- r_bg(run_worker, args = args, supervise = TRUE, package = TRUE)
59+
attr(rx, "channels") <- args[["channels"]]
60+
61+
## Relay output from the worker process
62+
while (rx$is_alive()) {
63+
res <- poll(list(rx), ms = -1)[[1]]
64+
65+
## Relay stdout?
66+
if ("ready" %in% res[["output"]]) {
67+
out <- rx$read_output_lines()
68+
writeLines(out, con = stdout())
69+
}
70+
71+
## Relay stderr?
72+
if ("ready" %in% res[["error"]]) {
73+
err <- rx$read_error_lines()
74+
writeLines(err, con = stderr())
75+
}
76+
}
77+
78+
info("wait for worker process to terminate")
79+
rx$wait()
80+
81+
info("get worker process result")
82+
results <- rx$get_result()
83+
84+
info("finalize worker process")
85+
rx$finalize()
86+
87+
invisible(result)
3888
} ## worker()
3989

4090

4191
#' @importFrom future plan
42-
run_worker <- function(cluster, worker_id, host, ssh_args, duration) {
92+
run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
4393
old_opts <- options(parallelly.availableCores.fallback = 1L)
4494
on.exit(options(old_opts))
4595
with(plan(sequential), local = TRUE)
@@ -74,7 +124,7 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration) {
74124

75125
info("hello")
76126
m <- pico_p2p_hello(p, type = "worker", expires = expires)
77-
127+
78128
info("wait for request")
79129
m <- pico_p2p_wait_for(p, type = "request", expires = expires)
80130
if (m[["type"]] == "expired") {
@@ -93,7 +143,7 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration) {
93143
info("future request expired")
94144
next
95145
}
96-
146+
97147
uri <- parse_transfer_uri(m[["via"]])
98148
if (!uri[["protocol"]] %in% supported_transfer_protocols()) {
99149
info("non-supported protocol")

0 commit comments

Comments
 (0)