Skip to content

Commit a5efb43

Browse files
committed
Merge branch 'feature/served-pipelines' of https://github.com/zenml-io/zenml into feature/served-pipelines
2 parents 67cba90 + e74e8af commit a5efb43

40 files changed

+893
-367
lines changed

docs/book/getting-started/core-concepts.md

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ As seen in the image, a step might use the outputs from a previous step and thus
6060

6161
Pipelines and steps are defined in code using Python _decorators_ or _classes_. This is where the core business logic and value of your work live, and you will spend most of your time defining these two things.
6262

63-
Even though pipelines are simple Python functions, you are only allowed to call steps within this function. The inputs for steps called within a pipeline can either be the outputs of previous steps or alternatively, you can pass in values directly (as long as they're JSON-serializable).
63+
Even though pipelines are simple Python functions, you are only allowed to call steps within this function. The inputs for steps called within a pipeline can either be the outputs of previous steps or alternatively, you can pass in values directly or map them onto pipeline parameters (as long as they're JSON-serializable). Similarly, you can return values from a pipeline that are step outputs as long as they are JSON-serializable.
6464

6565
```python
6666
from zenml import pipeline
@@ -71,19 +71,19 @@ def my_pipeline():
7171
step_2(input_one="hello", input_two=output_step_one)
7272

7373
@pipeline
74-
def agent_evaluation_pipeline():
74+
def agent_evaluation_pipeline(query: str = "What is machine learning?") -> str:
7575
"""An AI agent evaluation pipeline."""
7676
prompt = "You are a helpful assistant. Please answer: {query}"
77-
test_query = "What is machine learning?"
78-
evaluation_result = evaluate_agent_response(prompt, test_query)
77+
evaluation_result = evaluate_agent_response(prompt, query)
78+
return evaluation_result
7979
```
8080

8181
Executing the Pipeline is as easy as calling the function that you decorated with the `@pipeline` decorator.
8282

8383
```python
8484
if __name__ == "__main__":
8585
my_pipeline()
86-
agent_evaluation_pipeline()
86+
agent_evaluation_pipeline(query="What is an LLM?")
8787
```
8888

8989
#### Artifacts
@@ -118,9 +118,11 @@ Once you have implemented your workflow by using the concepts described above, y
118118

119119
#### Stacks & Components
120120

121-
When you want to execute a pipeline run with ZenML, **Stacks** come into play. A **Stack** is a collection of **stack components**, where each component represents the respective configuration regarding a particular function in your MLOps pipeline, such as orchestration systems, artifact repositories, and model deployment platforms.
121+
When you want to execute a pipeline run with ZenML, **Stacks** come into play. A **Stack** is a collection of **stack components**, where each component represents the respective configuration regarding a particular function in your MLOps pipeline, such as pipeline orchestration or deployment systems, artifact repositories and container registries.
122122

123-
For instance, if you take a close look at the default local stack of ZenML, you will see two components that are **required** in every stack in ZenML, namely an _orchestrator_ and an _artifact store_.
123+
Pipelines can be executed in two ways: in **batch mode** (traditional execution through an orchestrator) or in **online mode** (long-running HTTP servers that can be invoked via REST API calls). Deploying pipelines for online mode execution allows you to serve your ML workflows as real-time endpoints, making them accessible for live inference and interactive use cases.
124+
125+
For instance, if you take a close look at the default local stack of ZenML, you will see two components that are **required** in every stack in ZenML, namely an _orchestrator_ and an _artifact store_. Additional components like _deployers_ can be added to enable specific functionality such as deploying pipelines as HTTP endpoints.
124126

125127
![ZenML running code on the Local Stack.](../.gitbook/assets/02_pipeline_local_stack.png)
126128

@@ -130,16 +132,30 @@ Keep in mind that each one of these components is built on top of base abstracti
130132

131133
#### Orchestrator
132134

133-
An **Orchestrator** is a workhorse that coordinates all the steps to run in a pipeline. Since pipelines can be set up with complex combinations of steps with various asynchronous dependencies between them, the orchestrator acts as the component that decides what steps to run and when to run them.
135+
An **Orchestrator** is a workhorse that coordinates all the steps to run in a pipeline in batch mode. Since pipelines can be set up with complex combinations of steps with various asynchronous dependencies between them, the orchestrator acts as the component that decides what steps to run and when to run them.
134136

135137
ZenML comes with a default _local orchestrator_ designed to run on your local machine. This is useful, especially during the exploration phase of your project. You don't have to rent a cloud instance just to try out basic things.
136138

139+
#### Deployer
140+
141+
A **Deployer** is a stack component that manages the deployment of pipelines as long-running HTTP servers useful for online mode execution. Unlike orchestrators that execute pipelines in batch mode, deployers can create and manage persistent services that wrap your pipeline in a web application, usually containerized, allowing it to be invoked through HTTP requests.
142+
143+
ZenML comes with a _Docker deployer_ that can run deployments on your local machine as Docker containers, making it easy to test and develop real-time pipeline endpoints before moving to production infrastructure.
144+
145+
#### Pipeline Run
146+
147+
A **Pipeline Run** is a record of a pipeline execution. When you run a pipeline using an orchestrator, a pipeline run is created tracking information about the execution such as the status, the artifacts and metadata produced by the pipeline and all its steps. When a pipeline is deployed for online mode execution, a pipeline run is similarly created for every HTTP request made to it.
148+
137149
#### Artifact Store
138150

139151
An **Artifact Store** is a component that houses all data that passes through the pipeline as inputs and outputs. Each artifact that gets stored in the artifact store is tracked and versioned and this allows for extremely useful features like data caching, which speeds up your workflows.
140152

141153
Similar to the orchestrator, ZenML comes with a default _local artifact store_ designed to run on your local machine. This is useful, especially during the exploration phase of your project. You don't have to set up a cloud storage system to try out basic things.
142154

155+
#### Deployment
156+
157+
A **Deployment** is a running instance of a pipeline deployed as an HTTP endpoint. When you deploy a pipeline using a deployer, it becomes a long-running service that can be invoked through REST API calls. Each HTTP request to a deployment triggers a new pipeline run, creating the same artifacts and metadata tracking as traditional batch pipeline executions. This enables real-time inference, interactive ML workflows, and seamless integration with web applications and external services.
158+
143159
#### Flavor
144160

145161
ZenML provides a dedicated base abstraction for each stack component type. These abstractions are used to develop solutions, called **Flavors**, tailored to specific use cases/tools. With ZenML installed, you get access to a variety of built-in and integrated Flavors for each component type, but users can also leverage the base abstractions to create their own custom flavors.

src/zenml/client.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3393,6 +3393,8 @@ def list_snapshots(
33933393
schedule_id: Optional[Union[str, UUID]] = None,
33943394
source_snapshot_id: Optional[Union[str, UUID]] = None,
33953395
runnable: Optional[bool] = None,
3396+
deployable: Optional[bool] = None,
3397+
deployed: Optional[bool] = None,
33963398
tag: Optional[str] = None,
33973399
tags: Optional[List[str]] = None,
33983400
hydrate: bool = False,
@@ -3418,6 +3420,8 @@ def list_snapshots(
34183420
schedule_id: The ID of the schedule to filter by.
34193421
source_snapshot_id: The ID of the source snapshot to filter by.
34203422
runnable: Whether the snapshot is runnable.
3423+
deployable: Whether the snapshot is deployable.
3424+
deployed: Whether the snapshot is deployed.
34213425
tag: Filter by tag.
34223426
tags: Filter by tags.
34233427
hydrate: Flag deciding whether to hydrate the output model(s)
@@ -3444,6 +3448,8 @@ def list_snapshots(
34443448
schedule_id=schedule_id,
34453449
source_snapshot_id=source_snapshot_id,
34463450
runnable=runnable,
3451+
deployable=deployable,
3452+
deployed=deployed,
34473453
tag=tag,
34483454
tags=tags,
34493455
)
@@ -3745,6 +3751,9 @@ def list_deployments(
37453751
status: Optional[DeploymentStatus] = None,
37463752
url: Optional[str] = None,
37473753
user: Optional[Union[UUID, str]] = None,
3754+
pipeline: Optional[Union[UUID, str]] = None,
3755+
tag: Optional[str] = None,
3756+
tags: Optional[List[str]] = None,
37483757
hydrate: bool = False,
37493758
) -> Page[DeploymentResponse]:
37503759
"""List deployments.
@@ -3764,6 +3773,9 @@ def list_deployments(
37643773
status: The status of the deployment to filter by.
37653774
url: The url of the deployment to filter by.
37663775
user: Filter by user name/ID.
3776+
pipeline: Filter by pipeline name/ID.
3777+
tag: Tag to filter by.
3778+
tags: Tags to filter by.
37673779
hydrate: Flag deciding whether to hydrate the output model(s)
37683780
by including metadata fields in the response.
37693781
@@ -3786,6 +3798,9 @@ def list_deployments(
37863798
deployer_id=deployer_id,
37873799
status=status,
37883800
url=url,
3801+
pipeline=pipeline,
3802+
tag=tag,
3803+
tags=tags,
37893804
),
37903805
hydrate=hydrate,
37913806
)
@@ -4632,6 +4647,7 @@ def list_pipeline_runs(
46324647
hydrate: bool = False,
46334648
include_full_metadata: bool = False,
46344649
triggered_by_step_run_id: Optional[Union[UUID, str]] = None,
4650+
triggered_by_deployment_id: Optional[Union[UUID, str]] = None,
46354651
) -> Page[PipelineRunResponse]:
46364652
"""List all pipeline runs.
46374653
@@ -4678,6 +4694,8 @@ def list_pipeline_runs(
46784694
the response.
46794695
triggered_by_step_run_id: The ID of the step run that triggered
46804696
the pipeline run.
4697+
triggered_by_deployment_id: The ID of the deployment that triggered
4698+
the pipeline run.
46814699
46824700
Returns:
46834701
A page with Pipeline Runs fitting the filter description
@@ -4719,6 +4737,7 @@ def list_pipeline_runs(
47194737
in_progress=in_progress,
47204738
templatable=templatable,
47214739
triggered_by_step_run_id=triggered_by_step_run_id,
4740+
triggered_by_deployment_id=triggered_by_deployment_id,
47224741
)
47234742
return self.zen_store.list_runs(
47244743
runs_filter_model=runs_filter_model,

src/zenml/deployers/base_deployer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def _check_deployment_inputs_outputs(
174174
175175
Raises:
176176
DeploymentProvisionError: if the deployment has no compiled schemas
177-
for the pipeline inputs and outputs.
177+
for the pipeline inputs and outputs.
178178
"""
179179
if (
180180
not snapshot.pipeline_spec

src/zenml/deployers/containerized_deployer.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
Set,
2020
)
2121

22+
import zenml
2223
from zenml.config.build_configuration import BuildConfiguration
24+
from zenml.config.global_config import GlobalConfiguration
2325
from zenml.constants import (
2426
DEPLOYER_DOCKER_IMAGE_KEY,
2527
)
@@ -69,6 +71,12 @@ def requirements(self) -> Set[str]:
6971
"""
7072
requirements = super().requirements
7173
requirements.update(self.CONTAINER_REQUIREMENTS)
74+
75+
if self.config.is_local and GlobalConfiguration().uses_sql_store:
76+
# If we're directly connected to a DB, we need to install the
77+
# `local` extra in the Docker image to include the DB dependencies.
78+
requirements.add(f"'zenml[local]=={zenml.__version__}'")
79+
7280
return requirements
7381

7482
def get_docker_builds(

src/zenml/deployers/docker/docker_deployer.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,10 @@
5454
)
5555
from zenml.deployers.server.entrypoint_configuration import (
5656
AUTH_KEY_OPTION,
57+
DEPLOYMENT_ID_OPTION,
5758
PORT_OPTION,
5859
DeploymentEntrypointConfiguration,
5960
)
60-
from zenml.entrypoints.base_entrypoint_configuration import (
61-
SNAPSHOT_ID_OPTION,
62-
)
6361
from zenml.enums import DeploymentStatus, StackComponentType
6462
from zenml.logger import get_logger
6563
from zenml.models import (
@@ -251,7 +249,6 @@ def _get_container_operational_state(
251249
state.url = "http://localhost"
252250
if metadata.port:
253251
state.url += f":{metadata.port}"
254-
# TODO: check if the deployment is healthy.
255252

256253
return state
257254

@@ -304,7 +301,7 @@ def do_provision_deployment(
304301
entrypoint = DeploymentEntrypointConfiguration.get_entrypoint_command()
305302

306303
entrypoint_kwargs = {
307-
SNAPSHOT_ID_OPTION: snapshot.id,
304+
DEPLOYMENT_ID_OPTION: deployment.id,
308305
PORT_OPTION: 8000,
309306
}
310307
if deployment.auth_key:

src/zenml/deployers/server/app.py

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from fastapi.middleware.cors import CORSMiddleware
2828
from fastapi.responses import HTMLResponse, JSONResponse
2929
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
30-
from pydantic import BaseModel
3130

3231
from zenml.deployers.server.models import (
3332
ExecutionMetrics,
@@ -65,24 +64,17 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
6564
# Startup
6665
logger.info("🚀 Starting ZenML Pipeline Serving service...")
6766

68-
snapshot_id = os.getenv("ZENML_SNAPSHOT_ID")
69-
if not snapshot_id:
70-
raise ValueError("ZENML_SNAPSHOT_ID environment variable is required")
67+
deployment_id = os.getenv("ZENML_DEPLOYMENT_ID")
68+
if not deployment_id:
69+
raise ValueError(
70+
"ZENML_DEPLOYMENT_ID environment variable is required"
71+
)
7172

7273
try:
7374
global _service
74-
_service = PipelineDeploymentService(snapshot_id)
75+
_service = PipelineDeploymentService(deployment_id)
7576
_service.initialize()
76-
# params model is available.
77-
try:
78-
params_model = _service.params_model
79-
if isinstance(params_model, type) and issubclass(
80-
params_model, BaseModel
81-
):
82-
app.include_router(_build_invoke_router(_service))
83-
except Exception:
84-
# Skip router installation if parameter model is not ready
85-
pass
77+
app.include_router(_build_invoke_router(_service))
8678
logger.info("✅ Pipeline deployment service initialized successfully")
8779
except Exception as e:
8880
logger.error(f"❌ Failed to initialize: {e}")
@@ -107,7 +99,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
10799

108100
# Create FastAPI application with OpenAPI security scheme
109101
app = FastAPI(
110-
title="ZenML Pipeline Deployment",
102+
title=f"ZenML Pipeline Deployment {os.getenv('ZENML_DEPLOYMENT_ID')}",
111103
description="deploy ZenML pipelines as FastAPI endpoints",
112104
version="0.2.0",
113105
lifespan=lifespan,
@@ -346,8 +338,8 @@ def runtime_error_handler(request: Request, exc: RuntimeError) -> JSONResponse:
346338

347339
parser = argparse.ArgumentParser()
348340
parser.add_argument(
349-
"--snapshot_id",
350-
default=os.getenv("ZENML_SNAPSHOT_ID"),
341+
"--deployment_id",
342+
default=os.getenv("ZENML_DEPLOYMENT_ID"),
351343
help="Pipeline snapshot ID",
352344
)
353345
parser.add_argument(
@@ -371,8 +363,8 @@ def runtime_error_handler(request: Request, exc: RuntimeError) -> JSONResponse:
371363
)
372364
args = parser.parse_args()
373365

374-
if args.snapshot_id:
375-
os.environ["ZENML_SNAPSHOT_ID"] = args.snapshot_id
366+
if args.deployment_id:
367+
os.environ["ZENML_DEPLOYMENT_ID"] = args.deployment_id
376368
if args.auth_key:
377369
os.environ["ZENML_DEPLOYMENT_AUTH_KEY"] = args.auth_key
378370

0 commit comments

Comments
 (0)