Skip to content

Commit a1017c0

Browse files
committed
Merge branch 'feature/served-pipelines' of https://github.com/zenml-io/zenml into feature/served-pipelines
2 parents aba28cf + b41700d commit a1017c0

File tree

19 files changed

+576
-458
lines changed

19 files changed

+576
-458
lines changed

pyproject.toml

Lines changed: 119 additions & 273 deletions
Large diffs are not rendered by default.

src/zenml/cli/pipeline.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1147,7 +1147,6 @@ def deploy_snapshot(
11471147
pipeline_name_or_id: The name or ID of the pipeline.
11481148
deployment_name_or_id: Name or ID of the deployment to use for the
11491149
pipeline.
1150-
config_path: Path to pipeline configuration file.
11511150
update: If True, update the deployment with the same name if it
11521151
already exists.
11531152
overtake: If True, update the deployment with the same name if

src/zenml/config/compiler.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -650,17 +650,8 @@ def _compute_pipeline_spec(
650650
)
651651
for output_artifact in pipeline._output_artifacts
652652
]
653-
try:
654-
output_schema = pipeline._compute_output_schema()
655-
except Exception as e:
656-
logger.warning("Failed to compute pipeline output schema: %s", e)
657-
output_schema = None
658-
659-
parameters_model = pipeline.get_parameters_model()
660-
if parameters_model:
661-
input_schema = parameters_model.model_json_schema()
662-
else:
663-
input_schema = None
653+
input_schema = pipeline._compute_input_schema()
654+
output_schema = pipeline._compute_output_schema()
664655

665656
return PipelineSpec(
666657
steps=step_specs,

src/zenml/config/pipeline_spec.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,17 @@ class PipelineSpec(FrozenBaseModel):
4242
# inputs in the step specs refer to the pipeline parameter names
4343
# - 0.4: New Pipeline class, the upstream steps and
4444
# inputs in the step specs refer to the pipeline parameter names
45-
# - 0.5: Adds outputs and output schema
45+
# - 0.5: Adds input schema, outputs and output schema
4646
version: str = "0.5"
4747
source: Optional[SourceWithValidator] = None
4848
parameters: Dict[str, Any] = {}
49-
input_schema: Dict[str, Any] = {}
49+
input_schema: Optional[Dict[str, Any]] = Field(
50+
default=None,
51+
description="JSON schema of the pipeline inputs. This is only set "
52+
"for pipeline specs with version >= 0.5. If the value is None, the "
53+
"schema generation failed, which is most likely because some of the "
54+
"pipeline inputs are not JSON serializable.",
55+
)
5056
steps: List[StepSpec]
5157
outputs: List[OutputSpec] = []
5258
output_schema: Optional[Dict[str, Any]] = Field(

src/zenml/config/resource_settings.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,13 @@
1616
from enum import Enum
1717
from typing import Literal, Optional, Union
1818

19-
from pydantic import Field, NonNegativeInt, PositiveFloat, PositiveInt
20-
from pydantic_settings import SettingsConfigDict
19+
from pydantic import (
20+
ConfigDict,
21+
Field,
22+
NonNegativeInt,
23+
PositiveFloat,
24+
PositiveInt,
25+
)
2126

2227
from zenml.config.base_settings import BaseSettings
2328

@@ -178,7 +183,7 @@ def get_memory(
178183
# Should never happen due to the regex validation
179184
raise ValueError(f"Unable to parse memory unit from '{memory}'.")
180185

181-
model_config = SettingsConfigDict(
186+
model_config = ConfigDict(
182187
# public attributes are immutable
183188
frozen=True,
184189
# prevent extra attributes during model initialization

src/zenml/deployers/base_deployer.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,35 @@ def _update_deployment(
163163
DeploymentUpdate.from_operational_state(operational_state),
164164
)
165165

166+
def _check_deployment_inputs_outputs(
167+
self,
168+
snapshot: PipelineSnapshotResponse,
169+
) -> None:
170+
"""Check if the deployment has compiled schemas for the pipeline inputs and outputs.
171+
172+
Args:
173+
snapshot: The pipeline snapshot to check.
174+
175+
Raises:
176+
DeploymentProvisionError: if the deployment has no compiled schemas
177+
for the pipeline inputs and outputs.
178+
"""
179+
if (
180+
not snapshot.pipeline_spec
181+
or not snapshot.pipeline_spec.input_schema
182+
or not snapshot.pipeline_spec.output_schema
183+
):
184+
raise DeploymentProvisionError(
185+
f"The pipeline with name '{snapshot.pipeline.name}' referenced "
186+
f"by the deployment with name or ID "
187+
f"'{snapshot.name or snapshot.id}' "
188+
"is missing the compiled schemas for the pipeline inputs or "
189+
"outputs. This is most likely because some of the pipeline "
190+
"inputs or outputs are not JSON serializable. Please check that "
191+
"all the pipeline input arguments and return values have data "
192+
"types that are JSON serializable."
193+
)
194+
166195
def _check_deployment_deployer(
167196
self,
168197
deployment: DeploymentResponse,
@@ -383,6 +412,8 @@ def provision_deployment(
383412
"already exists"
384413
)
385414

415+
self._check_deployment_inputs_outputs(snapshot)
416+
386417
client = Client()
387418

388419
settings = cast(

src/zenml/deployers/docker/docker_deployer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,8 +414,11 @@ def do_provision_deployment(
414414
auto_remove=False,
415415
ports=ports,
416416
labels={
417-
"zenml-deployment-uuid": str(deployment.id),
417+
"zenml-deployment-id": str(deployment.id),
418418
"zenml-deployment-name": deployment.name,
419+
"zenml-deployer-name": str(self.name),
420+
"zenml-deployer-id": str(self.id),
421+
"managed-by": "zenml",
419422
},
420423
extra_hosts=extra_hosts,
421424
**run_args,

src/zenml/deployers/exceptions.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,5 @@ class DeploymentHTTPError(DeployerError):
5959
"""Error raised when an HTTP request to a deployment fails."""
6060

6161

62-
class DeploymentSchemaNotFoundError(KeyError, DeployerError):
63-
"""Error raised when a deployment schema is not found."""
64-
65-
6662
class DeploymentInvalidParametersError(DeployerError):
6763
"""Error raised when the parameters for a deployment are invalid."""

src/zenml/deployers/server/parameters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def build_params_model_from_snapshot(
6363
logger.error(f"Failed to load pipeline class from snapshot: {e}")
6464
raise RuntimeError(f"Failed to load pipeline class from snapshot: {e}")
6565

66-
model = pipeline_class.get_parameters_model()
66+
model = pipeline_class._compute_input_model()
6767
if not model:
6868
raise RuntimeError(
6969
f"Failed to construct parameters model from pipeline `{snapshot.pipeline_configuration.name}`."

src/zenml/deployers/server/service.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def initialize(self) -> None:
137137

138138
# Build parameter model
139139
self._params_model = build_params_model_from_snapshot(
140-
snapshot=self.snapshot,
140+
self.snapshot, strict=True
141141
)
142142

143143
# Initialize orchestrator
@@ -504,29 +504,33 @@ def _build_response(
504504
# ----------
505505

506506
@property
507-
def input_schema(self) -> Optional[Dict[str, Any]]:
508-
"""Return the JSON schema for pipeline input parameters if available.
507+
def input_schema(self) -> Dict[str, Any]:
508+
"""Return the JSON schema for pipeline input parameters.
509509
510510
Returns:
511-
The JSON schema for pipeline parameters if available.
511+
The JSON schema for pipeline parameters.
512512
"""
513-
try:
514-
if self.snapshot.pipeline_spec:
515-
return self.snapshot.pipeline_spec.input_schema
516-
except Exception:
517-
return None
518-
return None
513+
if (
514+
self.snapshot.pipeline_spec
515+
and self.snapshot.pipeline_spec.input_schema
516+
):
517+
return self.snapshot.pipeline_spec.input_schema
518+
# This should never happen, given that we check for this in the
519+
# base deployer.
520+
raise RuntimeError("The pipeline input schema is not available.")
519521

520522
@property
521-
def output_schema(self) -> Optional[Dict[str, Any]]:
522-
"""Return the JSON schema for the deployment response if available.
523+
def output_schema(self) -> Dict[str, Any]:
524+
"""Return the JSON schema for the pipeline outputs.
523525
524526
Returns:
525-
The JSON schema for the deployment response if available.
527+
The JSON schema for the pipeline outputs.
526528
"""
527-
try:
528-
if self.snapshot.pipeline_spec:
529-
return self.snapshot.pipeline_spec.output_schema
530-
except Exception:
531-
return None
532-
return None
529+
if (
530+
self.snapshot.pipeline_spec
531+
and self.snapshot.pipeline_spec.output_schema
532+
):
533+
return self.snapshot.pipeline_spec.output_schema
534+
# This should never happen, given that we check for this in the
535+
# base deployer.
536+
raise RuntimeError("The pipeline output schema is not available.")

0 commit comments

Comments
 (0)