@@ -104,7 +104,7 @@ def revive_processes(processes: list[ProcessNode], *, wait: bool = False) -> Non
104104
105105
106106def play_processes (
107- processes : list [ProcessNode ] | None = None , * , all_entries : bool = False , timeout : float = 5.0 , wait : bool = False
107+ processes : list [ProcessNode ] | None = None , * , all_entries : bool = False , timeout : float = 5.0
108108) -> None :
109109 """Play (unpause) paused processes.
110110
@@ -113,7 +113,6 @@ def play_processes(
113113 :param processes: List of processes to play.
114114 :param all_entries: Play all paused processes.
115115 :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
116- :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
117116 :raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
118117 """
119118 if not get_daemon_client ().is_daemon_running :
@@ -130,7 +129,7 @@ def play_processes(
130129 return
131130
132131 controller = get_manager ().get_process_controller ()
133- _perform_actions (processes , controller .play_process , 'play' , 'playing' , timeout , wait )
132+ _perform_actions (processes , controller .play_process , 'play' , 'playing' , timeout )
134133
135134
136135def pause_processes (
@@ -139,7 +138,6 @@ def pause_processes(
139138 msg_text : str = 'Paused through `aiida.engine.processes.control.pause_processes`' ,
140139 all_entries : bool = False ,
141140 timeout : float = 5.0 ,
142- wait : bool = False ,
143141) -> None :
144142 """Pause running processes.
145143
@@ -148,7 +146,6 @@ def pause_processes(
148146 :param processes: List of processes to play.
149147 :param all_entries: Pause all playing processes.
150148 :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
151- :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
152149 :raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
153150 """
154151 if not get_daemon_client ().is_daemon_running :
@@ -166,7 +163,7 @@ def pause_processes(
166163
167164 controller = get_manager ().get_process_controller ()
168165 action = functools .partial (controller .pause_process , msg_text = msg_text )
169- _perform_actions (processes , action , 'pause' , 'pausing' , timeout , wait )
166+ _perform_actions (processes , action , 'pause' , 'pausing' , timeout )
170167
171168
172169def kill_processes (
@@ -176,7 +173,6 @@ def kill_processes(
176173 force_kill : bool = False ,
177174 all_entries : bool = False ,
178175 timeout : float = 5.0 ,
179- wait : bool = False ,
180176) -> None :
181177 """Kill running processes.
182178
@@ -185,7 +181,6 @@ def kill_processes(
185181 :param processes: List of processes to play.
186182 :param all_entries: Kill all active processes.
187183 :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
188- :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
189184 :raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
190185 """
191186 if not get_daemon_client ().is_daemon_running :
@@ -203,7 +198,7 @@ def kill_processes(
203198
204199 controller = get_manager ().get_process_controller ()
205200 action = functools .partial (controller .kill_process , msg_text = msg_text , force_kill = force_kill )
206- _perform_actions (processes , action , 'kill' , 'killing' , timeout , wait )
201+ _perform_actions (processes , action , 'kill' , 'killing' , timeout )
207202
208203
209204def _perform_actions (
@@ -212,7 +207,6 @@ def _perform_actions(
212207 infinitive : str ,
213208 present : str ,
214209 timeout : t .Optional [float ] = None ,
215- wait : bool = False ,
216210 ** kwargs : t .Any ,
217211) -> None :
218212 """Perform an action on a list of processes.
@@ -223,7 +217,6 @@ def _perform_actions(
223217 :param present: The present tense of the verb that represents the action.
224218 :param past: The past tense of the verb that represents the action.
225219 :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
226- :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
227220 :param kwargs: Keyword arguments that will be passed to the method ``action``.
228221 :raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
229222 """
@@ -241,77 +234,52 @@ def _perform_actions(
241234 else :
242235 futures [future ] = process
243236
244- _resolve_futures (futures , infinitive , present , wait , timeout )
237+ _resolve_futures (futures , infinitive , present , timeout )
245238
246239
247240def _resolve_futures (
248241 futures : dict [concurrent .futures .Future , ProcessNode ],
249242 infinitive : str ,
250243 present : str ,
251- wait : bool = False ,
252244 timeout : t .Optional [float ] = None ,
253245) -> None :
254246 """Process a mapping of futures representing an action on an active process.
255247
256248 This function will echo the correct information strings based on the outcomes of the futures and the given verb
257249 conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a
258- timeout to put a maximum wait time on the actions.
250+ timeout to put a maximum wait time on the actions. TODO fix docstring
259251
260252 :param futures: The map of action futures and the corresponding processes.
261253 :param infinitive: The infinitive form of the action verb.
262254 :param present: The present tense form of the action verb.
263- :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
264255 :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
265256 """
266- scheduled = {}
267-
268- def handle_result (result ):
269- if result is True :
270- LOGGER .report (f'request to { infinitive } Process<{ process .pk } > sent' )
271- elif result is False :
272- LOGGER .error (f'problem { present } Process<{ process .pk } >' )
273- elif isinstance (result , kiwipy .Future ):
274- LOGGER .report (f'scheduled { infinitive } Process<{ process .pk } >' )
275- scheduled [result ] = process
276- else :
277- LOGGER .error (f'got unexpected response when { present } Process<{ process .pk } >: { result } ' )
257+ if not timeout :
258+ return
259+
260+ LOGGER .report (f"waiting for process(es) { ',' .join ([str (proc .pk ) for proc in futures .values ()])} " )
278261
279262 try :
280263 for future , process in futures .items ():
281- # unwrap is need here since LoopCommunicator will also wrap a future
264+ # we unwrap to the end
282265 unwrapped = unwrap_kiwi_future (future )
283266 try :
284267 result = unwrapped .result (timeout = timeout )
285268 except communications .TimeoutError :
286- cancelled = unwrapped .cancel ()
269+ cancelled = future .cancel ()
287270 if cancelled :
288271 LOGGER .error (f'call to { infinitive } Process<{ process .pk } > timed out and was cancelled.' )
289272 else :
290273 LOGGER .error (f'call to { infinitive } Process<{ process .pk } > timed out but could not be cancelled.' )
291274 except Exception as exception :
292275 LOGGER .error (f'failed to { infinitive } Process<{ process .pk } >: { exception } ' )
293276 else :
294- if isinstance (result , kiwipy .Future ):
295- LOGGER .report (f'scheduled { infinitive } Process<{ process .pk } >' )
296- scheduled [result ] = process
277+ if result is True :
278+ LOGGER .report (f'request to { infinitive } Process<{ process .pk } > sent' )
279+ elif result is False :
280+ LOGGER .error (f'problem { present } Process<{ process .pk } >' )
297281 else :
298- handle_result (result )
299-
300- if not wait or not scheduled :
301- return
302-
303- LOGGER .report (f"waiting for process(es) { ',' .join ([str (proc .pk ) for proc in scheduled .values ()])} " )
304-
305- for future in concurrent .futures .as_completed (scheduled .keys (), timeout = timeout ):
306- process = scheduled [future ]
307-
308- try :
309- result = future .result ()
310- except Exception as exception :
311- LOGGER .error (f'failed to { infinitive } Process<{ process .pk } >: { exception } ' )
312- else :
313- handle_result (result )
314-
282+ LOGGER .error (f'got unexpected response when { present } Process<{ process .pk } >: { result } ' )
315283 except concurrent .futures .TimeoutError :
316284 raise ProcessTimeoutException (
317285 f'timed out trying to { infinitive } processes { futures .values ()} \n '
0 commit comments