Skip to content

Commit c6153aa

Browse files
committed
WIP
1 parent 240fb64 commit c6153aa

21 files changed

+1397
-18
lines changed

src/zenml/config/compiler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ def compile(
153153

154154
snapshot = PipelineSnapshotBase(
155155
run_name_template=run_name,
156+
is_dynamic=pipeline.is_dynamic,
156157
pipeline_configuration=pipeline.configuration,
157158
step_configurations=steps,
158159
client_environment=get_run_environment_dict(),
@@ -634,7 +635,7 @@ def _compute_pipeline_spec(
634635
Raises:
635636
ValueError: If the pipeline has no steps.
636637
"""
637-
if not step_specs:
638+
if not step_specs and not pipeline.is_dynamic:
638639
raise ValueError(
639640
f"Pipeline '{pipeline.name}' cannot be compiled because it has "
640641
f"no steps. Please make sure that your steps are decorated "

src/zenml/config/step_configurations.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,13 @@ class StepConfigurationUpdate(FrozenBaseModel):
214214
default=None,
215215
description="The cache policy for the step.",
216216
)
217+
in_process: Optional[bool] = Field(
218+
default=None,
219+
description="Whether to run the step in process. This is only "
220+
"applicable for dynamic pipelines. If not set, the step will by "
221+
"default run in-process unless it requires a different Docker image "
222+
"or has special resource requirements.",
223+
)
217224

218225
outputs: Mapping[str, PartialArtifactConfiguration] = {}
219226

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py

Lines changed: 285 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
from zenml.config.base_settings import BaseSettings
5050
from zenml.constants import (
5151
METADATA_ORCHESTRATOR_RUN_ID,
52+
ORCHESTRATOR_DOCKER_IMAGE_KEY,
5253
)
5354
from zenml.enums import ExecutionMode, ExecutionStatus, StackComponentType
5455
from zenml.integrations.kubernetes.constants import (
@@ -80,6 +81,7 @@
8081
from zenml.stack import StackValidator
8182

8283
if TYPE_CHECKING:
84+
from zenml.config.step_run_info import StepRunInfo
8385
from zenml.models import (
8486
PipelineRunResponse,
8587
PipelineSnapshotBase,
@@ -110,7 +112,10 @@ def should_build_pipeline_image(
110112
settings = cast(
111113
KubernetesOrchestratorSettings, self.get_settings(snapshot)
112114
)
113-
return settings.always_build_pipeline_image
115+
if settings.always_build_pipeline_image:
116+
return True
117+
else:
118+
return super().should_build_pipeline_image(snapshot)
114119

115120
def get_kube_client(
116121
self, incluster: Optional[bool] = None
@@ -446,8 +451,6 @@ def submit_pipeline(
446451
KubernetesOrchestratorSettings, self.get_settings(snapshot)
447452
)
448453

449-
assert stack.container_registry
450-
451454
# Get Docker image for the orchestrator pod
452455
try:
453456
image = self.get_image(snapshot=snapshot)
@@ -656,6 +659,285 @@ def _wait_for_run_to_finish() -> None:
656659
)
657660
return None
658661

662+
def submit_dynamic_pipeline(
663+
self,
664+
snapshot: "PipelineSnapshotResponse",
665+
stack: "Stack",
666+
environment: Dict[str, str],
667+
placeholder_run: Optional["PipelineRunResponse"] = None,
668+
) -> Optional[SubmissionResult]:
669+
"""Submits a dynamic pipeline to the orchestrator."""
670+
from zenml.pipelines.dynamic.entrypoint_configuration import (
671+
DynamicPipelineEntrypointConfiguration,
672+
)
673+
674+
pipeline_name = snapshot.pipeline_configuration.name
675+
settings = cast(
676+
KubernetesOrchestratorSettings, self.get_settings(snapshot)
677+
)
678+
image = self.get_image(snapshot=snapshot)
679+
680+
command = (
681+
DynamicPipelineEntrypointConfiguration.get_entrypoint_command()
682+
)
683+
args = DynamicPipelineEntrypointConfiguration.get_entrypoint_arguments(
684+
snapshot_id=snapshot.id,
685+
run_id=placeholder_run.id if placeholder_run else None,
686+
)
687+
688+
# Authorize pod to run Kubernetes commands inside the cluster.
689+
service_account_name = self._get_service_account_name(settings)
690+
691+
# We set some default minimum resource requests for the orchestrator pod
692+
# here if the user has not specified any, because the orchestrator pod
693+
# takes up some memory resources itself and, if not specified, the pod
694+
# will be scheduled on any node regardless of available memory and risk
695+
# negatively impacting or even crashing the node due to memory pressure.
696+
orchestrator_pod_settings = kube_utils.apply_default_resource_requests(
697+
memory="400Mi",
698+
cpu="100m",
699+
pod_settings=settings.orchestrator_pod_settings,
700+
)
701+
702+
if self.config.pass_zenml_token_as_secret:
703+
secret_name = self.get_token_secret_name(snapshot.id)
704+
token = environment.pop("ZENML_STORE_API_TOKEN")
705+
kube_utils.create_or_update_secret(
706+
core_api=self._k8s_core_api,
707+
namespace=self.config.kubernetes_namespace,
708+
secret_name=secret_name,
709+
data={KUBERNETES_SECRET_TOKEN_KEY_NAME: token},
710+
)
711+
orchestrator_pod_settings.env.append(
712+
{
713+
"name": "ZENML_STORE_API_TOKEN",
714+
"valueFrom": {
715+
"secretKeyRef": {
716+
"name": secret_name,
717+
"key": KUBERNETES_SECRET_TOKEN_KEY_NAME,
718+
}
719+
},
720+
}
721+
)
722+
723+
orchestrator_pod_labels = {
724+
"pipeline": kube_utils.sanitize_label(pipeline_name),
725+
}
726+
727+
if placeholder_run:
728+
orchestrator_pod_labels["run_id"] = kube_utils.sanitize_label(
729+
str(placeholder_run.id)
730+
)
731+
orchestrator_pod_labels["run_name"] = kube_utils.sanitize_label(
732+
placeholder_run.name
733+
)
734+
735+
pod_manifest = build_pod_manifest(
736+
pod_name=None,
737+
image_name=image,
738+
command=command,
739+
args=args,
740+
privileged=False,
741+
pod_settings=orchestrator_pod_settings,
742+
service_account_name=service_account_name,
743+
env=environment,
744+
labels=orchestrator_pod_labels,
745+
mount_local_stores=self.config.is_local,
746+
termination_grace_period_seconds=settings.pod_stop_grace_period,
747+
)
748+
749+
pod_failure_policy = settings.pod_failure_policy or {
750+
# These rules are applied sequentially. This means any failure in
751+
# the main container will count towards the max retries. Any other
752+
# disruption will not count towards the max retries.
753+
"rules": [
754+
# If the main container fails, we count it towards the max
755+
# retries.
756+
{
757+
"action": "Count",
758+
"onExitCodes": {
759+
"containerName": "main",
760+
"operator": "NotIn",
761+
"values": [0],
762+
},
763+
},
764+
# If the pod is interrupted at any other time, we don't count
765+
# it as a retry
766+
{
767+
"action": "Ignore",
768+
"onPodConditions": [
769+
{
770+
"type": "DisruptionTarget",
771+
"status": "True",
772+
}
773+
],
774+
},
775+
]
776+
}
777+
778+
job_name = settings.job_name_prefix or ""
779+
random_prefix = "".join(random.choices("0123456789abcdef", k=8))
780+
job_name += f"-{random_prefix}-{snapshot.pipeline_configuration.name}"
781+
# The job name will be used as a label on the pods, so we need to make
782+
# sure it doesn't exceed the label length limit
783+
job_name = kube_utils.sanitize_label(job_name)
784+
785+
job_manifest = build_job_manifest(
786+
job_name=job_name,
787+
pod_template=pod_template_manifest_from_pod(pod_manifest),
788+
backoff_limit=settings.orchestrator_job_backoff_limit,
789+
ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
790+
active_deadline_seconds=settings.active_deadline_seconds,
791+
pod_failure_policy=pod_failure_policy,
792+
labels=orchestrator_pod_labels,
793+
annotations={
794+
ORCHESTRATOR_ANNOTATION_KEY: str(self.id),
795+
},
796+
)
797+
798+
if snapshot.schedule:
799+
raise RuntimeError("Dynamic pipelines cannot be scheduled yet.")
800+
else:
801+
try:
802+
kube_utils.create_job(
803+
batch_api=self._k8s_batch_api,
804+
namespace=self.config.kubernetes_namespace,
805+
job_manifest=job_manifest,
806+
)
807+
except Exception as e:
808+
if self.config.pass_zenml_token_as_secret:
809+
secret_name = self.get_token_secret_name(snapshot.id)
810+
try:
811+
kube_utils.delete_secret(
812+
core_api=self._k8s_core_api,
813+
namespace=self.config.kubernetes_namespace,
814+
secret_name=secret_name,
815+
)
816+
except Exception as cleanup_error:
817+
logger.error(
818+
"Error cleaning up secret %s: %s",
819+
secret_name,
820+
cleanup_error,
821+
)
822+
raise e
823+
824+
if settings.synchronous:
825+
826+
def _wait_for_run_to_finish() -> None:
827+
logger.info("Waiting for orchestrator job to finish...")
828+
kube_utils.wait_for_job_to_finish(
829+
batch_api=self._k8s_batch_api,
830+
core_api=self._k8s_core_api,
831+
namespace=self.config.kubernetes_namespace,
832+
job_name=job_name,
833+
backoff_interval=settings.job_monitoring_interval,
834+
fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons,
835+
stream_logs=True,
836+
)
837+
838+
return SubmissionResult(
839+
wait_for_completion=_wait_for_run_to_finish,
840+
)
841+
else:
842+
logger.info(
843+
f"Orchestrator job `{job_name}` started. "
844+
f"Run the following command to inspect the logs: "
845+
f"`kubectl -n {self.config.kubernetes_namespace} logs "
846+
f"job/{job_name}`"
847+
)
848+
return None
849+
850+
def run_dynamic_out_of_process_step(
851+
self, step_run_info: "StepRunInfo", environment: Dict[str, str]
852+
) -> None:
853+
from zenml.step_operators.step_operator_entrypoint_configuration import (
854+
StepOperatorEntrypointConfiguration,
855+
)
856+
857+
settings = cast(
858+
KubernetesOrchestratorSettings, self.get_settings(step_run_info)
859+
)
860+
image_name = step_run_info.get_image(key=ORCHESTRATOR_DOCKER_IMAGE_KEY)
861+
command = StepOperatorEntrypointConfiguration.get_entrypoint_command()
862+
args = StepOperatorEntrypointConfiguration.get_entrypoint_arguments(
863+
step_name=step_run_info.pipeline_step_name,
864+
snapshot_id=step_run_info.snapshot_id,
865+
step_run_id=str(step_run_info.step_run_id),
866+
)
867+
868+
step_labels = {
869+
"run_id": kube_utils.sanitize_label(str(step_run_info.run_id)),
870+
"run_name": kube_utils.sanitize_label(str(step_run_info.run_name)),
871+
"pipeline": kube_utils.sanitize_label(step_run_info.pipeline.name),
872+
"step_name": kube_utils.sanitize_label(
873+
step_run_info.pipeline_step_name
874+
),
875+
}
876+
step_annotations = {
877+
STEP_NAME_ANNOTATION_KEY: step_run_info.pipeline_step_name,
878+
}
879+
880+
# We set some default minimum memory resource requests for the step pod
881+
# here if the user has not specified any, because the step pod takes up
882+
# some memory resources itself and, if not specified, the pod will be
883+
# scheduled on any node regardless of available memory and risk
884+
# negatively impacting or even crashing the node due to memory pressure.
885+
pod_settings = kube_utils.apply_default_resource_requests(
886+
memory="400Mi",
887+
pod_settings=settings.pod_settings,
888+
)
889+
890+
pod_manifest = build_pod_manifest(
891+
pod_name=None,
892+
image_name=image_name,
893+
command=command,
894+
args=args,
895+
env=environment,
896+
privileged=settings.privileged,
897+
pod_settings=pod_settings,
898+
service_account_name=settings.service_account_name,
899+
labels=step_labels,
900+
)
901+
902+
job_name = settings.job_name_prefix or ""
903+
random_prefix = "".join(random.choices("0123456789abcdef", k=8))
904+
job_name += f"-{random_prefix}-{step_run_info.pipeline_step_name}-{step_run_info.pipeline.name}"
905+
# The job name will be used as a label on the pods, so we need to make
906+
# sure it doesn't exceed the label length limit
907+
job_name = kube_utils.sanitize_label(job_name)
908+
909+
job_manifest = build_job_manifest(
910+
job_name=job_name,
911+
pod_template=pod_template_manifest_from_pod(pod_manifest),
912+
# The orchestrator already handles retries, so we don't need to
913+
# retry the step operator job.
914+
backoff_limit=0,
915+
ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
916+
active_deadline_seconds=settings.active_deadline_seconds,
917+
labels=step_labels,
918+
annotations=step_annotations,
919+
)
920+
921+
kube_utils.create_job(
922+
batch_api=self._k8s_batch_api,
923+
namespace=self.config.kubernetes_namespace,
924+
job_manifest=job_manifest,
925+
)
926+
927+
logger.info(
928+
"Waiting for job `%s` to finish...",
929+
job_name,
930+
)
931+
kube_utils.wait_for_job_to_finish(
932+
batch_api=self._k8s_batch_api,
933+
core_api=self._k8s_core_api,
934+
namespace=self.config.kubernetes_namespace,
935+
job_name=job_name,
936+
fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons,
937+
stream_logs=True,
938+
)
939+
logger.info("Job completed.")
940+
659941
def _get_service_account_name(
660942
self, settings: KubernetesOrchestratorSettings
661943
) -> str:

0 commit comments

Comments
 (0)