Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sdk/python/kfp/dsl/component_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def component(
pip_index_urls: Optional[List[str]] = None,
output_component_file: Optional[str] = None,
install_kfp_package: bool = True,
install_kubeflow_package: bool = True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this an enum value instead? I'm thinking of the three options or similar:

  1. auto
  2. install
  3. skip

Then we can default to auto but allow the user to override (force an installation) in cases where auto doesn't work. I suppose the user could always force it with adding a value to packages_to_install but this makes the behavior clearer.

kfp_package_path: Optional[str] = None,
pip_trusted_hosts: Optional[List[str]] = None,
use_venv: bool = False,
Expand Down Expand Up @@ -88,6 +89,10 @@ def component(
This flag is ignored when ``target_image`` is specified, which implies
a choice to build a containerized component. Containerized components
will always install KFP as part of the build process.
install_kubeflow_package: If True (default), automatically detects kubeflow
imports in the component function and adds 'kubeflow' to packages_to_install.
Set to False if kubeflow is pre-installed in your base image to avoid
duplicate installation.
kfp_package_path: Specifies the location from which to install KFP. By
default, this will try to install from PyPI using the same version
as that used when this component was created. Component authors can
Expand Down Expand Up @@ -156,6 +161,7 @@ def pipeline():
pip_index_urls=pip_index_urls,
output_component_file=output_component_file,
install_kfp_package=install_kfp_package,
install_kubeflow_package=install_kubeflow_package,
kfp_package_path=kfp_package_path,
pip_trusted_hosts=pip_trusted_hosts,
use_venv=use_venv,
Expand All @@ -171,6 +177,7 @@ def pipeline():
pip_index_urls=pip_index_urls,
output_component_file=output_component_file,
install_kfp_package=install_kfp_package,
install_kubeflow_package=install_kubeflow_package,
kfp_package_path=kfp_package_path,
pip_trusted_hosts=pip_trusted_hosts,
use_venv=use_venv,
Expand Down
114 changes: 114 additions & 0 deletions sdk/python/kfp/dsl/component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import base64
import dataclasses
import gzip
Expand Down Expand Up @@ -47,6 +48,97 @@
SINGLE_OUTPUT_NAME = 'Output'


def _detect_kubeflow_imports_in_function(func: Callable) -> bool:
"""Detects if the function imports kubeflow package using AST parsing.

Args:
func: The function to analyze for kubeflow imports.

Returns:
bool: True if any kubeflow import is found, False otherwise.

Detects these import patterns:
- import kubeflow
- import kubeflow.training (any submodule)
- from kubeflow import X
- from kubeflow.submodule import X
"""
try:
# Get function source code
source = inspect.getsource(func)

# Remove leading indentation to handle nested/indented functions
source = textwrap.dedent(source)

# Parse the source into an AST
tree = ast.parse(source)

for node in ast.walk(tree):
if isinstance(node, ast.Import):
# Handle "import kubeflow" or "import kubeflow.submodule"
for alias in node.names:
if alias.name == 'kubeflow' or alias.name.startswith(
'kubeflow.'):
return True
elif isinstance(node, ast.ImportFrom):
# Handle "from kubeflow import X" or "from kubeflow.submodule import X"
if node.module and (node.module == 'kubeflow' or
node.module.startswith('kubeflow.')):
return True

return False

except (OSError, TypeError, SyntaxError, AttributeError):
# Handle cases where:
# - Source is not available (OSError)
# - Function is built-in or C extension (TypeError)
# - Source has syntax errors (SyntaxError)
# - inspect module issues (AttributeError)
return False


def _parse_package_name(package_spec: str) -> str:
"""Extract base package name from package specification.

Args:
package_spec: Package specification like 'kubeflow==2.0.0', 'kubeflow>=1.5.0',
'kubeflow[extras]', 'git+https://github.com/kubeflow/sdk.git', etc.

Returns:
str: The base package name without version/extras/operators.

Examples:
'kubeflow==2.0.0' -> 'kubeflow'
'kubeflow>=1.5.0' -> 'kubeflow'
'kubeflow[extras]' -> 'kubeflow'
'git+https://github.com/kubeflow/sdk.git' -> 'kubeflow' (special case for kubeflow detection)
"""
# Handle VCS URLs (git+https://, git+ssh://, etc.)
if '+' in package_spec and '://' in package_spec:
# Special case: if it's a kubeflow URL, return 'kubeflow' for auto-detection logic
if 'kubeflow' in package_spec.lower():
return 'kubeflow'
# For other VCS URLs, extract package name from URL path
# e.g., 'git+https://github.com/org/package.git' -> 'package'
match = re.search(r'/([^/]+?)(?:\.git)?(?:[#@].*)?$', package_spec)
if match:
return match.group(1)
return package_spec # fallback

# Handle standard package specs: package[extras]==version, package>=version, etc.
package_name = package_spec

# Remove extras in brackets: package[extras] -> package
if '[' in package_name:
package_name = package_name.split('[')[0]

# Remove version operators: package>=1.0 -> package, package==2.0 -> package
package_name = re.split(r'[<>=!~]', package_name)[0]

# Remove any remaining whitespace
return package_name.strip()


@dataclasses.dataclass
class ComponentInfo():
"""A dataclass capturing registered components.
Expand Down Expand Up @@ -152,15 +244,32 @@ def make_pip_install_command(


def _get_packages_to_install_command(
func: Optional[Callable] = None,
kfp_package_path: Optional[str] = None,
pip_index_urls: Optional[List[str]] = None,
packages_to_install: Optional[List[str]] = None,
install_kfp_package: bool = True,
install_kubeflow_package: bool = True,
target_image: Optional[str] = None,
pip_trusted_hosts: Optional[List[str]] = None,
use_venv: bool = False,
) -> List[str]:
packages_to_install = packages_to_install or []

# Auto-detect and add kubeflow if needed
if install_kubeflow_package and func is not None:
detected_kubeflow = _detect_kubeflow_imports_in_function(func)

if detected_kubeflow:
# Parse existing packages to check for kubeflow
existing_package_names = [
_parse_package_name(pkg) for pkg in packages_to_install
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference is to use a regex to specifically detect the presence of kubeflow rather than having generic package name detection.

]

# Only add if not already specified
if 'kubeflow' not in existing_package_names:
packages_to_install.append('kubeflow')

kfp_in_user_pkgs = any(pkg.startswith('kfp') for pkg in packages_to_install)
# if the user doesn't say "don't install", they aren't building a
# container component, and they haven't already specified a KFP dep
Expand Down Expand Up @@ -645,6 +754,7 @@ def create_notebook_component_from_func(
pip_index_urls: Optional[List[str]] = None,
output_component_file: Optional[str] = None,
install_kfp_package: bool = True,
install_kubeflow_package: bool = True,
kfp_package_path: Optional[str] = None,
pip_trusted_hosts: Optional[List[str]] = None,
use_venv: bool = False,
Expand Down Expand Up @@ -709,6 +819,7 @@ def create_notebook_component_from_func(
pip_index_urls=pip_index_urls,
output_component_file=output_component_file,
install_kfp_package=install_kfp_package,
install_kubeflow_package=install_kubeflow_package,
kfp_package_path=kfp_package_path,
pip_trusted_hosts=pip_trusted_hosts,
use_venv=use_venv,
Expand Down Expand Up @@ -738,6 +849,7 @@ def create_component_from_func(
pip_index_urls: Optional[List[str]] = None,
output_component_file: Optional[str] = None,
install_kfp_package: bool = True,
install_kubeflow_package: bool = True,
kfp_package_path: Optional[str] = None,
pip_trusted_hosts: Optional[List[str]] = None,
use_venv: bool = False,
Expand All @@ -752,7 +864,9 @@ def create_component_from_func(
"""

packages_to_install_command = _get_packages_to_install_command(
func=func,
install_kfp_package=install_kfp_package,
install_kubeflow_package=install_kubeflow_package,
target_image=target_image,
kfp_package_path=kfp_package_path,
packages_to_install=packages_to_install,
Expand Down
Loading
Loading