Skip to content

Commit f7686cc

Browse files
Migrating towards worker to update worker coordinator via stdout (instead of via file)
1 parent ac1b643 commit f7686cc

File tree

2 files changed

+63
-23
lines changed

2 files changed

+63
-23
lines changed

R/pico_p2p.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ pico_p2p_withdraw_future <- function(p, from = p$user, to, future_id, ...) {
159159
}
160160

161161

162-
pico_p2p_take_on_future <- function(p, to, future, duration = 60, from = p$user, ...) {
162+
pico_p2p_take_on_future <- function(p, to, future, duration = 15, from = p$user, ...) {
163163
debug <- isTRUE(getOption("future.p2p.debug"))
164164
if (debug) {
165165
mdebug_push("pico_p2p_take_on_future() ...")

R/worker.R

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
9191
now <- pico_p2p_time()
9292

9393
expires <- pico_p2p_time(delta = duration)
94-
94+
9595
args <- list(
9696
cluster = cluster,
9797
worker_id = worker_id,
@@ -116,24 +116,18 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
116116
info("launching background worker process")
117117
rx <- r_bg(run_worker, args = args, supervise = TRUE, package = TRUE)
118118
attr(rx, "channels") <- args[["channels"]]
119-
120-
## Wait for worker to launch
121-
info("waiting for worker process to start")
122-
while ("started" %in% rx_worker()) {
123-
Sys.sleep(0.1)
124-
}
125119
info("worker process started")
126120

127121
## Announce we're available
128122
info("announcing to p2p message board we are joining as a worker")
129123
m <- pico_p2p_hello(p, type = "worker", expires = expires)
130124

131-
## The ID of the future we're offering to work on or that is being processed
132-
future <- NULL
133-
client <- NULL
134-
135125
## Main loop monitoring the P2P message board and the background worker
136126
state <- "waiting"
127+
future <- NULL
128+
client <- NULL
129+
offer_expires <- Inf
130+
137131
info("waiting for request")
138132

139133
repeat tryCatch({
@@ -147,18 +141,38 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
147141
## Any updates from worker, e.g. output to be relayed?
148142
res <- poll(list(rx), ms = 100)[[1]]
149143

144+
worker_status <- NULL
145+
150146
## Relay stdout?
151147
if ("ready" %in% res[["output"]]) {
152148
out <- rx$read_output_lines()
149+
is_special <- grepl("^worker_status=", out)
150+
worker_status <- out[is_special]
151+
out <- out[!is_special]
152+
out <- sprintf(" %s", out)
153153
writeLines(out, con = stdout())
154154
}
155155

156156
## Relay stderr?
157157
if ("ready" %in% res[["error"]]) {
158158
err <- rx$read_error_lines()
159+
err <- sprintf(" %s", err)
159160
writeLines(err, con = stderr())
160161
}
161162

163+
## Handle worker status updates
164+
if (length(worker_status) > 0) {
165+
worker_status <- sub("^worker_status=", "", worker_status)
166+
info("Status update received from worker: [n=%d] %s", length(worker_status), commaq(worker_status))
167+
168+
if ("ready" %in% worker_status) {
169+
info("worker process is ready")
170+
}
171+
if ("interrupted" %in% worker_status) {
172+
signalCondition(worker_interrupt())
173+
}
174+
}
175+
162176
if (state == "exit") {
163177
info("Terminating worker")
164178
break
@@ -197,16 +211,29 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
197211
signalCondition(future_withdraw("worker expired; terminating"))
198212
}
199213

214+
if (Sys.time() > offer_expires) {
215+
info("work offer expired")
216+
state <- "waiting"
217+
offer_expires <- Inf
218+
future <- NULL
219+
client <- NULL
220+
## FIXME: Update client via P2P message board
221+
next
222+
}
223+
200224
## Process request?
201225
if (length(m) > 0) {
202226
## Are we read to offer to do work?
203227
if (state == "waiting" && m[["type"]] == "request") {
204228
stop_if_not(is.null(future), is.null(client))
205229
future <- m[["future"]]
206230
client <- m[["from"]]
207-
info("offer to process future %s for client %s", sQuote(future), sQuote(client))
231+
duration <- 5.0
232+
info("offer to process future %s for client %s (valid for %g seconds)", sQuote(future), sQuote(client), duration)
208233
state <- "offer"
209-
pico_p2p_take_on_future(p, to = client, future = future)
234+
## Make a work offer for 5 seconds
235+
m0 <- pico_p2p_take_on_future(p, to = client, future = future, duration = duration)
236+
offer_expires <- m0[["expires"]]
210237
} else if (state == "offer" && future %in% m[["future"]]) {
211238
info("waiting for acceptance of our work offer")
212239
if (m[["type"]] == "accept" && m[["to"]] == worker_id) {
@@ -258,8 +285,9 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
258285
}
259286
} else if (state == "resolved") {
260287
## Check if future results have been transferred
261-
if ("uploaded" %in% info) {
288+
if ("ready" %in% info) {
262289
state <- "waiting"
290+
offer_expires <- Inf
263291
future <- NULL
264292
client <- NULL
265293
info("Future %s has been resolved and results have been sent to client %s", sQuote(future), sQuote(client))
@@ -294,6 +322,7 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
294322
}, worker_interrupt = function(c) {
295323
info("Worker process was interrupted")
296324
state <- "waiting"
325+
offer_expires <- Inf
297326
future <- NULL
298327
client <- NULL
299328
info("waiting for request")
@@ -302,14 +331,15 @@ worker <- function(cluster = p2p_cluster_name(), host = "pipe.pico.sh", ssh_args
302331
info("interrupted")
303332
## Interrupt worker
304333
rx$interrupt()
334+
state <<- "exit"
335+
offer_expires <- Inf
305336
future <- NULL
306337
client <- NULL
307338
## FIXME: Update the P2P message board
308-
state <<- "exit"
309339
info("exiting")
310340
}) ## repeat tryCatch({ ... })
311341

312-
info("Waiting 5 seconds before kill worker process and its children ...")
342+
info("Waiting 5 seconds before killing the worker process and its children ...")
313343
Sys.sleep(5.0)
314344
rx$kill_tree()
315345

@@ -366,11 +396,13 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
366396
} ## rx_parent()
367397

368398
tx_parent <- function(msg, channel = channels[["rx"]]) {
399+
cat(sprintf("worker_status=%s\n", msg), file = stdout())
400+
flush(stdout())
369401
writeLines(msg, con = channel)
370402
} ## tx_parent()
371403

372-
## Tell parent we're alive
373-
tx_parent("started")
404+
## Tell parent that worker is ready
405+
tx_parent("ready")
374406

375407
repeat tryCatch({
376408
## Wait for instructions from parent
@@ -391,7 +423,12 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
391423
nzchar(via), !grepl("[,=]", via)
392424
)
393425
tx_parent("downloading")
394-
res <- pico_p2p_receive_future(p, via = via)
426+
dt <- system.time({
427+
res <- pico_p2p_receive_future(p, via = via)
428+
})
429+
dt <- difftime(dt[3], 0)
430+
info("Future %s received in %s", sQuote(future), format(dt))
431+
395432
f <- res[["future"]]
396433
stop_if_not(paste(f[["uuid"]], collapse = "-") == future)
397434

@@ -404,9 +441,12 @@ run_worker <- function(cluster, worker_id, host, ssh_args, duration, channels) {
404441
tx_parent("resolved")
405442

406443
info("sending future result %s via %s", sQuote(future), sQuote(via))
407-
res <- pico_p2p_send_result(p, future = f, via = via)
408-
tx_parent("uploaded")
409-
info("future result %s sent", sQuote(future), sQuote(via))
444+
dt <- system.time({
445+
res <- pico_p2p_send_result(p, future = f, via = via)
446+
})
447+
dt <- difftime(dt[3], 0)
448+
info("future result %s sent in %s", sQuote(future), format(dt))
449+
tx_parent("ready")
410450
}
411451
}, interrupt = function(c) {
412452
info("interrupted")

0 commit comments

Comments
 (0)