@@ -94,111 +94,6 @@ as_ClusterFuture <- function(future, workers = NULL, ...) {
9494}
9595
9696
97- # ' @export
98- run.ClusterFuture <- function (future , ... ) {
99- if (getOption(" future.backend.version" , 2L ) == 2L ) {
100- return (NextMethod())
101- }
102-
103- debug <- getOption(" future.debug" , FALSE )
104-
105- if (future [[" state" ]] != ' created' ) {
106- label <- future [[" label" ]]
107- if (is.null(label )) label <- " <none>"
108- stop(FutureError(sprintf(" A future ('%s') can only be launched once" , label ), future = future ))
109- }
110-
111- # # Assert that the process that created the future is
112- # # also the one that evaluates/resolves/queries it.
113- assertOwner(future )
114-
115- backend <- future [[" backend" ]]
116- workers <- backend [[" workers" ]]
117- reg <- backend [[" reg" ]]
118-
119- data <- getFutureData(future )
120- persistent <- isTRUE(future [[" persistent" ]])
121-
122- # # Next available cluster node
123- t_start <- Sys.time()
124- node_idx <- requestNode(await = function () {
125- FutureRegistry(reg , action = " collect-first" , earlySignal = TRUE )
126- }, workers = workers )
127- future [[" node" ]] <- node_idx
128-
129- # # Cluster node to use
130- cl <- workers [node_idx ]
131-
132- if (inherits(future [[" .journal" ]], " FutureJournal" )) {
133- appendToFutureJournal(future ,
134- event = " getWorker" ,
135- category = " overhead" ,
136- parent = " launch" ,
137- start = t_start ,
138- stop = Sys.time()
139- )
140- }
141-
142-
143- # # (i) Reset global environment of cluster node such that
144- # # previous futures are not affecting this one, which
145- # # may happen even if the future is evaluated inside a
146- # # local, e.g. local({ a <<- 1 }).
147- if (! persistent ) {
148- t_start <- Sys.time()
149- cluster_call_blocking(cl , fun = grmall , future = future , when = " call grmall() on" )
150- if (inherits(future [[" .journal" ]], " FutureJournal" )) {
151- appendToFutureJournal(future ,
152- event = " eraseWorker" ,
153- category = " overhead" ,
154- parent = " launch" ,
155- start = t_start ,
156- stop = Sys.time()
157- )
158- }
159- }
160-
161-
162- # # (ii) Attach packages that needs to be attached
163- # # NOTE: Already take care of by evalFuture().
164- # # However, if we need to get an early error about missing packages,
165- # # we can get the error here before launching the future.
166- t_start <- Sys.time()
167- packages <- future [[" packages" ]]
168- if (future [[" earlySignal" ]] && length(packages ) > 0 ) {
169- if (debug ) mdebugf(" Attaching %d packages (%s) on cluster node #%d ..." ,
170- length(packages ), hpaste(sQuote(packages )), node_idx )
171-
172- cluster_call_blocking(cl , fun = requirePackages , packages , future = future , when = " call requirePackages() on" )
173-
174- if (debug ) mdebugf(" Attaching %d packages (%s) on cluster node #%d ... DONE" ,
175- length(packages ), hpaste(sQuote(packages )), node_idx )
176- }
177-
178- if (inherits(future [[" .journal" ]], " FutureJournal" )) {
179- appendToFutureJournal(future ,
180- event = " attachPackages" ,
181- category = " overhead" ,
182- parent = " launch" ,
183- start = t_start ,
184- stop = Sys.time()
185- )
186- }
187-
188- # # Add to registry
189- FutureRegistry(reg , action = " add" , future = future , earlySignal = FALSE )
190-
191- # # (iv) Launch future
192- node_call_nonblocking(cl [[1L ]], fun = evalFuture , args = list (data ), when = " launch future on" )
193-
194- future [[" state" ]] <- ' running'
195-
196- if (debug ) mdebugf(" %s started" , class(future )[1 ])
197-
198- invisible (future )
199- }
200-
201-
20297getSocketSelectTimeout <- function (future , timeout = NULL ) {
20398 if (! is.null(timeout )) return (timeout )
20499
0 commit comments