@@ -98,28 +98,29 @@ def __init__(
9898 self .batch_size = batch_size
9999 self .amqp_port = amqp_port
100100 self .client = client
101+ self .executor_kwargs = kwargs
101102
102103 if not _globus_compute_enabled :
103104 raise OptionalModuleMissing (
104105 ['globus-compute-sdk' ],
105106 "GlobusComputeExecutor requires globus-compute-sdk installed"
106107 )
107108
109+ def start (self ) -> None :
110+ """ Start the Globus Compute Executor """
111+
108112 self ._executor : Executor = Executor (
109- endpoint_id = endpoint_id ,
110- task_group_id = task_group_id ,
111- resource_specification = resource_specification ,
112- user_endpoint_config = user_endpoint_config ,
113- label = label ,
114- batch_size = batch_size ,
115- amqp_port = amqp_port ,
113+ endpoint_id = self . endpoint_id ,
114+ task_group_id = self . task_group_id ,
115+ resource_specification = self . resource_specification ,
116+ user_endpoint_config = self . user_endpoint_config ,
117+ label = self . label ,
118+ batch_size = self . batch_size ,
119+ amqp_port = self . amqp_port ,
116120 client = self .client ,
117- ** kwargs
121+ ** self . executor_kwargs
118122 )
119123
120- def start (self ) -> None :
121- pass
122-
123124 def submit (self , func : Callable , resource_specification : Dict [str , Any ], * args : Any , ** kwargs : Any ) -> Future :
124125 """ Submit func to globus-compute
125126
@@ -164,4 +165,4 @@ def shutdown(self):
164165 GCE.shutdown will cancel all futures that have not yet registered with
165166 Globus Compute and will not wait for the launched futures to complete.
166167 """
167- return self ._executor .shutdown (wait = False , cancel_futures = True )
168+ self ._executor .shutdown (wait = False , cancel_futures = True )
0 commit comments