Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 103 additions & 11 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import uuid
from concurrent.futures import Future
from datetime import datetime
from typing import List, Literal, Optional, Union
from typing import Dict, List, Literal, Optional, Union

# Import other libraries
import typeguard
Expand Down Expand Up @@ -84,8 +84,12 @@ class TaskVineExecutor(BlockProviderExecutor, putils.RepresentationMixin):
pre-warmed forked python process.
Default is 'regular'.

use_tmp_dir_for_staging: bool
Whether to use tmp dir for staging functions, arguments, and results.
Default is False.

manager_config: TaskVineManagerConfig
Configuration for the TaskVine manager. Default
Configuration for the TaskVine manager.

factory_config: TaskVineFactoryConfig
Configuration for the TaskVine factory.
Expand All @@ -105,6 +109,7 @@ def __init__(self,
label: str = "TaskVineExecutor",
worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory',
function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular',
use_tmp_dir_for_staging: bool = False,
manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
Expand All @@ -131,6 +136,7 @@ def __init__(self,
self.label = label
self.worker_launch_method = worker_launch_method
self.function_exec_mode = function_exec_mode
self.use_tmp_dir_for_staging = use_tmp_dir_for_staging
self.manager_config = manager_config
self.factory_config = factory_config
self.storage_access = storage_access
Expand Down Expand Up @@ -179,6 +185,13 @@ def __init__(self,
# Path to directory that holds all tasks' data and results.
self._function_data_dir = ""

# Mapping of function names to function details.
# Currently the values include function objects, path to serialized functions,
# path to serialized function contexts, and whether functions are serialized.
# Helpful to detect inconsistencies in serverless functions.
# Helpful to deduplicate the same function.
self._map_func_names_to_func_details: Dict[str, Dict] = {}

# Helper scripts to prepare package tarballs for Parsl apps
self._package_analyze_script = shutil.which("poncho_package_analyze")
self._package_create_script = shutil.which("poncho_package_create")
Expand Down Expand Up @@ -225,8 +238,13 @@ def __create_data_and_logging_dirs(self):
# Create directories for data and results
log_dir = os.path.join(run_dir, self.label)
os.makedirs(log_dir)
tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-'
self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix)

if self.use_tmp_dir_for_staging:
tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-'
self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix).name
else:
self._function_data_dir = os.path.join(log_dir, 'function')
logging.debug(f'Function data dir is {self._function_data_dir}')

# put TaskVine logs outside of a Parsl run as TaskVine caches between runs while
# Parsl does not.
Expand All @@ -236,7 +254,7 @@ def __create_data_and_logging_dirs(self):

# factory logs go with manager logs regardless
self.factory_config.scratch_dir = self.manager_config.vine_log_dir
logger.debug(f"Function data directory: {self._function_data_dir.name}, log directory: {log_dir}")
logger.debug(f"Function data directory: {self._function_data_dir}, log directory: {log_dir}")
logger.debug(
f"TaskVine manager log directory: {self.manager_config.vine_log_dir}, "
f"factory log directory: {self.factory_config.scratch_dir}")
Expand Down Expand Up @@ -303,9 +321,10 @@ def _path_in_task(self, executor_task_id, *path_components):
'map': Pickled file with a dict between local parsl names, and remote taskvine names.
"""
task_dir = "{:04d}".format(executor_task_id)
return os.path.join(self._function_data_dir.name, task_dir, *path_components)
return os.path.join(self._function_data_dir, task_dir, *path_components)

def submit(self, func, resource_specification, *args, **kwargs):
import cloudpickle
"""Processes the Parsl app by its arguments and submits the function
information to the task queue, to be executed using the TaskVine
system. The args and kwargs are processed for input and output files to
Expand All @@ -326,11 +345,35 @@ def submit(self, func, resource_specification, *args, **kwargs):
Keyword arguments to the Parsl app
"""

# a Parsl function must have a name
if func.__name__ is None:
raise ValueError('A Parsl function must have a name')

logger.debug(f'Got resource specification: {resource_specification}')

# If `_parsl_monitoring_task_id` is in kwargs, Parsl monitoring code is enabled.
is_monitoring_enabled = '_parsl_monitoring_task_id' in kwargs

# Default execution mode of apps is regular
exec_mode = resource_specification.get('exec_mode', self.function_exec_mode)

# Fall back to regular execution if a function is Parsl-monitored as a monitored function is invocation-specific.
# Note that it is possible to get the wrapped function by calling the `__wrapped__` attribute when monitoring is enabled.
# It will disable the monitoring wrapper code however.
if exec_mode == 'serverless' and is_monitoring_enabled:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit suspicious to me -- what is it about monitoring that is special here?

is it that there is a new function definition for every submission? If so, there are other parsl modes like some file staging scenarios, where that can happen too.

Although I think that is also what the block below, with function equality checking and fallback to regular is trying to do too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that for serverless execution model to work, a function func must be a generic function, instead of an invocation-specific function. The monitoring code replaces the original function with an invocation-specific version with task_id and so on. For file staging, I have been poking around it and find that as long as the storage_access attribute is None, which it is for TaskVineExecutor, the dfk won't replace the original function with file staging wrapper (I might be wrong here) and file staging is instead delegated to TaskVine's specialized file staging API.

The code block with function id checking is as you said a detection mechanism to fall back to the regular execution method if functions do change.

Thanks for reviewing it early! I was gonna fill this PR with more descriptions, tests, and documentation before pinging you.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

people don't use file staging much, but it might not be none - for example, I know one user who uses work queue plus zip file staging in Parsl, and that should translate directly to tv + zip file staging.

does that monitoring test detect anything that the line below it doesn't already detect? or is it for getting a nicer error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for getting a nicer message so users may know what's wrong specifically with each case. The monitoring detection can be removed without affecting functionality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the first monitoring check removes the case where the first function runs serverless-ly and subsequent functions run regularly.

logger.warning("A serverless task cannot run with Parsl monitoring enabled. Falling back to execute this task as a regular task.")
exec_mode = 'regular'

if exec_mode == 'serverless':
if func.__name__ not in self._map_func_names_to_func_details:
self._map_func_names_to_func_details[func.__name__] = {'func_obj': func}
else:
if id(func) != id(self._map_func_names_to_func_details[func.__name__]['func_obj']):
logger.warning('Inconsistency in a serverless function call detected.\
A function name cannot point to two different function objects.\
Falling back to executing it as a regular task.')
exec_mode = 'regular'

# Detect resources and features of a submitted Parsl app
cores = None
memory = None
Expand Down Expand Up @@ -361,7 +404,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
self._executor_task_counter += 1

# Create a per task directory for the function, argument, map, and result files
os.mkdir(self._path_in_task(executor_task_id))
os.makedirs(self._path_in_task(executor_task_id), exist_ok=True)

input_files = []
output_files = []
Expand Down Expand Up @@ -394,21 +437,62 @@ def submit(self, func, resource_specification, *args, **kwargs):
argument_file = None
result_file = None
map_file = None
function_context_file = None

# Get path to files that will contain the pickled function,
# arguments, result, and map of input and output files
function_file = self._path_in_task(executor_task_id, "function")
if exec_mode == 'serverless':
if 'function_file' not in self._map_func_names_to_func_details[func.__name__]:
function_file = os.path.join(self._function_data_dir, func.__name__, 'function')
os.makedirs(os.path.join(self._function_data_dir, func.__name__))
self._map_func_names_to_func_details[func.__name__].update({'function_file': function_file, 'is_serialized': False})
else:
function_file = self._map_func_names_to_func_details[func.__name__]['function_file']
else:
function_file = self._path_in_task(executor_task_id, "function")
argument_file = self._path_in_task(executor_task_id, "argument")
result_file = self._path_in_task(executor_task_id, "result")
map_file = self._path_in_task(executor_task_id, "map")

logger.debug("Creating executor task {} with function at: {}, argument at: {}, \
and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file))
if exec_mode == 'serverless':
if 'function_context' in resource_specification:
if 'function_context_file' not in self._map_func_names_to_func_details[func.__name__]:
function_context = resource_specification.get('function_context')
function_context_args = resource_specification.get('function_context_args', [])
function_context_kwargs = resource_specification.get('function_context_kwargs', {})
function_context_file = os.path.join(self._function_data_dir, func.__name__, 'function_context')

self._cloudpickle_serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs])
self._map_func_names_to_func_details[func.__name__].update({'function_context_file': function_context_file})
else:
function_context_file = self._map_func_names_to_func_details[func.__name__]['function_context_file']

logger.debug("Creating executor task {} with function at: {}, argument at: {}, and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file))

# Serialize function object and arguments, separately
self._serialize_object_to_file(function_file, func)
if exec_mode == 'regular' or not self._map_func_names_to_func_details[func.__name__]['is_serialized']:
self._serialize_object_to_file(function_file, func)
if exec_mode == 'serverless':
self._map_func_names_to_func_details[func.__name__]['is_serialized'] = True

# Delete references of function context information from resource_specification
# as they are not needed to be transferred to remote nodes.
# They are restored when the kwargs serialization is done.
if exec_mode == 'serverless':
function_context = kwargs['parsl_resource_specification'].pop('function_context', None)
function_context_args = kwargs['parsl_resource_specification'].pop('function_context_args', [])
function_context_kwargs = kwargs['parsl_resource_specification'].pop('function_context_kwargs', {})

args_dict = {'args': args, 'kwargs': kwargs}
self._serialize_object_to_file(argument_file, args_dict)

if exec_mode == 'serverless':
if function_context:
kwargs['parsl_resource_specification']['function_context'] = function_context
if function_context_args:
kwargs['parsl_resource_specification']['function_context_args'] = function_context_args
if function_context_kwargs:
kwargs['parsl_resource_specification']['function_context_kwargs'] = function_context_kwargs

# Construct the map file of local filenames at worker
self._construct_map_file(map_file, input_files, output_files)
Expand All @@ -427,6 +511,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
category = func.__name__ if self.manager_config.autocategory else 'parsl-default'

task_info = ParslTaskToVine(executor_id=executor_task_id,
func_name=func.__name__,
exec_mode=exec_mode,
category=category,
input_files=input_files,
Expand All @@ -435,6 +520,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
function_file=function_file,
argument_file=argument_file,
result_file=result_file,
function_context_file=function_context_file,
cores=cores,
memory=memory,
disk=disk,
Expand Down Expand Up @@ -489,6 +575,12 @@ def _serialize_object_to_file(self, path, obj):
while written < len(serialized_obj):
written += f_out.write(serialized_obj[written:])

def _cloudpickle_serialize_object_to_file(self, path, obj):
"""Takes any object and serializes it to the file path."""
import cloudpickle
with open(path, 'wb') as f:
cloudpickle.dump(obj, f)

def _construct_map_file(self, map_file, input_files, output_files):
""" Map local filepath of parsl files to the filenames at the execution worker.
If using a shared filesystem, the filepath is mapped to its absolute filename.
Expand Down
Loading
Loading