Skip to content

Commit 4be9d23

Browse files
committed
Support for GlobusComputeExecutor to submit functions to globus_compute_endpoints
1 parent 6683f5b commit 4be9d23

File tree

3 files changed

+149
-1
lines changed

3 files changed

+149
-1
lines changed

parsl/executors/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from parsl.executors.flux.executor import FluxExecutor
2+
from parsl.executors.globus_compute import GlobusComputeExecutor
23
from parsl.executors.high_throughput.executor import HighThroughputExecutor
34
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
45
from parsl.executors.threads import ThreadPoolExecutor
@@ -8,4 +9,5 @@
89
'HighThroughputExecutor',
910
'MPIExecutor',
1011
'WorkQueueExecutor',
11-
'FluxExecutor']
12+
'FluxExecutor',
13+
'GlobusComputeExecutor']

parsl/executors/globus_compute.py

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import uuid
2+
from concurrent.futures import Future
3+
from typing import Any, Callable, Dict, Optional, Union
4+
5+
import typeguard
6+
7+
from parsl.errors import OptionalModuleMissing
8+
from parsl.executors.base import ParslExecutor
9+
from parsl.utils import RepresentationMixin
10+
11+
UUID_LIKE_T = Union[uuid.UUID, str]
12+
13+
14+
15+
class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
16+
""" GlobusComputeExecutor enables remote execution on Globus Compute endpoints
17+
18+
GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor
19+
Refer to `globus-compute user documentation <https://globus-compute.readthedocs.io/en/latest/executor.html>`_
20+
and `reference documentation <https://globus-compute.readthedocs.io/en/latest/reference/executor.html>`_
21+
for more details.
22+
"""
23+
24+
def __init__(
25+
self,
26+
endpoint_id: Optional[UUID_LIKE_T] = None,
27+
task_group_id: Optional[UUID_LIKE_T] = None,
28+
resource_specification: Optional[dict[str, Any]] = None,
29+
user_endpoint_config: Optional[dict[str, Any]] = None,
30+
label: str = "GlobusComputeExecutor",
31+
batch_size: int = 128,
32+
amqp_port: Optional[int] = None,
33+
**kwargs,
34+
):
35+
"""
36+
37+
Parameters
38+
----------
39+
40+
endpoint_id:
41+
id of the endpoint to which to submit tasks
42+
43+
task_group_id:
44+
The Task Group to which to associate tasks. If not set,
45+
one will be instantiated.
46+
47+
resource_specification:
48+
Specify resource requirements for individual task execution.
49+
50+
user_endpoint_config:
51+
User endpoint configuration values as described
52+
and allowed by endpoint administrators. Must be a JSON-serializable dict
53+
or None.
54+
55+
label:
56+
a label to name the executor; mainly utilized for
57+
logging and advanced needs with multiple executors.
58+
59+
batch_size:
60+
the maximum number of tasks to coalesce before
61+
sending upstream [min: 1, default: 128]
62+
63+
amqp_port:
64+
Port to use when connecting to results queue. Note that the
65+
Compute web services only support 5671, 5672, and 443.
66+
67+
kwargs:
68+
Other kwargs listed will be passed through to globus_compute_sdk.Executor
69+
as is
70+
"""
71+
super().__init__()
72+
self.endpoint_id = endpoint_id
73+
self.task_group_id = task_group_id
74+
self.resource_specification = resource_specification
75+
self.user_endpoint_config = user_endpoint_config
76+
self.label = label
77+
self.batch_size = batch_size
78+
self.amqp_port = amqp_port
79+
80+
try:
81+
from globus_compute_sdk import Executor
82+
except ImportError:
83+
raise OptionalModuleMissing(
84+
['globus-compute-sdk'],
85+
"GlobusComputeExecutor requires globus-compute-sdk installed"
86+
)
87+
self._executor: Executor = Executor(
88+
endpoint_id=endpoint_id,
89+
task_group_id=task_group_id,
90+
resource_specification=resource_specification,
91+
user_endpoint_config=user_endpoint_config,
92+
label=label,
93+
batch_size=batch_size,
94+
amqp_port=amqp_port,
95+
**kwargs
96+
)
97+
98+
def start(self) -> None:
99+
"""Empty function
100+
"""
101+
pass
102+
103+
def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
104+
""" Submit fn to globus-compute
105+
106+
107+
Parameters
108+
----------
109+
110+
func: Callable
111+
Python function to execute remotely
112+
resource_specification: Dict[str, Any]
113+
Resource specification used to run MPI applications on Endpoints configured
114+
to use globus compute's MPIEngine
115+
args:
116+
Args to pass to the function
117+
kwargs:
118+
kwargs to pass to the function
119+
120+
Returns
121+
-------
122+
123+
Future
124+
"""
125+
self._executor.resource_specification = resource_specification or self.resource_specification
126+
return self._executor.submit(func, *args, **kwargs)
127+
128+
def shutdown(self, wait=True, *, cancel_futures=False):
129+
"""Clean-up the resources associated with the Executor.
130+
131+
It is safe to call this method several times. Otherwise, no other methods
132+
can be called after this one.
133+
134+
Parameters
135+
----------
136+
137+
wait: If True, then this method will not return until all pending
138+
futures have received results.
139+
cancel_futures: If True, then this method will cancel all futures
140+
that have not yet registered their tasks with the Compute web services.
141+
Tasks cannot be cancelled once they are registered.
142+
"""
143+
return self._executor.shutdown()
144+
145+

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
'flux': ['pyyaml', 'cffi', 'jsonschema'],
3535
'proxystore': ['proxystore'],
3636
'radical-pilot': ['radical.pilot==1.60', 'radical.utils==1.60'],
37+
'globus_compute': ['globus_compute_sdk>=2.27.1'],
3738
# Disabling psi-j since github direct links are not allowed by pypi
3839
# 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl']
3940
}

0 commit comments

Comments
 (0)