Skip to content

Commit 89bc64d

Browse files
authored
Launch local daemons from host not dispatcher (#403)
* Launch local daemons from host not dispatcher * Simplify a bit
1 parent 56bd97f commit 89bc64d

File tree

6 files changed

+41
-52
lines changed

6 files changed

+41
-52
lines changed

NEWS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
If the serialization hook function errors or otherwise fails to return a raw vector, this will error out rather than be silently ignored (thanks @dipterix, #378).
1919
* `require_daemons()` arguments are swapped so that `.compute` comes before `call` for ease of use.
2020
Previous usage will work for the time being, although is deprecated and will be defunct in a future version.
21+
* Fixes `daemons(n)` failing to launch local daemons if mirai was installed in a custom user library set by an explicit `.libPaths()` call in '.Rprofile' (thanks @erydit and @dpastoor, #390).
2122
* Enhancements to `everywhere()`:
2223
+ Consecutive `everywhere()` calls are permissible again when using dispatcher (behaviour update in v2.4.1) (#354).
2324
+ No longer has any effect on the RNG stream when using a reproducible `seed` value at `daemons()` (#356).

R/daemons.R

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -621,23 +621,13 @@ args_daemon_disp <- function(url, dots, rs = NULL, tls = NULL) {
621621
shQuote(sprintf("mirai::daemon(\"%s\"%s%s)", url, dots, parse_tls(tls)))
622622
}
623623

624-
args_dispatcher_local <- function(urld, n, dots) {
624+
args_dispatcher <- function(urld, url, n) {
625625
shQuote(sprintf(
626-
".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d%s)",
627-
libp(),
628-
urld,
629-
n,
630-
dots
631-
))
632-
}
633-
634-
args_dispatcher_remote <- function(urld, url, dots) {
635-
shQuote(sprintf(
636-
".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",url=\"%s\"%s)",
626+
".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",url=\"%s\",n=%d)",
637627
libp(),
638628
urld,
639629
url,
640-
dots
630+
n
641631
))
642632
}
643633

@@ -649,15 +639,17 @@ query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, bloc
649639
recv(sock, mode = recv_mode, block = block)
650640
}
651641

652-
launch_dispatcher <- function(arg, dots, envir, serial, tls = NULL, pass = NULL) {
642+
launch_dispatcher <- function(url, dots, envir, serial, tls = NULL, pass = NULL) {
653643
cv <- cv()
654644
urld <- local_url()
655645
sock <- req_socket(urld)
656646
pipe_notify(sock, cv, add = TRUE)
657-
write_args <- if (is.character(arg)) args_dispatcher_remote else args_dispatcher_local
647+
local <- is.numeric(url)
648+
n <- if (local) url else 0L
649+
if (local) url <- local_url()
658650
system2(
659651
.command,
660-
args = c("--default-packages=NULL", "--vanilla", "-e", write_args(urld, arg, dots)),
652+
args = c("--default-packages=NULL", "--vanilla", "-e", args_dispatcher(urld, url, n)),
661653
wait = FALSE
662654
)
663655
if (is.null(serial)) serial <- .[["serial"]]
@@ -672,8 +664,14 @@ launch_dispatcher <- function(arg, dots, envir, serial, tls = NULL, pass = NULL)
672664
message(sprintf(._[["sync_dispatcher"]], sync <- sync + .limit_long_secs))
673665

674666
pipe_notify(sock, NULL, add = TRUE)
675-
req <- request(.context(sock), data, send_mode = 1L, recv_mode = 2L, cv = cv)
676-
667+
send(sock, data, mode = 1L, block = TRUE)
668+
if (local) {
669+
launch_args <- args_daemon_disp(url, dots)
670+
for (i in seq_len(n)) {
671+
launch_daemon(launch_args)
672+
}
673+
}
674+
req <- recv_aio(sock, mode = 2L, cv = cv)
677675
while(!until(cv, .limit_long))
678676
message(sprintf(._[["sync_dispatcher"]], sync <- sync + .limit_long_secs))
679677

R/dispatcher.R

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,28 @@
1616
#' @inheritParams daemons
1717
#' @param host the character URL dispatcher should dial in to, typically an IPC
1818
#' address.
19-
#' @param url (optional) the character URL dispatcher should listen at (and
20-
#' daemons should dial in to), including the port to connect to e.g.
21-
#' 'tcp://hostname:5555' or 'tcp://10.75.32.70:5555'. Specify 'tls+tcp://' to
22-
#' use secure TLS connections.
23-
#' @param n (optional) if specified, the integer number of daemons to launch. In
24-
#' this case, a local url is automatically generated.
19+
#' @param url the character URL dispatcher should listen at (and daemons should
20+
#' dial in to), including the port to connect to e.g. tcp://hostname:5555' or
21+
#' 'tcp://10.75.32.70:5555'. Specify 'tls+tcp://' to use secure TLS
22+
#' connections.
23+
#' @param n \[default 0L\] if specified, the integer number of daemons to be
24+
#' launched locally by the host process.
2525
#'
2626
#' @return Invisible NULL.
2727
#'
2828
#' @export
2929
#'
30-
dispatcher <- function(host, url = NULL, n = NULL, ...) {
31-
n <- if (is.numeric(n)) as.integer(n) else length(url)
32-
n > 0L || stop(._[["missing_url"]])
33-
30+
dispatcher <- function(host, url = NULL, n = 0L, ...) {
3431
cv <- cv()
3532
sock <- socket("rep")
3633
on.exit(reap(sock))
3734
pipe_notify(sock, cv, remove = TRUE, flag = flag_value())
35+
36+
psock <- socket("poly")
37+
on.exit(reap(psock), add = TRUE, after = TRUE)
38+
m <- monitor(psock, cv)
39+
n && listen(psock, url = url, fail = 2L)
40+
3841
dial_sync_socket(sock, host)
3942

4043
raio <- recv_aio(sock, mode = 1L, cv = cv)
@@ -47,10 +50,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ...) {
4750
}
4851

4952
tls <- NULL
50-
auto <- is.null(url)
51-
if (auto) {
52-
url <- local_url()
53-
} else {
53+
if (!n) {
5454
if (is.character(res[[2L]])) {
5555
tls <- res[[2L]]
5656
pass <- res[[3L]]
@@ -61,21 +61,13 @@ dispatcher <- function(host, url = NULL, n = NULL, ...) {
6161
serial <- res[[4L]]
6262
res <- res[[5L]]
6363

64-
psock <- socket("poly")
65-
on.exit(reap(psock), add = TRUE, after = TRUE)
66-
m <- monitor(psock, cv)
67-
listen(psock, url = url, tls = tls, fail = 2L)
68-
6964
inq <- outq <- list()
7065
events <- integer()
7166
count <- 0L
7267
envir <- new.env(hash = FALSE, parent = emptyenv())
7368
`[[<-`(envir, "stream", res)
74-
if (auto) {
75-
dots <- parse_dots(...)
76-
for (i in seq_len(n)) {
77-
launch_daemon(args_daemon_disp(url, dots))
78-
}
69+
70+
if (n) {
7971
for (i in seq_len(n))
8072
while(!until(cv, .limit_long))
8173
cv_signal(cv) || wait(cv) || return()
@@ -88,13 +80,13 @@ dispatcher <- function(host, url = NULL, n = NULL, ...) {
8880
}
8981
}
9082
} else {
83+
listen(psock, url = url, tls = tls, fail = 2L)
9184
listener <- attr(psock, "listener")[[1L]]
9285
url <- opt(listener, "url")
9386
if (parse_url(url)[["port"]] == "0") {
9487
url <- sub_real_port(opt(listener, "tcp-bound-port"), url)
9588
}
9689
}
97-
9890
send(sock, c(Sys.getpid(), url), mode = 2L, block = TRUE)
9991

10092
ctx <- .context(sock)

R/mirai-package.R

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
function_required = "`.f` must be of type function, not %s",
7575
localhost = "SSH tunnelling requires daemons `url` hostname to be `127.0.0.1`",
7676
missing_expression = "missing expression, perhaps wrap in {}?",
77-
missing_url = "`n` must be 1 or greater, or else `url` must be supplied",
7877
named_args = "all items in `.args` must be named, unless supplying an environment",
7978
named_dots = "all `...` arguments must be named, unless supplying an environment",
8079
n_one = "`n` must be 1 or greater",

man/dispatcher.Rd

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/tests.R

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ test_error(daemons(-1), "zero or greater")
2828
test_error(daemons(raw(0L)), "must be numeric")
2929
test_error(daemons(1, dispatcher = ""))
3030
test_error(daemons(url = local_url(), dispatcher = NA))
31-
test_error(dispatcher(client = "URL"), "must be 1 or greater")
3231
test_error(daemon("URL"), "Invalid argument")
3332
test_error(launch_local(1L), "daemons must be set")
3433
test_true(!daemons_set())

0 commit comments

Comments
 (0)