Skip to content

Commit f30d397

Browse files
committed
More code review suggestions
1 parent 16855f8 commit f30d397

File tree

11 files changed

+185
-132
lines changed

11 files changed

+185
-132
lines changed

src/zenml/config/compiler.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -650,17 +650,8 @@ def _compute_pipeline_spec(
650650
)
651651
for output_artifact in pipeline._output_artifacts
652652
]
653-
try:
654-
output_schema = pipeline._compute_output_schema()
655-
except Exception as e:
656-
logger.warning("Failed to compute pipeline output schema: %s", e)
657-
output_schema = None
658-
659-
parameters_model = pipeline.get_parameters_model()
660-
if parameters_model:
661-
input_schema = parameters_model.model_json_schema()
662-
else:
663-
input_schema = None
653+
input_schema = pipeline._compute_input_schema()
654+
output_schema = pipeline._compute_output_schema()
664655

665656
return PipelineSpec(
666657
steps=step_specs,

src/zenml/config/pipeline_spec.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,17 @@ class PipelineSpec(FrozenBaseModel):
4242
# inputs in the step specs refer to the pipeline parameter names
4343
# - 0.4: New Pipeline class, the upstream steps and
4444
# inputs in the step specs refer to the pipeline parameter names
45-
# - 0.5: Adds outputs and output schema
45+
# - 0.5: Adds input schema, outputs and output schema
4646
version: str = "0.5"
4747
source: Optional[SourceWithValidator] = None
4848
parameters: Dict[str, Any] = {}
49-
input_schema: Dict[str, Any] = {}
49+
input_schema: Optional[Dict[str, Any]] = Field(
50+
default=None,
51+
description="JSON schema of the pipeline inputs. This is only set "
52+
"for pipeline specs with version >= 0.5. If the value is None, the "
53+
"schema generation failed, which is most likely because some of the "
54+
"pipeline inputs are not JSON serializable.",
55+
)
5056
steps: List[StepSpec]
5157
outputs: List[OutputSpec] = []
5258
output_schema: Optional[Dict[str, Any]] = Field(

src/zenml/deployers/base_deployer.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,35 @@ def _update_deployment(
163163
DeploymentUpdate.from_operational_state(operational_state),
164164
)
165165

166+
def _check_deployment_inputs_outputs(
167+
self,
168+
snapshot: PipelineSnapshotResponse,
169+
) -> None:
170+
"""Check if the deployment has compiled schemas for the pipeline inputs and outputs.
171+
172+
Args:
173+
snapshot: The pipeline snapshot to check.
174+
175+
Raises:
176+
DeploymentProvisionError: if the deployment has no compiled schemas
177+
for the pipeline inputs and outputs.
178+
"""
179+
if (
180+
not snapshot.pipeline_spec
181+
or not snapshot.pipeline_spec.input_schema
182+
or not snapshot.pipeline_spec.output_schema
183+
):
184+
raise DeploymentProvisionError(
185+
f"The pipeline with name '{snapshot.pipeline.name}' referenced "
186+
f"by the deployment with name or ID "
187+
f"'{snapshot.name or snapshot.id}' "
188+
"is missing the compiled schemas for the pipeline inputs or "
189+
"outputs. This is most likely because some of the pipeline "
190+
"inputs or outputs are not JSON serializable. Please check that "
191+
"all the pipeline input arguments and return values have data "
192+
"types that are JSON serializable."
193+
)
194+
166195
def _check_deployment_deployer(
167196
self,
168197
deployment: DeploymentResponse,
@@ -383,6 +412,8 @@ def provision_deployment(
383412
"already exists"
384413
)
385414

415+
self._check_deployment_inputs_outputs(snapshot)
416+
386417
client = Client()
387418

388419
settings = cast(

src/zenml/deployers/exceptions.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,5 @@ class DeploymentHTTPError(DeployerError):
5959
"""Error raised when an HTTP request to a deployment fails."""
6060

6161

62-
class DeploymentSchemaNotFoundError(KeyError, DeployerError):
63-
"""Error raised when a deployment schema is not found."""
64-
65-
6662
class DeploymentInvalidParametersError(DeployerError):
6763
"""Error raised when the parameters for a deployment are invalid."""

src/zenml/deployers/server/parameters.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,13 @@ def build_params_model_from_snapshot(
6464
logger.error(f"Failed to load pipeline class from snapshot: {e}")
6565
raise RuntimeError(f"Failed to load pipeline class from snapshot: {e}")
6666

67-
model = pipeline_class.get_parameters_model()
67+
model = pipeline_class._compute_input_model()
6868
if not model:
6969
message = (
7070
f"Failed to construct parameters model from pipeline "
7171
f"`{snapshot.pipeline_configuration.name}`."
7272
)
7373
logger.error(message)
7474
raise RuntimeError(message)
75-
else:
76-
logger.debug(message)
7775

7876
return model

src/zenml/deployers/server/service.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -507,29 +507,33 @@ def _build_response(
507507
# ----------
508508

509509
@property
510-
def input_schema(self) -> Optional[Dict[str, Any]]:
511-
"""Return the JSON schema for pipeline input parameters if available.
510+
def input_schema(self) -> Dict[str, Any]:
511+
"""Return the JSON schema for pipeline input parameters.
512512
513513
Returns:
514-
The JSON schema for pipeline parameters if available.
514+
The JSON schema for pipeline parameters.
515515
"""
516-
try:
517-
if self.snapshot.pipeline_spec:
518-
return self.snapshot.pipeline_spec.input_schema
519-
except Exception:
520-
return None
521-
return None
516+
if (
517+
self.snapshot.pipeline_spec
518+
and self.snapshot.pipeline_spec.input_schema
519+
):
520+
return self.snapshot.pipeline_spec.input_schema
521+
# This should never happen, given that we check for this in the
522+
# base deployer.
523+
raise RuntimeError("The pipeline input schema is not available.")
522524

523525
@property
524-
def output_schema(self) -> Optional[Dict[str, Any]]:
525-
"""Return the JSON schema for the deployment response if available.
526+
def output_schema(self) -> Dict[str, Any]:
527+
"""Return the JSON schema for the pipeline outputs.
526528
527529
Returns:
528-
The JSON schema for the deployment response if available.
530+
The JSON schema for the pipeline outputs.
529531
"""
530-
try:
531-
if self.snapshot.pipeline_spec:
532-
return self.snapshot.pipeline_spec.output_schema
533-
except Exception:
534-
return None
535-
return None
532+
if (
533+
self.snapshot.pipeline_spec
534+
and self.snapshot.pipeline_spec.output_schema
535+
):
536+
return self.snapshot.pipeline_spec.output_schema
537+
# This should never happen, given that we check for this in the
538+
# base deployer.
539+
raise RuntimeError("The pipeline output schema is not available.")

src/zenml/deployers/utils.py

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
DeploymentHTTPError,
2626
DeploymentNotFoundError,
2727
DeploymentProvisionError,
28-
DeploymentSchemaNotFoundError,
2928
)
3029
from zenml.enums import DeploymentStatus
3130
from zenml.models import DeploymentResponse
@@ -45,25 +44,18 @@ def get_deployment_input_schema(
4544
The schema for the deployment's input parameters.
4645
4746
Raises:
48-
DeploymentSchemaNotFoundError: If the deployment has no associated
49-
snapshot, pipeline spec, or parameters schema.
47+
RuntimeError: If the deployment has no associated input schema.
5048
"""
51-
if not deployment.snapshot:
52-
raise DeploymentSchemaNotFoundError(
53-
f"Deployment {deployment.name} has no associated snapshot."
54-
)
55-
56-
if not deployment.snapshot.pipeline_spec:
57-
raise DeploymentSchemaNotFoundError(
58-
f"Deployment {deployment.name} has no associated pipeline spec."
59-
)
49+
if (
50+
deployment.snapshot
51+
and deployment.snapshot.pipeline_spec
52+
and deployment.snapshot.pipeline_spec.input_schema
53+
):
54+
return deployment.snapshot.pipeline_spec.input_schema
6055

61-
if not deployment.snapshot.pipeline_spec.input_schema:
62-
raise DeploymentSchemaNotFoundError(
63-
f"Deployment {deployment.name} has no associated parameters schema."
64-
)
65-
66-
return deployment.snapshot.pipeline_spec.input_schema
56+
raise RuntimeError(
57+
f"Deployment {deployment.name} has no associated input schema."
58+
)
6759

6860

6961
def get_deployment_output_schema(
@@ -78,25 +70,18 @@ def get_deployment_output_schema(
7870
The schema for the deployment's output parameters.
7971
8072
Raises:
81-
DeploymentSchemaNotFoundError: If the deployment has no associated
82-
snapshot, pipeline spec, or output schema.
73+
RuntimeError: If the deployment has no associated output schema.
8374
"""
84-
if not deployment.snapshot:
85-
raise DeploymentSchemaNotFoundError(
86-
f"Deployment {deployment.name} has no associated snapshot."
87-
)
88-
89-
if not deployment.snapshot.pipeline_spec:
90-
raise DeploymentSchemaNotFoundError(
91-
f"Deployment {deployment.name} has no associated pipeline spec."
92-
)
93-
94-
if not deployment.snapshot.pipeline_spec.output_schema:
95-
raise DeploymentSchemaNotFoundError(
96-
f"Deployment {deployment.name} has no associated output schema."
97-
)
98-
99-
return deployment.snapshot.pipeline_spec.output_schema
75+
if (
76+
deployment.snapshot
77+
and deployment.snapshot.pipeline_spec
78+
and deployment.snapshot.pipeline_spec.output_schema
79+
):
80+
return deployment.snapshot.pipeline_spec.output_schema
81+
82+
raise RuntimeError(
83+
f"Deployment {deployment.name} has no associated output schema."
84+
)
10085

10186

10287
def get_deployment_invocation_example(

src/zenml/integrations/aws/deployers/aws_deployer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,11 @@ def _select_aws_cpu_memory_combination(
10971097
Returns:
10981098
Tuple of (cpu, memory) that best matches requirements, in AWS App
10991099
Runner format.
1100+
1101+
Raises:
1102+
ValueError: If the requested resource requirements cannot be matched
1103+
to any of the supported combinations for the AWS App Runner
1104+
service and strict_resource_matching is True.
11001105
"""
11011106
if requested_cpu is None and requested_memory_gb is None:
11021107
return f"{DEFAULT_CPU:g} vCPU", f"{DEFAULT_MEMORY:g} GB"

src/zenml/models/v2/core/deployment.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ class DeploymentUpdate(BaseUpdate):
100100
status: Optional[str] = Field(
101101
default=None,
102102
title="The new status of the deployment.",
103+
description="Possible values are: "
104+
f"{', '.join(DeploymentStatus.values())}",
103105
)
104106
deployment_metadata: Optional[Dict[str, Any]] = Field(
105107
default=None,
@@ -143,7 +145,8 @@ class DeploymentResponseBody(ProjectScopedResponseBody):
143145
status: Optional[str] = Field(
144146
default=None,
145147
title="The status of the deployment.",
146-
description="Current operational status of the deployment.",
148+
description="Current operational status of the deployment. Possible "
149+
f"values are: {', '.join(DeploymentStatus.values())}",
147150
)
148151

149152

0 commit comments

Comments
 (0)