@@ -177,219 +177,7 @@ runDEoptim <- function(landscape,
177177 pkgsNeeded = neededPkgs , envir = environment()
178178 )
179179 cl <- control $ cluster # This is to test whether it is actually closed
180- # on.exit(parallel::stopCluster(control$cluster), add = TRUE)
181-
182- # control <- list(itermax = itermax, trace = trace, strategy = strategy)
183-
184- # if (!is.null(initialpop)) {
185- # control$initialpop <- initialpop
186- # }
187- #
188- # if (!is.null(NP)) {
189- # control$NP <- NP
190- # }
191- # # if (is.null(cores)) cores <- "localhost"
192- # if (!is.null(cores)) {
193- # logPath <- file.path(
194- # logPath,
195- # paste0(
196- # "fireSense_SpreadFit_", format(Sys.time(), "%Y-%m-%d_%H%M%S"),
197- # "_pid", Sys.getpid(), ".log"
198- # )
199- # )
200- # message(paste0(
201- # "Starting parallel model fitting for ",
202- # "fireSense_SpreadFit. Log: ", logPath
203- # ))
204- #
205- # # Make sure logPath can be written in the workers -- need to create the dir
206- #
207- # if (is.numeric(cores)) cores <- rep("localhost", cores)
208- #
209- # ## Make cluster with just one worker per machine --> don't need to do these steps
210- # # multiple times per machine, if not all 'localhost'
211- # revtunnel <- FALSE
212- # neededPkgs <- c("kSamples", "magrittr", "raster", "data.table",
213- # "SpaDES.tools", "fireSenseUtils", "sf")
214- #
215- # if (!identical("localhost", unique(cores))) {
216- # repos <- c("https://predictiveecology.r-universe.dev", getOption("repos"))
217- #
218- # aa <- Require::pkgDep(unique(c("dqrng", "PredictiveEcology/SpaDES.tools@development",
219- # "PredictiveEcology/fireSenseUtils@development", "qs",
220- # "RCurl", neededPkgs)), recursive = TRUE)
221- # pkgsNeeded <- unique(Require::extractPkgName(unname(unlist(aa))))
222- #
223- # revtunnel <- ifelse(all(cores == "localhost"), FALSE, TRUE)
224- #
225- # coresUnique <- setdiff(unique(cores), "localhost")
226- # message("copying packages to: ", paste(coresUnique, collapse = ", "))
227- #
228- # # RscriptPath = "/usr/local/bin/Rscript"
229- #
230- # # st <- system.time(
231- # # cl <- mirai::make_cluster(
232- # # n = length(coresUnique),
233- # # url = "tcp://localhost:5563",
234- # # remote = mirai::ssh_config(
235- # # remotes = paste0("ssh://", coresUnique),
236- # # tunnel = TRUE,
237- # # timeout = 1,
238- # # rscript = RscriptPath
239- # # )
240- # # )
241- # # )
242- # st <- system.time({
243- # cl <- parallelly::makeClusterPSOCK(coresUnique, revtunnel = revtunnel, rscript_libs = libPath
244- # # , rscript = c("nice", RscriptPath)
245- # )
246- # })
247- # clusterExport(cl, list("libPath", "logPath", "repos", "pkgsNeeded"),
248- # envir = environment())
249- #
250- # # Missing `dqrng` and `sitmo`
251- # Require::Install(pkgsNeeded, libPaths = libPath)
252- #
253- # parallel::clusterEvalQ(cl, {
254- # # If this is first time that packages need to be installed for this user on this machine
255- # # there won't be a folder present that is writable
256- # if (!dir.exists(libPath)) {
257- # dir.create(libPath, recursive = TRUE)
258- # }
259- # })
260- #
261- # message("Setting up packages on the cluster...")
262- # out <- lapply(setdiff(unique(cores), "localhost"), function(ip) {
263- # rsync <- Sys.which("rsync")
264- # if (!nzchar(rsync)) stop()
265- # system(paste0(rsync, " -aruv --update ", paste(file.path(libPath, pkgsNeeded), collapse = " "),
266- # " ", ip, ":", libPath))
267- # })
268- #
269- # parallel::clusterEvalQ(cl, {
270- # # If this is first time that packages need to be installed for this user on this machine
271- # # there won't be a folder present that is writable
272- # if (tryCatch(packageVersion("Require") < "1.0.1", error = function() TRUE))
273- # install.packages("Require", lib = libPath)
274- # library(Require, lib.loc = libPath)
275- # dir.create(dirname(logPath), recursive = TRUE)
276- # out <- Require::Install(pkgsNeeded, libPaths = libPath)
277- # })
278- #
279- # GDALversions <- parallel::clusterEvalQ(cl, {
280- # .libPaths(libPath)
281- # return(sf::sf_extSoftVersion()["GDAL"])
282- # })
283- #
284- # stopifnot(length(unique(sf::sf_extSoftVersion()["GDAL"], GDALversions)) == 1)
285- # parallel::stopCluster(cl)
286- #
287- # ## Now make full cluster with one worker per core listed in "cores"
288- # message("Starting ", paste(paste(names(table(cores))), "x", table(cores),
289- # collapse = ", "), " clusters")
290- # message("Starting main parallel cluster ...")
291- # # sshCores <- paste0("ssh//", grep('localhost', cores, invert = TRUE, value = TRUE))
292- # # nonsshCores <- grep('localhost', cores, value = TRUE)
293- # # coresForMirai <- c(nonsshCores, sshCores)
294- #
295- # st <- system.time({
296- # # cl <- mirai::make_cluster(
297- # # length(coresForMirai),
298- # # # url = "tcp://localhost:5555",
299- # # remote = ssh_config(
300- # # remotes = coresForMirai,
301- # # # tunnel = TRUE,
302- # # timeout = 1,
303- # # rscript = RscriptPath
304- # # )
305- # # )
306- #
307- # cl <- parallelly::makeClusterPSOCK(cores,
308- # revtunnel = revtunnel,
309- # outfile = logPath, rscript_libs = libPath
310- # # , rscript = c("nice", RscriptPath)
311- # )
312- # })
313- #
314- # on.exit(stopCluster(cl))
315- # message(
316- # "it took ", round(st[3], 2), "s to start ",
317- # paste(paste(names(table(cores))), "x", table(cores), collapse = ", "), " threads"
318- # )
319- # message("Moving objects to each node in cluster")
320- #
321- # stMoveObjects <- try({
322- # system.time({
323- # objsToCopy <- mget(unlist(objsNeeded))
324- # objsToCopy <- lapply(objsToCopy, FUN = function(x) {
325- # if (inherits(x, "SpatRaster")) {
326- # x <- terra::wrap(x)
327- # } else {
328- # x
329- # }
330- # x
331- # })
332- # filenameForTransfer <- normalizePath(tempfile(fileext = ".qs"), mustWork = FALSE, winslash = "/")
333- # dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE) # during development, this was deleted accidentally
334- # qs::qsave(objsToCopy, file = filenameForTransfer)
335- # stExport <- system.time({
336- # outExp <- clusterExport(cl, varlist = "filenameForTransfer", envir = environment())
337- # })
338- # out11 <- clusterEvalQ(cl, {
339- # dir.create(dirname(filenameForTransfer), recursive = TRUE, showWarnings = FALSE)
340- # })
341- # out <- lapply(setdiff(unique(cores), "localhost"), function(ip) {
342- # rsync <- Sys.which("rsync")
343- # st1 <- system.time(system(paste0(rsync, " -av ",
344- # filenameForTransfer, " ", ip, ":",
345- # filenameForTransfer)))
346- # })
347- # out <- clusterEvalQ(cl, {
348- # out <- qs::qread(file = filenameForTransfer)
349- # out <- lapply(out, FUN = function(x) {
350- # if (inherits(x, "PackedSpatRaster")) {
351- # x <- terra::unwrap(x)
352- # } else {
353- # x
354- # }
355- # x
356- # })
357- # list2env(out, envir = .GlobalEnv)
358- # })
359- # # Delete the file
360- # out <- clusterEvalQ(cl, {
361- # if (dir.exists(dirname(filenameForTransfer))) {
362- # try(unlink(dirname(filenameForTransfer), recursive = TRUE), silent = TRUE)
363- # }
364- # })
365- # })
366- # })
367- #
368- # if (is(stMoveObjects, "try-error")) {
369- # message("The attempt to move objects to cluster using rsync and qs failed; trying clusterExport")
370- # stMoveObjects <- system.time(clusterExport(cl, objsNeeded, envir = environment()))
371- # list2env(mget(unlist(objsNeeded), envir = environment()), envir = .GlobalEnv)
372- # }
373- # message("it took ", round(stMoveObjects[3], 2), "s to move objects to nodes")
374- # message("loading packages in cluster nodes")
375- #
376- # clusterExport(cl, "neededPkgs", envir = environment())
377- # stPackages <- system.time(parallel::clusterEvalQ(
378- # cl,
379- # {
380- # for (i in neededPkgs) {
381- # library(i, character.only = TRUE)
382- # }
383- # message("loading ", i, " at ", Sys.time())
384- # }
385- # ))
386- # message("it took ", round(stPackages[3], 2), "s to load packages")
387- #
388- # control$cluster <- cl
389- # } else {
390- # list2env(mget(unlist(objsNeeded), envir = environment()), envir = .GlobalEnv)
391- # }
392- # }
180+
393181 # ####################################################################
394182 # DEOptim call
395183 # ####################################################################
@@ -467,21 +255,8 @@ visualizeDE <- function(DE, cachePath, titles, lower, upper) {
467255 geom_histogram(bins = 15 ) + coord_cartesian(xlim = c(lower [p ],upper [p ])) +
468256 ggtitle(p ) + xlab(NULL ) +
469257 theme_minimal()
470- # geom_smooth(se = TRUE) +
471- # geom_ribbon(aes(ymin = lower95, ymax = upper95)) +
472- # facet_wrap(facets = "variable", scales = "free")
473258 })
474259 invisible (ggpubr :: ggarrange(plotlist = ff ))
475- #
476- #
477- #
478- # aa <- as.data.table(t(DE$member$pop))
479- # melt(aa, id.vars = )
480- # aa[, pars := paste0("par", 1:NROW(aa))]
481- # dim1 <- floor(sqrt(NROW(aa)))
482- # dim2 <- NROW(aa) / dim1
483- # par(mfrow = c(dim1, dim2))
484- # aa[, hist(t(.SD), main = as.character(.BY))[[2]], by = pars]
485260}
486261
487262# ' Iterative `DEoptim` Runner with Caching and Visualization
0 commit comments