Skip to content

Commit fe1c2c7

Browse files
feat(workflow): option to ignore deleted outputs in status/update (#2832)
1 parent 6759d33 commit fe1c2c7

File tree

17 files changed

+555
-317
lines changed

17 files changed

+555
-317
lines changed

docs/conf.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,9 +349,11 @@
349349
("py:class", "CommandResult"),
350350
("py:class", "CommunicationCallback"),
351351
("py:class", "DynamicProxy"),
352+
("py:class", "IActivityGateway"),
352353
("py:class", "IClientDispatcher"),
353354
("py:class", "IDatabaseDispatcher"),
354355
("py:class", "IDatasetGateway"),
356+
("py:class", "IPlanGateway"),
355357
("py:class", "LocalClient"),
356358
("py:class", "OID_TYPE"),
357359
("py:class", "Path"),

renku/command/status.py

Lines changed: 32 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,95 +18,74 @@
1818
"""Renku ``status`` command."""
1919

2020
from collections import defaultdict
21-
from typing import Dict, Set, Tuple
21+
from pathlib import Path
22+
from typing import Dict, Set
2223

2324
from renku.command.command_builder import inject
2425
from renku.command.command_builder.command import Command
25-
from renku.core.interface.activity_gateway import IActivityGateway
2626
from renku.core.interface.client_dispatcher import IClientDispatcher
27-
from renku.core.util.metadata import filter_overridden_activities, get_modified_activities
2827
from renku.core.util.os import get_relative_path_to_cwd, get_relative_paths
29-
from renku.domain_model.entity import Entity
30-
from renku.domain_model.provenance.activity import Activity
28+
from renku.core.workflow.activity import (
29+
get_all_modified_and_deleted_activities_and_entities,
30+
get_downstream_generating_activities,
31+
is_activity_valid,
32+
)
3133

3234

3335
def get_status_command():
3436
"""Show a status of the repository."""
35-
return Command().command(_get_status).require_migration().require_clean().with_database(write=False)
37+
return Command().command(_get_status).require_migration().with_database(write=False)
3638

3739

3840
@inject.autoparams()
39-
def _get_status(client_dispatcher: IClientDispatcher, activity_gateway: IActivityGateway, paths=None):
40-
def get_dependant_activities_from(start_activity):
41-
"""Return a set of activity and all its downstream activities.
42-
43-
Args:
44-
start_activity: Root activity to start from.
45-
46-
Returns:
47-
Root activity and all its downstream activities.
48-
"""
49-
all_activities = activity_gateway.get_downstream_activities(start_activity)
50-
all_activities.add(start_activity)
51-
return all_activities
52-
41+
def _get_status(ignore_deleted: bool, client_dispatcher: IClientDispatcher, paths=None):
5342
def mark_generations_as_stale(activity):
5443
for generation in activity.generations:
5544
generation_path = get_relative_path_to_cwd(client.path / generation.entity.path)
5645
stale_outputs[generation_path].add(usage_path)
5746

5847
client = client_dispatcher.current_client
5948

60-
paths = paths or []
61-
paths = get_relative_paths(base=client.path, paths=paths)
49+
ignore_deleted = ignore_deleted or client.get_value("renku", "update_ignore_delete")
50+
51+
modified, deleted = get_all_modified_and_deleted_activities_and_entities(client.repository)
6252

63-
modified, deleted = _get_modified_paths(activity_gateway=activity_gateway, repository=client.repository)
53+
modified = {(a, e) for a, e in modified if is_activity_valid(a)}
54+
deleted = {(a, e) for a, e in deleted if is_activity_valid(a)}
6455

6556
if not modified and not deleted:
6657
return None, None, None, None
6758

59+
paths = paths or []
60+
paths = get_relative_paths(base=client.path, paths=[Path.cwd() / p for p in paths])
61+
6862
modified_inputs: Set[str] = set()
6963
stale_outputs: Dict[str, Set[str]] = defaultdict(set)
7064
stale_activities: Dict[str, Set[str]] = defaultdict(set)
7165

7266
for start_activity, entity in modified:
7367
usage_path = get_relative_path_to_cwd(client.path / entity.path)
7468

75-
activities = get_dependant_activities_from(start_activity)
76-
77-
if not paths or entity.path in paths: # add all downstream activities
69+
# NOTE: Add all downstream activities if the modified entity is in paths; otherwise, add only activities that
70+
# chain-generate at least one of the paths
71+
generation_paths = [] if not paths or entity.path in paths else paths
72+
73+
activities = get_downstream_generating_activities(
74+
starting_activities={start_activity},
75+
paths=generation_paths,
76+
ignore_deleted=ignore_deleted,
77+
client_path=client.path,
78+
)
79+
if activities:
7880
modified_inputs.add(usage_path)
81+
7982
for activity in activities:
8083
if len(activity.generations) == 0:
8184
stale_activities[activity.id].add(usage_path)
8285
else:
8386
mark_generations_as_stale(activity)
84-
else:
85-
for activity in activities:
86-
if any(g.entity.path in paths for g in activity.generations):
87-
modified_inputs.add(usage_path)
88-
mark_generations_as_stale(activity)
89-
90-
deleted = {get_relative_path_to_cwd(client.path / d) for d in deleted if not paths or d in paths}
91-
92-
return stale_outputs, stale_activities, modified_inputs, deleted
93-
94-
95-
def _get_modified_paths(activity_gateway, repository) -> Tuple[Set[Tuple[Activity, Entity]], Set[str]]:
96-
"""Get modified and deleted usages/inputs of a list of activities.
97-
98-
Args:
99-
activity_gateway: Activity gateway.
100-
repository: Current ``Repository``.
101-
102-
Returns:
103-
Tuple[Set[Tuple[Activity, Entity]], Set[str]]: Tuple of Activities with their modified paths
104-
and deleted paths.
105-
"""
106-
all_activities = activity_gateway.get_all_activities()
107-
108-
relevant_activities = filter_overridden_activities(all_activities)
10987

110-
modified, deleted = get_modified_activities(activities=relevant_activities, repository=repository)
88+
deleted_paths = {e.path for _, e in deleted}
89+
deleted_paths = {get_relative_path_to_cwd(client.path / d) for d in deleted_paths if not paths or d in paths}
11190

112-
return modified, {e.path for _, e in deleted}
91+
return stale_outputs, stale_activities, modified_inputs, deleted_paths

renku/command/update.py

Lines changed: 21 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,23 @@
1717
# limitations under the License.
1818
"""Renku ``update`` command."""
1919

20-
from collections import defaultdict
2120
from pathlib import Path
22-
from typing import Dict, List, Optional, Set, Tuple
21+
from typing import Optional
2322

2423
from renku.command.command_builder import inject
2524
from renku.command.command_builder.command import Command
2625
from renku.command.workflow import execute_workflow
2726
from renku.core import errors
2827
from renku.core.errors import ParameterError
29-
from renku.core.interface.activity_gateway import IActivityGateway
3028
from renku.core.interface.client_dispatcher import IClientDispatcher
31-
from renku.core.interface.plan_gateway import IPlanGateway
32-
from renku.core.util.metadata import add_activity_if_recent, filter_overridden_activities, get_modified_activities
3329
from renku.core.util.os import get_relative_paths
34-
from renku.core.workflow.activity import sort_activities
30+
from renku.core.workflow.activity import (
31+
get_all_modified_and_deleted_activities_and_entities,
32+
get_downstream_generating_activities,
33+
is_activity_valid,
34+
sort_activities,
35+
)
3536
from renku.core.workflow.concrete_execution_graph import ExecutionGraph
36-
from renku.domain_model.provenance.activity import Activity
37-
from renku.domain_model.workflow.plan import AbstractPlan
3837

3938

4039
def update_command():
@@ -44,10 +43,10 @@ def update_command():
4443

4544
@inject.autoparams()
4645
def _update(
47-
update_all,
48-
dry_run,
46+
update_all: bool,
47+
dry_run: bool,
48+
ignore_deleted: bool,
4949
client_dispatcher: IClientDispatcher,
50-
activity_gateway: IActivityGateway,
5150
provider: str,
5251
config: Optional[str],
5352
paths=None,
@@ -60,10 +59,18 @@ def _update(
6059
client = client_dispatcher.current_client
6160

6261
paths = paths or []
63-
paths = get_relative_paths(base=client.path, paths=paths)
62+
paths = get_relative_paths(base=client.path, paths=[Path.cwd() / p for p in paths])
6463

65-
modified_activities, modified_paths = _get_modified_activities_and_paths(client.repository, activity_gateway)
66-
activities = _get_downstream_activities(modified_activities, activity_gateway, paths)
64+
modified, _ = get_all_modified_and_deleted_activities_and_entities(client.repository)
65+
modified_activities = {a for a, _ in modified if is_activity_valid(a)}
66+
modified_paths = {e.path for _, e in modified}
67+
68+
activities = get_downstream_generating_activities(
69+
starting_activities=modified_activities,
70+
paths=paths,
71+
ignore_deleted=ignore_deleted,
72+
client_path=client.path,
73+
)
6774

6875
if len(activities) == 0:
6976
raise errors.NothingToExecuteError()
@@ -75,110 +82,3 @@ def _update(
7582

7683
graph = ExecutionGraph([a.plan_with_values for a in activities], virtual_links=True)
7784
execute_workflow(dag=graph.workflow_graph, provider=provider, config=config)
78-
79-
80-
@inject.autoparams()
81-
def _is_activity_valid(activity: Activity, plan_gateway: IPlanGateway, client_dispatcher: IClientDispatcher) -> bool:
82-
"""Return whether this plan is current and has not been deleted.
83-
84-
Args:
85-
activity(Activity): The Activity whose Plan should be checked.
86-
plan_gateway(IPlanGateway): The injected Plan gateway.
87-
client_dispatcher(IClientDispatcher): The injected client dispatcher.
88-
89-
Returns:
90-
bool: True if the activities' Plan is still valid, False otherwise.
91-
92-
"""
93-
client = client_dispatcher.current_client
94-
95-
for usage in activity.usages:
96-
if not (client.path / usage.entity.path).exists():
97-
return False
98-
99-
plan = activity.association.plan
100-
101-
if plan.invalidated_at is not None:
102-
return False
103-
104-
# get newest with same name
105-
newest_plan = plan_gateway.get_by_name(plan.name)
106-
107-
if newest_plan is None or newest_plan.invalidated_at is not None:
108-
return False
109-
110-
all_plans = plan_gateway.get_all_plans()
111-
112-
derived: Optional[AbstractPlan] = plan
113-
while derived:
114-
plan = derived
115-
derived = next((p for p in all_plans if p.derived_from is not None and p.derived_from == plan.id), None)
116-
117-
return plan.invalidated_at is None
118-
119-
120-
def _get_modified_activities_and_paths(repository, activity_gateway) -> Tuple[Set[Activity], Set[str]]:
121-
"""Return latest activities that one of their inputs is modified.
122-
123-
Args:
124-
repository: The current ``Repository``.
125-
activity_gateway: The injected Activity gateway.
126-
127-
Returns:
128-
Tuple[Set[Activity],Set[str]]: Tuple of modified activites and modified paths.
129-
130-
"""
131-
all_activities = activity_gateway.get_all_activities()
132-
relevant_activities = filter_overridden_activities(all_activities)
133-
modified, _ = get_modified_activities(activities=list(relevant_activities), repository=repository)
134-
return {a for a, _ in modified if _is_activity_valid(a)}, {e.path for _, e in modified}
135-
136-
137-
def _get_downstream_activities(
138-
starting_activities: Set[Activity], activity_gateway: IActivityGateway, paths: List[str]
139-
) -> List[Activity]:
140-
"""Return activities downstream of passed activities.
141-
142-
Args:
143-
starting_activities(Set[Activity]): Activities to use as starting/upstream nodes.
144-
activity_gateway(IActivityGateway): The injected Activity gateway.
145-
paths(List[str]): Optional gnerated paths to end downstream chains at.
146-
147-
Returns:
148-
Set[Activity]: All activites and their downstream activities.
149-
150-
"""
151-
all_activities: Dict[str, Set[Activity]] = defaultdict(set)
152-
153-
def include_newest_activity(activity):
154-
existing_activities = all_activities[activity.association.plan.id]
155-
add_activity_if_recent(activity=activity, activities=existing_activities)
156-
157-
def does_activity_generate_any_paths(activity):
158-
is_same = any(g.entity.path in paths for g in activity.generations)
159-
is_parent = any(Path(p) in Path(g.entity.path).parents for p in paths for g in activity.generations)
160-
161-
return is_same or is_parent
162-
163-
for activity in starting_activities:
164-
downstream_chains = activity_gateway.get_downstream_activity_chains(activity)
165-
166-
if paths:
167-
# NOTE: Add the activity to check if it also matches the condition
168-
downstream_chains.append((activity,))
169-
downstream_chains = [c for c in downstream_chains if does_activity_generate_any_paths(c[-1])]
170-
171-
# NOTE: Include activity only if any of its downstream match the condition
172-
if downstream_chains:
173-
include_newest_activity(activity)
174-
else:
175-
include_newest_activity(activity)
176-
177-
for chain in downstream_chains:
178-
for activity in chain:
179-
if not _is_activity_valid(activity):
180-
# don't process further downstream activities as the plan in question was deleted
181-
break
182-
include_newest_activity(activity)
183-
184-
return list({a for activities in all_activities.values() for a in activities})

renku/command/workflow.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,13 +661,15 @@ def _visualize_graph(
661661
client = client_dispatcher.current_client
662662

663663
sources = sources or []
664-
sources = get_relative_paths(base=client.path, paths=sources)
664+
sources = get_relative_paths(base=client.path, paths=[Path.cwd() / p for p in sources])
665665

666666
if not targets:
667667
usages = activity_gateway.get_all_usage_paths()
668668
generations = activity_gateway.get_all_generation_paths()
669669

670670
targets = [g for g in generations if all(not are_paths_related(g, u) for u in usages)]
671+
else:
672+
targets = get_relative_paths(base=client.path, paths=[Path.cwd() / p for p in targets])
671673

672674
activities = get_activities_until_paths(
673675
paths=targets,

0 commit comments

Comments
 (0)