diff --git a/.gitignore b/.gitignore index 8811016b83..d361be3ab4 100644 --- a/.gitignore +++ b/.gitignore @@ -121,3 +121,17 @@ ENV/ # emacs buffers \#* + +runinfo* +parsl/tests/.pytest* + +# documentation generation +docs/stubs/* +docs/1-parsl-introduction.ipynb + +/tmp +parsl/data_provider/dyn.new.py + +examples/ + +.jupyter/ diff --git a/docs/images/mon_env_detail.png b/docs/images/mon_env_detail.png new file mode 100644 index 0000000000..2eab8b94a4 Binary files /dev/null and b/docs/images/mon_env_detail.png differ diff --git a/docs/images/mon_file_detail.png b/docs/images/mon_file_detail.png new file mode 100644 index 0000000000..6fbf638439 Binary files /dev/null and b/docs/images/mon_file_detail.png differ diff --git a/docs/images/mon_file_provenance.png b/docs/images/mon_file_provenance.png new file mode 100644 index 0000000000..5274510f38 Binary files /dev/null and b/docs/images/mon_file_provenance.png differ diff --git a/docs/images/mon_task_detail.png b/docs/images/mon_task_detail.png new file mode 100644 index 0000000000..b8acb327f5 Binary files /dev/null and b/docs/images/mon_task_detail.png differ diff --git a/docs/images/mon_workflow_files.png b/docs/images/mon_workflow_files.png new file mode 100644 index 0000000000..c5378d2260 Binary files /dev/null and b/docs/images/mon_workflow_files.png differ diff --git a/docs/images/mon_workflows_page.png b/docs/images/mon_workflows_page.png index 3b9be2edc7..62f5f55d30 100644 Binary files a/docs/images/mon_workflows_page.png and b/docs/images/mon_workflows_page.png differ diff --git a/docs/reference.rst b/docs/reference.rst index 153d9fedb0..d585a320a4 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -54,6 +54,7 @@ Data management parsl.data_provider.data_manager.DataManager parsl.data_provider.staging.Staging parsl.data_provider.files.File + parsl.data_provider.dynamic_files.DynamicFileList parsl.data_provider.ftp.FTPSeparateTaskStaging parsl.data_provider.ftp.FTPInTaskStaging parsl.data_provider.file_noop.NoOpFileStaging diff --git a/docs/userguide/advanced/monitoring.rst b/docs/userguide/advanced/monitoring.rst index c30db877ed..0cd9834817 100644 --- a/docs/userguide/advanced/monitoring.rst +++ b/docs/userguide/advanced/monitoring.rst @@ -15,6 +15,8 @@ SQLite tools. Monitoring configuration ------------------------ +Parsl monitoring is only supported with the `parsl.executors.HighThroughputExecutor`. + The following example shows how to enable monitoring in the Parsl configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use port 55055 to receive monitoring messages from workers every 10 seconds. @@ -47,6 +49,116 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por ) +File Provenance +--------------- + +The monitoring system can also be used to track file provenance. File provenance is defined as the +history of a file including: + +* When the files was created +* File size in bytes +* File md5sum +* What task created the file +* What task(s) used the file +* What inputs were given to the task that created the file +* What environment was used (e.g. the 'worker_init' entry from a :py:class:`~parsl.providers.ExecutionProvider`), + not available with every provider. + +The purpose of the file provenance tracking is to provide a mechanism where the user can see exactly +how a file was created and used in a workflow. This can be useful for debugging, understanding the +workflow, for ensuring that the workflow is reproducible, and reviewing past work. The file +provenance information is stored in the monitoring database and can be accessed using the +``parsl-visualize`` tool. To enable file provenance tracking, set the ``file_provenance`` flag to +``True`` in the `parsl.monitoring.MonitoringHub` configuration. + +This functionality also enables you to log informational messages from you scripts, to capture +anything not automatically gathered. The main change to your code to use this functionality is to +assign the return value of the ``parsl.load`` to a variable. Then use the ``log_info`` function to +log the messages in the database. Note that this feature is only available in the main script, not +inside Apps. Passing this variable, ``my_cfg`` in the example below to an App will have undefined +behavior. The following example shows how to use this feature. + +.. code-block:: python + + import parsl + from parsl.monitoring.monitoring import MonitoringHub + from parsl.config import Config + from parsl.executors import HighThroughputExecutor + from parsl.addresses import address_by_hostname + + import logging + + config = Config( + executors=[ + HighThroughputExecutor( + label="local_htex", + cores_per_worker=1, + max_workers_per_node=4, + address=address_by_hostname(), + ) + ], + monitoring=MonitoringHub( + hub_address=address_by_hostname(), + hub_port=55055, + monitoring_debug=False, + resource_monitoring_interval=10, + file_provenance=True, + ), + strategy='none' + ) + + my_cfg = parsl.load(config) + + my_cfg.log_info("This is an informational message") + +The file provenance framework also works with the :ref:`label-dynamic-file-list` feature. When a +:py:class:`parsl.data_provider.dynamic_files.DynamicFileList` is used the framework will wait until the app completes +and any files contained in the :py:class:`parsl.data_provider.dynamic_files.DynamicFileList` are marked as done before +completing its processing. + +.. note:: + Known limitations: The file provenance feature will capture the creation of files and the use of files in an app, + but does not capture the modification of files it already knows about. + +This functionality also enables you to log informational messages from you scripts, to capture anything not +automatically gathered. The main change to your code to use this functionality is to assign the return value of the +``parsl.load`` to a variable. Then use the ``log_info`` function to log the messages in the database. Note that this +feature is only available in the main script, not inside apps, unless you pass the variable (``my_cfg`` in the example +below), as an argument to the app. The following example shows how to use this feature. + +.. code-block:: python + + import parsl + from parsl.monitoring.monitoring import MonitoringHub + from parsl.config import Config + from parsl.executors import HighThroughputExecutor + from parsl.addresses import address_by_hostname + + import logging + + config = Config( + executors=[ + HighThroughputExecutor( + label="local_htex", + cores_per_worker=1, + max_workers_per_node=4, + address=address_by_hostname(), + ) + ], + monitoring=MonitoringHub( + hub_address=address_by_hostname(), + hub_port=55055, + monitoring_debug=False, + resource_monitoring_interval=10, + capture_file_provenance=True, + ), + strategy='none' + ) + + my_cfg = parsl.load(config) + + my_cfg.log_info("This is an informational message") + Visualization ------------- @@ -72,7 +184,7 @@ By default, the visualization web server listens on ``127.0.0.1:8080``. If the w $ ssh -L 50000:127.0.0.1:8080 username@cluster_address This command will bind your local machine's port 50000 to the remote cluster's port 8080. -The dashboard can then be accessed via the local machine's browser at ``127.0.0.1:50000``. +The dashboard can then be accessed via the local machine's browser at ``127.0.0.1:50000``. .. warning:: Alternatively you can deploy the visualization server on a public interface. However, first check that this is allowed by the cluster's security policy. The following example shows how to deploy the web server on a public port (i.e., open to Internet via ``public_IP:55555``):: @@ -96,12 +208,12 @@ Workflow Summary The workflow summary page captures the run level details of a workflow, including start and end times as well as task summary statistics. The workflow summary section is followed by the *App Summary* that lists -the various apps and invocation count for each. +the various apps and invocation count for each. .. image:: ../../images/mon_workflow_summary.png -The workflow summary also presents three different views of the workflow: +The workflow summary also presents several different views of the workflow: * Workflow DAG - with apps differentiated by colors: This visualization is useful to visually inspect the dependency structure of the workflow. Hovering over the nodes in the DAG shows a tooltip for the app represented by the node and it's task ID. @@ -117,3 +229,35 @@ The workflow summary also presents three different views of the workflow: .. image:: ../../images/mon_resource_summary.png +* Workflow file provenance (only if enabled and files were used in the workflow): This visualization gives a tabular listing of each task that created (output) or used (input) a file. Each listed file has a link to a page detailing the file's information. + +.. image:: ../../images/mon_workflow_files.png + +File Provenance +^^^^^^^^^^^^^^^ + +The file provenance page provides an interface for searching for files and viewing their provenance. The % wildcard can be used in the search bar to match any number of characters. Any results are listed in a table below the search bar. Clicking on a file in the table will take you to the file's detail page. + +.. image:: ../../images/mon_file_provenance.png + +File Details +^^^^^^^^^^^^ + +The file details page provides information about a specific file, including the file's name, size, md5sum, and the tasks that created and used the file. Clicking on any of the tasks will take you to their respective details page. If the file was created by a task there will be an entry for the Environment used by that task. Clicking that link will take you to the Environment Details page. + +.. image:: ../../images/mon_file_detail.png + + +Task Details +^^^^^^^^^^^^ + +The task details page provides information about a specific instantiation of a task. This information includes task dependencies, executor (environment), input and output files, and task arguments. + +.. image:: ../../images/mon_task_detail.png + +Environment Details +^^^^^^^^^^^^^^^^^^^ + +The environment details page provides information on the compute environment a task was run including the provider and launcher used and the worker_init that was used. + +.. image:: ../../images/mon_env_detail.png diff --git a/docs/userguide/configuration/data.rst b/docs/userguide/configuration/data.rst index 0c4c4b334d..fed0a9f48c 100644 --- a/docs/userguide/configuration/data.rst +++ b/docs/userguide/configuration/data.rst @@ -4,9 +4,9 @@ Staging data files ================== Parsl apps can take and return data files. A file may be passed as an input -argument to an app, or returned from an app after execution. Parsl -provides support to automatically transfer (stage) files between -the main Parsl program, worker nodes, and external data storage systems. +argument to an app, or returned from an app after execution. Parsl +provides support to automatically transfer (stage) files between +the main Parsl program, worker nodes, and external data storage systems. Input files can be passed as regular arguments, or a list of them may be specified in the special ``inputs`` keyword argument to an app invocation. @@ -39,13 +39,13 @@ interface. Parsl files ----------- -Parsl uses a custom :py:class:`~parsl.data_provider.files.File` to provide a -location-independent way of referencing and accessing files. -Parsl files are defined by specifying the URL *scheme* and a path to the file. +Parsl uses a custom :py:class:`~parsl.data_provider.files.File` to provide a +location-independent way of referencing and accessing files. +Parsl files are defined by specifying the URL *scheme* and a path to the file. Thus a file may represent an absolute path on the submit-side file system or a URL to an external file. -The scheme defines the protocol via which the file may be accessed. +The scheme defines the protocol via which the file may be accessed. Parsl supports the following schemes: file, ftp, http, https, and globus. If no scheme is specified Parsl will default to the file scheme. @@ -59,8 +59,8 @@ README file. File('https://github.com/Parsl/parsl/blob/master/README.rst') -Parsl automatically translates the file's location relative to the -environment in which it is accessed (e.g., the Parsl program or an app). +Parsl automatically translates the file's location relative to the +environment in which it is accessed (e.g., the Parsl program or an app). The following example shows how a file can be accessed in the app irrespective of where that app executes. @@ -83,22 +83,148 @@ As described below, the method by which this files are transferred depends on the scheme and the staging providers specified in the Parsl configuration. + +.. _label-dynamic-file-list: + +Dynamic File List +^^^^^^^^^^^^^^^^^ + +In certain cases, you do not know the number of files that will be produced by an app. When this happens, your script +will likely fail with an error ro do something unexpected, as Parsl needs to know about all files (input and output) +before an app runs. For the circumstances the :py:class:`parsl.data_provider.dynamic_files.DynamicFileList` was +developed. The purpose of the :py:class:`parsl.data_provider.dynamic_files.DynamicFileList` is to allow an app to add +files to the outputs, without the outputs being pre-specified. The +:py:class:`parsl.data_provider.dynamic_files.DynamicFileList` behaves like a list, but is also a Future. It can be used +anywhere a list of :py:class:`~parsl.data_provider.files.File` objects is expected. + +Take this example: + +.. code-block:: python + + import parsl + from parsl.config import Config + from parsl.executors import ThreadPoolExecutor + from parsl.data_provider.dynamic_files import DynamicFileList + + config = Config(executors=[ThreadPoolExecutor(label="local_htex")]) + parsl.load(config) + + @parsl.python_app + def produce(outputs=[]): + import random + import string + from parsl.data_provider.files import File + count = random.randint(3, 9) + for i in range(count): + fl = File(f'data_{i}.log') + with open(fl, 'w') as fh: + fh.write(''.join(random.choices(string.ascii_letters, k=50))) + outputs.append(fl) + print(f"\n\nProduced {len(outputs)} files") + + @parsl.python_app + def consume(inputs=[]): + from parsl.data_provider.files import File + print(f"Consuming {len(inputs)} files") + for inp in inputs: + with open(inp.filepath, 'r') as inp: + print(f" Reading {inp}") + content = inp.read() + +The app ``produce`` produces a random number of output files, these could be log files, data files, etc.). The +``consume`` function takes those files and reads them (it could really do anything with them). If we use the following +code, we wil not get the expected result: + +.. code-block:: python + + outp = [] + prc = produce(outputs=outp) + cons = consume(inputs=outp) + cons.result() + +The code will output something like + +.. code-block:: bash + + Consuming 0 files + Produced 3 files + +which is both out of order (the ``Produced`` line should be first) and incorrect (the ``Consuming`` line should have the +same number as the ``Produced`` line). This is because when Parsl generates the DAG, it sees an empty list for the +``inputs`` to consume and believes that there is no connection between the ``outputs`` of ``produce`` and these +``inputs``. Thus generating a DAG that allows ``consume`` to run whenever there are processing resources available, even +parallel with ``produce``. If we make a single line change (changing ``outp`` to be a +:py:class:`parsl.data_provider.dynamic_files.DynamicFileList`), this can be fixed: + +.. code-block:: python + + outp = DynamicFileList() + prc = produce(outputs=outp) + cons = consume(inputs=outp) + cons.result() + +The code will now work properly, reporting the correct number of files produced and consumed: + +.. code-block:: bash + + Produced 7 files + Consuming 7 files + Reading <_io.TextIOWrapper name='data_0.log' mode='r' encoding='UTF-8'> + Reading <_io.TextIOWrapper name='data_1.log' mode='r' encoding='UTF-8'> + Reading <_io.TextIOWrapper name='data_2.log' mode='r' encoding='UTF-8'> + Reading <_io.TextIOWrapper name='data_3.log' mode='r' encoding='UTF-8'> + Reading <_io.TextIOWrapper name='data_4.log' mode='r' encoding='UTF-8'> + Reading <_io.TextIOWrapper name='data_5.log' mode='r' encoding='UTF-8'> + Reading <_io.TextIOWrapper name='data_6.log' mode='r' encoding='UTF-8'> + +This works because, as a Future, the :py:class:`parsl.data_provider.dynamic_files.DynamicFileList` causes Parsl to make +a connection between the ``outputs`` of ``produce`` and the ``inputs`` of ``consume``. This causes the DAG to wait until +``produce`` has completed before running ``consume``. + +The :py:class:`parsl.data_provider.dynamic_files.DynamicFileList` can also be used in more complex ways, such as slicing +and will behave as expected. Lets take the previous example where ``produce`` generates an unknown number of files. You +know that the first one produced is always a log file, which you don't really care about, but the remaining files are +data that you are interested in. Traditionally you would do something like + +.. code-block:: python + + outp = [] + prc = produce(outputs=outp) + cons = consume(inputs=outp[1:]) + cons.result() + +but this will either throw an exception or fail (depending on your Python version) as the first example above did with 0 +consumed files. But using a :py:class:`parsl.data_provider.dynamic_files.DynamicFileList` will work as expected: + +.. code-block:: python + + outp = DynamicFileList() + f1 = process(outputs=outp) + r1 = consume(inputs=outp[1:]) + r1.result() + +.. + +None of the above examples will necessarily work as expected if ``produce`` was a :py:func:`parsl.app.app.bash_app`. This +is because the command line call returned by the :py:func:`parsl.app.app.bash_app` may produce files that neither Python +nor Parsl are aware of, there is no direct way to to know what files to track, without additional work. + Staging providers ----------------- -Parsl is able to transparently stage files between at-rest locations and +Parsl is able to transparently stage files between at-rest locations and execution locations by specifying a list of -:py:class:`~parsl.data_provider.staging.Staging` instances for an executor. +:py:class:`~parsl.data_provider.staging.Staging` instances for an executor. These staging instances define how to transfer files in and out of an execution location. This list should be supplied as the ``storage_access`` -parameter to an executor when it is constructed. +parameter to an executor when it is constructed. -Parsl includes several staging providers for moving files using the +Parsl includes several staging providers for moving files using the schemes defined above. By default, Parsl executors are created with -three common staging providers: +three common staging providers: the NoOpFileStaging provider for local and shared file systems and the HTTP(S) and FTP staging providers for transferring -files to and from remote storage locations. The following +files to and from remote storage locations. The following example shows how to explicitly set the default staging providers. .. code-block:: python @@ -116,12 +242,12 @@ example shows how to explicitly set the default staging providers. ) ] ) - - -Parsl further differentiates when staging occurs relative to -the app invocation that requires or produces files. + + +Parsl further differentiates when staging occurs relative to +the app invocation that requires or produces files. Staging either occurs with the executing task (*in-task staging*) -or as a separate task (*separate task staging*) before app execution. +or as a separate task (*separate task staging*) before app execution. In-task staging uses a wrapper that is executed around the Parsl task and thus occurs on the resource on which the task is executed. Separate @@ -137,9 +263,9 @@ NoOpFileStaging for Local/Shared File Systems The NoOpFileStaging provider assumes that files specified either with a path or with the ``file`` URL scheme are available both on the submit and execution side. This occurs, for example, when there is a -shared file system. In this case, files will not moved, and the +shared file system. In this case, files will not moved, and the File object simply presents the same file path to the Parsl program -and any executing tasks. +and any executing tasks. Files defined as follows will be handled by the NoOpFileStaging provider. @@ -177,14 +303,14 @@ will be executed as a separate Parsl task that will complete before the corresponding app executes. These providers cannot be used to stage out output files. -The following example defines a file accessible on a remote FTP server. +The following example defines a file accessible on a remote FTP server. .. code-block:: python File('ftp://www.iana.org/pub/mirror/rirstats/arin/ARIN-STATS-FORMAT-CHANGE.txt') When such a file object is passed as an input to an app, Parsl will download the file to whatever location is selected for the app to execute. -The following example illustrates how the remote file is implicitly downloaded from an FTP server and then converted. Note that the app does not need to know the location of the downloaded file on the remote computer, as Parsl abstracts this translation. +The following example illustrates how the remote file is implicitly downloaded from an FTP server and then converted. Note that the app does not need to know the location of the downloaded file on the remote computer, as Parsl abstracts this translation. .. code-block:: python @@ -204,8 +330,8 @@ The following example illustrates how the remote file is implicitly downloaded f # call the convert app with the Parsl file f = convert(inputs=[inp], outputs=[out]) f.result() - -HTTP and FTP separate task staging providers can be configured as follows. + +HTTP and FTP separate task staging providers can be configured as follows. .. code-block:: python @@ -213,8 +339,8 @@ HTTP and FTP separate task staging providers can be configured as follows. from parsl.executors import HighThroughputExecutor from parsl.data_provider.http import HTTPSeparateTaskStaging from parsl.data_provider.ftp import FTPSeparateTaskStaging - - config = Config( + + config = Config( executors=[ HighThroughputExecutor( storage_access=[HTTPSeparateTaskStaging(), FTPSeparateTaskStaging()] @@ -233,10 +359,10 @@ task staging providers described above, but will do so in a wrapper around individual app invocations, which guarantees that they will stage files to a file system visible to the app. -A downside of this staging approach is that the staging tasks are less visible +A downside of this staging approach is that the staging tasks are less visible to Parsl, as they are not performed as separate Parsl tasks. -In-task staging providers can be configured as follows. +In-task staging providers can be configured as follows. .. code-block:: python @@ -315,16 +441,16 @@ In some cases, for example when using a Globus `shared endpoint None: + def __init__(self, fut: Future, file_obj: File, dfk: Union["DataFlowKernel", None], tid: int, app_fut: Optional[Future] = None) -> None: """Construct the DataFuture object. If the file_obj is a string convert to a File. @@ -56,7 +72,11 @@ def __init__(self, fut: Future, file_obj: File, tid: int) -> None: self._tid = tid self.file_obj = file_obj self.parent = fut - + if app_fut: + self.app_fut = app_fut + else: + self.app_fut = fut + self.data_flow_kernel = dfk self.parent.add_done_callback(self.parent_callback) logger.debug("Creating DataFuture with parent: %s and file: %s", self.parent, repr(self.file_obj)) @@ -76,6 +96,30 @@ def filename(self): """Filepath of the File object this datafuture represents.""" return self.filepath + @property + def uuid(self): + """UUID of the File object this datafuture represents.""" + return self.file_obj.uuid + + @property + def timestamp(self): + """Timestamp when the future was marked done.""" + return self.file_obj.timestamp + + @timestamp.setter + def timestamp(self, value: Optional[datetime]) -> None: + self.file_obj.timestamp = value + + @property + def size(self): + """Size of the file.""" + return self.file_obj.size + + @property + def md5sum(self): + """MD5 sum of the file.""" + return self.file_obj.md5sum + def cancel(self): raise NotImplementedError("Cancel not implemented") @@ -97,3 +141,17 @@ def __repr__(self) -> str: else: done = "not done" return f"<{module}.{qualname} object at {hex(id(self))} representing {repr(self.file_obj)} {done}>" + + def __getstate__(self): + state = self.__dict__.copy() + del state['data_flow_kernel'] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + + def __reduce__(self): + return self.__reduce_ex__(None) + + def __reduce_ex__(self, proto): + return (self.__class__, (self.parent, self.file_obj, None, self.tid, self.app_fut)) diff --git a/parsl/data_provider/data_manager.py b/parsl/data_provider/data_manager.py index db5d242d36..5448b63aa2 100644 --- a/parsl/data_provider/data_manager.py +++ b/parsl/data_provider/data_manager.py @@ -3,6 +3,7 @@ from typing import TYPE_CHECKING, Any, Callable, List, Optional from parsl.app.futures import DataFuture +from parsl.data_provider.dynamic_files import DynamicFileList from parsl.data_provider.file_noop import NoOpFileStaging from parsl.data_provider.files import File from parsl.data_provider.ftp import FTPSeparateTaskStaging @@ -58,12 +59,22 @@ def replace_task_stage_out(self, file: File, func: Callable, executor: str) -> C raise ValueError("Executor {} cannot stage file {}".format(executor, repr(file))) def optionally_stage_in(self, input, func, executor): - if isinstance(input, DataFuture): + if isinstance(input, DynamicFileList.DynamicFile): + if input.empty: + file = DynamicFileList.DynamicFile + else: + file = input.cleancopy() + # replace the input DataFuture with a new DataFuture which will complete at + # the same time as the original one, but will contain the newly + # copied file + input = DataFuture(input, file, dfk=self.dfk, tid=input.tid) + return (input, func) + elif isinstance(input, DataFuture): file = input.file_obj.cleancopy() # replace the input DataFuture with a new DataFuture which will complete at # the same time as the original one, but will contain the newly # copied file - input = DataFuture(input, file, tid=input.tid) + input = DataFuture(input, file, dfk=self.dfk, tid=input.tid) elif isinstance(input, File): file = input.cleancopy() input = file @@ -113,7 +124,7 @@ def stage_in(self, file: File, input: Any, executor: str) -> Any: - executor (str) : an executor the file is going to be staged in to. """ - if isinstance(input, DataFuture): + if isinstance(input, (DataFuture, DynamicFileList, DynamicFileList.DynamicFile)): parent_fut = input # type: Optional[Future] elif isinstance(input, File): parent_fut = None diff --git a/parsl/data_provider/dynamic_files.py b/parsl/data_provider/dynamic_files.py new file mode 100644 index 0000000000..6177a07503 --- /dev/null +++ b/parsl/data_provider/dynamic_files.py @@ -0,0 +1,787 @@ +"""This module implements the DynamicFileList class and DynamicFile subclass. + +The DynamicFile class is a drop in replacement/wrapper for the File and DataFuture classes. + +The DynamicFileList class is intended to replace the list of Files for app `outputs`. It acts like a +traditional Python `list`, but is also a Future. This allows for Files to be appended to the output list +and have these Files properly treated by Parsl. +""" +from __future__ import annotations + +import logging +import sys +from concurrent.futures import Future +from copy import deepcopy +from datetime import datetime, timezone +from typing import Callable, Dict, List, Optional, Type, Union + +import typeguard + +from parsl.app.futures import DataFuture +from parsl.data_provider.files import File +from parsl.dataflow.futures import AppFuture +from parsl.dataflow.taskrecord import deepcopy as trcopy + +logger = logging.getLogger(__name__) + + +class DynamicFileList(Future): + """A list of files that is also a Future. + + The DynamicFileList class is intended to replace the list of Files for app `outputs`. It acts like a + traditional Python `list`, but is also a Future. This allows for Files to be appended to the output list + and have these Files properly treated by Parsl. + """ + + class DynamicFile(Future): + """A wrapper for a File or DataFuture + + Should not be instantiated from outside the DynamicFileList class. + """ + def parent_callback(self, parent_fu: Future) -> None: + """Callback from executor future to update the parent. + + Updates the future with the result (the File object) or the parent future's + exception. + + Args: + - parent_fu (Future): Future returned by the executor along with callback + + Returns: + - None + """ + e = parent_fu.exception() + if e: + self.set_exception(e) + else: + self.file_obj.timestamp = datetime.now(timezone.utc) + self.parent.dfk.register_as_output(self.file_obj, self.parent.task_record) + if self._is_df: + self.set_result(self.file_obj.file_obj) + else: + self.set_result(self.file_obj) + + def __init__(self, fut: DynamicFileList, + file_obj: Optional[Union[File, DataFuture]] = None): + """Construct a DynamicFile instance + + If the file_obj is None, create an empty instance, otherwise wrap file_obj. + + Args: + - fut (AppFuture) : AppFuture that this DynamicFile will track + - file_obj (File/DataFuture obj) : Something representing file(s) + """ # TODO need to be able to link output and input dynamic file objects and update when the output changes + super().__init__() + self._is_df = isinstance(file_obj, DataFuture) + self.parent = fut + self.file_obj = file_obj + self.parent.add_done_callback(self.parent_callback) + self._empty = file_obj is None #: Tracks whether this wrapper is empty + self._staged_out = False + + @property + def staged(self) -> bool: + """Return whether this file has been staged out.""" + return self._staged_out + + @property + def scheme(self) -> Union[str, None]: + """Return the scheme for the wrapped file object.""" + if self.empty: + return None + if self._is_df: + return self.file_obj.file_obj.scheme + return self.file_obj.scheme + + @property + def empty(self) -> bool: + """Return whether this is an empty wrapper.""" + return self._empty + + @property + def uuid(self) -> Union[str, None]: + """Return the uuid of the file object this data-future represents.""" + if self._empty: + return None + return self.file_obj.uuid + + @property + def timestamp(self) -> Union[datetime, None]: + """Return the timestamp of the file object this data-future represents.""" + if self._empty: + return None + return self.file_obj.timestamp + + @timestamp.setter + def timestamp(self, timestamp: Optional[datetime]) -> None: + """Set the timestamp""" + self.file_obj.timestamp = timestamp + + @typeguard.typechecked + def set_file(self, file_obj: Union[File, DataFuture, 'DynamicFileList.DynamicFile']): + """Set the file_obj for this instance. + + Args: + - file_obj (File/DataFuture) : File or DataFuture to set + """ + if isinstance(file_obj, type(self)): + self.file_obj = file_obj.file_obj + else: + self.file_obj = file_obj + self._empty = self.file_obj is None + self._is_df = isinstance(self.file_obj, DataFuture) + if self.file_obj is not None: + self.parent.add_done_func(self.file_obj.filename, self.done) + + def cleancopy(self) -> File: + """Create a clean copy of the file_obj.""" + if self._is_df: + return self.file_obj.file_obj.cleancopy() + return self.file_obj.cleancopy() + + def convert_to_df(self, dfk) -> None: + """Convert the file_obj to a DataFuture. + + Parameters + ---------- + dfk : DataFlowKernel + The dataflow kernel that this instance is associated with + """ + if not self._is_df: + self.file_obj = DataFuture(self.parent, self.file_obj, tid=self.parent._output_task_id, + dfk=dfk) + self._is_df = True + + def done(self) -> bool: + """Return whether the file_obj state is `done`. + + Returns: + - bool : True if the file_obj is `done`, False otherwise + """ + if self._is_df: + return self.file_obj.done() + if self._empty: + return False + return True # Files are always done + + @property + def is_df(self) -> bool: + """Return whether this instance wraps a DataFuture.""" + return self._is_df + + @property + def tid(self) -> Union[int, None]: + """Returns the task_id of the task that will resolve this DataFuture.""" + if self._is_df: + return self.file_obj.tid + + @property + def filepath(self) -> Union[str, None]: + """Filepath of the File object this data-future represents.""" + if self.file_obj is None: + return None + return self.file_obj.filepath + + @property + def filename(self) -> Union[str, None]: + """Filename of the File object this data-future represents.""" + if self.file_obj is None: + return None + return self.file_obj.filepath + + @property + def size(self) -> Union[int, None]: + """Size of the file.""" + if self._empty: + return None + if self._is_df: + return self.file_obj.file_obj.size + return self.file_obj.size + + @size.setter + def size(self, f_size: int): + """ Set the size of the file """ + if self.file_obj: + if self._is_df: + self.file_obj.file_obj.size = f_size + else: + self.file_obj.size = f_size + + @property + def md5sum(self) -> Union[str, None]: + """MD5 sum of the file.""" + if self._empty: + return None + if self._is_df: + return self.file_obj.file_obj.md5sum + return self.file_obj.md5sum + + @md5sum.setter + def md5sum(self, sum: str): + """ Set MD5 sum for file """ + if self.file_obj: + if self._is_df: + self.file_obj.file_obj.md5sum = sum + else: + self.file_obj.md5sum = sum + + def cancel(self): + """Not implemented""" + raise NotImplementedError("Cancel not implemented") + + def cancelled(self) -> bool: + """Return False""" + return False + + def running(self) -> bool: + """Return whether the parent future is running""" + if self.parent is not None: + return self.parent.running() + else: + return False + + def exception(self, timeout=None): + """Return None""" + return None + + def __repr__(self) -> str: + return self.file_obj.__repr__() + + def __eq__(self, other: DynamicFileList.DynamicFile) -> bool: + return self.uuid == other.uuid + + def __ne__(self, other: DynamicFileList.DynamicFile) -> bool: + return self.uuid != other.uuid + + def __gt__(self, other: DynamicFileList.DynamicFile) -> bool: + if self.file_obj is not None and other.file_obj is not None: + return self.filepath > other.filepath + if self.file_obj is None: + return False + return True + + def __lt__(self, other: DynamicFileList.DynamicFile) -> bool: + if self.file_obj is not None and other.file_obj is not None: + return self.filepath < other.filepath + if self.file_obj is None: + return True + return False + + def __ge__(self, other: DynamicFileList.DynamicFile) -> bool: + return self.__gt__(other) or self.__eq__(other) + + def __le__(self, other: DynamicFileList.DynamicFile) -> bool: + return self.__lt__(other) or self.__eq__(other) + + def parent_callback(self, parent_fu): + """Callback from executor future to update the parent. + + Updates the future with the result (the File object) or the parent future's + exception. + + Args: + - parent_fu (Future): Future returned by the executor along with callback + + Returns: + - None + """ + e = parent_fu.exception() + if e: + self.set_exception(e) + else: + if self._watcher is not None: + self._watcher() + self._watcher = None + self._is_done = True + self.parent._outputs = self + self.set_result(self) + + @typeguard.typechecked + def __init__(self, files: Optional[List[Union[File, DataFuture, Type[DynamicFileList.DynamicFile]]]] = None): + """Construct a DynamicFileList instance + + Args: + - files (List[File/DataFuture]) : List of files to initialize the DynamicFileList with + - fut (Future) : Future to set as the parent + """ + super().__init__() + self.files_done: Dict[str, Callable] = {} #: dict mapping file names to their "done" status True/False + self._last_idx = -1 + self.executor: str = '' + self.parent: Union[AppFuture, None] = None + self.dfk = None + self._sub_callbacks: List[Callable] = [] + self._in_callback = False + self._staging_inhibited = False + self._output_task_id = None + self.task_record = None + self._files: List[DynamicFileList.DynamicFile] = [] + self._is_done = False + if files is not None: + self.extend(files) + self._watcher = None + + def add_watcher_callback(self, func: Callable): + """ Add a function to call when a bash_watch app completes, to collect the files. + + Args: + - func (Callable) : function to use + """ + self._watcher = func + + def add_done_func(self, name: str, func: Callable): + """ Add a function to the files_done dict, specifically for when an empty DynamicFile + is updated to contain a real File. + + Args: + - name (str) : Name of the file to add the function for + - func (Callable) : Function to add + """ + self.files_done[name] = func + + def stage_file(self, idx: int): + """ Stage a file at the given index, we do this now because so that the app and dataflow + can act accordingly when the app finishes. + + Args: + - idx (int) : Index of the file to stage + """ + if self.dfk is None: + return + out_file = self[idx] + if out_file.empty or out_file.staged: + return + if self.parent is None or not out_file.is_df: + return + if self._staging_inhibited: + logger.debug("Not performing output staging for: {}".format(repr(out_file.file_obj))) + else: + f_copy = out_file.file_obj.file_obj.cleancopy() + self[idx].file_obj.file_obj = f_copy + logger.debug("Submitting stage out for output file {}".format(repr(out_file.file_obj))) + stageout_fut = self.dfk.data_manager.stage_out(f_copy, self.executor, self.parent) + if stageout_fut: + logger.debug("Adding a dependency on stageout future for {}".format(repr(out_file))) + self[idx].file_obj.parent = stageout_fut + self[idx].file_obj._tid = self.parent.tid + else: + logger.debug("No stageout dependency for {}".format(repr(f_copy))) + # self.parent._outputs.append(DataFuture(self.parent, out_file.file_obj.file_obj, tid=self.parent.tid)) + func = self.dfk.tasks[self._output_task_id]['func'] + # this is a hook for post-task stage-out + # note that nothing depends on the output - which is maybe a bug + # in the not-very-tested stage-out system? + func = self.dfk.data_manager.replace_task_stage_out(f_copy, func, self.executor) + self.dfk.tasks[self._output_task_id]['func'] = func + self.parent._outputs = self + self._call_callbacks() + # TODO dfk._gather_all_deps + + def count(self, item) -> int: + """ Return the count of the given item in the list""" + return self._files.count(item) + + def wrap(self, file_obj: Union[File, DataFuture, None]) -> DynamicFile: + """ Wrap a file object in a DynamicFile + + Args: + - file_obj (File/DataFuture) : File or DataFuture to wrap + """ + return self.DynamicFile(self, file_obj) + + def set_dataflow(self, dataflow, executor: str, st_inhibited: bool, task_id: int, task_record: dict): + """ Set the dataflow and executor for this instance + + Args: + - dataflow (DataFlowKernel) : Dataflow kernel that this instance is associated with + - executor (str) : Executor that this instance is associated with + - st_inhibited (bool) : Whether staging is inhibited + """ + self.executor = executor + self.dfk = dataflow + self._staging_inhibited = st_inhibited + self._output_task_id = task_id + self.task_record = task_record + for idx in range(self._last_idx + 1): + self.stage_file(idx) + + def set_parent(self, fut: AppFuture): + """ Set the parent future for this instance + + Args: + - fut (Future) : Future to set as the parent + """ + if self.parent is not None: + raise ValueError("Parent future already set") + self.parent = fut + self.parent.add_done_callback(self.parent_callback) + for idx in range(self._last_idx + 1): + self[idx].convert_to_df(self.dfk) + self.stage_file(idx) + self._call_callbacks() + + def cancel(self): + """ Not implemented """ + raise NotImplementedError("Cancel not implemented") + + def cancelled(self) -> bool: + """ Not implemented """ + return False + + def running(self) -> bool: + """ Returns True if the parent future is running """ + if self.parent is not None: + return self.parent.running() + else: + return False + + def exception(self, timeout=None) -> None: + """ No-op""" + return None + + def done(self, ) -> bool: + """ Return True if all files are done """ + if not self._is_done: + return False + for element in self.files_done.values(): + if not element(): + return False + return True + + def __len__(self) -> int: + return len(self._files) + + def __eq__(self, other: DynamicFileList) -> bool: + return self._files == other._files + + def __ne__(self, other: DynamicFileList) -> bool: + return self._files != other._files + + def __lt__(self, other: DynamicFileList) -> bool: + return self._files < other._files + + def __le__(self, other: DynamicFileList) -> bool: + return self._files <= other._files + + def __gt__(self, other: DynamicFileList) -> bool: + return self._files > other._files + + def __ge__(self, other: DynamicFileList) -> bool: + return self._files >= other._files + + @typeguard.typechecked + def append(self, __object: Union[File, DataFuture, 'DynamicFileList.DynamicFile']): + """ Append a file to the list and update the files_done dict + + Args: + - __object (File/DataFuture) : File or DataFuture to append + """ + if not isinstance(__object, DynamicFileList.DynamicFile): + if self.parent is not None and isinstance(__object, File): + __object = DataFuture(self.parent, __object, tid=self._output_task_id, + dfk=self.dfk) + __object = self.wrap(__object) + if self._last_idx == len(self) - 1: + self._files.append(__object) + else: + # must assume the object is empty, but exists + self._files[self._last_idx + 1].set_file(__object) + self.files_done[__object.filename] = self._files[self._last_idx + 1].done + self._last_idx += 1 + self.stage_file(self._last_idx) + self._call_callbacks() + + def extend(self, __iterable): + """ Extend the list with the contents of the iterable and update the files_done dict + + Args: + - __iterable (Iterable) : Iterable to extend the list with + """ + items = [] + for f in __iterable: + if not isinstance(f, (DynamicFileList.DynamicFile, File, DataFuture)): + raise ValueError("DynamicFileList can only contain Files or DataFutures") + if not isinstance(f, DynamicFileList.DynamicFile): + if self.parent is not None and isinstance(f, File): + f = DataFuture(self.parent, f, tid=self._output_task_id, + dfk=self.dfk) + f = self.wrap(f) + self.files_done[f.filename] = f.done + items.append(f) + if self._last_idx == len(self) - 1: + self._files.extend(items) + for i in range(len(items)): + self._last_idx += 1 + self.stage_file(self._last_idx) + self._call_callbacks() + return + diff = len(self) - 1 - self._last_idx - len(items) + if diff < 0: + self._files.extend([self.wrap(None)] * abs(diff)) + for item in items: + self._last_idx += 1 + self[self._last_idx].set_file(item) + self.files_done[item.filename] = self._files[self._last_idx].done + self.stage_file(self._last_idx) + self._call_callbacks() + + def index(self, + item: Union[File, DataFuture, DynamicFile], + start: Optional[int] = 0, + stop: Optional[int] = sys.maxsize) -> int: + """ Return the index of the first instance of the given item in the list. + Raises a ValueError if the item is not found. Note that this method looks + for the base File object which is wrapped by the DataFuture or DynamicFile. + + Args: + - item (File/DataFuture/DynamicFile) : File, DataFuture, or DynamicFile to find + - start (int) : Index to start searching from + - stop (int) : Index to stop searching at + """ + for i in range(start, min(stop, len(self))): + if not self[i].empty: + if self[i].uuid == item.uuid: + return i + raise ValueError("Item not found") + + def insert(self, __index: int, __object: Union[File, DataFuture, DynamicFile]): + """ Insert a file into the list at the given index and update the files_done dict + + Args: + - __index (int) : Index to insert the file at + - __object (File/DataFuture) : File or DataFuture to insert + """ + if __index > self._last_idx: + raise ValueError("Cannot insert at index greater than the last index") + if not isinstance(__object, self.DynamicFile): + if self.parent is not None and isinstance(__object, File): + __object = DataFuture(self.parent, __object, tid=self._output_task_id, + dfk=self.dfk) + __object = self.wrap(__object) + self.files_done[__object.filename] = __object.done + self._files.insert(__index, __object) + self.stage_file(__index) + self._last_idx += 1 + self._call_callbacks() + + def remove(self, __value): + """ Remove a file from the list and update the files_done dict + + Args: + - __value (File/DataFuture) : File or DataFuture to remove + """ + if __value.filename in self.files_done: + del self.files_done[__value.filename] + idx = self.index(__value) + del self._files[idx] + self._last_idx -= 1 + self._call_callbacks() + + def pop(self, __index: int = -1) -> DataFuture: + """ Pop a file from the list and update the files_done dict + + Args: + - __index (int) : Index to pop the file at + + Returns: + - File/DataFuture : File or DataFuture that was popped + """ + if __index == -1: + value = self._files.pop(self._last_idx) + elif __index <= self._last_idx: + value = self._files.pop(__index) + else: + raise IndexError("Index out of range") + del self.files_done[value.filename] + self._last_idx -= 1 + self._call_callbacks() + return value.file_obj + + def clear(self): + """ Clear the list and the files_done dict """ + self.files_done.clear() + self._last_idx = -1 + self._files.clear() + # detach all the callbacks so that sub-lists can still be used + self._sub_callbacks.clear() + + def resize(self, idx: int) -> None: + """ Resize the list to the given length + + Args: + - idx (int) : Length to resize the list to + """ + if idx == len(self): + return + if idx < len(self): + for i in range(len(self) - 1, idx - 1, -1): + del self[i] + else: + self._expand(idx) + + def _call_callbacks(self): + """ Call the callbacks for the sublists """ + if self._in_callback: + return + self._in_callback = True + for cb in self._sub_callbacks: + cb() + self._in_callback = False + + def _expand(self, idx): + for _ in range(idx - len(self) + 1): + self._files.append(self.wrap(None)) + + @typeguard.typechecked + def __setitem__(self, key: int, value: Union[File, DataFuture, 'DynamicFileList.DynamicFile']): + if self[key].filename in self.files_done: + del self.files_done[self[key].filename] + if self.__getitem__(key).empty: + if self.parent is not None and isinstance(value, File): + value = DataFuture(self.parent, value, tid=self._output_task_id, + dfk=self.dfk) + self._files[key].set_file(value) + self.files_done[self._files[key].filename] = self._files[key].done + self._last_idx = max(self._last_idx, key) + self._call_callbacks() + self.stage_file(key) + elif value.uuid == self._files[key].uuid: + if isinstance(value, DynamicFileList.DynamicFile): + self._files[key].set_file(value.file_obj) + else: + self._files[key].set_file(value) + else: + raise ValueError("Cannot set a value that is not empty") + # if not isinstance(value, self.DynamicFile): + # if isinstance(value, File): + # value = DataFuture(self.parent, value, tid=self._output_task_id) + # value = self.wrap(value) + # super().__setitem__(key, value) + # self.files_done[value.filename] = value.done + + def __getitem__(self, key): + # make sure the list will be long enough when it is filled, so we can return a future + if isinstance(key, slice): + if key.start is None: + pass + elif key.start >= len(self): + for i in range(len(self), key.start + 1): + self.append(self.wrap(None)) + if key.stop is not None and key.stop > len(self): + for i in range(len(self), key.stop): + self.append(self.wrap(None)) + ret = DynamicFileSubList(key, self._files[key], self) + self._sub_callbacks.append(ret.callback) + return ret + else: + if key >= len(self): + self._expand(key) + return self._files[key] + + def get_update(self, key: slice): + """Get an updated slice for the sublist. + + Args: + - key (slice) : Slice to update + + Returns: + - List[DynamicFile] : Updated slice + """ + return self._files[key] + + def __contains__(self, item): + return item in self._files + + def __bool__(self): + return bool(self._files) + + def __iter__(self): + yield from self._files + + def __hash__(self): + return hash(self._files) + + def __delitem__(self, key): + raise Exception("Cannot delete from a DynamicFileList") + # del self.files_done[self[key].filename] + # super().__delitem__(key) + # self._call_callbacks() + + def __repr__(self): + type_ = type(self) + module = type_.__module__ + qualname = type_.__qualname__ + if self.done(): + done = "done" + else: + done = "not done" + return f"<{module}.{qualname} object at {hex(id(self))} containing {len(self)} objects {done}>" + + def __getstate__(self): + state = self.__dict__.copy() + del state['dfk'] + del state['_condition'] + if self.parent is not None: + if state['parent'].task_record is not None: + state['parent'].task_record = trcopy(self.parent.task_record) + if self.task_record is not None: + state['task_record'] = trcopy(self.task_record) + return state + + def __setstate__(self, state): + self.__dict__.update(state) + + def __reduce__(self): + return self.__reduce_ex__(None) + + def __reduce_ex__(self, proto): + if self.task_record is not None: + tr = trcopy(self.task_record) + if 'dfk' in tr: + del tr['dfk'] + else: + tr = None + if self.parent is not None: + par = deepcopy(self.parent) + if 'dfk' in par.task_record: + del par.task_record['dfk'] + else: + par = None + data = {} + data["files_done"] = self.files_done + data["_last_idx"] = self._last_idx + data["executor"] = self.executor + data["parent"] = par + data["_sub_callbacks"] = self._sub_callbacks + data["_in_callback"] = self._in_callback + data["_staging_inhibited"] = self._staging_inhibited + data["_output_task_id"] = self._output_task_id + data["task_record"] = tr + data["_is_done"] = self._is_done + return (self.__class__, (self._files,), data) + + ''' + return (self.__class__, (self._files, ), {"files_done": self.files_done, + "_last_idx": self._last_idx, + "executor": self.executor, + "parent": par, + "_sub_callbacks": self._sub_callbacks, + "_in_callback": self._in_callback, + "_staging_inhibited": self._staging_inhibited, + "_output_task_id": self._output_task_id, + "task_record": tr, + "_is_done": self._is_done})''' + + +class DynamicFileSubList(DynamicFileList): + @typeguard.typechecked + def __init__(self, key: slice, files: Optional[List[DynamicFileList.DynamicFile]], parent: DynamicFileList): + super().__init__(files=files) + self.parent = parent + self.slice = key + self.fixed_size = key.stop is not None and key.start is not None + + def callback(self): + """Callback for updating the sublist when the parent list is updated.""" + self.clear() + self.extend(self.parent.get_update(self.slice)) diff --git a/parsl/data_provider/files.py b/parsl/data_provider/files.py index 4263753dce..ad3557a806 100644 --- a/parsl/data_provider/files.py +++ b/parsl/data_provider/files.py @@ -5,8 +5,10 @@ on where (client-side, remote-side, intermediary-side) the File.filepath is being called from. """ +import datetime import logging import os +import uuid from typing import Optional, Union from urllib.parse import urlparse @@ -28,8 +30,9 @@ class File: """ @typeguard.typechecked - def __init__(self, url: Union[os.PathLike, str]): - """Construct a File object from a url string. + def __init__(self, url: Union[os.PathLike, str], uu_id: Optional[uuid.UUID] = None, + timestamp: Optional[datetime.datetime] = None): + """Construct a File object from an url string. Args: - url (string or PathLike) : url of the file e.g. @@ -45,7 +48,16 @@ def __init__(self, url: Union[os.PathLike, str]): self.netloc = parsed_url.netloc self.path = parsed_url.path self.filename = os.path.basename(self.path) + # let the DFK set these values, if needed + self.size: Optional[int] = None + self.md5sum: Optional[str] = None + self.time_stamp = timestamp + self.local_path: Optional[str] = None + if uu_id is not None: + self.uuid = uu_id + else: + self.uuid = uuid.uuid4() def cleancopy(self) -> "File": """Returns a copy of the file containing only the global immutable state, @@ -53,7 +65,7 @@ def cleancopy(self) -> "File": object will be as the original object was when it was constructed. """ logger.debug("Making clean copy of File object {}".format(repr(self))) - return File(self.url) + return File(self.url, self.uuid, self.time_stamp) def __str__(self) -> str: return self.filepath @@ -67,6 +79,7 @@ def __repr__(self) -> str: f"netloc={self.netloc}", f"path={self.path}", f"filename={self.filename}", + f"uuid={self.uuid}", ] if self.local_path is not None: content.append(f"local_path={self.local_path}") @@ -96,3 +109,13 @@ def filepath(self) -> str: return self.path else: raise ValueError("No local_path set for {}".format(repr(self))) + + @property + def timestamp(self) -> Optional[str]: + """Get the timestamp""" + return self.time_stamp + + @timestamp.setter + def timestamp(self, timestamp: Optional[datetime.datetime]) -> None: + """Set the timestamp""" + self.time_stamp = timestamp diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 485e6c8343..d8ca1f689f 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -14,6 +14,7 @@ from concurrent.futures import Future from functools import partial from getpass import getuser +from hashlib import md5 from socket import gethostname from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union from uuid import uuid4 @@ -26,6 +27,7 @@ from parsl.app.futures import DataFuture from parsl.config import Config from parsl.data_provider.data_manager import DataManager +from parsl.data_provider.dynamic_files import DynamicFileList from parsl.data_provider.files import File from parsl.dataflow.dependency_resolvers import SHALLOW_DEPENDENCY_RESOLVER from parsl.dataflow.errors import DependencyError, JoinError @@ -84,7 +86,6 @@ def __init__(self, config: Config) -> None: A specification of all configuration options. For more details see the :class:~`parsl.config.Config` documentation. """ - # this will be used to check cleanup only happens once self.cleanup_called = False @@ -117,6 +118,9 @@ def __init__(self, config: Config) -> None: if self.monitoring: self.monitoring.start(self.run_dir, self.config.run_dir) self.monitoring_radio = MultiprocessingQueueRadioSender(self.monitoring.resource_msgs) + self.file_provenance = self.monitoring.file_provenance + else: + self.file_provenance = False self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None @@ -246,13 +250,15 @@ def _create_task_log_info(self, task_record: TaskRecord) -> Dict[str, Any]: Create the dictionary that will be included in the log. """ info_to_monitor = ['func_name', 'memoize', 'hashsum', 'fail_count', 'fail_cost', 'status', - 'id', 'time_invoked', 'try_time_launched', 'time_returned', 'try_time_returned', 'executor'] + 'id', 'time_invoked', 'try_time_launched', 'time_returned', 'try_time_returned', 'executor', + 'environment'] # mypy cannot verify that these task_record[k] references are valid: # They are valid if all entries in info_to_monitor are declared in the definition of TaskRecord # This type: ignore[literal-required] asserts that fact. task_log_info = {"task_" + k: task_record[k] for k in info_to_monitor} # type: ignore[literal-required] - + task_log_info['task_args'] = str(task_record['args']) + task_log_info['task_kwargs'] = str(task_record['kwargs']) task_log_info['run_id'] = self.run_id task_log_info['try_id'] = task_record['try_id'] task_log_info['timestamp'] = datetime.datetime.now() @@ -302,11 +308,132 @@ def std_spec_to_name(name, spec): return task_log_info - def _count_deps(self, depends: Sequence[Future]) -> int: + def _send_file_log_info(self, file: Union[File, DataFuture], + task_record: TaskRecord, is_output: bool) -> None: + """ Generate a message for the monitoring db about a file. """ + if self.monitoring_radio and self.file_provenance: + file_log_info = self._create_file_log_info(file, task_record) + # make sure the task_id is None for inputs + if not is_output: + file_log_info['task_id'] = None + self.monitoring_radio.send((MessageType.FILE_INFO, file_log_info)) + + def _create_file_log_info(self, file: Union[File, DataFuture], + task_record: TaskRecord) -> Dict[str, Any]: + """ + Create the dictionary that will be included in the monitoring db. + """ + # set file info if needed + if isinstance(file, DataFuture): + fo = file.file_obj + else: + fo = file + if fo.scheme == 'file' and os.path.isfile(fo.filepath): + if not fo.timestamp: + fo.timestamp = datetime.datetime.fromtimestamp(os.stat(fo.filepath).st_ctime, tz=datetime.timezone.utc) + if not fo.size: + fo.size = os.stat(fo.filepath).st_size + if not fo.md5sum: + fo.md5sum = md5(open(fo.filepath, 'rb').read()).hexdigest() + + file_log_info = {'file_name': file.filename, + 'file_id': str(file.uuid), + 'run_id': self.run_id, + 'task_id': task_record['id'], + 'try_id': task_record['try_id'], + 'timestamp': file.timestamp, + 'size': file.size, + 'md5sum': file.md5sum + } + return file_log_info + + def register_as_input(self, f: Union[File, DataFuture], + task_record: TaskRecord): + """ Register a file as an input to a task. """ + if self.monitoring_radio and self.file_provenance: + self._send_file_log_info(f, task_record, False) + file_input_info = self._create_file_io_info(f, task_record) + self.monitoring_radio.send((MessageType.INPUT_FILE, file_input_info)) + + def register_as_output(self, f: Union[File, DataFuture], + task_record: TaskRecord): + """ Register a file as an output of a task. """ + if self.monitoring_radio and self.file_provenance: + self._send_file_log_info(f, task_record, True) + file_output_info = self._create_file_io_info(f, task_record) + self.monitoring_radio.send((MessageType.OUTPUT_FILE, file_output_info)) + + def _create_file_io_info(self, file: Union[File, DataFuture], + task_record: TaskRecord) -> Dict[str, Any]: + """ + Create the dictionary that will be included in the monitoring db + """ + file_io_info = {'file_id': str(file.uuid), + 'run_id': self.run_id, + 'task_id': task_record['id'], + 'try_id': task_record['try_id'], + } + return file_io_info + + def _register_env(self, environ: ParslExecutor) -> None: + """ Capture the environment information for the monitoring db. """ + if self.monitoring_radio and self.file_provenance: + environ_info = self._create_env_log_info(environ) + self.monitoring_radio.send((MessageType.ENVIRONMENT_INFO, environ_info)) + + def _create_env_log_info(self, environ: ParslExecutor) -> Dict[str, Any]: + """ + Create the dictionary that will be included in the monitoring db + """ + env_log_info = {'run_id': environ.run_id, + 'environment_id': str(environ.uu_id), + 'label': environ.label + } + + env_log_info['address'] = getattr(environ, 'address', None) + provider = getattr(environ, 'provider', None) + if provider is not None: + env_log_info['provider'] = provider.label + env_log_info['launcher'] = str(type(getattr(provider, 'launcher', None))) + env_log_info['worker_init'] = getattr(provider, 'worker_init', None) + return env_log_info + + def log_info(self, msg: str) -> None: + """Log an info message to the monitoring db.""" + if self.monitoring_radio: + if self.file_provenance: + misc_msg = self._create_misc_log_info(msg) + if misc_msg is None: + logger.info("Could not turn message into a str, so not sending message to monitoring db") + else: + self.monitoring_radio.send((MessageType.MISC_INFO, misc_msg)) + else: + logger.info("File provenance is not enabled, so not sending message to monitoring db") + else: + logger.info("Monitoring is not enabled, so not sending message to monitoring db") + + def _create_misc_log_info(self, msg: Any) -> Union[None, Dict[str, Any]]: + """ + Create the dictionary that will be included in the monitoring db + """ + try: # exception should only be raised if msg cannot be cast to a str + return {'run_id': self.run_id, + 'timestamp': datetime.datetime.now(), + 'info': str(msg) + } + except Exception: + return None + + def _count_deps(self, task_record: TaskRecord) -> int: """Count the number of unresolved futures in the list depends. """ count = 0 - for dep in depends: + for dep in task_record['depends']: + # if dep is a DynamicFileList in the outputs, then is state is not relevant + if isinstance(dep, DynamicFileList) and 'outputs' in task_record['kwargs'] \ + and isinstance(task_record['kwargs']['outputs'], DynamicFileList) \ + and task_record['kwargs']['outputs'] == dep: + continue if not dep.done(): count += 1 @@ -651,7 +778,7 @@ def _launch_if_ready_async(self, task_record: TaskRecord) -> None: logger.debug(f"Task {task_id} is not pending, so launch_if_ready skipping") return - if self._count_deps(task_record['depends']) != 0: + if self._count_deps(task_record) != 0: logger.debug(f"Task {task_id} has outstanding dependencies, so launch_if_ready skipping") return @@ -772,8 +899,8 @@ def launch_task(self, task_record: TaskRecord) -> Future: return exec_fu - def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], func: Callable) -> Tuple[Sequence[Any], Dict[str, Any], - Callable]: + def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], func: Callable, + task_record: TaskRecord) -> Tuple[Sequence[Any], Dict[str, Any], Callable]: """Look for inputs of the app that are files. Give the data manager the opportunity to replace a file with a data future for that file, for example wrapping the result of a staging action. @@ -793,6 +920,7 @@ def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, inputs = kwargs.get('inputs', []) for idx, f in enumerate(inputs): (inputs[idx], func) = self.data_manager.optionally_stage_in(f, func, executor) + self.register_as_input(f, task_record) for kwarg, f in kwargs.items(): # stdout and stderr files should not be staging in (they will be staged *out* @@ -800,17 +928,31 @@ def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, if kwarg in ['stdout', 'stderr']: continue (kwargs[kwarg], func) = self.data_manager.optionally_stage_in(f, func, executor) + if isinstance(f, (DynamicFileList.DynamicFile, File, DataFuture)): + self.register_as_input(f, task_record) newargs = list(args) for idx, f in enumerate(newargs): (newargs[idx], func) = self.data_manager.optionally_stage_in(f, func, executor) + if isinstance(f, (DynamicFileList.DynamicFile, File, DataFuture)): + self.register_as_input(f, task_record) return tuple(newargs), kwargs, func - def _add_output_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], app_fut: AppFuture, func: Callable) -> Callable: + def _add_output_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], app_fut: AppFuture, + func: Callable, task_id: int, task_record: TaskRecord) -> Callable: logger.debug("Adding output dependencies") outputs = kwargs.get('outputs', []) - app_fut._outputs = [] + if isinstance(outputs, DynamicFileList): + outputs.set_dataflow(self, executor, self.check_staging_inhibited(kwargs), task_id, task_record) + outputs.set_parent(app_fut) + app_fut._outputs = outputs + return func + + if isinstance(outputs, DynamicFileList): + app_fut._outputs = DynamicFileList() + else: + app_fut._outputs = [] # Pass over all possible outputs: the outputs kwarg, stdout and stderr # and for each of those, perform possible stage-out. This can result in: @@ -834,10 +976,10 @@ def stageout_one_file(file: File, rewritable_func: Callable): stageout_fut = self.data_manager.stage_out(f_copy, executor, app_fut) if stageout_fut: logger.debug("Adding a dependency on stageout future for {}".format(repr(file))) - df = DataFuture(stageout_fut, file, tid=app_fut.tid) + df = DataFuture(stageout_fut, file, dfk=self, tid=app_fut.tid, app_fut=app_fut) else: logger.debug("No stageout dependency for {}".format(repr(file))) - df = DataFuture(app_fut, file, tid=app_fut.tid) + df = DataFuture(app_fut, file, dfk=self, tid=app_fut.tid, app_fut=app_fut) # this is a hook for post-task stageout # note that nothing depends on the output - which is maybe a bug @@ -846,7 +988,7 @@ def stageout_one_file(file: File, rewritable_func: Callable): return rewritable_func, f_copy, df else: logger.debug("Not performing output staging for: {}".format(repr(file))) - return rewritable_func, file, DataFuture(app_fut, file, tid=app_fut.tid) + return rewritable_func, file, DataFuture(app_fut, file, dfk=self, tid=app_fut.tid, app_fut=app_fut) for idx, file in enumerate(outputs): func, outputs[idx], o = stageout_one_file(file, func) @@ -892,8 +1034,12 @@ def check_dep(d: Any) -> None: check_dep(dep) # Check for futures in inputs=[...] - for dep in kwargs.get('inputs', []): - check_dep(dep) + inp = kwargs.get('inputs', []) + if isinstance(inp, DynamicFileList): + check_dep(inp) + else: + for dep in inp: + check_dep(dep) return depends @@ -935,19 +1081,24 @@ def append_failure(e: Exception, dep: Future) -> None: for key in kwargs: dep = kwargs[key] try: + if key == 'outputs' and isinstance(dep, DynamicFileList): + continue kwargs[key] = self.dependency_resolver.traverse_to_unwrap(dep) except Exception as e: append_failure(e, dep) # Check for futures in inputs=[...] if 'inputs' in kwargs: - new_inputs = [] - for dep in kwargs['inputs']: - try: - new_inputs.extend([self.dependency_resolver.traverse_to_unwrap(dep)]) - except Exception as e: - append_failure(e, dep) - kwargs['inputs'] = new_inputs + if isinstance(kwargs['inputs'], DynamicFileList): + new_inputs = kwargs['inputs'] + else: + new_inputs = [] + for dep in kwargs['inputs']: + try: + new_inputs.extend([self.dependency_resolver.traverse_to_unwrap(dep)]) + except Exception as e: + append_failure(e, dep) + kwargs['inputs'] = new_inputs return new_args, kwargs, dep_failures @@ -1029,8 +1180,9 @@ def submit(self, 'time_returned': None, 'try_time_launched': None, 'try_time_returned': None, - 'resource_specification': resource_specification} - + 'resource_specification': resource_specification, + 'environment': str(self.executors[executor].uu_id)} + self._register_env(self.executors[executor]) self.update_task_state(task_record, States.unsched) for kw in ['stdout', 'stderr']: @@ -1047,9 +1199,7 @@ def submit(self, task_record['app_fu'] = app_fu # Transform remote input files to data futures - app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func) - - func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func) + app_args, app_kwargs, func = self._add_input_deps(executor, app_args, app_kwargs, func, task_record) logger.debug("Added output dependencies") @@ -1064,6 +1214,8 @@ def submit(self, self.tasks[task_id] = task_record + task_record['func'] = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func, task_id, task_record) + logger.debug("Gathering dependencies") # Get the list of dependencies for the task depends = self._gather_all_deps(app_args, app_kwargs) diff --git a/parsl/dataflow/futures.py b/parsl/dataflow/futures.py index 616a3d3bad..6a1c5dcab2 100644 --- a/parsl/dataflow/futures.py +++ b/parsl/dataflow/futures.py @@ -8,6 +8,7 @@ import parsl.app.app as app from parsl.app.futures import DataFuture from parsl.dataflow.taskrecord import TaskRecord +from parsl.dataflow.taskrecord import deepcopy as trcopy logger = logging.getLogger(__name__) @@ -154,6 +155,13 @@ def __getattr__(self, name: str) -> AppFuture: return deferred_getattr_app(self, name) + def __deepcopy__(self, memo): + trcp = trcopy(self.task_record) + instance = AppFuture(trcp) + instance._update_lock = None + instance._condition = None + return instance + def deferred_getitem(o: Any, k: Any) -> Any: return o[k] diff --git a/parsl/dataflow/taskrecord.py b/parsl/dataflow/taskrecord.py index 0621ab8f41..b61a3c7db2 100644 --- a/parsl/dataflow/taskrecord.py +++ b/parsl/dataflow/taskrecord.py @@ -3,6 +3,7 @@ import datetime import threading from concurrent.futures import Future +from copy import deepcopy as dcpy # only for type checking: from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union @@ -101,3 +102,34 @@ class TaskRecord(TypedDict, total=False): """Restricts access to end-of-join behavior to ensure that joins only complete once, even if several joining Futures complete close together in time.""" + + environment: str + """The environment in which the task is being executed.""" + + +def deepcopy(tr: TaskRecord) -> TaskRecord: + tr_out: TaskRecord + tr_out = { + 'executor': dcpy(tr['executor']), + 'func': dcpy(tr['func']), + 'func_name': dcpy(tr['func'].__name__), + 'memoize': dcpy(tr['memoize']), + 'hashsum': dcpy(tr['hashsum']), + 'exec_fu': None, + 'fail_count': dcpy(tr['fail_count']), + 'fail_cost': dcpy(tr['fail_cost']), + 'fail_history': dcpy(tr['fail_history']), + 'from_memo': dcpy(tr['from_memo']), + 'ignore_for_cache': dcpy(tr['ignore_for_cache']), + 'join': dcpy(tr['join']), + 'joins': dcpy(tr['joins']), + 'try_id': dcpy(tr['try_id']), + 'id': dcpy(tr['id']), + 'time_invoked': dcpy(tr['time_invoked']), + 'time_returned': dcpy(tr['time_returned']), + 'try_time_launched': dcpy(tr['try_time_launched']), + 'try_time_returned': dcpy(tr['try_time_returned']), + 'resource_specification': dcpy(tr['resource_specification']), + 'environment': dcpy(tr['environment'])} + + return tr_out diff --git a/parsl/executors/base.py b/parsl/executors/base.py index 466944a195..37df811f55 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -2,6 +2,7 @@ import logging import os +import uuid from abc import ABCMeta, abstractmethod from concurrent.futures import Future from multiprocessing.queues import Queue @@ -72,6 +73,7 @@ def __init__( self.run_dir = os.path.abspath(run_dir) self.run_id = run_id + self.uu_id = uuid.uuid4() def __enter__(self) -> Self: return self diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 9c1f88c687..ec8bef2df9 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -6,7 +6,7 @@ import queue import threading import time -from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, cast +from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, Union, cast import typeguard @@ -49,6 +49,11 @@ RESOURCE = 'resource' # Resource table includes task resource utilization NODE = 'node' # Node table include node info BLOCK = 'block' # Block table include the status for block polling +FILE = 'file' # Files table include file info +INPUT_FILE = 'input_file' # Input files table include input file info +OUTPUT_FILE = 'output_file' # Output files table include output file info +ENVIRONMENT = 'environment' # Executor table include executor info +MISC_INFO = 'misc_info' # Misc info table include misc info class Database: @@ -165,9 +170,12 @@ class Task(Base): task_hashsum = Column('task_hashsum', Text, nullable=True, index=True) task_inputs = Column('task_inputs', Text, nullable=True) task_outputs = Column('task_outputs', Text, nullable=True) + task_args = Column('task_args', Text, nullable=True) + task_kwargs = Column('task_kwargs', Text, nullable=True) task_stdin = Column('task_stdin', Text, nullable=True) task_stdout = Column('task_stdout', Text, nullable=True) task_stderr = Column('task_stderr', Text, nullable=True) + task_environment = Column('task_environment', Text, nullable=True) task_time_invoked = Column( 'task_time_invoked', DateTime, nullable=True) @@ -237,6 +245,55 @@ class Block(Base): PrimaryKeyConstraint('run_id', 'block_id', 'executor_label', 'timestamp'), ) + class File(Base): + __tablename__ = FILE + file_name = Column('file_name', Text, index=True, nullable=False) + file_path = Column('file_path', Text, nullable=True) + full_path = Column('full_path', Text, index=True, nullable=False) + file_id = Column('file_id', Text, index=True, nullable=False) + run_id = Column('run_id', Text, index=True, nullable=False) + task_id = Column('task_id', Integer, index=True, nullable=True) + try_id = Column('try_id', Integer, index=True, nullable=True) + timestamp = Column('timestamp', DateTime, index=True, nullable=True) + size = Column('size', BigInteger, nullable=True) + md5sum = Column('md5sum', Text, nullable=True) + __table_args__ = (PrimaryKeyConstraint('file_id'),) + + class Environment(Base): + __tablename__ = ENVIRONMENT + environment_id = Column('environment_id', Text, index=True, nullable=False) + run_id = Column('run_id', Text, index=True, nullable=False) + label = Column('label', Text, nullable=False) + address = Column('address', Text, nullable=True) + provider = Column('provider', Text, nullable=True) + launcher = Column('launcher', Text, nullable=True) + worker_init = Column('worker_init', Text, nullable=True) + __table_args__ = (PrimaryKeyConstraint('environment_id'),) + + class InputFile(Base): + __tablename__ = INPUT_FILE + file_id = Column('file_id', Text, sa.ForeignKey(FILE + ".file_id"), nullable=False) + run_id = Column('run_id', Text, index=True, nullable=False) + task_id = Column('task_id', Integer, index=True, nullable=False) + try_id = Column('try_id', Integer, index=True, nullable=False) + __table_args__ = (PrimaryKeyConstraint('file_id'),) + + class OutputFile(Base): + __tablename__ = OUTPUT_FILE + file_id = Column('file_id', Text, sa.ForeignKey(FILE + ".file_id"), nullable=False) + run_id = Column('run_id', Text, index=True, nullable=False) + task_id = Column('task_id', Integer, index=True, nullable=False) + try_id = Column('try_id', Integer, index=True, nullable=False) + __table_args__ = (PrimaryKeyConstraint('file_id'),) + + class MiscInfo(Base): + __tablename__ = MISC_INFO + run_id = Column('run_id', Text, index=True, nullable=False) + timestamp = Column('timestamp', DateTime, index=True, nullable=False) + info = Column('info', Text, nullable=False) + __table_args__ = ( + PrimaryKeyConstraint('run_id', 'timestamp'),) + class Resource(Base): __tablename__ = RESOURCE try_id = Column('try_id', Integer, nullable=False) @@ -340,6 +397,14 @@ def start(self, """ inserted_tries: Set[Any] = set() + """ + like inserted_tasks but for Files + """ + inserted_files: Dict[str, Dict[str, Union[None, datetime.datetime, str, int]]] = dict() + input_inserted_files: Dict[str, List[str]] = dict() + output_inserted_files: Dict[str, List[str]] = dict() + inserted_envs: Set[object] = set() + # for any task ID, we can defer exactly one message, which is the # assumed-to-be-unique first message (with first message flag set). # The code prior to this patch will discard previous message in @@ -383,6 +448,11 @@ def start(self, "Got {} messages from priority queue".format(len(priority_messages))) task_info_update_messages, task_info_insert_messages, task_info_all_messages = [], [], [] try_update_messages, try_insert_messages, try_all_messages = [], [], [] + file_update_messages, file_insert_messages, file_all_messages = [], [], [] + input_file_insert_messages, input_file_all_messages = [], [] + output_file_insert_messages, output_file_all_messages = [], [] + environment_insert_messages = [] + misc_info_insert_messages = [] for msg_type, msg in priority_messages: if msg_type == MessageType.WORKFLOW_INFO: if "python_version" in msg: # workflow start message @@ -419,6 +489,86 @@ def start(self, if task_try_id in deferred_resource_messages: reprocessable_first_resource_messages.append( deferred_resource_messages.pop(task_try_id)) + elif msg_type == MessageType.FILE_INFO: + file_id = msg['file_id'] + file_all_messages.append(msg) + msg['full_path'] = msg['file_name'] + loc = msg['file_name'].rfind("/") + if loc >= 0: + msg['file_path'] = msg['file_name'][:loc] + msg['file_name'] = msg['file_name'][loc + 1:] + + if file_id in inserted_files: + new_item = False + # once certain items are set, they should not be changed + if inserted_files[file_id]['timestamp'] is None: + if msg['timestamp'] is not None: + inserted_files[file_id]['timestamp'] = msg['timestamp'] + new_item = True + else: + msg['timestamp'] = inserted_files[file_id]['timestamp'] + if inserted_files[file_id]['size'] is None: + if msg['size'] is not None: + inserted_files[file_id]['size'] = msg['size'] + new_item = True + else: + msg['size'] = inserted_files[file_id]['size'] + if inserted_files[file_id]['md5sum'] is None: + if msg['md5sum'] is not None: + inserted_files[file_id]['md5sum'] = msg['md5sum'] + new_item = True + else: + msg['md5sum'] = inserted_files[file_id]['md5sum'] + if inserted_files[file_id]['task_id'] is None: + if msg['task_id'] is not None: + inserted_files[file_id]['task_id'] = msg['task_id'] + inserted_files[file_id]['try_id'] = msg['try_id'] + new_item = True + else: + if msg['task_id'] == inserted_files[file_id]['task_id']: + if inserted_files[file_id]['try_id'] is None: + inserted_files[file_id]['try_id'] = msg['try_id'] + new_item = True + elif msg['try_id'] > inserted_files[file_id]['try_id']: + inserted_files[file_id]['try_id'] = msg['try_id'] + new_item = True + else: + msg['task_id'] = inserted_files[file_id]['task_id'] + msg['try_id'] = inserted_files[file_id]['try_id'] + if new_item: + file_update_messages.append(msg) + else: + inserted_files[file_id] = {'size': msg['size'], + 'md5sum': msg['md5sum'], + 'timestamp': msg['timestamp'], + 'task_id': msg['task_id'], + 'try_id': msg['try_id']} + file_insert_messages.append(msg) + elif msg_type == MessageType.ENVIRONMENT_INFO: + if msg['environment_id'] not in inserted_envs: + environment_insert_messages.append(msg) + inserted_envs.add(msg['environment_id']) + elif msg_type == MessageType.MISC_INFO: + # no filtering, just insert each message + misc_info_insert_messages.append(msg) + elif msg_type == MessageType.INPUT_FILE: + file_id = msg['file_id'] + input_file_all_messages.append(msg) + identifier = f"{msg['run_id']}.{msg['task_id']}.{msg['try_id']}" + if file_id not in input_inserted_files: + input_inserted_files[file_id] = [] + if identifier not in input_inserted_files[file_id]: + input_inserted_files[file_id].append(identifier) + input_file_insert_messages.append(msg) + elif msg_type == MessageType.OUTPUT_FILE: + file_id = msg['file_id'] + output_file_all_messages.append(msg) + identifier = f"{msg['run_id']}.{msg['task_id']}.{msg['try_id']}" + if file_id not in output_inserted_files: + output_inserted_files[file_id] = [] + if identifier not in output_inserted_files[file_id]: + output_inserted_files[file_id].append(identifier) + output_file_insert_messages.append(msg) else: raise RuntimeError("Unexpected message type {} received on priority queue".format(msg_type)) @@ -449,6 +599,39 @@ def start(self, self._insert(table=STATUS, messages=task_info_all_messages) + if file_insert_messages: + logger.debug("Inserting {} FILE_INFO to file table".format(len(file_insert_messages))) + self._insert(table=FILE, messages=file_insert_messages) + logger.debug( + "There are {} inserted file records".format(len(inserted_files))) + + if environment_insert_messages: + logger.debug("Inserting {} ENVIRONMENT_INFO to environment table".format(len(environment_insert_messages))) + self._insert(table=ENVIRONMENT, messages=environment_insert_messages) + logger.debug( + "There are {} inserted environment records".format(len(inserted_envs))) + + if file_update_messages: + logger.debug("Updating {} FILE_INFO into file table".format(len(file_update_messages))) + self._update(table=FILE, + columns=['timestamp', 'size', 'md5sum', 'file_id', 'task_id', 'try_id'], + messages=file_update_messages) + + if input_file_insert_messages: + logger.debug("Inserting {} INPUT_FILE to input_files table".format(len(input_file_insert_messages))) + self._insert(table=INPUT_FILE, messages=input_file_insert_messages) + logger.debug("There are {} inserted input file records".format(len(input_inserted_files))) + + if output_file_insert_messages: + logger.debug("Inserting {} OUTPUT_FILE to output_files table".format(len(output_file_insert_messages))) + self._insert(table=OUTPUT_FILE, messages=output_file_insert_messages) + logger.debug("There are {} inserted output file records".format(len(output_inserted_files))) + + if misc_info_insert_messages: + logger.debug("Inserting {} MISC_INFO to misc_info table".format(len(misc_info_insert_messages))) + self._insert(table=MISC_INFO, messages=misc_info_insert_messages) + logger.debug("There are {} inserted misc info records".format(len(misc_info_insert_messages))) + if try_insert_messages: logger.debug("Inserting {} TASK_INFO to try table".format(len(try_insert_messages))) self._insert(table=TRY, messages=try_insert_messages) @@ -575,7 +758,8 @@ def _dispatch_to_internal(self, x: Tuple) -> None: assert isinstance(x, tuple) assert len(x) == 2, "expected message tuple to have exactly two elements" - if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO]: + if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO, MessageType.FILE_INFO, MessageType.INPUT_FILE, + MessageType.OUTPUT_FILE, MessageType.ENVIRONMENT_INFO, MessageType.MISC_INFO]: self.pending_priority_queue.put(cast(Any, x)) elif x[0] == MessageType.RESOURCE_INFO: body = x[1] diff --git a/parsl/monitoring/message_type.py b/parsl/monitoring/message_type.py index 366b61bd42..03a81f9dbb 100644 --- a/parsl/monitoring/message_type.py +++ b/parsl/monitoring/message_type.py @@ -17,3 +17,10 @@ class MessageType(Enum): # Reports of the block info BLOCK_INFO = 4 + + # Reports file info + FILE_INFO = 5 + INPUT_FILE = 6 + OUTPUT_FILE = 7 + ENVIRONMENT_INFO = 8 + MISC_INFO = 9 diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index b4a005e21c..4e2b8fa52b 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -43,7 +43,8 @@ def __init__(self, logging_endpoint: Optional[str] = None, monitoring_debug: bool = False, resource_monitoring_enabled: bool = True, - resource_monitoring_interval: float = 30): # in seconds + resource_monitoring_interval: float = 30, # in seconds + file_provenance: bool = False): """ Parameters ---------- @@ -77,6 +78,9 @@ def __init__(self, If set to 0, only start and end information will be logged, and no periodic monitoring will be made. Default: 30 seconds + file_provenance : bool + Set this field to True to enable logging of file provenance information. + Default: False """ if _db_manager_excepts: @@ -114,6 +118,8 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval + self.file_provenance = file_provenance + def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") diff --git a/parsl/monitoring/queries/pandas.py b/parsl/monitoring/queries/pandas.py index 9bda8422e7..50a72e3408 100644 --- a/parsl/monitoring/queries/pandas.py +++ b/parsl/monitoring/queries/pandas.py @@ -8,6 +8,38 @@ DB = Any +def input_files_for_task(workflow_id: Any, task_id: Any, db: DB) -> pd.DataFrame: + return pd.read_sql_query(""" + SELECT * + FROM input_file, file + WHERE input_file.run_id='%s' AND input_file.task_id='%s' + AND input_file.file_id = file.file_id; + """ % (workflow_id, task_id), db) + + +def output_files_for_task(workflow_id: Any, task_id: Any, db: DB) -> pd.DataFrame: + return pd.read_sql_query(""" + SELECT * + FROM output_file, file + WHERE output_file.run_id='%s' AND output_file.task_id='%s' + AND output_file.file_id = file.file_id; + """ % (workflow_id, task_id), db) + + +def full_task_info(workflow_id: Any, task_id: Any, db: DB) -> pd.DataFrame: + task_details = pd.read_sql_query(""" + SELECT * + FROM task + WHERE run_id='%s' AND task_id='%s'; + """ % (workflow_id, task_id), db) + print(task_details) + if not task_details.empty: + task_details = task_details.iloc[0] + task_details['task_inputs'] = input_files_for_task(workflow_id, task_id, db) + task_details['task_outputs'] = output_files_for_task(workflow_id, task_id, db) + return task_details + + def app_counts_for_workflow(workflow_id: Any, db: DB) -> pd.DataFrame: return pd.read_sql_query(""" SELECT task_func_name, count(*) as 'frequency' diff --git a/parsl/monitoring/visualization/app.py b/parsl/monitoring/visualization/app.py index e0c9510ee8..069eb2f598 100644 --- a/parsl/monitoring/visualization/app.py +++ b/parsl/monitoring/visualization/app.py @@ -23,6 +23,7 @@ def cli_run(): app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = args.db_path app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False + app.config['SECRET_KEY'] = os.urandom(24) db.init_app(app) with app.app_context(): diff --git a/parsl/monitoring/visualization/form_fields.py b/parsl/monitoring/visualization/form_fields.py new file mode 100644 index 0000000000..d19123b079 --- /dev/null +++ b/parsl/monitoring/visualization/form_fields.py @@ -0,0 +1,8 @@ +from flask_wtf import FlaskForm +from wtforms import StringField, SubmitField +from wtforms.validators import DataRequired + + +class FileForm(FlaskForm): + file_name = StringField('File Name', validators=[DataRequired()]) + submit = SubmitField('Submit') diff --git a/parsl/monitoring/visualization/models.py b/parsl/monitoring/visualization/models.py index dd39c1d961..ca7c55bae1 100644 --- a/parsl/monitoring/visualization/models.py +++ b/parsl/monitoring/visualization/models.py @@ -5,6 +5,11 @@ STATUS = 'status' # Status table includes task status RESOURCE = 'resource' # Resource table includes task resource utilization NODE = 'node' # Node table include node info +FILE = 'file' # Files table include file info +INPUT_FILE = 'input_file' # Input files table include input file info +OUTPUT_FILE = 'output_file' # Output files table include output file info +ENVIRONMENT = 'environment' # Executor table include executor info +MISC_INFO = 'misc_info' # Misc info table include misc info db = SQLAlchemy() @@ -67,11 +72,65 @@ class Task(db.Model): task_stdin = db.Column('task_stdin', db.Text, nullable=True) task_stdout = db.Column('task_stdout', db.Text, nullable=True) task_stderr = db.Column('task_stderr', db.Text, nullable=True) + task_environment = db.Column('task_environment', db.Text, nullable=True) __table_args__ = ( db.PrimaryKeyConstraint('task_id', 'run_id'), ) +class File(db.Model): + __tablename__ = FILE + file_name = db.Column('file_name', db.Text, index=True, nullable=False) + file_path = db.Column('file_path', db.Text, nullable=True) + full_path = db.Column('full_path', db.Text, index=True, nullable=False) + file_id = db.Column('file_id', db.Text, index=True, nullable=False) + run_id = db.Column('run_id', db.Text, index=True, nullable=False) + task_id = db.Column('task_id', db.Integer, index=True, nullable=True) + try_id = db.Column('try_id', db.Integer, index=True, nullable=True) + timestamp = db.Column('timestamp', db.DateTime, index=True, nullable=True) + size = db.Column('size', db.BigInteger, nullable=True) + md5sum = db.Column('md5sum', db.Text, nullable=True) + __table_args__ = (db.PrimaryKeyConstraint('file_id'),) + + +class Environment(db.Model): + __tablename__ = ENVIRONMENT + environment_id = db.Column('environment_id', db.Text, index=True, nullable=False) + run_id = db.Column('run_id', db.Text, index=True, nullable=False) + label = db.Column('label', db.Text, nullable=False) + address = db.Column('address', db.Text, nullable=True) + provider = db.Column('provider', db.Text, nullable=True) + launcher = db.Column('launcher', db.Text, nullable=True) + worker_init = db.Column('worker_init', db.Text, nullable=True) + __table_args__ = (db.PrimaryKeyConstraint('environment_id'),) + + +class InputFile(db.Model): + __tablename__ = INPUT_FILE + file_id = db.Column('file_id', db.Text, nullable=False) + run_id = db.Column('run_id', db.Text, index=True, nullable=False) + task_id = db.Column('task_id', db.Integer, index=True, nullable=False) + try_id = db.Column('try_id', db.Integer, index=True, nullable=False) + __table_args__ = (db.PrimaryKeyConstraint('file_id'),) + + +class OutputFile(db.Model): + __tablename__ = OUTPUT_FILE + file_id = db.Column('file_id', db.Text, nullable=False) + run_id = db. Column('run_id', db.Text, index=True, nullable=False) + task_id = db.Column('task_id', db.Integer, index=True, nullable=False) + try_id = db.Column('try_id', db.Integer, index=True, nullable=False) + __table_args__ = (db.PrimaryKeyConstraint('file_id'),) + + +class MiscInfo(db.Model): + __tablename__ = MISC_INFO + run_id = db.Column('run_id', db.Text, index=True, nullable=False) + timestamp = db.Column('timestamp', db.DateTime, index=True, nullable=False) + info = db.Column('info', db.Text, nullable=False) + __table_args__ = (db.PrimaryKeyConstraint('run_id', 'timestamp'),) + + class Resource(db.Model): __tablename__ = RESOURCE task_id = db.Column('task_id', db.Integer, db.ForeignKey( diff --git a/parsl/monitoring/visualization/templates/env.html b/parsl/monitoring/visualization/templates/env.html new file mode 100644 index 0000000000..52292d252c --- /dev/null +++ b/parsl/monitoring/visualization/templates/env.html @@ -0,0 +1,48 @@ +{% extends "layout.html" %} + +{% block content %} + +
+

Executor ({{ environment_details['environment_id'] }})

+
+
+
+
+
    +
  • Environment label: {{ environment_details['label'] }}
  • +
  • Workflow id: {{ workflow['workflow_name'] }}
  • +
  • Address: {{ environment_details['address'] }}
  • +
  • Provider: {{ environment_details['provider'] }}
  • +
  • Launcher: {{ environment_details['launcher'] }}
  • +
  • Worker init: {{ environment_details['worker_init'] }}
  • + {% if tasks %} +
  • Used by Tasks: + + + + + + + + + {% for name, ids in tasks.items() %} + + + + {% endfor %} + +
    IdName
    {% set first = True %} + {% for i in ids %} + {% if not first %}, {% endif %} + {{ i }} + {% set first = False %} + {% endfor %} + {{ name }}
    + +
  • + {% endif %} +
+
+
+
+{% endblock %} diff --git a/parsl/monitoring/visualization/templates/file.html b/parsl/monitoring/visualization/templates/file.html new file mode 100644 index 0000000000..1b3f547863 --- /dev/null +++ b/parsl/monitoring/visualization/templates/file.html @@ -0,0 +1,48 @@ +{% extends "layout.html" %} + +{% block content %} + +
+

File Provenance

+ + Files by workflow. +

+ Search for files by name or path (use % as a wildcard): +

+ {{ form.hidden_tag() }} + {{ form.file_name.label }} {{ form.file_name(size=32) }} + {{ form.submit() }} +
+ {% if form.file_name.errors %} +
    + {% for error in form.file_name.errors %} +
  • {{ error }}
  • + {% endfor %} +
+{% endif %} +{% if file_list is defined %} + {% if file_list %} + + + + + + + + + + {% for file in file_list %} + + + + + + {% endfor %} + +
File NameFile SizeCreated
{{ file['full_path'] }}{{ file['size'] }}{{ file['timestamp'] | timeformat }}
+ {% else %} +

No files found.

+ {% endif %} +{% endif %} + +{% endblock %} \ No newline at end of file diff --git a/parsl/monitoring/visualization/templates/file_detail.html b/parsl/monitoring/visualization/templates/file_detail.html new file mode 100644 index 0000000000..b79d54510b --- /dev/null +++ b/parsl/monitoring/visualization/templates/file_detail.html @@ -0,0 +1,35 @@ +{% extends "layout.html" %} + +{% block content %} + +
+

File Details

+ + + {% if file_details['file_path'] %} + + {% endif %} + + + + {% if file_details['timestamp'] %} + + {% endif %} + + {% if output_files %} + + {% endif %} + {% if input_files %} + + {% endif %} + {% if environment %} + + {% endif %} +
Name:{{ file_details['file_name'] }}
Path:{{ file_details['file_path'] }}
Id:{{ file_details['file_id'] }}
Size:{{ file_details['size'] }} bytes
md5sum:{{ file_details['md5sum'] }}
Creation date:{{ file_details['timestamp'] | timeformat }}
Workflow:{{ workflow.workflow_name }}
Created by:{{ tasks[output_files['task_id']]['task_func_name'] }}
Used by: + {% for input in input_files %} + {{ tasks[input['task_id']]['task_func_name'] }}
+ {% endfor %} +
Environment: +{{ environment['label'] }} +
+{% endblock %} \ No newline at end of file diff --git a/parsl/monitoring/visualization/templates/file_workflow.html b/parsl/monitoring/visualization/templates/file_workflow.html new file mode 100644 index 0000000000..32c7fe20ae --- /dev/null +++ b/parsl/monitoring/visualization/templates/file_workflow.html @@ -0,0 +1,47 @@ +{% extends "layout.html" %} + +{% block content %} + +
+ +

Files for Workflow {{ workflow['workflow_name'] }}

+ + + + + + + + + + + + {% for tid, task in tasks | dictsort %} + + + + + + + {% endfor %} + + +
TaskTask IdInput FilesOutput Files
{{ task['task_func_name'] }}{{ task['task_id'] }} + {% if task['task_inputs'] %} + {% for input in task_files[tid]['inputs'] %} + {{ file_map[input['file_id']] }}
+ {% endfor %} + {% else %} + None + {% endif %} +
+ {% if task['task_outputs'] %} + {% for output in task_files[tid]['outputs'] %} + {{ file_map[output['file_id']] }}
+ {% endfor %} + {% else %} + None + {% endif %} +
+ +{% endblock %} \ No newline at end of file diff --git a/parsl/monitoring/visualization/templates/layout.html b/parsl/monitoring/visualization/templates/layout.html index 0b25df42b5..3e337ad73c 100644 --- a/parsl/monitoring/visualization/templates/layout.html +++ b/parsl/monitoring/visualization/templates/layout.html @@ -33,9 +33,10 @@ diff --git a/parsl/monitoring/visualization/templates/task.html b/parsl/monitoring/visualization/templates/task.html index 3e763401f3..26835e20c4 100644 --- a/parsl/monitoring/visualization/templates/task.html +++ b/parsl/monitoring/visualization/templates/task.html @@ -9,13 +9,20 @@

{{ task_details['task_func_name'] }} ({{ task_details['task_id'] }})

    -
  • Workflow name: {{ workflow_details['workflow_name'] }}
  • +
  • Workflow name: {{ workflow_details['workflow_name'] }}
  • Started: {{ workflow_details['time_began'] | timeformat }}
  • Completed: {{ workflow_details['time_completed'] | timeformat }}
  • Workflow duration: {{ (workflow_details['time_began'], workflow_details['time_completed']) | durationformat }}
  • Owner: {{ workflow_details['user'] }}
  • task_func_name: {{ task_details['task_func_name'] }}
  • task_id: {{ task_details['task_id'] }}
  • +
  • Task executor: + {% if environments[task_details['task_environment']] %} + {{ task_details['task_environment'] }} + {% else %} + {{ task_details['task_environment'] }} + {% endif %} +
  • task_depends: {% if task_details['task_depends'] %} {% for id in task_details['task_depends'].split(",") %} @@ -27,8 +34,30 @@

    {{ task_details['task_func_name'] }} ({{ task_details['task_id'] }})

  • task_time_invoked: {{ task_details['task_time_invoked'] | timeformat }}
  • task_time_returned: {{ task_details['task_time_returned'] | timeformat }}
  • -
  • task_inputs: {{ task_details['task_inputs'] }}
  • -
  • task_outputs: {{ task_details['task_outputs'] }}
  • +
  • File inputs: + {% if not task_details['task_inputs'].empty %} + + {% else %} + None + {% endif %} +
  • +
  • File outputs: + {% if not task_details['task_outputs'].empty %} +
      + {% for ofile in task_details['task_outputs'].itertuples() %} +
    • {{ ofile.file_name }}
    • + {% endfor %} +
    + {% else %} + None + {% endif %} +
  • +
  • task_args: {{ task_details['task_args'] }}
  • +
  • task_kwargs: {{ task_details['task_kwargs'] }}
  • task_stdin: {{ task_details['task_stdin'] }}
  • task_stdout: {{ task_details['task_stdout'] }}
  • task_stderr: {{ task_details['task_stderr'] }}
  • diff --git a/parsl/monitoring/visualization/templates/workflow.html b/parsl/monitoring/visualization/templates/workflow.html index d8b78a1851..bafcb6888f 100644 --- a/parsl/monitoring/visualization/templates/workflow.html +++ b/parsl/monitoring/visualization/templates/workflow.html @@ -44,11 +44,31 @@
    App Summary
- +{% if have_misc %} +
Misc Info
+ + + + + + + + + {% for m_info in misc_info %} + + + + + {% endfor %} + +
DateInfo
{{ m_info['timestamp'] | timeformat }}{{ m_info['info'] }}
+{% endif %} View workflow DAG -- colored by apps
View workflow DAG -- colored by task states
View workflow resource usage - + {% if have_files %} +
View workflow file provenance + {% endif %} {{ task_gantt | safe }} {{ task_per_app |safe }} diff --git a/parsl/monitoring/visualization/templates/workflows_summary.html b/parsl/monitoring/visualization/templates/workflows_summary.html index 277e2fea70..2bee89adc8 100644 --- a/parsl/monitoring/visualization/templates/workflows_summary.html +++ b/parsl/monitoring/visualization/templates/workflows_summary.html @@ -14,6 +14,7 @@

Workflows

Status Runtime (s) Tasks + Files @@ -29,6 +30,11 @@

Workflows

{{ w['tasks_failed_count'] }} + {% if w['run_id'] in have_files %} + files + {% else %} + + {% endif %} {% endfor %} diff --git a/parsl/monitoring/visualization/views.py b/parsl/monitoring/visualization/views.py index 8e34119143..67aec121ad 100644 --- a/parsl/monitoring/visualization/views.py +++ b/parsl/monitoring/visualization/views.py @@ -1,9 +1,23 @@ +import datetime +import os.path as ospath + import pandas as pd from flask import current_app as app -from flask import render_template +from flask import render_template, request import parsl.monitoring.queries.pandas as queries -from parsl.monitoring.visualization.models import Status, Task, Workflow, db +from parsl.monitoring.visualization.form_fields import FileForm +from parsl.monitoring.visualization.models import ( + Environment, + File, + InputFile, + MiscInfo, + OutputFile, + Status, + Task, + Workflow, + db, +) from parsl.monitoring.visualization.plots.default.task_plots import ( time_series_memory_per_task_plot, ) @@ -20,8 +34,6 @@ dummy = True -import datetime - def format_time(value): if value is None: @@ -52,11 +64,76 @@ def format_duration(value): @app.route('/') def index(): workflows = Workflow.query.all() + have_files = [] for workflow in workflows: workflow.status = 'Running' if workflow.time_completed is not None: workflow.status = 'Completed' - return render_template('workflows_summary.html', workflows=workflows) + file_list = File.query.filter_by(run_id=workflow.run_id).first() + if file_list: + have_files.append(workflow.run_id) + return render_template('workflows_summary.html', workflows=workflows, have_files=have_files) + + +@app.route('/file//') +def file_id(file_id): + file_details = File.query.filter_by(file_id=file_id).first() + input_files = InputFile.query.filter_by(file_id=file_id).all() + output_files = OutputFile.query.filter_by(file_id=file_id).first() + task_ids = set() + environ = None + + for f in input_files: + task_ids.add(f.task_id) + if output_files: + task_ids.add(output_files.task_id) + tasks = {} + for tid in task_ids: + tasks[tid] = Task.query.filter_by(run_id=file_details.run_id, task_id=tid).first() + workflow_details = Workflow.query.filter_by(run_id=file_details.run_id).first() + if output_files: + environ = Environment.query.filter_by(environment_id=tasks[output_files.task_id].task_environment).first() + + return render_template('file_detail.html', file_details=file_details, + input_files=input_files, output_files=output_files, + tasks=tasks, workflow=workflow_details, environment=environ) + + +@app.route('/file', methods=['GET', 'POST']) +def file(): + form = FileForm() + if request.method == 'POST': + file_list = [] + if form.validate_on_submit(): + if not form.file_name.data.startswith('%'): + filename = '%' + form.file_name.data + else: + filename = form.file_name.data + file_list = File.query.filter(File.full_path.like(filename)).all() + return render_template('file.html', form=form, file_list=file_list) + return render_template('file.html', form=form) + + +@app.route('/file/workflow//') +def file_workflow(workflow_id): + workflow_files = File.query.filter_by(run_id=workflow_id).all() + file_map = {} + workflow_details = Workflow.query.filter_by(run_id=workflow_id).first() + task_ids = set() + files_by_task = {} + file_details = {} + for wf in workflow_files: + file_details[wf.file_id] = wf + task_ids.add(wf.task_id) + file_map[wf.file_id] = ospath.basename(wf.full_path) + tasks = {} + + for tid in task_ids: + tasks[tid] = Task.query.filter_by(run_id=workflow_id, task_id=tid).first() + files_by_task[tid] = {'inputs': InputFile.query.filter_by(run_id=workflow_id, task_id=tid).all(), + 'outputs': OutputFile.query.filter_by(run_id=workflow_id, task_id=tid).all()} + return render_template('file_workflow.html', workflow=workflow_details, + task_files=files_by_task, tasks=tasks, file_map=file_map) @app.route('/workflow//') @@ -70,12 +147,37 @@ def workflow(workflow_id): df_task = queries.completion_times_for_workflow(workflow_id, db.engine) df_task_tries = queries.tries_for_workflow(workflow_id, db.engine) task_summary = queries.app_counts_for_workflow(workflow_id, db.engine) - + file_list = File.query.filter_by(run_id=workflow_id).first() + if file_list: + have_files = True + else: + have_files = False + misc_info = MiscInfo.query.filter_by(run_id=workflow_id).order_by(MiscInfo.timestamp.asc()).all() + if misc_info: + have_misc = True + else: + have_misc = False return render_template('workflow.html', workflow_details=workflow_details, task_summary=task_summary, task_gantt=task_gantt_plot(df_task, df_status, time_completed=workflow_details.time_completed), - task_per_app=task_per_app_plot(df_task_tries, df_status, time_completed=workflow_details.time_completed)) + task_per_app=task_per_app_plot(df_task_tries, df_status, time_completed=workflow_details.time_completed), + have_files=have_files, misc_info=misc_info, have_misc=have_misc) + + +@app.route('/workflow//environment/') +def environment(workflow_id, environment_id): + environment_details = Environment.query.filter_by(environment_id=environment_id).first() + workflow = Workflow.query.filter_by(run_id=workflow_id).first() + task_list = Task.query.filter_by(task_environment=environment_id).all() + tasks = {} + for task in task_list: + if task.task_func_name not in tasks: + tasks[task.task_func_name] = [] + tasks[task.task_func_name].append(task.task_id) + + return render_template('env.html', environment_details=environment_details, + workflow=workflow, tasks=tasks) @app.route('/workflow//app/') @@ -114,12 +216,15 @@ def task(workflow_id, task_id): if workflow_details is None: return render_template('error.html', message="Workflow %s could not be found" % workflow_id) - task_details = Task.query.filter_by( - run_id=workflow_id, task_id=task_id).first() + task_details = queries.full_task_info(workflow_id, task_id, db.engine) task_status = Status.query.filter_by( run_id=workflow_id, task_id=task_id).order_by(Status.timestamp) df_resources = queries.resources_for_task(workflow_id, task_id, db.engine) + environments = Environment.query.filter_by(run_id=workflow_id).all() + environs = {} + for env in environments: + environs[env.environment_id] = env.label return render_template('task.html', workflow_details=workflow_details, @@ -127,6 +232,7 @@ def task(workflow_id, task_id): task_status=task_status, time_series_memory_resident=time_series_memory_per_task_plot( df_resources, 'psutil_process_memory_resident', 'Memory Usage'), + environments=environs ) diff --git a/parsl/tests/test_monitoring/test_file_provenance.py b/parsl/tests/test_monitoring/test_file_provenance.py new file mode 100644 index 0000000000..165fdbe0f2 --- /dev/null +++ b/parsl/tests/test_monitoring/test_file_provenance.py @@ -0,0 +1,114 @@ +import os +import shutil +import time + +import pytest + +import parsl +from parsl.config import Config +from parsl.data_provider.files import File +from parsl.executors import ThreadPoolExecutor +from parsl.monitoring import MonitoringHub +from parsl.monitoring.radios.udp import UDPRadio + + +@parsl.bash_app +def initialize(outputs=[]): + import time + time.sleep(1) + return f"echo 'Initialized' > {outputs[0]}" + + +@parsl.python_app +def split_data(inputs=None, outputs=None): + with open(inputs[0], 'r') as fh: + data = fh.read() + for i, op in enumerate(outputs): + with open(op, 'w') as ofh: + ofh.write(f"{i + 1}\n") + ofh.write(data) + + +@parsl.python_app +def process(inputs=None, outputs=None): + with open(inputs[0], 'r') as fh: + data = fh.read() + with open(outputs[0], 'w') as ofh: + ofh.write(f"{data} processed") + + +@parsl.python_app +def combine(inputs=None, outputs=None): + with open(outputs[0], 'w') as ofh: + for fl in inputs: + with open(fl, 'r') as fh: + ofh.write(fh.read()) + ofh.write("\n") + + +def fresh_config(): + return Config( + executors=[ + ThreadPoolExecutor(remote_monitoring_radio=UDPRadio(address="localhost", atexit_timeout=0)) + ], + strategy='simple', + strategy_period=0.1, + monitoring=MonitoringHub(file_provenance=True) + ) + + +@pytest.mark.local +def test_provenance(tmpd_cwd): + # this is imported here rather than at module level because + # it isn't available in a plain parsl install, so this module + # would otherwise fail to import and break even a basic test + # run. + import sqlalchemy + from sqlalchemy import text + + cfg = fresh_config() + cfg.run_dir = tmpd_cwd + cfg.monitoring.logging_endpoint = f"sqlite:///{tmpd_cwd}/monitoring.db" + + with parsl.load(cfg) as my_dfk: + cwd = os.getcwd() + if os.path.exists(os.path.join(cwd, 'provenance')): + shutil.rmtree(os.path.join(cwd, 'provenance')) + os.mkdir(os.path.join(cwd, 'provenance')) + os.chdir(os.path.join(cwd, 'provenance')) + + my_dfk.log_info("Starting Run") + + init = initialize(outputs=[File(os.path.join(os.getcwd(), 'initialize.txt'))]) + + sd = split_data(inputs=[init.outputs[0]], + outputs=[File(os.path.join(os.getcwd(), f'split_data_{i}.txt')) for i in range(4)]) + + p = [process(inputs=[sdo], outputs=[File(os.path.join(os.getcwd(), f'processed_data_{i}.txt'))]) for i, sdo in + enumerate(sd.outputs)] + + c = combine(inputs=[pp.outputs[0] for pp in p], outputs=[File(os.path.join(os.getcwd(), 'combined_data.txt'))]) + + c.result() + os.chdir(cwd) + my_dfk.log_info("Ending run") + time.sleep(2) + + engine = sqlalchemy.create_engine(cfg.monitoring.logging_endpoint) + with engine.begin() as connection: + def count_rows(table: str): + result = connection.execute(text(f"SELECT COUNT(*) FROM {table}")) + (count,) = result.first() + return count + + assert count_rows("workflow") == 1 + + assert count_rows("task") == 7 + + assert count_rows("file") == 10 + + assert count_rows("input_file") == 9 + + assert count_rows("output_file") == 10 + + assert count_rows("misc_info") == 2 diff --git a/parsl/tests/unit/executors/high_throughput/test_process_worker_pool.py b/parsl/tests/unit/executors/high_throughput/test_process_worker_pool.py index 20b09554a6..2c8879832b 100644 --- a/parsl/tests/unit/executors/high_throughput/test_process_worker_pool.py +++ b/parsl/tests/unit/executors/high_throughput/test_process_worker_pool.py @@ -36,6 +36,20 @@ def test_arg_parser_known_required(): @pytest.mark.local @pytest.mark.parametrize("req", _known_required) +@pytest.mark.skipif(sys.version_info > (3, 12), reason="parse_args behavior changed in 3.13") +def test_arg_parser_required_pre_3_13(req): + p = process_worker_pool.get_arg_parser() + p.exit_on_error = False + with pytest.raises(SystemExit) as pyt_exc: + p.parse_args([]) + + e_msg = pyt_exc.value.args[1] + assert req in e_msg + + +@pytest.mark.local +@pytest.mark.parametrize("req", _known_required) +@pytest.mark.skipif(sys.version_info < (3, 13), reason="parse_args behavior changed in 3.13") def test_arg_parser_required(req): p = process_worker_pool.get_arg_parser() p.exit_on_error = False diff --git a/setup.py b/setup.py index 673e5c84fc..ab3481c366 100755 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ 'networkx>=3.2,<3.3', 'Flask>=1.0.2', 'flask_sqlalchemy', - + 'Flask-WTF', # pandas uses "loose semantic versioning" # https://pandas.pydata.org/docs/development/policies.html#version-policy 'pandas<3,>=2.2',