Skip to content

Commit c71927f

Browse files
BUG FIX: future() argument 'conditions' was not used when loading packages [#789]
1 parent 062c6fd commit c71927f

File tree

3 files changed

+139
-135
lines changed

3 files changed

+139
-135
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: future
2-
Version: 1.49.0-9025
2+
Version: 1.49.0-9027
33
Title: Unified Parallel and Distributed Processing in R for Everyone
44
Depends:
55
R (>= 3.2.0)

NEWS.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@
2222
worker causes warnings to be escalated immediately to errors on the
2323
worker, which therefore also terminates the future.
2424

25+
* `future()` argument `conditions` was not used when loading and
26+
attaching packages specified via argument `packages`, which
27+
prevented us from excluding, for instance,
28+
`packageStartupMessage`:s, causing them to be displayed in
29+
sequential and multicore processing.
30+
2531
* Now the ClusterFutureBackend tries even harder to shut down
2632
parallel cluster workers when shutting down the backend. If it
2733
fails to communicate with one or more of the parallel workers, it

R/backend_api-evalFuture.R

Lines changed: 132 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,123 @@ evalFuture <- function(
411411

412412

413413
evalFutureInternal <- function(data) {
414+
onEvalCondition <- function(cond) {
415+
is_error <- inherits(cond, "error")
416+
if (is_error) {
417+
## Disable timeouts as soon as possible, in case there is a
418+
## timeout set by the future expression, which triggered
419+
## this error
420+
setTimeLimit(cpu = Inf, elapsed = Inf, transient = FALSE)
421+
}
422+
423+
## Handle immediately?
424+
if (length(immediateConditionHandlers) > 0) {
425+
## Handle immediateCondition:s?
426+
idxs <- inherits(cond, names(immediateConditionHandlers), which = TRUE)
427+
428+
if (length(idxs) > 0 && !identical(idxs, 0L)) {
429+
class <- class(cond)[idxs[[1]]]
430+
431+
handler <- immediateConditionHandlers[[class]]
432+
record <- handler(cond)
433+
434+
## Record condition?
435+
if (isTRUE(record)) {
436+
...future.conditions[[length(...future.conditions) + 1L]] <<- list(
437+
condition = cond,
438+
signaled = 1L
439+
)
440+
}
441+
442+
## Avoid condition from being signaled more than once
443+
muffleCondition(cond)
444+
445+
return()
446+
}
447+
}
448+
449+
## Ignore condition?
450+
ignore <- !is_error &&
451+
!is.null(conditionClassesExclude) &&
452+
inherits(cond, conditionClassesExclude)
453+
454+
## Handle error:s specially
455+
if (is_error) {
456+
sessionInformation <- function() {
457+
list(
458+
r = R.Version(),
459+
locale = Sys.getlocale(),
460+
rngkind = RNGkind(),
461+
namespaces = loadedNamespaces(),
462+
search = search(),
463+
system = Sys.info()
464+
)
465+
}
466+
467+
sysCalls <- getSysCalls()
468+
469+
## Record condition
470+
...future.conditions[[length(...future.conditions) + 1L]] <<- list(
471+
condition = cond,
472+
calls = c(sysCalls(from = ...future.frame), cond[["call"]]),
473+
session = sessionInformation(),
474+
timestamp = Sys.time(),
475+
signaled = 0L
476+
)
477+
478+
signalCondition(cond)
479+
} else if (!ignore &&
480+
!is.null(conditionClasses) &&
481+
inherits(cond, conditionClasses)
482+
) {
483+
484+
## SPECIAL CASE: If a warnings and option 'warn' is >= 2 on the
485+
## worker, then let it escalate to an error here on the worker
486+
if (inherits(cond, "warning") && getOption("warn") >= 2L) {
487+
return()
488+
}
489+
490+
## Relay 'immediateCondition' conditions immediately?
491+
## If so, then do not muffle it and flag it as signaled
492+
## already here.
493+
signal <- inherits(cond, immediateConditionClasses)
494+
## Record condition
495+
...future.conditions[[length(...future.conditions) + 1L]] <<- list(
496+
condition = cond,
497+
signaled = as.integer(signal)
498+
)
499+
if (length(immediateConditionClasses) > 0 && !split && !signal) {
500+
muffleCondition(cond, pattern = muffleInclude)
501+
}
502+
} else {
503+
if (!split && !is.null(conditionClasses)) {
504+
## SPECIAL CASE: If a warnings and option 'warn' is >= 2 on the
505+
## worker, then let it escalate to an error here on the worker
506+
if (inherits(cond, "warning") && getOption("warn") >= 2L) {
507+
return()
508+
}
509+
510+
## Muffle all non-captured conditions
511+
muffleCondition(cond, pattern = muffleInclude)
512+
}
513+
}
514+
} ## onEvalCondition()
515+
516+
onEvalErrorOrInterrupt <- function(ex) {
517+
FutureResult(
518+
conditions = ...future.conditions,
519+
rng = !identical(globalenv()[[".Random.seed"]], ...future.rng),
520+
uuid = uuid,
521+
misuseGlobalEnv = if (checkGlobalenv) list(added = diff_globalenv(...future.globalenv.names)) else NULL,
522+
misuseConnections = diff_connections(get_connections(details = isTRUE(attr(checkConnections, "details", exact = TRUE))), ...future.connections),
523+
misuseDevices = if (checkDevices) diff_devices(base::.Devices, ...future.devices) else NULL,
524+
misuseDefaultDevice = ...future.option.defaultDevice,
525+
started = ...future.startTime
526+
)
527+
} ## onEvalErrorOrInterrupt()
528+
529+
530+
414531
debug <- FALSE
415532

416533
core <- data[["core"]]
@@ -500,11 +617,20 @@ evalFutureInternal <- function(data) {
500617
## Start time for future evaluation
501618
...future.startTime <- Sys.time()
502619

620+
conditionClassesExclude <- attr(conditionClasses, "exclude", exact = TRUE)
621+
muffleInclude <- attr(conditionClasses, "muffleInclude", exact = TRUE)
622+
if (is.null(muffleInclude)) muffleInclude <- "^muffle"
623+
624+
...future.frame <- sys.nframe()
625+
...future.conditions <- list()
626+
503627

504628
## -----------------------------------------------------------------
505629
## Load and attached backend packages
506630
## -----------------------------------------------------------------
507-
attachPackages(backendPackages)
631+
withCallingHandlers({
632+
attachPackages(backendPackages)
633+
}, condition = onEvalCondition)
508634

509635

510636
## -----------------------------------------------------------------
@@ -517,7 +643,9 @@ evalFutureInternal <- function(data) {
517643
...future.mc.cores.old <- getOption("mc.cores")
518644

519645
## Load and attached packages
520-
attachPackages(packages)
646+
withCallingHandlers({
647+
attachPackages(packages)
648+
}, condition = onEvalCondition)
521649

522650
## Note, we record R options and environment variables _after_
523651
## loading and attaching packages, in case they set options/env vars
@@ -950,13 +1078,6 @@ evalFutureInternal <- function(data) {
9501078
}
9511079

9521080

953-
conditionClassesExclude <- attr(conditionClasses, "exclude", exact = TRUE)
954-
muffleInclude <- attr(conditionClasses, "muffleInclude", exact = TRUE)
955-
if (is.null(muffleInclude)) muffleInclude <- "^muffle"
956-
957-
...future.frame <- sys.nframe()
958-
...future.conditions <- list()
959-
9601081
## NOTE: We don't want to use local(body) w/ on.exit() because
9611082
## evaluation in a local is optional, cf. argument 'local'.
9621083
## If this was mandatory, we could. Instead we use
@@ -979,136 +1100,13 @@ evalFutureInternal <- function(data) {
9791100
misuseDefaultDevice = ...future.option.defaultDevice,
9801101
started = ...future.startTime
9811102
)
982-
}, condition = function(cond) {
983-
is_error <- inherits(cond, "error")
984-
if (is_error) {
985-
## Disable timeouts as soon as possible, in case there is a
986-
## timeout set by the future expression, which triggered
987-
## this error
988-
setTimeLimit(cpu = Inf, elapsed = Inf, transient = FALSE)
989-
}
990-
991-
## Handle immediately?
992-
if (length(immediateConditionHandlers) > 0) {
993-
## Handle immediateCondition:s?
994-
idxs <- inherits(cond, names(immediateConditionHandlers), which = TRUE)
995-
996-
if (length(idxs) > 0 && !identical(idxs, 0L)) {
997-
class <- class(cond)[idxs[[1]]]
998-
999-
handler <- immediateConditionHandlers[[class]]
1000-
record <- handler(cond)
1001-
1002-
## Record condition?
1003-
if (isTRUE(record)) {
1004-
...future.conditions[[length(...future.conditions) + 1L]] <<- list(
1005-
condition = cond,
1006-
signaled = 1L
1007-
)
1008-
}
1009-
1010-
## Avoid condition from being signaled more than once
1011-
muffleCondition(cond)
1012-
1013-
return()
1014-
}
1015-
}
1016-
1017-
## Ignore condition?
1018-
ignore <- !is_error &&
1019-
!is.null(conditionClassesExclude) &&
1020-
inherits(cond, conditionClassesExclude)
1021-
1022-
## Handle error:s specially
1023-
if (is_error) {
1024-
sessionInformation <- function() {
1025-
list(
1026-
r = R.Version(),
1027-
locale = Sys.getlocale(),
1028-
rngkind = RNGkind(),
1029-
namespaces = loadedNamespaces(),
1030-
search = search(),
1031-
system = Sys.info()
1032-
)
1033-
}
1034-
1035-
sysCalls <- getSysCalls()
1036-
1037-
## Record condition
1038-
...future.conditions[[length(...future.conditions) + 1L]] <<- list(
1039-
condition = cond,
1040-
calls = c(sysCalls(from = ...future.frame), cond[["call"]]),
1041-
session = sessionInformation(),
1042-
timestamp = Sys.time(),
1043-
signaled = 0L
1044-
)
1045-
1046-
signalCondition(cond)
1047-
} else if (!ignore &&
1048-
!is.null(conditionClasses) &&
1049-
inherits(cond, conditionClasses)
1050-
) {
1051-
1052-
## SPECIAL CASE: If a warnings and option 'warn' is >= 2 on the
1053-
## worker, then let it escalate to an error here on the worker
1054-
if (inherits(cond, "warning") && getOption("warn") >= 2L) {
1055-
return()
1056-
}
1057-
1058-
## Relay 'immediateCondition' conditions immediately?
1059-
## If so, then do not muffle it and flag it as signaled
1060-
## already here.
1061-
signal <- inherits(cond, immediateConditionClasses)
1062-
## Record condition
1063-
...future.conditions[[length(...future.conditions) + 1L]] <<- list(
1064-
condition = cond,
1065-
signaled = as.integer(signal)
1066-
)
1067-
if (length(immediateConditionClasses) > 0 && !split && !signal) {
1068-
muffleCondition(cond, pattern = muffleInclude)
1069-
}
1070-
} else {
1071-
if (!split && !is.null(conditionClasses)) {
1072-
## SPECIAL CASE: If a warnings and option 'warn' is >= 2 on the
1073-
## worker, then let it escalate to an error here on the worker
1074-
if (inherits(cond, "warning") && getOption("warn") >= 2L) {
1075-
return()
1076-
}
1077-
1078-
## Muffle all non-captured conditions
1079-
muffleCondition(cond, pattern = muffleInclude)
1080-
}
1081-
}
1082-
} ## function(cond)
1083-
) ## withCallingHandlers()
1103+
}, condition = onEvalCondition) ## withCallingHandlers()
10841104
}, finally = {
10851105
## Disable timeouts as soon as possible, in case there is a
10861106
## timeout set by the future expression
10871107
setTimeLimit(cpu = Inf, elapsed = Inf, transient = FALSE)
10881108
}) ## tryCatch() for future evaluation
1089-
}, interrupt = function(ex) {
1090-
FutureResult(
1091-
conditions = ...future.conditions,
1092-
rng = !identical(globalenv()[[".Random.seed"]], ...future.rng),
1093-
uuid = uuid,
1094-
misuseGlobalEnv = if (checkGlobalenv) list(added = diff_globalenv(...future.globalenv.names)) else NULL,
1095-
misuseConnections = diff_connections(get_connections(details = isTRUE(attr(checkConnections, "details", exact = TRUE))), ...future.connections),
1096-
misuseDevices = if (checkDevices) diff_devices(base::.Devices, ...future.devices) else NULL,
1097-
misuseDefaultDevice = ...future.option.defaultDevice,
1098-
started = ...future.startTime
1099-
)
1100-
}, error = function(ex) {
1101-
FutureResult(
1102-
conditions = ...future.conditions,
1103-
rng = !identical(globalenv()[[".Random.seed"]], ...future.rng),
1104-
uuid = uuid,
1105-
misuseGlobalEnv = if (checkGlobalenv) list(added = diff_globalenv(...future.globalenv.names)) else NULL,
1106-
misuseConnections = diff_connections(get_connections(details = isTRUE(attr(checkConnections, "details", exact = TRUE))), ...future.connections),
1107-
misuseDevices = if (checkDevices) diff_devices(base::.Devices, ...future.devices) else NULL,
1108-
misuseDefaultDevice = ...future.option.defaultDevice,
1109-
started = ...future.startTime
1110-
)
1111-
}) ## output tryCatch()
1109+
}, interrupt = onEvalErrorOrInterrupt, error = onEvalErrorOrInterrupt) ## output tryCatch()
11121110

11131111

11141112
## -----------------------------------------------------------------

0 commit comments

Comments
 (0)