Skip to content

Commit aa58a71

Browse files
committed
use 1 mapping only
1 parent 816aca4 commit aa58a71

File tree

1 file changed

+15
-17
lines changed

1 file changed

+15
-17
lines changed

parsl/executors/taskvine/executor.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -178,14 +178,12 @@ def __init__(self,
178178
# Path to directory that holds all tasks' data and results.
179179
self._function_data_dir = ""
180180

181-
# Mapping of function names to function objects
181+
# Mapping of function names to function details
182+
# Currently the values include function objects, path to serialized functions, path to serialized function contexts, and whether functions are serialized
182183
# Helpful to detect inconsistencies in serverless functions
183-
self._map_func_names_to_func_objs = {}
184-
185-
# Mapping of function names to file containing functions' serialization
186184
# Helpful to deduplicate the same function
187-
self._map_func_names_to_serialized_func_file = {}
188-
185+
self._map_func_names_to_func_details = {}
186+
189187
# Helper scripts to prepare package tarballs for Parsl apps
190188
self._package_analyze_script = shutil.which("poncho_package_analyze")
191189
self._package_create_script = shutil.which("poncho_package_create")
@@ -348,10 +346,10 @@ def submit(self, func, resource_specification, *args, **kwargs):
348346
exec_mode = resource_specification.get('exec_mode', self.function_exec_mode)
349347

350348
if exec_mode == 'serverless':
351-
if func.__name__ not in self._map_func_names_to_func_objs:
352-
self._map_func_names_to_func_objs[func.__name__] = func
349+
if func.__name__ not in self._map_func_names_to_func_details or 'func_obj' not in self._map_func_names_to_func_details[func.__name__]:
350+
self._map_func_names_to_func_details[func.__name__] = {'func_obj': func}
353351
else:
354-
if id(func) != id(self._map_func_names_to_func_objs[func.__name__]):
352+
if id(func) != id(self._map_func_names_to_func_details[func.__name__]['func_obj']):
355353
logger.warning('Inconsistency in a serverless function call detected. A function name cannot point to two different function objects. Falling back to executing it as a regular task.')
356354
exec_mode = 'regular'
357355

@@ -427,12 +425,12 @@ def submit(self, func, resource_specification, *args, **kwargs):
427425
# Get path to files that will contain the pickled function,
428426
# arguments, result, and map of input and output files
429427
if exec_mode == 'serverless':
430-
if func.__name__ not in self._map_func_names_to_serialized_func_file:
428+
if func.__name__ not in self._map_func_names_to_func_details or 'function_file' not in self._map_func_names_to_func_details[func.__name__]:
431429
function_file = os.path.join(self._function_data_dir.name, func.__name__, 'function')
432-
self._map_func_names_to_serialized_func_file[func.__name__] = {'function_file': function_file, 'is_serialized': False}
430+
self._map_func_names_to_func_details[func.__name__] = {'function_file': function_file, 'is_serialized': False}
433431
os.makedirs(os.path.join(self._function_data_dir.name, func.__name__))
434432
else:
435-
function_file = self._map_func_names_to_serialized_func_file[func.__name__]['function_file']
433+
function_file = self._map_func_names_to_func_details[func.__name__]['function_file']
436434
else:
437435
function_file = self._path_in_task(executor_task_id, "function")
438436
argument_file = self._path_in_task(executor_task_id, "argument")
@@ -441,25 +439,25 @@ def submit(self, func, resource_specification, *args, **kwargs):
441439

442440
if exec_mode == 'serverless':
443441
if 'function_context' in resource_specification:
444-
if 'function_context_file' not in self._map_func_names_to_serialized_func_file[func.__name__]:
442+
if 'function_context_file' not in self._map_func_names_to_func_details[func.__name__]:
445443
function_context = resource_specification.get('function_context')
446444
function_context_args = resource_specification.get('function_context_args', [])
447445
function_context_kwargs = resource_specification.get('function_context_kwargs', {})
448446
function_context_file = os.path.join(self._function_data_dir.name, func.__name__, 'function_context')
449447
self._serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs])
450-
self._map_func_names_to_serialized_func_file[func.__name__]['function_context_file'] = function_context_file
448+
self._map_func_names_to_func_details[func.__name__]['function_context_file'] = function_context_file
451449
else:
452-
function_context_file = self._map_func_names_to_serialized_func_file[func.__name__]['function_context_file']
450+
function_context_file = self._map_func_names_to_func_details[func.__name__]['function_context_file']
453451

454452

455453
logger.debug("Creating executor task {} with function at: {}, argument at: {}, \
456454
and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file))
457455

458456
# Serialize function object and arguments, separately
459-
if exec_mode == 'regular' or not self._map_func_names_to_serialized_func_file[func.__name__]['is_serialized']:
457+
if exec_mode == 'regular' or not self._map_func_names_to_func_details[func.__name__]['is_serialized']:
460458
self._serialize_object_to_file(function_file, func)
461459
if exec_mode == 'serverless':
462-
self._map_func_names_to_serialized_func_file[func.__name__]['is_serialized'] = True
460+
self._map_func_names_to_func_details[func.__name__]['is_serialized'] = True
463461
args_dict = {'args': args, 'kwargs': kwargs}
464462
self._serialize_object_to_file(argument_file, args_dict)
465463

0 commit comments

Comments
 (0)