Skip to content

Commit 4c0d1cf

Browse files
committed
fixes to update head
1 parent 194632b commit 4c0d1cf

File tree

2 files changed

+54
-2
lines changed

2 files changed

+54
-2
lines changed

parsl/executors/taskvine/executor.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ def _path_in_task(self, executor_task_id, *path_components):
323323
return os.path.join(self._function_data_dir.name, task_dir, *path_components)
324324

325325
def submit(self, func, resource_specification, *args, **kwargs):
326+
import cloudpickle
326327
"""Processes the Parsl app by its arguments and submits the function
327328
information to the task queue, to be executed using the TaskVine
328329
system. The args and kwargs are processed for input and output files to
@@ -459,8 +460,15 @@ def submit(self, func, resource_specification, *args, **kwargs):
459460
function_context_args = resource_specification.get('function_context_args', [])
460461
function_context_kwargs = resource_specification.get('function_context_kwargs', {})
461462
function_context_file = os.path.join(self._function_data_dir.name, func.__name__, 'function_context')
462-
self._serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs])
463+
464+
# DEBUG
465+
with open('/tmp/tmp.context.executor.function.module.file', 'w') as f:
466+
f.write(str(function_context.__module__))
467+
468+
self._cloudpickle_serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs])
463469
self._map_func_names_to_func_details[func.__name__].update({'function_context_file': function_context_file})
470+
# DEBUG
471+
shutil.copyfile(function_context_file, '/tmp/tmp.context.file')
464472
else:
465473
function_context_file = self._map_func_names_to_func_details[func.__name__]['function_context_file']
466474

@@ -472,6 +480,21 @@ def submit(self, func, resource_specification, *args, **kwargs):
472480
self._serialize_object_to_file(function_file, func)
473481
if exec_mode == 'serverless':
474482
self._map_func_names_to_func_details[func.__name__]['is_serialized'] = True
483+
484+
# DEBUG
485+
if exec_mode == 'serverless':
486+
import copy
487+
kwargs = copy.deepcopy(kwargs)
488+
logger.info(f'ThanhDBG before trimming kwargs: {kwargs}')
489+
# pop function context stuff, that should not propagate with the invocation function
490+
if 'function_context' in kwargs['parsl_resource_specification']:
491+
del kwargs['parsl_resource_specification']['function_context']
492+
if 'function_context_args' in kwargs['parsl_resource_specification']:
493+
del kwargs['parsl_resource_specification']['function_context_args']
494+
if 'function_context_kwargs' in kwargs['parsl_resource_specification']:
495+
del kwargs['parsl_resource_specification']['function_context_kwargs']
496+
logger.info(f'ThanhDBG after trimming kwargs: {kwargs}')
497+
475498
args_dict = {'args': args, 'kwargs': kwargs}
476499
self._serialize_object_to_file(argument_file, args_dict)
477500

@@ -556,6 +579,12 @@ def _serialize_object_to_file(self, path, obj):
556579
while written < len(serialized_obj):
557580
written += f_out.write(serialized_obj[written:])
558581

582+
def _cloudpickle_serialize_object_to_file(self, path, obj):
583+
"""Takes any object and serializes it to the file path."""
584+
import cloudpickle
585+
with open(path, 'wb') as f:
586+
cloudpickle.dump(obj, f)
587+
559588
def _construct_map_file(self, map_file, input_files, output_files):
560589
""" Map local filepath of parsl files to the filenames at the execution worker.
561590
If using a shared filesystem, the filepath is mapped to its absolute filename.

parsl/executors/taskvine/manager.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ def _set_manager_attributes(m, config):
5757
for k, v in config.tune_parameters.items():
5858
m.tune(k, v)
5959

60+
# DEBUG
61+
m.tune("watch-library-logfiles", 1)
62+
6063

6164
def _prepare_environment_serverless(manager_config, env_cache_dir, poncho_create_script):
6265
# Return path to a packaged poncho environment
@@ -276,8 +279,22 @@ def _taskvine_submit_wait(ready_task_queue=None,
276279
# Deserialize the function context to add it to the library if available
277280
# This cost is paid only once per function/app.
278281
function_context_list = None
282+
279283
if task.function_context_file:
280-
function_context_list = _deserialize_object_from_file(task.function_context_file)
284+
#function_context_list = _deserialize_object_from_file(task.function_context_file)
285+
import cloudpickle
286+
with open(task.function_context_file, 'rb') as f:
287+
function_context_list = cloudpickle.load(f)
288+
289+
# DEBUG
290+
with open('/tmp/tmp.context.manager.print.context.file.file', 'w') as f:
291+
f.write(str(task.function_context_file))
292+
with open('/tmp/tmp.context.manager.file', 'w') as f:
293+
f.write(str(function_context_list))
294+
logger.info(f'ThanhDBG {function_context_list}')
295+
with open('/tmp/tmp.context.manager.function.module.file', 'w') as f:
296+
f.write(str(function_context_list[0].__module__))
297+
#logger.info('ThanhDBG' + function_context_list[0].__module__)
281298

282299
# Don't automatically add environment so manager can declare and cache the vine file associated with the environment file
283300
add_env = False
@@ -290,6 +307,7 @@ def _taskvine_submit_wait(ready_task_queue=None,
290307
hoisting_modules=[parsl.serialize, run_parsl_function],
291308
exec_mode='direct',
292309
library_context_info=function_context_list)
310+
293311

294312
# Configure the library if provided
295313
if manager_config.library_config:
@@ -324,6 +342,11 @@ def _taskvine_submit_wait(ready_task_queue=None,
324342
args = all_args['args']
325343
kwargs = all_args['kwargs']
326344

345+
# DEBUG
346+
with open('/tmp/manager.check.args.kwargs.function.call.file', 'w') as f:
347+
f.write(str(args))
348+
f.write(str(kwargs))
349+
327350
t = FunctionCall(libs_installed[task.func_name], task.func_name, *args, **kwargs)
328351
except Exception as e:
329352
logger.error("Unable to create executor task (mode:serverless): {}".format(e))

0 commit comments

Comments
 (0)