Skip to content

Commit fb92e8b

Browse files
Added provenance_manager extension point
This commit adds the possibility to upload custom `ProvenanceManager` classes through StreamFlow plugins and to visualize them through the `streamflow ext list` and `streamflow ext show` commands
1 parent 2670704 commit fb92e8b

File tree

7 files changed

+113
-33
lines changed

7 files changed

+113
-33
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ zip-safe = true
7272
"streamflow.deployment.connector" = ["schemas/base/*.json", "schemas/*.json"]
7373
"streamflow.deployment.filter" = ["schemas/*.json"]
7474
"streamflow.persistence" = ["schemas/*.sql", "schemas/*.json"]
75+
"streamflow.provenance" = ["schemas/**/*.json"]
7576
"streamflow.recovery" = ["schemas/*.json"]
7677
"streamflow.scheduling" = ["schemas/*.json"]
7778
"streamflow.scheduling.policy" = ["schemas/*.json"]

streamflow/ext/plugin.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from streamflow.core.data import DataManager
88
from streamflow.core.deployment import BindingFilter, Connector, DeploymentManager
99
from streamflow.core.persistence import Database
10+
from streamflow.core.provenance import ProvenanceManager
1011
from streamflow.core.recovery import CheckpointManager, FailureManager
1112
from streamflow.core.scheduling import Policy, Scheduler
1213
from streamflow.cwl.requirement.docker import cwl_docker_translator_classes
@@ -17,6 +18,7 @@
1718
from streamflow.deployment.filter import binding_filter_classes
1819
from streamflow.log_handler import logger
1920
from streamflow.persistence import database_classes
21+
from streamflow.provenance import provenance_manager_classes
2022
from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes
2123
from streamflow.scheduling import scheduler_classes
2224
from streamflow.scheduling.policy import policy_classes
@@ -31,6 +33,7 @@
3133
"deployment_manager": deployment_manager_classes,
3234
"failure_manager": failure_manager_classes,
3335
"policy": policy_classes,
36+
"provenance_manager": provenance_manager_classes,
3437
"scheduler": scheduler_classes,
3538
}
3639

@@ -85,6 +88,28 @@ def register_failure_manager(self, name: str, cls: type[FailureManager]):
8588
def register_policy(self, name: str, cls: type[Policy]):
8689
self._register(name, cls, "policy")
8790

91+
def register_provenance_manager(
92+
self, name: str, wf_type: str, cls: type[ProvenanceManager]
93+
):
94+
self.classes_.setdefault("provenance_manager", []).append(
95+
{
96+
"name": f"{name}/{wf_type}",
97+
"class": cls,
98+
}
99+
)
100+
if (
101+
name in extension_points["provenance_manager"]
102+
and wf_type in extension_points["provenance_manager"][name]
103+
):
104+
if logger.isEnabledFor(logging.WARNING):
105+
logger.warning(
106+
"{} is already installed and will be overridden by {}".format(
107+
f"{name}/{wf_type}",
108+
self.__class__.__module__ + "." + self.__class__.__name__,
109+
)
110+
)
111+
extension_points["provenance_manager"][name] = {wf_type: cls}
112+
88113
def register_scheduler(self, name: str, cls: type[Scheduler]):
89114
self._register(name, cls, "scheduler")
90115

streamflow/ext/utils.py

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,28 @@
1919
PLUGIN_ENTRY_POINT = "unito.streamflow.plugin"
2020

2121

22+
def _add_extension(
23+
extensions: MutableMapping[str, Any],
24+
ext_name: str,
25+
name: str,
26+
plugin_name: str,
27+
value: type | MutableMapping[str, type],
28+
) -> None:
29+
if ext_name == "provenance_manager":
30+
for wf_type in value:
31+
extensions.setdefault(ext_name, {})[name] = {
32+
"name": f"{name}/{wf_type}",
33+
"class": get_class_fullname(value[wf_type]),
34+
"plugin": plugin_name,
35+
}
36+
else:
37+
extensions.setdefault(ext_name, {})[name] = {
38+
"name": name,
39+
"class": get_class_fullname(value),
40+
"plugin": plugin_name,
41+
}
42+
43+
2244
def _filter_by_name(classes: MutableMapping[str, Any], name: str):
2345
filtered_classes = {}
2446
for class_ in classes:
@@ -39,6 +61,31 @@ def _flatten_all_of(entity_schema):
3961
return dict(sorted(entity_schema["properties"].items()))
4062

4163

64+
def _get_plugin_and_class(name: str, type_: str) -> tuple[str, type | None]:
65+
if type_ == "provenance_manager":
66+
if "/" not in name:
67+
return "-", None
68+
prov_name, wf_type = name.split("/", 1)
69+
if (
70+
prov_name in extension_points[type_]
71+
and wf_type in extension_points[type_][prov_name]
72+
):
73+
return "-", extension_points[type_][prov_name][wf_type]
74+
elif name in extension_points[type_]:
75+
return "-", extension_points[type_][name]
76+
for plugin_obj in entry_points(group=PLUGIN_ENTRY_POINT):
77+
plugin_class = (plugin_obj.load())()
78+
if isinstance(plugin_class, StreamFlowPlugin):
79+
plugin_class.register()
80+
plugin_classes = _filter_by_name(
81+
{k: v for k, v in plugin_class.classes_.items() if k == type_}, name
82+
)
83+
for item in plugin_classes.get(type_, []):
84+
if item["name"] == name:
85+
return plugin_obj.name, item["class"]
86+
return "-", None
87+
88+
4289
def _get_property_desc(
4390
k: str, obj: MutableMapping[str, Any], refs: MutableMapping[str, Any]
4491
) -> str:
@@ -151,11 +198,13 @@ def list_extensions(name: str | None, type_: str | None):
151198
if type_ is None or ext == type_:
152199
for n, v in classes.items():
153200
if name is None or name == n:
154-
extensions.setdefault(ext, {})[n] = {
155-
"name": n,
156-
"class": get_class_fullname(v),
157-
"plugin": "-",
158-
}
201+
_add_extension(
202+
extensions=extensions,
203+
ext_name=ext,
204+
name=n,
205+
plugin_name="-",
206+
value=v,
207+
)
159208
for k in max_sizes:
160209
max_sizes[k] = max(max_sizes[k], len(extensions[ext][n][k]))
161210
if plugins := entry_points(group=PLUGIN_ENTRY_POINT):
@@ -172,11 +221,13 @@ def list_extensions(name: str | None, type_: str | None):
172221
plugin_classes = _filter_by_name(plugin_classes, name)
173222
for ext, items in plugin_classes.items():
174223
for item in items:
175-
extensions.setdefault(ext, {})[item["name"]] = {
176-
"name": item["name"],
177-
"class": get_class_fullname(item["class"]),
178-
"plugin": plugin.name,
179-
}
224+
_add_extension(
225+
extensions=extensions,
226+
ext_name=ext,
227+
name=item["name"],
228+
plugin_name=plugin.name,
229+
value=item["class"],
230+
)
180231
for k in max_sizes:
181232
max_sizes[k] = max(
182233
max_sizes[k], len(extensions[ext][item["name"]][k])
@@ -288,22 +339,7 @@ def load_extensions():
288339

289340

290341
def show_extension(name: str, type_: str):
291-
plugin = "-"
292-
if name in extension_points[type_]:
293-
class_ = extension_points[type_][name]
294-
else:
295-
class_ = None
296-
for plugin_obj in entry_points(group=PLUGIN_ENTRY_POINT):
297-
plugin_class = (plugin_obj.load())()
298-
if isinstance(plugin_class, StreamFlowPlugin):
299-
plugin_class.register()
300-
plugin_classes = _filter_by_name(
301-
{k: v for k, v in plugin_class.classes_.items() if k == type_}, name
302-
)
303-
for item in plugin_classes.get(type_, []):
304-
if item["name"] == name:
305-
class_ = item["class"]
306-
plugin = plugin_obj.name
342+
plugin, class_ = _get_plugin_and_class(name, type_)
307343
if class_ is not None:
308344
class_name = get_class_fullname(class_)
309345
entity_schema = class_.get_schema()

streamflow/main.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from streamflow.parser import parser
3232
from streamflow.persistence import database_classes
3333
from streamflow.persistence.loading_context import DefaultDatabaseLoadingContext
34-
from streamflow.provenance import prov_classes
34+
from streamflow.provenance import provenance_manager_classes
3535
from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes
3636
from streamflow.scheduling import scheduler_classes
3737

@@ -126,20 +126,20 @@ async def _async_prov(args: argparse.Namespace):
126126
f"Workflow {args.workflow} is associated to the following types: {','.join(wf_type)}"
127127
)
128128
wf_type = list(wf_type)[0]
129-
if args.type not in prov_classes:
129+
if args.type not in provenance_manager_classes:
130130
raise WorkflowProvenanceException(
131131
f"{args.type} provenance format is not supported."
132132
)
133-
elif wf_type not in prov_classes[args.type]:
133+
elif wf_type not in provenance_manager_classes[args.type]:
134134
raise WorkflowProvenanceException(
135135
"{} provenance format is not supported for workflows of type {}.".format(
136136
args.type, wf_type
137137
)
138138
)
139139
else:
140-
provenance_manager: ProvenanceManager = prov_classes[args.type][wf_type](
141-
context, db_context, workflows
142-
)
140+
provenance_manager: ProvenanceManager = provenance_manager_classes[
141+
args.type
142+
][wf_type](context, db_context, workflows)
143143
await provenance_manager.create_archive(
144144
outdir=args.outdir,
145145
filename=args.name,

streamflow/provenance/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from streamflow.provenance.run_crate import CWLRunCrateProvenanceManager
22

3-
prov_classes = {"run_crate": {"cwl": CWLRunCrateProvenanceManager}}
3+
provenance_manager_classes = {"run_crate": {"cwl": CWLRunCrateProvenanceManager}}

streamflow/provenance/run_crate.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import uuid
1313
from abc import ABC, abstractmethod
1414
from collections.abc import MutableMapping, MutableSequence
15+
from importlib.resources import files
1516
from typing import Any, cast, get_args
1617
from zipfile import ZipFile
1718

@@ -1424,6 +1425,16 @@ async def get_main_entity(self) -> MutableMapping[str, Any]:
14241425
)
14251426
return main_entity
14261427

1428+
@classmethod
1429+
def get_schema(cls) -> str:
1430+
return (
1431+
files(__package__)
1432+
.joinpath("schemas")
1433+
.joinpath("run_crate")
1434+
.joinpath("cwl.json")
1435+
.read_text("utf-8")
1436+
)
1437+
14271438
async def get_property_value(
14281439
self, name: str, token: Token
14291440
) -> MutableMapping[str, Any] | None:
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"$schema": "https://json-schema.org/draft/2019-09/schema",
3+
"$id": "https://streamflow.di.unito.it/schemas/provenance/run_crate/cwl.json",
4+
"type": "object",
5+
"properties": {},
6+
"additionalProperties": false
7+
}

0 commit comments

Comments
 (0)