Skip to content

Commit e82964c

Browse files
committed
Misc fixes
1 parent 60e9358 commit e82964c

File tree

4 files changed

+89
-3
lines changed

4 files changed

+89
-3
lines changed

src/zenml/entrypoints/step_entrypoint_configuration.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,20 @@ def load_snapshot(self) -> "PipelineSnapshotResponse":
161161
snapshot_id=snapshot_id, step_configuration_filter=[step_name]
162162
)
163163

164+
def get_step_configuration(
165+
self, snapshot: "PipelineSnapshotResponse", step_name: str
166+
) -> "Step":
167+
"""Gets the step configuration.
168+
169+
Args:
170+
snapshot: The snapshot.
171+
step_name: The name of the step.
172+
173+
Returns:
174+
The step configuration.
175+
"""
176+
return snapshot.step_configurations[step_name]
177+
164178
def run(self) -> None:
165179
"""Prepares the environment and runs the configured step."""
166180
snapshot = self.load_snapshot()
@@ -188,7 +202,9 @@ def run(self) -> None:
188202

189203
pipeline_name = snapshot.pipeline_configuration.name
190204

191-
step = snapshot.step_configurations[step_name]
205+
step = self.get_step_configuration(
206+
snapshot=snapshot, step_name=step_name
207+
)
192208
self._run_step(step, snapshot=snapshot)
193209

194210
self.post_run(

src/zenml/step_operators/step_operator_entrypoint_configuration.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# permissions and limitations under the License.
1414
"""Abstract base class for entrypoint configurations that run a single step."""
1515

16-
from typing import TYPE_CHECKING, Any, Dict, List
16+
from typing import TYPE_CHECKING, Any, Dict, List, Optional
1717
from uuid import UUID
1818

1919
from zenml.client import Client
@@ -64,6 +64,60 @@ def get_entrypoint_arguments(
6464
kwargs[STEP_RUN_ID_OPTION],
6565
]
6666

67+
def _should_download_code(
68+
self,
69+
snapshot: "PipelineSnapshotResponse",
70+
step_name: Optional[str] = None,
71+
) -> bool:
72+
"""Checks whether code should be downloaded.
73+
74+
Args:
75+
snapshot: The snapshot to check.
76+
step_name: Name of the step to be run. This will be used to
77+
determine whether code download is necessary. If not given,
78+
the DockerSettings of the pipeline will be used to make that
79+
decision instead.
80+
81+
Returns:
82+
Whether code should be downloaded.
83+
"""
84+
step_run_id = UUID(self.entrypoint_args[STEP_RUN_ID_OPTION])
85+
step_run = Client().zen_store.get_run_step(step_run_id)
86+
87+
docker_settings = step_run.config.docker_settings
88+
89+
if (
90+
snapshot.code_reference
91+
and docker_settings.allow_download_from_code_repository
92+
):
93+
return True
94+
95+
if (
96+
snapshot.code_path
97+
and docker_settings.allow_download_from_artifact_store
98+
):
99+
return True
100+
101+
return False
102+
103+
def get_step_configuration(
104+
self, snapshot: "PipelineSnapshotResponse", step_name: str
105+
) -> "Step":
106+
"""Gets the step configuration.
107+
108+
Args:
109+
snapshot: The snapshot.
110+
step_name: The name of the step.
111+
112+
Returns:
113+
The step configuration.
114+
"""
115+
step_run_id = UUID(self.entrypoint_args[STEP_RUN_ID_OPTION])
116+
step_run = Client().zen_store.get_run_step(step_run_id)
117+
from zenml.config.step_configurations import Step
118+
119+
return Step(spec=step_run.spec, config=step_run.config)
120+
67121
def _run_step(
68122
self,
69123
step: "Step",

src/zenml/steps/base_step.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ def __init__(
125125
retry: Optional[StepRetryConfig] = None,
126126
substitutions: Optional[Dict[str, str]] = None,
127127
cache_policy: Optional[CachePolicyOrString] = None,
128+
in_process: Optional[bool] = None,
128129
) -> None:
129130
"""Initializes a step.
130131
@@ -158,6 +159,8 @@ def __init__(
158159
retry: Configuration for retrying the step in case of failure.
159160
substitutions: Extra placeholders to use in the name template.
160161
cache_policy: Cache policy for this step.
162+
in_process: Whether to run the step in process. This is only
163+
applicable for dynamic pipelines.
161164
"""
162165
from zenml.config.step_configurations import PartialStepConfiguration
163166

@@ -222,6 +225,7 @@ def __init__(
222225
retry=retry,
223226
substitutions=substitutions,
224227
cache_policy=cache_policy,
228+
in_process=in_process,
225229
)
226230

227231
notebook_utils.try_to_save_notebook_cell_code(self.source_object)
@@ -661,6 +665,7 @@ def configure(
661665
retry: Optional[StepRetryConfig] = None,
662666
substitutions: Optional[Dict[str, str]] = None,
663667
cache_policy: Optional[CachePolicyOrString] = None,
668+
in_process: Optional[bool] = None,
664669
merge: bool = True,
665670
) -> T:
666671
"""Configures the step.
@@ -704,6 +709,8 @@ def configure(
704709
retry: Configuration for retrying the step in case of failure.
705710
substitutions: Extra placeholders to use in the name template.
706711
cache_policy: Cache policy for this step.
712+
in_process: Whether to run the step in process. This is only
713+
applicable for dynamic pipelines.
707714
merge: If `True`, will merge the given dictionary configurations
708715
like `parameters` and `settings` with existing
709716
configurations. If `False` the given configurations will
@@ -782,6 +789,7 @@ def _convert_to_tuple(value: Any) -> Tuple[Source, ...]:
782789
"retry": retry,
783790
"substitutions": substitutions,
784791
"cache_policy": cache_policy,
792+
"in_process": in_process,
785793
}
786794
)
787795
config = StepConfigurationUpdate(**values)
@@ -810,6 +818,7 @@ def with_options(
810818
retry: Optional[StepRetryConfig] = None,
811819
substitutions: Optional[Dict[str, str]] = None,
812820
cache_policy: Optional[CachePolicyOrString] = None,
821+
in_process: Optional[bool] = None,
813822
merge: bool = True,
814823
) -> "BaseStep":
815824
"""Copies the step and applies the given configurations.
@@ -843,6 +852,8 @@ def with_options(
843852
retry: Configuration for retrying the step in case of failure.
844853
substitutions: Extra placeholders for the step name.
845854
cache_policy: Cache policy for this step.
855+
in_process: Whether to run the step in process. This is only
856+
applicable for dynamic pipelines.
846857
merge: If `True`, will merge the given dictionary configurations
847858
like `parameters` and `settings` with existing
848859
configurations. If `False` the given configurations will
@@ -872,6 +883,7 @@ def with_options(
872883
retry=retry,
873884
substitutions=substitutions,
874885
cache_policy=cache_policy,
886+
in_process=in_process,
875887
merge=merge,
876888
)
877889
return step_copy

src/zenml/steps/step_decorator.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def step(
8080
retry: Optional["StepRetryConfig"] = None,
8181
substitutions: Optional[Dict[str, str]] = None,
8282
cache_policy: Optional["CachePolicyOrString"] = None,
83+
in_process: Optional[bool] = None,
8384
) -> Callable[["F"], "BaseStep"]: ...
8485

8586

@@ -104,6 +105,7 @@ def step(
104105
retry: Optional["StepRetryConfig"] = None,
105106
substitutions: Optional[Dict[str, str]] = None,
106107
cache_policy: Optional["CachePolicyOrString"] = None,
108+
in_process: Optional[bool] = None,
107109
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
108110
"""Decorator to create a ZenML step.
109111
@@ -139,7 +141,8 @@ def step(
139141
retry: configuration of step retry in case of step failure.
140142
substitutions: Extra placeholders for the step name.
141143
cache_policy: Cache policy for this step.
142-
144+
in_process: Whether to run the step in process. This is only
145+
applicable for dynamic pipelines.
143146
Returns:
144147
The step instance.
145148
"""
@@ -176,6 +179,7 @@ def inner_decorator(func: "F") -> "BaseStep":
176179
retry=retry,
177180
substitutions=substitutions,
178181
cache_policy=cache_policy,
182+
in_process=in_process,
179183
)
180184

181185
return step_instance

0 commit comments

Comments
 (0)