@@ -137,7 +137,6 @@ makeClusterFuture <- function(specs = nbrOfWorkers(), ...) {
137137}
138138
139139
140- # ' @importFrom utils str
141140# ' @rawNamespace if (getRversion() >= "4.4") S3method(print,FutureCluster)
142141print.FutureCluster <- function (x , ... ) {
143142 cat(sprintf(" A %s cluster with %d node\n " , sQuote(class(x )[1 ]), length(x )))
@@ -186,7 +185,6 @@ print.FutureCluster <- function(x, ...) {
186185}
187186
188187
189- # ' @importFrom utils capture.output str
190188# ' @importFrom future future
191189# ' @rawNamespace if (getRversion() >= "4.4") importFrom(parallel,sendData)
192190# ' @rawNamespace if (getRversion() >= "4.4") S3method(sendData,FutureNode)
@@ -201,12 +199,12 @@ sendData.FutureNode <- function(node, data) {
201199
202200 debug <- isTRUE(getOption(" future.debug" ))
203201 if (debug ) {
204- message(sprintf( " sendData() for %s #%d ..." , class(node )[1 ], index ) )
205- on.exit(message(sprintf( " sendData() for %s %d ... done " , class( node )[ 1 ], index ) ))
202+ mdebugf_push( " sendData() for %s #%d ..." , class(node )[1 ], index )
203+ on.exit(mdebug_pop( ))
206204 }
207205
208206 type <- data [[" type" ]]
209- if (debug ) message(sprintf( " | type: %s" , sQuote(type ) ))
207+ if (debug ) mdebugf( " | type: %s" , sQuote(type ))
210208
211209 # # Assert that future backend has not changed
212210 backend <- node [[" cluster_env" ]][[" backend" ]]
@@ -222,11 +220,11 @@ sendData.FutureNode <- function(node, data) {
222220
223221 # # SPECIAL CASE #1: Called via clusterSetRNGStream()?
224222 if (called_via_clusterSetRNGStream()) {
225- if (debug ) message (" Detected: clusterSetRNGStream()" )
223+ if (debug ) mdebug (" Detected: clusterSetRNGStream()" )
226224 args <- data [[" args" ]]
227225 call <- args [[1 ]]
228226 seed <- call [[3 ]]
229- if (debug ) message(sprintf( " Seed recorded: (%s)" , paste (seed , collapse = " , " ) ))
227+ if (debug ) mdebugf( " Seed recorded: (%s)" , comma (seed ))
230228 ns <- getNamespace(" future" )
231229 is_lecyer_cmrg_seed <- get(" is_lecyer_cmrg_seed" , mode = " function" , envir = ns , inherits = FALSE )
232230 stopifnot(is_lecyer_cmrg_seed(seed ))
@@ -239,12 +237,12 @@ sendData.FutureNode <- function(node, data) {
239237
240238 # # SPECIAL CASE #2: Called via clusterExport()?
241239 if (called_via_clusterExport()) {
242- if (debug ) message (" Detected: clusterExport()" )
240+ if (debug ) mdebug (" Detected: clusterExport()" )
243241
244242 # # Only need to be handled once per cluster - not once per node
245243 if (index == 1L ) {
246244 args <- data [[" args" ]]
247- if (debug ) message(sprintf( " Exports: [n=%d] %s" , length(args ), commaq(names(args ) )))
245+ if (debug ) mdebugf( " Exports: [n=%d] %s" , length(args ), commaq(names(args )))
248246 cluster_env <- node [[" cluster_env" ]]
249247 exports <- cluster_env [[" exports" ]]
250248 if (is.null(exports )) exports <- list ()
@@ -264,15 +262,15 @@ sendData.FutureNode <- function(node, data) {
264262
265263 # # SPECIAL CASE #3: Called via clusterEvalQ()?
266264 if (called_via_clusterEvalQ()) {
267- if (debug ) message (" Detected: clusterEvalQ()" )
265+ if (debug ) mdebug (" Detected: clusterEvalQ()" )
268266
269267 # # Only need to be handled once per cluster - not once per node
270268 if (index == 1L ) {
271269 args <- data [[" args" ]]
272270 expr <- args [[1 ]]
273271 calls <- sys.calls()
274272 if (debug ) {
275- message (" Expression:" )
273+ mdebug (" Expression:" )
276274 mprint(expr )
277275 }
278276
@@ -327,18 +325,10 @@ sendData.FutureNode <- function(node, data) {
327325
328326 node [[" future" ]] <- local({
329327 if (debug ) {
330- message(" | Create future ..." )
331- on.exit(message(" | Create future ... done" ))
332-
333- out <- capture.output(str(list (data = data )))
334- out <- sprintf(" | : %s" , out )
335- out <- paste(out , collapse = " \n " )
336- message(out )
337-
338- out <- capture.output(str(list (options = options )))
339- out <- sprintf(" | : %s" , out )
340- out <- paste(out , collapse = " \n " )
341- message(out )
328+ mdebug_push(" Create future ..." )
329+ on.exit(mdebug_pop())
330+ mstr(list (data = data ))
331+ mstr(list (options = options ))
342332 }
343333 fun <- data [[" fun" ]]
344334 args <- data [[" args" ]]
@@ -347,21 +337,16 @@ sendData.FutureNode <- function(node, data) {
347337 future_args <- list (expr = quote(expr ), substitute = FALSE )
348338 future_args <- c(future_args , options )
349339
350- if (debug ) {
351- out <- capture.output(str(list (args = future_args )))
352- out <- sprintf(" | : %s" , out )
353- out <- paste(out , collapse = " \n " )
354- message(out )
355- }
340+ if (debug ) mstr(list (future_args = future_args ))
356341 do.call(future , args = future_args )
357342 })
358343 } else if (type == " DONE" ) {
359344 future <- node [[" future" ]]
360345 if (inherits(future , " Future" )) {
361346 node [[" future" ]] <- local({
362347 if (debug ) {
363- message( " | Canceling future ..." )
364- on.exit(message( " | Canceling future ... done " ))
348+ mdebug_push( " Canceling future ..." )
349+ on.exit(mdebug_pop( ))
365350 }
366351 future <- cancel(future )
367352 tryCatch(resolve(future ), error = identity )
@@ -383,19 +368,16 @@ sendData.FutureNode <- function(node, data) {
383368recvData.FutureNode <- function (node ) {
384369 debug <- isTRUE(getOption(" future.debug" ))
385370 if (debug ) {
386- message(sprintf( " recvData() for %s ..." , class(node )[1 ]) )
387- on.exit(message(sprintf( " recvData() for %s ... done " , class( node )[ 1 ]) ))
371+ mdebugf_push( " recvData() for %s ..." , class(node )[1 ])
372+ on.exit(mdebug_pop( ))
388373 }
389374
390375 future <- node [[" future" ]]
391376 if (! inherits(future , " Future" )) {
392377 stop(sprintf(" %s does not have a future associated with it" , class(node )[1 ]))
393378 }
394379 result <- result(future )
395- if (debug ) {
396- print(result )
397- print(utils :: ls.str(result ))
398- }
380+ if (debug ) mprint(result )
399381
400382 if (" seed" %in% names(node ) && ! is.null(result [[" seed" ]])) {
401383 if (debug ) mdebug(" Updating the node's RNG state" )
0 commit comments