@@ -121,7 +121,7 @@ getSocketSelectTimeout <- function(future, timeout = NULL) {
121121 # # in infinite waiting (PR17203). Fixed in R devel r73470 (2017-10-05)
122122 # # and R 3.4.3
123123 # # Source: https://github.com/HenrikBengtsson/Wishlist-for-R/issues/35
124- if (.Platform $ OS.type != " windows" && getRversion() < " 3.4.3" ) {
124+ if (.Platform [[ " OS.type" ]] != " windows" && getRversion() < " 3.4.3" ) {
125125 timeout <- round(timeout , digits = 0L )
126126 }
127127 attr(timeout , " validated" ) <- TRUE
@@ -143,8 +143,8 @@ resolved.ClusterFuture <- function(x, run = TRUE, timeout = NULL, ...) {
143143 future <- x
144144 backend <- future [[" backend" ]]
145145 stop_if_not(inherits(backend , " FutureBackend" ))
146- workers <- backend $ workers
147- reg <- backend $ reg
146+ workers <- backend [[ " workers" ]]
147+ reg <- backend [[ " reg" ]]
148148
149149 # # A lazy future not even launched?
150150 if (future [[" state" ]] == " created" ) {
@@ -157,7 +157,7 @@ resolved.ClusterFuture <- function(x, run = TRUE, timeout = NULL, ...) {
157157 # # Find which nodes are available
158158 avail <- rep(TRUE , times = length(workers ))
159159 futures <- FutureRegistry(reg , action = " list" , earlySignal = FALSE )
160- nodes <- unlist(lapply(futures , FUN = function (f ) f $ node ), use.names = FALSE )
160+ nodes <- unlist(lapply(futures , FUN = function (f ) f [[ " node" ]] ), use.names = FALSE )
161161 avail [nodes ] <- FALSE
162162 if (debug ) mdebug(" - avail: [n=%d] %s" , length(avail ), commaq(avail ))
163163
@@ -185,7 +185,7 @@ resolved.ClusterFuture <- function(x, run = TRUE, timeout = NULL, ...) {
185185 node <- cl [[1 ]]
186186
187187 # # Check if workers socket connection is available for reading
188- if (! is.null(con <- node $ con )) {
188+ if (! is.null(con <- node [[ " con" ]] )) {
189189 # # AD HOC/SPECIAL CASE: Skip if connection has been serialized and lacks
190190 # # internal representation. /HB 2018-10-27
191191 connId <- connectionId(con )
@@ -200,7 +200,7 @@ resolved.ClusterFuture <- function(x, run = TRUE, timeout = NULL, ...) {
200200 # # n infinite waiting (PR17203). Fixed in R devel r73470 (2017-10-05)
201201 # # and R 3.4.3
202202 # # Source: https://github.com/HenrikBengtsson/Wishlist-for-R/issues/35
203- if (! isTRUE(attr(timeout , " validated" , exact = TRUE )) && .Platform $ OS.type != " windows" && getRversion() < " 3.4.3" ) {
203+ if (! isTRUE(attr(timeout , " validated" , exact = TRUE )) && .Platform [[ " OS.type" ]] != " windows" && getRversion() < " 3.4.3" ) {
204204 timeout <- round(timeout , digits = 0L )
205205 }
206206 }
@@ -296,8 +296,8 @@ receiveMessageFromWorker <- function(future, ...) {
296296 if (! inherits(backend , " FutureBackend" ) && ! is.list(backend )) {
297297 stop(sprintf(" [INTERNAL ERROR] receiveMessageFromWorker(): the 'backend' element of the %s object is neither a FutureBackend object nor a list: %s" , class(future )[1 ], class(backend )[1 ]))
298298 }
299- workers <- backend $ workers
300- reg <- backend $ reg
299+ workers <- backend [[ " workers" ]]
300+ reg <- backend [[ " reg" ]]
301301
302302 node_idx <- future [[" node" ]]
303303 if (debug ) mdebugf(" - cluster node index: %d" , node_idx )
@@ -318,7 +318,7 @@ receiveMessageFromWorker <- function(future, ...) {
318318 if (inherits(ack , " error" )) {
319319 if (debug ) mdebugf(" - parallel:::recvResult() produced an error: %s" , conditionMessage(ack ))
320320 msg <- post_mortem_cluster_failure(ack , when = " receive message results from" , node = node , future = future )
321- ex <- FutureError(msg , call = ack $ call , future = future )
321+ ex <- FutureError(msg , call = ack [[ " call" ]] , future = future )
322322 future [[" result" ]] <- ex
323323 stop(ex )
324324 }
@@ -367,7 +367,7 @@ receiveMessageFromWorker <- function(future, ...) {
367367 # # they will be resignaled each time value() is called.
368368 signaled <- future [[" .signaledConditions" ]]
369369 if (length(signaled ) > 0 ) {
370- result $ conditions <- c(future [[" .signaledConditions" ]], result $ conditions )
370+ result [[ " conditions" ]] <- c(future [[" .signaledConditions" ]], result [[ " conditions" ]] )
371371 future [[" .signaledConditions" ]] <- NULL
372372 }
373373
@@ -430,9 +430,9 @@ receiveMessageFromWorker <- function(future, ...) {
430430 }
431431
432432 # # Increment signal count
433- signaled <- condition $ signaled
433+ signaled <- condition [[ " signaled" ]]
434434 if (is.null(signaled )) signaled <- 0L
435- condition $ signaled <- signaled + 1L
435+ condition [[ " signaled" ]] <- signaled + 1L
436436
437437 # # Record condition as signaled
438438 signaled <- future [[" .signaledConditions" ]]
@@ -562,13 +562,13 @@ post_mortem_cluster_failure <- function(ex, when, node, future) {
562562 # # (2) Information on the cluster node
563563
564564 # # (a) Process information on the worker, if available
565- pid <- node $ session_info $ process $ pid
565+ pid <- node [[ " session_info" ]][[ " process" ]][[ " pid" ]]
566566 pid_info <- if (is.numeric(pid )) sprintf(" PID %.0f" , pid ) else NULL
567567
568568 # # (b) Host information on the worker, if available
569569 # # AD HOC: This assumes that the worker has a hostname, which is not
570570 # # the case for MPI workers. /HB 2017-03-07
571- host <- node $ host
571+ host <- node [[ " host" ]]
572572 localhost <- isTRUE(attr(host , " localhost" , exact = TRUE ))
573573 host_info <- if (! is.null(host )) {
574574 sprintf(" on %s%s" , if (localhost ) " localhost " else " " , sQuote(host ))
@@ -589,7 +589,7 @@ post_mortem_cluster_failure <- function(ex, when, node, future) {
589589
590590 # # (a) Inspect the 'reason' for known clues
591591 if (grepl(" ignoring SIGPIPE signal" , reason )) {
592- postmortem $ sigpipe <- " The SIGPIPE error suggests that the R socket connection to the parallel worker broke, which can happen for different reasons, e.g. the parallel worker crashed"
592+ postmortem [[ " sigpipe" ]] <- " The SIGPIPE error suggests that the R socket connection to the parallel worker broke, which can happen for different reasons, e.g. the parallel worker crashed"
593593 }
594594
595595 # # (a) Did the worker process terminate?
@@ -615,20 +615,20 @@ post_mortem_cluster_failure <- function(ex, when, node, future) {
615615 msg2 <- " No process exists with this PID on the remote host, i.e. the remote worker is no longer alive"
616616 }
617617 }
618- postmortem $ alive <- msg2
618+ postmortem [[ " alive" ]] <- msg2
619619 }
620620
621621 # # (b) Did the worker use a connection that changed?
622- if (inherits(node $ con , " connection" )) {
623- postmortem $ connection <- check_connection_details(node , future = future )
622+ if (inherits(node [[ " con" ]] , " connection" )) {
623+ postmortem [[ " connection" ]] <- check_connection_details(node , future = future )
624624 }
625625
626626 # # (c) Any non-exportable globals?
627627 globals <- future [[" globals" ]]
628- postmortem $ non_exportable <- assert_no_references(globals , action = " string" )
628+ postmortem [[ " non_exportable" ]] <- assert_no_references(globals , action = " string" )
629629
630630 # # (d) Size of globals
631- postmortem $ global_sizes <- summarize_size_of_globals(globals )
631+ postmortem [[ " global_sizes" ]] <- summarize_size_of_globals(globals )
632632
633633 # # (5) The final error message
634634 msg <- sprintf(" %s (%s) failed to %s %s. The reason reported was %s" ,
@@ -701,11 +701,11 @@ assertValidConnection <- function(future) {
701701 node_idx <- future [[" node" ]]
702702 if (debug ) mdebugf(" - cluster node index: %d" , node_idx )
703703
704- cl <- backend $ workers [node_idx ]
704+ cl <- backend [[ " workers" ]] [node_idx ]
705705 node <- cl [[1 ]]
706706
707707 # # Nothing to do?
708- if (is.null(con <- node $ con )) return ()
708+ if (is.null(con <- node [[ " con" ]] )) return ()
709709
710710 # # AD HOC/SPECIAL CASE: Skip if connection has been serialized and lacks internal representation. /HB 2018-10-27
711711 connId <- connectionId(con )
0 commit comments