| 
 | 1 | +from __future__ import annotations  | 
 | 2 | + | 
 | 3 | +import copy  | 
 | 4 | +from concurrent.futures import Future  | 
 | 5 | +from typing import Any, Callable, Dict  | 
 | 6 | + | 
 | 7 | +import typeguard  | 
 | 8 | + | 
 | 9 | +from parsl.errors import OptionalModuleMissing  | 
 | 10 | +from parsl.executors.base import ParslExecutor  | 
 | 11 | +from parsl.utils import RepresentationMixin  | 
 | 12 | + | 
 | 13 | +try:  | 
 | 14 | +    from globus_compute_sdk import Executor  | 
 | 15 | +    _globus_compute_enabled = True  | 
 | 16 | +except ImportError:  | 
 | 17 | +    _globus_compute_enabled = False  | 
 | 18 | + | 
 | 19 | + | 
 | 20 | +class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):  | 
 | 21 | +    """ GlobusComputeExecutor enables remote execution on Globus Compute endpoints  | 
 | 22 | +
  | 
 | 23 | +    GlobusComputeExecutor is a thin wrapper over globus_compute_sdk.Executor  | 
 | 24 | +    Refer to `globus-compute user documentation <https://globus-compute.readthedocs.io/en/latest/executor.html>`_  | 
 | 25 | +    and `reference documentation <https://globus-compute.readthedocs.io/en/latest/reference/executor.html>`_  | 
 | 26 | +    for more details.  | 
 | 27 | +
  | 
 | 28 | +    .. note::  | 
 | 29 | +       As a remote execution system, Globus Compute relies on serialization to ship  | 
 | 30 | +       tasks and results between the Parsl client side and the remote Globus Compute  | 
 | 31 | +       Endpoint side. Serialization is unreliable across python versions, and  | 
 | 32 | +       wrappers used by Parsl assume identical Parsl versions across on both sides.  | 
 | 33 | +       We recommend using matching Python, Parsl and Globus Compute version on both  | 
 | 34 | +       the client side and the endpoint side for stable behavior.  | 
 | 35 | +
  | 
 | 36 | +    """  | 
 | 37 | + | 
 | 38 | +    @typeguard.typechecked  | 
 | 39 | +    def __init__(  | 
 | 40 | +        self,  | 
 | 41 | +        executor: Executor,  | 
 | 42 | +        label: str = 'GlobusComputeExecutor',  | 
 | 43 | +    ):  | 
 | 44 | +        """  | 
 | 45 | +        Parameters  | 
 | 46 | +        ----------  | 
 | 47 | +
  | 
 | 48 | +        executor: globus_compute_sdk.Executor  | 
 | 49 | +            Pass a globus_compute_sdk Executor that will be used to execute  | 
 | 50 | +            tasks on a globus_compute endpoint. Refer to `globus-compute docs  | 
 | 51 | +            <https://globus-compute.readthedocs.io/en/latest/reference/executor.html#globus-compute-executor>`_  | 
 | 52 | +
  | 
 | 53 | +        label:  | 
 | 54 | +            a label to name the executor  | 
 | 55 | +        """  | 
 | 56 | +        if not _globus_compute_enabled:  | 
 | 57 | +            raise OptionalModuleMissing(  | 
 | 58 | +                ['globus-compute-sdk'],  | 
 | 59 | +                "GlobusComputeExecutor requires globus-compute-sdk installed"  | 
 | 60 | +            )  | 
 | 61 | + | 
 | 62 | +        super().__init__()  | 
 | 63 | +        self.executor: Executor = executor  | 
 | 64 | +        self.resource_specification = self.executor.resource_specification  | 
 | 65 | +        self.user_endpoint_config = self.executor.user_endpoint_config  | 
 | 66 | +        self.label = label  | 
 | 67 | + | 
 | 68 | +    def start(self) -> None:  | 
 | 69 | +        """ Start the Globus Compute Executor """  | 
 | 70 | +        pass  | 
 | 71 | + | 
 | 72 | +    def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:  | 
 | 73 | +        """ Submit func to globus-compute  | 
 | 74 | +
  | 
 | 75 | +
  | 
 | 76 | +        Parameters  | 
 | 77 | +        ----------  | 
 | 78 | +
  | 
 | 79 | +        func: Callable  | 
 | 80 | +            Python function to execute remotely  | 
 | 81 | +
  | 
 | 82 | +        resource_specification: Dict[str, Any]  | 
 | 83 | +            Resource specification can be used specify MPI resources required by MPI applications on  | 
 | 84 | +            Endpoints configured to use globus compute's MPIEngine. GCE also accepts *user_endpoint_config*  | 
 | 85 | +            to configure endpoints when the endpoint is a `Multi-User Endpoint  | 
 | 86 | +            <https://globus-compute.readthedocs.io/en/latest/endpoints/endpoints.html#templating-endpoint-configuration>`_  | 
 | 87 | +
  | 
 | 88 | +        args:  | 
 | 89 | +            Args to pass to the function  | 
 | 90 | +
  | 
 | 91 | +        kwargs:  | 
 | 92 | +            kwargs to pass to the function  | 
 | 93 | +
  | 
 | 94 | +        Returns  | 
 | 95 | +        -------  | 
 | 96 | +
  | 
 | 97 | +        Future  | 
 | 98 | +        """  | 
 | 99 | +        res_spec = copy.deepcopy(resource_specification or self.resource_specification)  | 
 | 100 | +        # Pop user_endpoint_config since it is illegal in resource_spec for globus_compute  | 
 | 101 | +        if res_spec:  | 
 | 102 | +            user_endpoint_config = res_spec.pop('user_endpoint_config', self.user_endpoint_config)  | 
 | 103 | +        else:  | 
 | 104 | +            user_endpoint_config = self.user_endpoint_config  | 
 | 105 | + | 
 | 106 | +        try:  | 
 | 107 | +            self.executor.resource_specification = res_spec  | 
 | 108 | +            self.executor.user_endpoint_config = user_endpoint_config  | 
 | 109 | +            return self.executor.submit(func, *args, **kwargs)  | 
 | 110 | +        finally:  | 
 | 111 | +            # Reset executor state to defaults set at configuration time  | 
 | 112 | +            self.executor.resource_specification = self.resource_specification  | 
 | 113 | +            self.executor.user_endpoint_config = self.user_endpoint_config  | 
 | 114 | + | 
 | 115 | +    def shutdown(self):  | 
 | 116 | +        """Clean-up the resources associated with the Executor.  | 
 | 117 | +
  | 
 | 118 | +        GCE.shutdown will cancel all futures that have not yet registered with  | 
 | 119 | +        Globus Compute and will not wait for the launched futures to complete.  | 
 | 120 | +        This method explicitly shutsdown the result_watcher thread to avoid  | 
 | 121 | +        it waiting for outstanding futures at thread exit.  | 
 | 122 | +        """  | 
 | 123 | +        self.executor.shutdown(wait=False, cancel_futures=True)  | 
 | 124 | +        result_watcher = self.executor._get_result_watcher()  | 
 | 125 | +        result_watcher.shutdown(wait=False, cancel_futures=True)  | 
0 commit comments