Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 122 additions & 113 deletions sdk/python/kfp/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,168 +73,172 @@

# hack: constants and custom type generics have to be defined here to be captured by autodoc and autodocsumm used in ./docs/conf.py
PIPELINE_JOB_NAME_PLACEHOLDER = '{{$.pipeline_job_name}}'
"""A placeholder used to obtain a pipeline job name within a task at pipeline runtime.
In Kubeflow Pipelines, this maps to the pipeline run display name.

Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Job name:',
value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER,
)
"""A placeholder used to obtain a pipeline job name within a task at pipeline
runtime. In Kubeflow Pipelines, this maps to the pipeline run display name.

Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Job name:',
value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER,
)
"""

PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER = '{{$.pipeline_job_resource_name}}'
"""A placeholder used to obtain a pipeline job resource name within a task at pipeline runtime.
In Kubeflow Pipelines, this maps to the pipeline run name in the underlying pipeline engine (e.g. an Argo Workflow
object name).

Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Job resource name:',
value=dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER,
)
"""A placeholder used to obtain a pipeline job resource name within a task at
pipeline runtime. In Kubeflow Pipelines, this maps to the pipeline run name in
the underlying pipeline engine (e.g. an Argo Workflow object name).

Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Job resource name:',
value=dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER,
)
"""

PIPELINE_JOB_ID_PLACEHOLDER = '{{$.pipeline_job_uuid}}'
"""A placeholder used to obtain a pipeline job ID within a task at pipeline runtime.
In Kubeflow Pipelines, this maps to the pipeline run UUID.

Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Job ID:',
value=dsl.PIPELINE_JOB_ID_PLACEHOLDER,
)
"""A placeholder used to obtain a pipeline job ID within a task at pipeline
runtime. In Kubeflow Pipelines, this maps to the pipeline run UUID.

Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Job ID:',
value=dsl.PIPELINE_JOB_ID_PLACEHOLDER,
)
"""

PIPELINE_TASK_NAME_PLACEHOLDER = '{{$.pipeline_task_name}}'
"""A placeholder used to obtain a task name within a task at pipeline runtime.
In Kubeflow Pipelines, this maps to the component name.
In Kubeflow Pipelines, this maps to the component name.

Example:
::
Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Task name:',
value=dsl.PIPELINE_TASK_NAME_PLACEHOLDER,
)
@dsl.pipeline
def my_pipeline():
print_op(
msg='Task name:',
value=dsl.PIPELINE_TASK_NAME_PLACEHOLDER,
)
"""

PIPELINE_TASK_ID_PLACEHOLDER = '{{$.pipeline_task_uuid}}'
"""A placeholder used to obtain a task ID within a task at pipeline runtime.
In Kubeflow Pipelines, this maps to the component's ML Metadata (MLMD) execution ID.

Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Task ID:',
value=dsl.PIPELINE_TASK_ID_PLACEHOLDER,
)
"""A placeholder used to obtain a task ID within a task at pipeline runtime. In
Kubeflow Pipelines, this maps to the component's ML Metadata (MLMD) execution
ID.

Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Task ID:',
value=dsl.PIPELINE_TASK_ID_PLACEHOLDER,
)
"""

PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER = '{{$.outputs.output_file}}'
"""A placeholder used to obtain the path to the executor_output.json file within the task container.
"""A placeholder used to obtain the path to the executor_output.json file
within the task container.

Example:
::
Example:
::

@dsl.pipeline
def my_pipeline():
create_artifact_with_metadata(
metadata={'foo': 'bar'},
executor_output_destination=dsl.PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER,
)
@dsl.pipeline
def my_pipeline():
create_artifact_with_metadata(
metadata={'foo': 'bar'},
executor_output_destination=dsl.PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER,
)
"""

PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER = '{{$}}'
"""A placeholder used to obtain executor input message passed to the task.

Example:
::
Example:
::

@dsl.pipeline
def my_pipeline():
custom_container_op(
executor_input=dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER,
)
@dsl.pipeline
def my_pipeline():
custom_container_op(
executor_input=dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER,
)
"""

PIPELINE_ROOT_PLACEHOLDER = '{{$.pipeline_root}}'
"""A placeholder used to obtain the pipeline root.

Example:
::
Example:
::

@dsl.pipeline
def my_pipeline():
store_model(
tmp_dir=dsl.PIPELINE_ROOT_PLACEHOLDER+'/tmp',
)
@dsl.pipeline
def my_pipeline():
store_model(
tmp_dir=dsl.PIPELINE_ROOT_PLACEHOLDER+'/tmp',
)
"""

PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER = '{{$.pipeline_job_create_time_utc}}'
"""A placeholder used to obtain the time that a pipeline job was created.

Example:
::
Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Job created at:',
value=dsl.PIPELINE_JOB_CREATE_TIME_UTC,
)
@dsl.pipeline
def my_pipeline():
print_op(
msg='Job created at:',
value=dsl.PIPELINE_JOB_CREATE_TIME_UTC,
)
"""
PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER = '{{$.pipeline_job_schedule_time_utc}}'
"""A placeholder used to obtain the time for which a pipeline job is scheduled.

Example:
::
Example:
::

@dsl.pipeline
def my_pipeline():
print_op(
msg='Job scheduled at:',
value=dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC,
)
@dsl.pipeline
def my_pipeline():
print_op(
msg='Job scheduled at:',
value=dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC,
)
"""

WORKSPACE_PATH_PLACEHOLDER = '{{$.workspace_path}}'
"""A placeholder used to obtain the path to the shared workspace within a component.

Example:
::

@dsl.pipeline(
pipeline_config=dsl.PipelineConfig(
workspace=dsl.WorkspaceConfig(size='100Gi'),
),
"""A placeholder used to obtain the path to the shared workspace within a
component.

Example:
::

@dsl.pipeline(
pipeline_config=dsl.PipelineConfig(
workspace=dsl.WorkspaceConfig(size='100Gi'),
),
)
def my_pipeline():
clone_repo_task = clone_repo(
workspacePath=dsl.WORKSPACE_PATH_PLACEHOLDER, repo='https://github.com/example/repo',
)
def my_pipeline():
clone_repo_task = clone_repo(
workspacePath=dsl.WORKSPACE_PATH_PLACEHOLDER, repo='https://github.com/example/repo',
)
"""

T = TypeVar('T')
Input = Annotated[T, InputAnnotation]
"""Type generic used to represent an input artifact of type ``T``, where ``T`` is an artifact class.
"""Type generic used to represent an input artifact of type ``T``, where ``T``
is an artifact class.

Use ``Input[Artifact]`` or ``Output[Artifact]`` to indicate whether the enclosed artifact is a component input or output.

Expand Down Expand Up @@ -262,7 +266,10 @@ def my_pipeline():
Output = Annotated[T, OutputAnnotation]
# Runtime-only input for accessing embedded artifacts extracted at runtime.
EmbeddedInput = Annotated[T, EmbeddedAnnotation]
"""A type generic used to represent an output artifact of type ``T``, where ``T`` is an artifact class. The argument typed with this annotation is provided at runtime by the executing backend and does not need to be passed as an input by the pipeline author (see example).
"""A type generic used to represent an output artifact of type ``T``, where
``T`` is an artifact class. The argument typed with this annotation is provided
at runtime by the executing backend and does not need to be passed as an input
by the pipeline author (see example).

Use ``Input[Artifact]`` or ``Output[Artifact]`` to indicate whether the enclosed artifact is a component input or output.

Expand Down Expand Up @@ -290,6 +297,7 @@ def my_pipeline():
# compile-time only dependencies
if os.environ.get('_KFP_RUNTIME', 'false') != 'true':
from kfp.dsl.component_decorator import component
from kfp.dsl.component_decorator import KubeflowPackageInstallMode
from kfp.dsl.component_task_config import TaskConfigField
from kfp.dsl.component_task_config import TaskConfigPassthrough
from kfp.dsl.container_component_decorator import container_component
Expand Down Expand Up @@ -318,5 +326,6 @@ def my_pipeline():
'ExitHandler', 'ParallelFor', 'Collected', 'IfPresentPlaceholder',
'ConcatPlaceholder', 'PipelineTask', 'PipelineConfig',
'WorkspaceConfig', 'KubernetesWorkspaceConfig', 'TaskConfigField',
'TaskConfigPassthrough', 'notebook_component'
'TaskConfigPassthrough', 'notebook_component',
'KubeflowPackageInstallMode'
])
25 changes: 25 additions & 0 deletions sdk/python/kfp/dsl/component_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import enum
import functools
from typing import Callable, List, Optional, Union
import warnings
Expand All @@ -21,6 +22,19 @@
from kfp.dsl.component_task_config import TaskConfigPassthrough


class KubeflowPackageInstallMode(str, enum.Enum):
"""Installation mode for the Kubeflow SDK package in components.

Attributes:
AUTO: Automatically detect and install if kubeflow is imported in the component function.
INSTALL: Always install the kubeflow package regardless of usage.
SKIP: Never install the kubeflow package, even if detected in the component function.
"""
AUTO = 'auto'
INSTALL = 'install'
SKIP = 'skip'


def component(
func: Optional[Callable] = None,
*,
Expand All @@ -30,6 +44,8 @@ def component(
pip_index_urls: Optional[List[str]] = None,
output_component_file: Optional[str] = None,
install_kfp_package: bool = True,
install_kubeflow_package:
KubeflowPackageInstallMode = KubeflowPackageInstallMode.AUTO,
kfp_package_path: Optional[str] = None,
pip_trusted_hosts: Optional[List[str]] = None,
use_venv: bool = False,
Expand Down Expand Up @@ -88,6 +104,13 @@ def component(
This flag is ignored when ``target_image`` is specified, which implies
a choice to build a containerized component. Containerized components
will always install KFP as part of the build process.
install_kubeflow_package: Controls whether the Kubeflow SDK is installed.
Can be KubeflowPackageInstallMode.AUTO (default), KubeflowPackageInstallMode.INSTALL,
or KubeflowPackageInstallMode.SKIP.
- AUTO: Detects kubeflow imports in the component function via AST parsing
and automatically adds 'kubeflow' to packages_to_install if detected.
- INSTALL: Always installs the kubeflow package regardless of whether it's used.
- SKIP: Never installs kubeflow, even if detected (useful if pre-installed in base_image).
kfp_package_path: Specifies the location from which to install KFP. By
default, this will try to install from PyPI using the same version
as that used when this component was created. Component authors can
Expand Down Expand Up @@ -156,6 +179,7 @@ def pipeline():
pip_index_urls=pip_index_urls,
output_component_file=output_component_file,
install_kfp_package=install_kfp_package,
install_kubeflow_package=install_kubeflow_package,
kfp_package_path=kfp_package_path,
pip_trusted_hosts=pip_trusted_hosts,
use_venv=use_venv,
Expand All @@ -171,6 +195,7 @@ def pipeline():
pip_index_urls=pip_index_urls,
output_component_file=output_component_file,
install_kfp_package=install_kfp_package,
install_kubeflow_package=install_kubeflow_package,
kfp_package_path=kfp_package_path,
pip_trusted_hosts=pip_trusted_hosts,
use_venv=use_venv,
Expand Down
Loading