|
| 1 | +#' @tags MiraiFutureBackend |
| 2 | +#' @tags detritus-files |
| 3 | +#' @tags mirai_cluster mirai_multisession |
| 4 | + |
| 5 | +library(future) |
| 6 | +library(future.mirai) |
| 7 | + |
| 8 | +options(future.mirai.debug = TRUE) |
| 9 | +options(future.debug = TRUE) |
| 10 | + |
| 11 | +message("*** MiraiFutureBackend ...") |
| 12 | + |
| 13 | + |
| 14 | +## --------------------------------------------------------- |
| 15 | +## mirai_version() |
| 16 | +## --------------------------------------------------------- |
| 17 | +message("- mirai_version() ...") |
| 18 | + |
| 19 | +v <- mirai_version() |
| 20 | +stopifnot(inherits(v, "package_version")) |
| 21 | +message(" mirai version: ", v) |
| 22 | + |
| 23 | +## Second call should use cached value |
| 24 | +v2 <- mirai_version() |
| 25 | +stopifnot(identical(v, v2)) |
| 26 | + |
| 27 | +message("- mirai_version() ... DONE") |
| 28 | + |
| 29 | + |
| 30 | +## --------------------------------------------------------- |
| 31 | +## get_mirai_daemons() and mirai_daemons_nworkers() |
| 32 | +## --------------------------------------------------------- |
| 33 | +message("- get_mirai_daemons() and mirai_daemons_nworkers() ...") |
| 34 | + |
| 35 | +## Setup daemons |
| 36 | +mirai::daemons(2) |
| 37 | + |
| 38 | +daemons <- get_mirai_daemons() |
| 39 | +message(" daemons class: ", class(daemons)[1]) |
| 40 | +stopifnot(is.data.frame(daemons) || is.numeric(daemons)) |
| 41 | + |
| 42 | +nworkers <- mirai_daemons_nworkers() |
| 43 | +message(" nworkers: ", nworkers) |
| 44 | +stopifnot(is.numeric(nworkers), nworkers >= 1) |
| 45 | + |
| 46 | +mirai::daemons(0) |
| 47 | + |
| 48 | +message("- get_mirai_daemons() and mirai_daemons_nworkers() ... DONE") |
| 49 | + |
| 50 | + |
| 51 | +## --------------------------------------------------------- |
| 52 | +## tweak.mirai_cluster() |
| 53 | +## --------------------------------------------------------- |
| 54 | +message("- tweak.mirai_cluster() ...") |
| 55 | + |
| 56 | +plan(future.mirai::mirai_multisession, workers = 2) |
| 57 | +stopifnot(nbrOfWorkers() == 2L) |
| 58 | + |
| 59 | +## Create a future to verify backend works after tweak |
| 60 | +f <- future(42) |
| 61 | +v <- value(f) |
| 62 | +stopifnot(v == 42) |
| 63 | + |
| 64 | +r <- resolved(f) |
| 65 | +message("Resolved: ", r) |
| 66 | + |
| 67 | +r <- result(f) |
| 68 | +print(r) |
| 69 | + |
| 70 | +plan(sequential) |
| 71 | + |
| 72 | +message("- tweak.mirai_cluster() ... DONE") |
| 73 | + |
| 74 | + |
| 75 | +## --------------------------------------------------------- |
| 76 | +## Global variables that clash with mirai::mirai() formals |
| 77 | +## --------------------------------------------------------- |
| 78 | +message("- globals that clash with mirai::mirai() formals ...") |
| 79 | + |
| 80 | +plan(future.mirai::mirai_multisession, workers = 2) |
| 81 | + |
| 82 | +## '.args' is a formal argument of mirai::mirai() |
| 83 | +## This should produce an error |
| 84 | +res <- tryCatch({ |
| 85 | + .args <- 42 |
| 86 | + f <- future(.args) |
| 87 | + value(f) |
| 88 | +}, error = identity) |
| 89 | +print(res) |
| 90 | +stopifnot(inherits(res, "FutureError")) |
| 91 | +stopifnot(grepl("clash with argument names of mirai::mirai", conditionMessage(res))) |
| 92 | + |
| 93 | +plan(sequential) |
| 94 | + |
| 95 | +message("- globals that clash with mirai::mirai() formals ... DONE") |
| 96 | + |
| 97 | + |
| 98 | +## --------------------------------------------------------- |
| 99 | +## interruptFuture.MiraiFutureBackend() via cancel() |
| 100 | +## --------------------------------------------------------- |
| 101 | +message("- interruptFuture.MiraiFutureBackend() via cancel() ...") |
| 102 | + |
| 103 | +plan(future.mirai::mirai_multisession, workers = 2) |
| 104 | + |
| 105 | +## Create a long-running future |
| 106 | +f <- future({ |
| 107 | + Sys.sleep(60) |
| 108 | + 42 |
| 109 | +}) |
| 110 | + |
| 111 | +r <- resolved(f) |
| 112 | +message("Resolved: ", r) |
| 113 | +stopifnot(!isTRUE(r)) |
| 114 | + |
| 115 | +## Cancel/interrupt the future |
| 116 | +f <- cancel(f, interrupt = TRUE) |
| 117 | +stopifnot(f[["state"]] == "canceled") |
| 118 | + |
| 119 | +r <- resolved(f) |
| 120 | +message("Resolved: ", r) |
| 121 | + |
| 122 | +## Trying to get the value should produce a FutureInterruptError |
| 123 | +res <- tryCatch(value(f), error = identity) |
| 124 | +print(res) |
| 125 | +stopifnot(inherits(res, "FutureInterruptError")) |
| 126 | + |
| 127 | +plan(sequential) |
| 128 | + |
| 129 | +message("- interruptFuture.MiraiFutureBackend() via cancel() ... DONE") |
| 130 | + |
| 131 | + |
| 132 | +message("- stopWorkers.MiraiFutureBackend() with active futures ...") |
| 133 | + |
| 134 | +plan(future.mirai::mirai_multisession, workers = 2, interrupts = FALSE) |
| 135 | + |
| 136 | +message("nbrOfWorkers(): ", nbrOfWorkers()) |
| 137 | +message("nbrOfFreeWorkers(): ", nbrOfFreeWorkers()) |
| 138 | + |
| 139 | +## Create a long-running future |
| 140 | +f <- future({ |
| 141 | + Sys.sleep(60) |
| 142 | + 42 |
| 143 | +}) |
| 144 | + |
| 145 | +r <- resolved(f) |
| 146 | +message("Resolved: ", r) |
| 147 | +stopifnot(!isTRUE(r)) |
| 148 | + |
| 149 | +plan(sequential) |
| 150 | + |
| 151 | +message("- stopWorkers.MiraiFutureBackend() with active futures ... DONE") |
| 152 | + |
| 153 | + |
| 154 | +## --------------------------------------------------------- |
| 155 | +## Error when no mirai daemons are available |
| 156 | +## --------------------------------------------------------- |
| 157 | +message("- Error when no mirai daemons ...") |
| 158 | + |
| 159 | +## Make sure no daemons are running |
| 160 | +mirai::daemons(0) |
| 161 | + |
| 162 | +## Try to create a MiraiFutureBackend without daemons |
| 163 | +MiraiFutureBackend <- future.mirai:::MiraiFutureBackend |
| 164 | +res <- tryCatch({ |
| 165 | + MiraiFutureBackend() |
| 166 | +}, error = identity) |
| 167 | +print(res) |
| 168 | +stopifnot(inherits(res, "FutureError")) |
| 169 | +stopifnot(grepl("at least one mirai daemon", conditionMessage(res))) |
| 170 | + |
| 171 | +message("- Error when no mirai daemons ... DONE") |
| 172 | + |
| 173 | + |
| 174 | +## --------------------------------------------------------- |
| 175 | +## mirai_cluster() function should always error when called directly |
| 176 | +## --------------------------------------------------------- |
| 177 | +message("- mirai_cluster() direct call ...") |
| 178 | + |
| 179 | +res <- tryCatch({ |
| 180 | + future.mirai::mirai_cluster() |
| 181 | +}, error = identity) |
| 182 | +print(res) |
| 183 | +stopifnot(inherits(res, "error")) |
| 184 | +stopifnot(grepl("must never be called directly", conditionMessage(res))) |
| 185 | + |
| 186 | +message("- mirai_cluster() direct call ... DONE") |
| 187 | + |
| 188 | + |
| 189 | +## --------------------------------------------------------- |
| 190 | +## resolved.MiraiFuture() with lazy future (state = "created") |
| 191 | +## --------------------------------------------------------- |
| 192 | +message("- resolved.MiraiFuture() for lazy future ...") |
| 193 | + |
| 194 | +plan(future.mirai::mirai_multisession, workers = 2) |
| 195 | + |
| 196 | +## Create a lazy future - it stays in "created" state until resolved |
| 197 | +f <- future({ 42 }, lazy = TRUE) |
| 198 | +stopifnot(f[["state"]] == "created") |
| 199 | + |
| 200 | +## resolved() should launch the future |
| 201 | +r <- resolved(f) |
| 202 | +message(" resolved: ", r) |
| 203 | +## After resolved() is called on a lazy future, it should be submitted |
| 204 | +stopifnot(f[["state"]] %in% c("running", "finished")) |
| 205 | + |
| 206 | +## Get the value |
| 207 | +v <- value(f) |
| 208 | +stopifnot(v == 42) |
| 209 | + |
| 210 | +plan(sequential) |
| 211 | + |
| 212 | +message("- resolved.MiraiFuture() for lazy future ... DONE") |
| 213 | + |
| 214 | + |
| 215 | +## --------------------------------------------------------- |
| 216 | +## nbrOfFreeWorkers.MiraiFutureBackend() |
| 217 | +## --------------------------------------------------------- |
| 218 | +message("- nbrOfFreeWorkers.MiraiFutureBackend() ...") |
| 219 | + |
| 220 | +plan(future.mirai::mirai_multisession, workers = 2) |
| 221 | + |
| 222 | +free <- nbrOfFreeWorkers() |
| 223 | +message(" free workers: ", free) |
| 224 | +stopifnot(is.numeric(free), free >= 0, free <= nbrOfWorkers()) |
| 225 | + |
| 226 | +## Create a future that takes some time |
| 227 | +f <- future({ Sys.sleep(2); 42 }) |
| 228 | + |
| 229 | +## Check free workers while future is running |
| 230 | +Sys.sleep(0.5) |
| 231 | +free_during <- nbrOfFreeWorkers() |
| 232 | +message(" free workers during execution: ", free_during) |
| 233 | + |
| 234 | +## Wait for future to complete |
| 235 | +v <- value(f) |
| 236 | +stopifnot(v == 42) |
| 237 | + |
| 238 | +## Check free workers after completion |
| 239 | +free_after <- nbrOfFreeWorkers() |
| 240 | +message(" free workers after: ", free_after) |
| 241 | +stopifnot(free_after == nbrOfWorkers()) |
| 242 | + |
| 243 | +plan(sequential) |
| 244 | + |
| 245 | +message("- nbrOfFreeWorkers.MiraiFutureBackend() ... DONE") |
| 246 | + |
| 247 | + |
| 248 | +message("*** MiraiFutureBackend ... DONE") |
0 commit comments