Skip to content

Commit ba1c013

Browse files
Move getFutureBackendConfigs() methods to the corresponding FutureBackend files
1 parent 6afe546 commit ba1c013

6 files changed

+106
-104
lines changed

R/backend_api-ClusterFutureBackend-class.R

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,3 +1040,44 @@ assertValidConnection <- function(future) {
10401040
stop(FutureError(msg, future = future))
10411041
}
10421042
}
1043+
1044+
1045+
1046+
#' @export
1047+
getFutureBackendConfigs.ClusterFuture <- function(future, ..., debug = isTRUE(getOption("future.debug"))) {
1048+
resignalImmediateConditions <- getOption("future.psock.relay.immediate", TRUE)
1049+
if (!resignalImmediateConditions) return(list())
1050+
1051+
conditionClasses <- future[["conditions"]]
1052+
if (is.null(conditionClasses)) return(list())
1053+
1054+
immediateConditionClasses <- attr(conditionClasses, "immediateConditionClasses", exact = TRUE)
1055+
if (is.null(immediateConditionClasses)) {
1056+
immediateConditionClasses <- "immediateCondition"
1057+
} else if (length(immediateConditionClasses) == 0L) {
1058+
return(list())
1059+
}
1060+
1061+
## Does the cluster node communicate with a connection?
1062+
## (if not, it's via MPI)
1063+
workers <- future[["workers"]]
1064+
stop_if_not(inherits(workers, "cluster"))
1065+
## AD HOC/FIXME: Here 'future[["node"]]' is yet not assigned, so we look at
1066+
## the first worker and assume the others are the same. /HB 2019-10-23
1067+
cl <- workers[1L]
1068+
stop_if_not(inherits(cl, "cluster"))
1069+
node <- cl[[1L]]
1070+
stop_if_not(inherits(node, c("SOCK0node", "SOCKnode")))
1071+
con <- node[["con"]]
1072+
if (is.null(con)) return(list())
1073+
1074+
capture <- list(
1075+
immediateConditionHandlers = list(
1076+
immediateCondition = psockImmediateConditionHandler
1077+
)
1078+
)
1079+
1080+
list(
1081+
capture = capture
1082+
)
1083+
}

R/backend_api-Future-class.R

Lines changed: 0 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -847,109 +847,6 @@ getFutureBackendConfigs <- function(future, ...) {
847847
UseMethod("getFutureBackendConfigs")
848848
}
849849

850-
#' @export
851-
getFutureBackendConfigs.Future <- function(future, ...) {
852-
list()
853-
}
854-
855-
#' @export
856-
getFutureBackendConfigs.UniprocessFuture <- function(future, ...) {
857-
conditionClasses <- future[["conditions"]]
858-
if (is.null(conditionClasses)) return(list())
859-
860-
capture <- list(
861-
immediateConditionHandlers = list(
862-
immediateCondition = function(cond) {
863-
signalCondition(cond)
864-
}
865-
)
866-
)
867-
868-
list(
869-
capture = capture
870-
)
871-
}
872-
873-
874-
#' @export
875-
getFutureBackendConfigs.MulticoreFuture <- function(future, ..., debug = isTRUE(getOption("future.debug"))) {
876-
conditionClasses <- future[["conditions"]]
877-
if (is.null(conditionClasses)) {
878-
capture <- list()
879-
} else {
880-
path <- immediateConditionsPath(rootPath = tempdir())
881-
capture <- list(
882-
immediateConditionHandlers = list(
883-
immediateCondition = function(cond) {
884-
fileImmediateConditionHandler(cond, path = path)
885-
}
886-
)
887-
)
888-
}
889-
890-
## Disable multi-threading in futures?
891-
threads <- NA_integer_
892-
multithreading <- getOption("future.fork.multithreading.enable", TRUE)
893-
if (isFALSE(multithreading)) {
894-
if (supports_omp_threads(assert = TRUE, debug = debug)) {
895-
threads <- 1L
896-
if (debug) mdebugf("- Force single-threaded (OpenMP and RcppParallel) processing in %s", class(future)[1])
897-
} else {
898-
warning(FutureWarning("It is not possible to disable OpenMP multi-threading on this systems", future = future))
899-
}
900-
}
901-
902-
context <- list(
903-
threads = threads
904-
)
905-
906-
list(
907-
capture = capture,
908-
context = context
909-
)
910-
}
911-
912-
913-
914-
#' @export
915-
getFutureBackendConfigs.ClusterFuture <- function(future, ..., debug = isTRUE(getOption("future.debug"))) {
916-
resignalImmediateConditions <- getOption("future.psock.relay.immediate", TRUE)
917-
if (!resignalImmediateConditions) return(list())
918-
919-
conditionClasses <- future[["conditions"]]
920-
if (is.null(conditionClasses)) return(list())
921-
922-
immediateConditionClasses <- attr(conditionClasses, "immediateConditionClasses", exact = TRUE)
923-
if (is.null(immediateConditionClasses)) {
924-
immediateConditionClasses <- "immediateCondition"
925-
} else if (length(immediateConditionClasses) == 0L) {
926-
return(list())
927-
}
928-
929-
## Does the cluster node communicate with a connection?
930-
## (if not, it's via MPI)
931-
workers <- future[["workers"]]
932-
stop_if_not(inherits(workers, "cluster"))
933-
## AD HOC/FIXME: Here 'future[["node"]]' is yet not assigned, so we look at
934-
## the first worker and assume the others are the same. /HB 2019-10-23
935-
cl <- workers[1L]
936-
stop_if_not(inherits(cl, "cluster"))
937-
node <- cl[[1L]]
938-
stop_if_not(inherits(node, c("SOCK0node", "SOCKnode")))
939-
con <- node[["con"]]
940-
if (is.null(con)) return(list())
941-
942-
capture <- list(
943-
immediateConditionHandlers = list(
944-
immediateCondition = psockImmediateConditionHandler
945-
)
946-
)
947-
948-
list(
949-
capture = capture
950-
)
951-
}
952-
953850

954851
getFutureData <- function(future, ..., debug = isTRUE(getOption("future.debug"))) {
955852
if (debug) {

R/backend_api-FutureBackend-class.R

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,9 @@ makeFutureBackend <- function(evaluator, ...) {
7070

7171
backend
7272
}
73+
74+
75+
#' @export
76+
getFutureBackendConfigs.Future <- function(future, ...) {
77+
list()
78+
}

R/backend_api-MulticoreFutureBackend-class.R

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,3 +540,42 @@ result.MulticoreFuture <- local({
540540
result
541541
}
542542
})
543+
544+
545+
#' @export
546+
getFutureBackendConfigs.MulticoreFuture <- function(future, ..., debug = isTRUE(getOption("future.debug"))) {
547+
conditionClasses <- future[["conditions"]]
548+
if (is.null(conditionClasses)) {
549+
capture <- list()
550+
} else {
551+
path <- immediateConditionsPath(rootPath = tempdir())
552+
capture <- list(
553+
immediateConditionHandlers = list(
554+
immediateCondition = function(cond) {
555+
fileImmediateConditionHandler(cond, path = path)
556+
}
557+
)
558+
)
559+
}
560+
561+
## Disable multi-threading in futures?
562+
threads <- NA_integer_
563+
multithreading <- getOption("future.fork.multithreading.enable", TRUE)
564+
if (isFALSE(multithreading)) {
565+
if (supports_omp_threads(assert = TRUE, debug = debug)) {
566+
threads <- 1L
567+
if (debug) mdebugf("- Force single-threaded (OpenMP and RcppParallel) processing in %s", class(future)[1])
568+
} else {
569+
warning(FutureWarning("It is not possible to disable OpenMP multi-threading on this systems", future = future))
570+
}
571+
}
572+
573+
context <- list(
574+
threads = threads
575+
)
576+
577+
list(
578+
capture = capture,
579+
context = context
580+
)
581+
}

R/backend_api-SequentialFutureBackend-class.R

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,22 @@ nbrOfFreeWorkers.SequentialFutureBackend <- function(evaluator, background = FAL
100100
assert_no_positional_args_but_first()
101101
if (isTRUE(background)) 0L else 1L
102102
}
103+
104+
105+
#' @export
106+
getFutureBackendConfigs.UniprocessFuture <- function(future, ...) {
107+
conditionClasses <- future[["conditions"]]
108+
if (is.null(conditionClasses)) return(list())
109+
110+
capture <- list(
111+
immediateConditionHandlers = list(
112+
immediateCondition = function(cond) {
113+
signalCondition(cond)
114+
}
115+
)
116+
)
117+
118+
list(
119+
capture = capture
120+
)
121+
}

man/MultiprocessFuture-class.Rd

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)