Skip to content

Commit 1188c19

Browse files
committed
Update executor to use multiprocess context (allows decorating more functions to be used as an application)
1 parent bbd5b0a commit 1188c19

File tree

2 files changed

+12
-22
lines changed

2 files changed

+12
-22
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ dependencies = [
1515
"networkx>=3.0,<4.0",
1616
"click>=8.0,<9.0",
1717
"daemons>=1.3.0,<2.0.0",
18-
"netqasm>=1.0.0,<2.0.0"
18+
"netqasm>=1.0.0,<2.0.0",
19+
"multiprocess>=0.70.18,<1.0"
1920
]
2021
requires-python = ">=3.8,<3.13"
2122
authors = [

simulaqron/run/run.py

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
import logging
22
import os
3-
from concurrent.futures import ProcessPoolExecutor as Pool
3+
from multiprocess.pool import Pool, ApplyResult
44
from importlib import reload
55
from os import PathLike
66
from pathlib import Path
7-
from time import sleep
8-
from typing import Callable, Optional, Any, Dict, List, Union
7+
from typing import Callable, Optional, Any, Dict, List, Union, Generator, Tuple
98

109
from netqasm.logging.glob import get_netqasm_logger
1110
from netqasm.logging.output import (reset_struct_loggers,
@@ -34,21 +33,11 @@
3433
}
3534

3635

37-
def as_completed(futures, names=None, sleep_time=0):
38-
futures = list(futures)
39-
if names is not None:
40-
names = list(names)
41-
while len(futures) > 0:
42-
for i, future in enumerate(futures):
43-
if future.done():
44-
futures.pop(i)
45-
if names is None:
46-
yield future
47-
else:
48-
name = names.pop(i)
49-
yield future, name
50-
if sleep_time > 0:
51-
sleep(sleep_time)
36+
def as_completed(futures: List[ApplyResult], names: List[str]) -> Generator[Tuple[ApplyResult, str], None, None]:
37+
if len(futures) is not len(names):
38+
raise RuntimeError("Not all registered applications have an associated name")
39+
for future, name in zip(futures, names):
40+
yield future, name
5241

5342

5443
def reset(save_loggers=False):
@@ -180,7 +169,7 @@ def run_applications(
180169
inputs=inputs,
181170
)
182171
inputs["app_config"] = app_cfg
183-
future = executor.submit(program.entry, **inputs)
172+
future: ApplyResult = executor.apply_async(program.entry, kwds=inputs)
184173
app_futures.append(future)
185174

186175
# for app_cfg in app_cfgs:
@@ -193,8 +182,8 @@ def run_applications(
193182
# Join the application processes and the backend
194183
names = [f'app_{app_name}' for app_name in app_names]
195184
result = {}
196-
for future, name in as_completed(app_futures, names=names):
197-
result[name] = future.result()
185+
for future, name in as_completed(app_futures, names):
186+
result[name] = future.get()
198187
# if results_file is not None:
199188
# save_results(results=results, results_file=results_file)
200189
if enable_logging:

0 commit comments

Comments
 (0)