Skip to content

Commit 9f87a5a

Browse files
PERFORMANCE: Pre-validation of cluster futures had a thinko, causing it to take 0.1-0.2 seconds for each future being launched
1 parent ec2446b commit 9f87a5a

File tree

3 files changed

+91
-84
lines changed

3 files changed

+91
-84
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future
2-
Version: 1.58.0-9003
2+
Version: 1.58.0-9004
33
Title: Unified Parallel and Distributed Processing in R for Everyone
44
Depends:
55
R (>= 3.2.0)

NEWS.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## Performance
44

5+
* The pre-validation of the cluster worker allotted to a future when
6+
launched was unnecessarily expensive due to a thinko, e.g. it would
7+
take ~0.1-0.2 seconds for a multisession future, whereas after the
8+
fix it is effectly 0.0 seconds.
9+
510
* Calling `resolved()` on a lazy `ClusterFuture` would collect the
611
result for the first _resolved_ future in order to free up one
712
worker slot. Now this is only done if all slots are occupied. The

R/backend_api-11.ClusterFutureBackend-class.R

Lines changed: 85 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,7 +1025,7 @@ receiveMessageFromWorker <- local({
10251025
## to backend[["workers"]], before returning.
10261026
#' @importFrom parallelly isConnectionValid isNodeAlive cloneNode
10271027
#' @importFrom parallel clusterCall
1028-
requestNode <- function(await, backend, timeout, delta, alpha) {
1028+
requestNode <- function(await, backend, timeout, delta, alpha, validateWorker = TRUE) {
10291029
debug <- isTRUE(getOption("future.debug"))
10301030
if (debug) {
10311031
mdebug_push("requestNode() ...")
@@ -1113,96 +1113,99 @@ requestNode <- function(await, backend, timeout, delta, alpha) {
11131113
stop_if_not(is.numeric(node_idx), is.finite(node_idx), node_idx >= 1, node_idx <= length(workers))
11141114
if (debug) mdebugf("Index of first available worker: %d", node_idx)
11151115

1116-
## Validate that the cluster node is working
1117-
if (debug) mdebug_push("Validate that the worker is functional ...")
1118-
cl <- workers[node_idx]
1119-
stop_if_not(length(cl) == 1L, inherits(cl, "cluster"))
1120-
1121-
maxTries <- 3L
1122-
for (kk in maxTries:1) {
1123-
okay <- TRUE
1116+
## Validate that the cluster node is working - if not, relaunch it
1117+
if (validateWorker) {
1118+
if (debug) mdebug_push("Validate that the worker is functional ...")
1119+
cl <- workers[node_idx]
1120+
stop_if_not(length(cl) == 1L, inherits(cl, "cluster"))
1121+
11241122
truth <- "future:::requestNode() validation call"
1125-
res <- tryCatch({
1126-
suppressWarnings({
1127-
clusterCall(cl = cl, function(x) x, truth)[[1]]
1128-
})
1129-
}, error = identity)
11301123

1131-
## If not working, investigate why, and relaunch a new worker
1132-
if (inherits(res, "error") || !identical(res, truth)) {
1133-
if (debug) {
1134-
mdebug("Worker is non-functional")
1135-
if (inherits(res, "error")) {
1136-
mdebug("Error received: ", conditionMessage(res))
1137-
} else {
1138-
mdebug("Result received: ", sQuote(res))
1124+
maxTries <- 3L
1125+
for (kk in maxTries:1) {
1126+
okay <- TRUE
1127+
res <- tryCatch({
1128+
suppressWarnings({
1129+
clusterCall(cl = cl, identity, truth)[[1]]
1130+
})
1131+
}, error = identity)
1132+
1133+
## If not working, investigate why, and relaunch a new worker
1134+
if (inherits(res, "error") || !identical(res, truth)) {
1135+
if (debug) {
1136+
mdebug("Worker is non-functional")
1137+
if (inherits(res, "error")) {
1138+
mdebug("Error received: ", conditionMessage(res))
1139+
} else {
1140+
mdebug("Result received: ", sQuote(res))
1141+
}
11391142
}
1140-
}
1141-
okay <- FALSE
1142-
1143-
## Is the connection working?
1144-
node <- cl[[1]]
1145-
con <- node[["con"]]
1146-
connectionOkay <- NA
1147-
if (inherits(con, "connection")) {
1148-
connectionOkay <- isConnectionValid(con)
1149-
if (debug) mdebug("Connection is valid: ", connectionOkay)
1150-
}
1151-
1152-
if (is.na(connectionOkay) || connectionOkay) {
1153-
## If the node does not use a connection, or the connection is working,
1154-
## we can only assume the worker is also alive. If so, we should try to
1155-
## kill the worker.
1156-
res <- suppressWarnings(killNode(node))
1157-
if (debug) mdebugf("Killed %s: %s", class(node)[1], res)
1158-
} else {
1159-
## If connection is not working, we could assume the worker is no longer
1160-
## alive, but it could also be a network issues. In either case, we
1161-
## should try to kill it, just in case.
1162-
res <- suppressWarnings(killNode(node))
1163-
if (debug) mdebugf("Killed %s: %s", class(node)[1], res)
1164-
}
1165-
if (kk == 1L) {
1166-
stop(FutureError(sprintf("Failed to find a functional cluster worker, after attempting to relaunch the parallel worker %d times", maxTries)))
1167-
}
1168-
} else {
1169-
if (debug) mdebug("Worker is functional")
1170-
break
1171-
}
1143+
okay <- FALSE
11721144

1173-
## Relaunch worker?
1174-
if (!okay) {
1175-
if (debug) mdebugf_push("Restarting non-alive cluster node %d ...", node_idx)
1176-
node2 <- tryCatch({
1177-
cloneNode(node)
1178-
}, error = identity)
1179-
if (inherits(node2, "error")) {
1180-
msg <- sprintf("One of the future workers of class %s, part of a cluster of class %s, was interrupted and attempts to relaunch it failed", sQuote(class(node)[1]), sQuote(class(cl)[1]))
1181-
if (inherits(node, c("SOCKnode", "SOCK0node")) &&
1182-
!inherits(node, c("RichSOCKnode"))) {
1183-
msg <- sprintf("%s. If you created your cluster with parallel::makeCluster(), try with parallelly::makeClusterPSOCK() instead", msg)
1145+
## Is the connection working?
1146+
node <- cl[[1]]
1147+
con <- node[["con"]]
1148+
connectionOkay <- NA
1149+
if (inherits(con, "connection")) {
1150+
connectionOkay <- isConnectionValid(con)
1151+
if (debug) mdebug("Connection is valid: ", connectionOkay)
1152+
}
1153+
1154+
if (is.na(connectionOkay) || connectionOkay) {
1155+
## If the node does not use a connection, or the connection is working,
1156+
## we can only assume the worker is also alive. If so, we should try to
1157+
## kill the worker.
1158+
res <- suppressWarnings(killNode(node))
1159+
if (debug) mdebugf("Killed %s: %s", class(node)[1], res)
1160+
} else {
1161+
## If connection is not working, we could assume the worker is no longer
1162+
## alive, but it could also be a network issues. In either case, we
1163+
## should try to kill it, just in case.
1164+
res <- suppressWarnings(killNode(node))
1165+
if (debug) mdebugf("Killed %s: %s", class(node)[1], res)
1166+
}
1167+
if (kk == 1L) {
1168+
stop(FutureError(sprintf("Failed to find a functional cluster worker, after attempting to relaunch the parallel worker %d times", maxTries)))
11841169
}
1185-
msg <- sprintf("%s. The reported reason was: %s", msg, conditionMessage(node2))
1186-
stop(FutureError(msg))
11871170
} else {
1188-
node <- node2
1171+
if (debug) mdebug("Worker is functional")
1172+
break
11891173
}
11901174

1191-
cl[[1]] <- node
1192-
1193-
workers[[node_idx]] <- node
1194-
backend[["workers"]] <- workers
1195-
1196-
if (debug) {
1197-
mdebug("Re-launched cluster node:")
1198-
mprint(node)
1199-
mdebugf_pop()
1175+
## Relaunch worker?
1176+
if (!okay) {
1177+
if (debug) mdebugf_push("Restarting non-alive cluster node %d ...", node_idx)
1178+
node2 <- tryCatch({
1179+
cloneNode(node)
1180+
}, error = identity)
1181+
if (inherits(node2, "error")) {
1182+
msg <- sprintf("One of the future workers of class %s, part of a cluster of class %s, was interrupted and attempts to relaunch it failed", sQuote(class(node)[1]), sQuote(class(cl)[1]))
1183+
if (inherits(node, c("SOCKnode", "SOCK0node")) &&
1184+
!inherits(node, c("RichSOCKnode"))) {
1185+
msg <- sprintf("%s. If you created your cluster with parallel::makeCluster(), try with parallelly::makeClusterPSOCK() instead", msg)
1186+
}
1187+
msg <- sprintf("%s. The reported reason was: %s", msg, conditionMessage(node2))
1188+
stop(FutureError(msg))
1189+
} else {
1190+
node <- node2
1191+
}
1192+
1193+
cl[[1]] <- node
1194+
1195+
workers[[node_idx]] <- node
1196+
backend[["workers"]] <- workers
1197+
1198+
if (debug) {
1199+
mdebug("Re-launched cluster node:")
1200+
mprint(node)
1201+
mdebugf_pop()
1202+
}
12001203
}
1201-
}
1202-
1203-
## Try again
1204-
Sys.sleep(0.1)
1205-
} ## for (kk in maxTries:1)
1204+
1205+
## Try again
1206+
Sys.sleep(0.1)
1207+
} ## for (kk in maxTries:1)
1208+
} ## if (validateWorker)
12061209

12071210
## Assert that there is no other registered future that is using
12081211
## the found node
@@ -1214,7 +1217,6 @@ requestNode <- function(await, backend, timeout, delta, alpha) {
12141217
}
12151218
}
12161219

1217-
12181220
if (debug) mdebug_pop()
12191221

12201222
node_idx

0 commit comments

Comments
 (0)