Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ ENV/
.mypy_cache/

# emacs buffers
\#*
\#*
Binary file added docs/images/mon_env_detail.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/mon_file_detail.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/mon_file_provenance.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/mon_task_detail.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/mon_workflow_files.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/images/mon_workflows_page.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
52 changes: 51 additions & 1 deletion docs/userguide/advanced/monitoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,55 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por
)


The monitoring system can also be used to track file provenance. File provenance is defined as the history of a file including:

* When the files was created
* File size in bytes
* File md5sum
* What task created the file
* What task(s) used the file
* What inputs were given to the task that created the file
* What environment was used (e.g. the 'worker_init' entry from a :py:class:`~parsl.providers.ExecutionProvider`), not available with every provider.

The purpose of the file provenance tracking is to provide a mechanism where the user can see exactly how a file was created and used in a workflow. This can be useful for debugging, understanding the workflow, for ensuring that the workflow is reproducible, and reviewing past work. The file provenance information is stored in the monitoring database and can be accessed using the ``parsl-visualize`` tool. To enable file provenance tracking, set the ``file_provenance`` flag to ``True`` in the `parsl.monitoring.MonitoringHub` configuration.

This functionality also enables you to log informational messages from you scripts, to capture anything not automatically gathered. The main change to your code to use this functionality is to assign the return value of the ``parsl.load`` to a variable. Then use the ``log_info`` function to log the messages in the database. Note that this feature is only available in the main script, not inside Apps. Passing this variable, ``my_cfg`` in the example below to an App will have undefined behavior. The following example shows how to use this feature.

.. code-block:: python

import parsl
from parsl.monitoring.monitoring import MonitoringHub
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_hostname

import logging

config = Config(
executors=[
HighThroughputExecutor(
label="local_htex",
cores_per_worker=1,
max_workers_per_node=4,
address=address_by_hostname(),
)
],
monitoring=MonitoringHub(
hub_address=address_by_hostname(),
hub_port=55055,
monitoring_debug=False,
resource_monitoring_interval=10,
file_provenance=True,
),
strategy='none'
)

my_cfg = parsl.load(config)

my_cfg.log_info("This is an informational message")

Known limitations: The file provenance feature will capture the creation of files and the use of files in an app, but does not capture the modification of files it already knows about.

Visualization
-------------

Expand Down Expand Up @@ -88,7 +137,7 @@ It provides a high level summary of workflow state as shown below:

.. image:: ../../images/mon_workflows_page.png

Throughout the dashboard, all blue elements are clickable. For example, clicking a specific worklow
Throughout the dashboard, all blue elements are clickable. For example, clicking a specific workflow
name from the table takes you to the Workflow Summary page described in the next section.

Workflow Summary
Expand Down Expand Up @@ -117,3 +166,4 @@ The workflow summary also presents three different views of the workflow:

.. image:: ../../images/mon_resource_summary.png

* Workflow file provenance: This visualization gives a tabular listing of each task that created (output) or used (input) a file. Each listed file has a link to a page detailing the file's information.
58 changes: 52 additions & 6 deletions parsl/app/futures.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
"""This module implements DataFutures.
"""
import logging
import os.path
from concurrent.futures import Future
from datetime import datetime, timezone
from hashlib import md5
from os import stat
from typing import Optional

import typeguard

Expand All @@ -14,7 +19,7 @@ class DataFuture(Future):
"""A datafuture points at an AppFuture.

We are simply wrapping a AppFuture, and adding the specific case where, if
the future is resolved i.e file exists, then the DataFuture is assumed to be
the future is resolved i.e. file exists, then the DataFuture is assumed to be
resolved.
"""

Expand All @@ -30,15 +35,15 @@ def parent_callback(self, parent_fu):
Returns:
- None
"""

e = parent_fu._exception
if e:
self.set_exception(e)
else:
self.set_result(self.file_obj)
self.update_file_provenance()

@typeguard.typechecked
def __init__(self, fut: Future, file_obj: File, tid: int) -> None:
def __init__(self, fut: Future, file_obj: File, tid: int, track_provenance: Optional[bool] = False) -> None:
"""Construct the DataFuture object.

If the file_obj is a string convert to a File.
Expand All @@ -48,16 +53,22 @@ def __init__(self, fut: Future, file_obj: File, tid: int) -> None:
Completion of ``fut`` indicates that the data is
ready.
- file_obj (File) : File that this DataFuture represents the availability of

Kwargs:
- tid (task_id) : Task id that this DataFuture tracks
Kwargs:
- track_provenance (bool) : If True then track the underlying file's provenance. Default is False.
"""
super().__init__()
self._tid = tid
self.file_obj = file_obj
self.parent = fut

self.track_provenance = track_provenance
self.parent.add_done_callback(self.parent_callback)
# only capture this if needed
if self.track_provenance and self.file_obj.scheme == 'file' and os.path.exists(file_obj.path):
file_stat = os.stat(file_obj.path)
self.file_obj.timestamp = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc)
self.file_obj.size = file_stat.st_size
self.file_obj.md5sum = md5(open(self.file_obj, 'rb').read()).hexdigest()

logger.debug("Creating DataFuture with parent: %s and file: %s", self.parent, repr(self.file_obj))

Expand All @@ -76,6 +87,30 @@ def filename(self):
"""Filepath of the File object this datafuture represents."""
return self.filepath

@property
def uu_id(self):
"""UUID of the File object this datafuture represents."""
return self.file_obj.uu_id

@property
def timestamp(self):
"""Timestamp when the future was marked done."""
return self.file_obj.timestamp

@timestamp.setter
def timestamp(self, value: Optional[datetime]) -> None:
self.file_obj.timestamp = value

@property
def size(self):
"""Size of the file."""
return self.file_obj.size

@property
def md5sum(self):
"""MD5 sum of the file."""
return self.file_obj.md5sum

def cancel(self):
raise NotImplementedError("Cancel not implemented")

Expand All @@ -97,3 +132,14 @@ def __repr__(self) -> str:
else:
done = "not done"
return f"<{module}.{qualname} object at {hex(id(self))} representing {repr(self.file_obj)} {done}>"

def update_file_provenance(self):
""" Update any file provenance information, but only if the file object if it is a File
"""
if self.track_provenance and self.file_obj.scheme == 'file' and os.path.isfile(self.file_obj.filepath):
if not self.file_obj.timestamp:
self.file_obj.timestamp = datetime.fromtimestamp(stat(self.file_obj.filepath).st_ctime, tz=timezone.utc)
if not self.file_obj.size:
self.file_obj.size = stat(self.file_obj.filepath).st_size
if not self.file_obj.md5sum:
self.file_obj.md5sum = md5(open(self.file_obj, 'rb').read()).hexdigest()
2 changes: 1 addition & 1 deletion parsl/curvezmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _start_auth_thread(self) -> ThreadAuthenticator:
auth_thread.start()
# Only allow certs that are in the cert dir
assert self.cert_dir # For mypy
auth_thread.configure_curve(domain="*", location=self.cert_dir)
auth_thread.configure_curve(domain="*", location=str(self.cert_dir))
return auth_thread

def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket:
Expand Down
2 changes: 1 addition & 1 deletion parsl/data_provider/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def optionally_stage_in(self, input, func, executor):
# replace the input DataFuture with a new DataFuture which will complete at
# the same time as the original one, but will contain the newly
# copied file
input = DataFuture(input, file, tid=input.tid)
input = DataFuture(input, file, input.tid, track_provenance=self.dfk.file_provenance)
elif isinstance(input, File):
file = input.cleancopy()
input = file
Expand Down
18 changes: 15 additions & 3 deletions parsl/data_provider/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
on where (client-side, remote-side, intermediary-side) the File.filepath is
being called from.
"""
import datetime
import logging
import os
import uuid
from typing import Optional, Union
from urllib.parse import urlparse

Expand All @@ -28,8 +30,9 @@ class File:
"""

@typeguard.typechecked
def __init__(self, url: Union[os.PathLike, str]):
"""Construct a File object from a url string.
def __init__(self, url: Union[os.PathLike, str], file_uuid: Optional[uuid.UUID] = None,
timestamp: Optional[datetime.datetime] = None):
"""Construct a File object from an url string.

Args:
- url (string or PathLike) : url of the file e.g.
Expand All @@ -38,22 +41,30 @@ def __init__(self, url: Union[os.PathLike, str]):
- 'file:///scratch/proj101/input.txt'
- 'globus://go#ep1/~/data/input.txt'
- 'globus://ddb59aef-6d04-11e5-ba46-22000b92c6ec/home/johndoe/data/input.txt'
- file_uuid (uuid.UUID) : unique identifier for the file, default is `None`
- timestamp (datetime.datetime) : creation timestamp for the file, default is `None`
"""
self.url = str(url)
parsed_url = urlparse(self.url)
self.scheme = parsed_url.scheme if parsed_url.scheme else 'file'
self.netloc = parsed_url.netloc
self.path = parsed_url.path
self.filename = os.path.basename(self.path)
# let the DFK set these values, if needed
self.size: Optional[int] = None
self.md5sum: Optional[str] = None
self.timestamp = timestamp

self.local_path: Optional[str] = None
self.uu_id = uuid.uuid4() if file_uuid is None else file_uuid

def cleancopy(self) -> "File":
"""Returns a copy of the file containing only the global immutable state,
without any mutable site-local local_path information. The returned File
object will be as the original object was when it was constructed.
"""
logger.debug("Making clean copy of File object {}".format(repr(self)))
return File(self.url)
return File(self.url, self.uu_id, self.timestamp)

def __str__(self) -> str:
return self.filepath
Expand All @@ -67,6 +78,7 @@ def __repr__(self) -> str:
f"netloc={self.netloc}",
f"path={self.path}",
f"filename={self.filename}",
f"uuid={self.uu_id}",
]
if self.local_path is not None:
content.append(f"local_path={self.local_path}")
Expand Down
Loading