Skip to content

Commit 38d7eec

Browse files
committed
Refactor and adjusted for better doce design
1 parent eeaac76 commit 38d7eec

File tree

4 files changed

+222
-152
lines changed

4 files changed

+222
-152
lines changed

src/zenml/container_registries/base_container_registry.py

Lines changed: 13 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -236,77 +236,19 @@ def get_image_repo_digest(self, image_name: str) -> Optional[str]:
236236

237237
return cast(str, metadata.id.split(":")[-1])
238238

239-
def get_kubernetes_image_pull_secret_data(
240-
self,
241-
) -> Optional[Tuple[str, str, str]]:
242-
"""Get container registry credentials for Kubernetes imagePullSecrets.
239+
@property
240+
def registry_server_uri(self) -> str:
241+
"""Get the normalized registry server URI.
243242
244-
This method only returns credentials when running with a Kubernetes-based
245-
orchestrator (kubernetes, kubeflow, etc.). For local orchestrators, it
246-
returns None since imagePullSecrets are not needed.
243+
This property returns a normalized registry server URI suitable for
244+
authentication and API access. Subclasses can override this property
245+
to provide registry-specific normalization logic.
247246
248247
Returns:
249-
Tuple of (registry_uri, username, password) if credentials are available
250-
and running with a Kubernetes orchestrator, None otherwise. The
251-
registry_uri is normalized for use in Kubernetes imagePullSecrets.
248+
The normalized registry server URI.
252249
"""
253-
from zenml.client import Client
254-
from zenml.logger import get_logger
255-
256-
logger = get_logger(__name__)
257-
258-
# Check if we're using a Kubernetes-based orchestrator
259-
try:
260-
stack = Client().active_stack
261-
orchestrator_flavor = stack.orchestrator.flavor
262-
263-
# List of orchestrator flavors that use Kubernetes and need imagePullSecrets
264-
kubernetes_orchestrators = {
265-
"kubernetes",
266-
"kubeflow",
267-
"vertex",
268-
"sagemaker",
269-
"tekton",
270-
"airflow", # when running on Kubernetes
271-
}
272-
273-
if orchestrator_flavor not in kubernetes_orchestrators:
274-
logger.debug(
275-
f"Skipping ImagePullSecret generation for non-Kubernetes orchestrator: {orchestrator_flavor}"
276-
)
277-
return None
278-
279-
# Additional check for Kubernetes orchestrator with local flag
280-
if orchestrator_flavor == "kubernetes" and hasattr(
281-
stack.orchestrator.config, "local"
282-
):
283-
if stack.orchestrator.config.local:
284-
logger.debug(
285-
"Skipping ImagePullSecret generation for local Kubernetes orchestrator"
286-
)
287-
return None
288-
289-
except Exception as e:
290-
logger.debug(
291-
f"Could not determine orchestrator type: {e}. Proceeding with ImagePullSecret generation."
292-
)
293-
294-
logger.debug(
295-
f"Getting ImagePullSecret data for registry: {self.config.uri}"
296-
)
297-
298-
credentials = self.credentials
299-
if not credentials:
300-
logger.debug("No credentials found for container registry")
301-
return None
302-
303-
username, password = credentials
304250
registry_uri = self.config.uri
305251

306-
logger.debug(
307-
f"Found credentials - username: {username[:3]}***, registry_uri: {registry_uri}"
308-
)
309-
310252
# Check if there's a service connector with a different registry setting
311253
connector = self.get_connector()
312254
if connector:
@@ -319,35 +261,21 @@ def get_kubernetes_image_pull_secret_data(
319261
and connector.config.registry
320262
):
321263
# Use the service connector's registry setting
322-
original_registry_uri = registry_uri
323264
registry_uri = connector.config.registry
324-
logger.debug(
325-
f"Service connector override: {original_registry_uri} -> {registry_uri}"
326-
)
327265

328266
# Normalize registry URI for consistency
329-
original_registry_uri = registry_uri
330267
if registry_uri.startswith("https://"):
331268
registry_uri = registry_uri[8:]
332269
elif registry_uri.startswith("http://"):
333270
registry_uri = registry_uri[7:]
334271

335-
if original_registry_uri != registry_uri:
336-
logger.debug(
337-
f"Normalized registry URI: {original_registry_uri} -> {registry_uri}"
338-
)
272+
# For generic registries, extract just the domain part for better image matching
273+
if not registry_uri.startswith(("http://", "https://")):
274+
domain = registry_uri.split("/")[0]
275+
return f"https://{domain}"
276+
277+
return registry_uri
339278

340-
# Validate the final result
341-
if not registry_uri or not username or not password:
342-
logger.warning(
343-
f"Invalid ImagePullSecret data: registry_uri='{registry_uri}', username='{username}', password_length={len(password) if password else 0}"
344-
)
345-
return None
346-
347-
logger.debug(
348-
f"Returning ImagePullSecret data: registry='{registry_uri}', username='{username[:3]}***'"
349-
)
350-
return registry_uri, username, password
351279

352280

353281
class BaseContainerRegistryFlavor(Flavor):

src/zenml/container_registries/dockerhub_container_registry.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,34 @@
1313
# permissions and limitations under the License.
1414
"""Implementation of a DockerHub Container Registry class."""
1515

16-
from typing import Optional
16+
from typing import Optional, Type
1717

1818
from zenml.constants import DOCKER_REGISTRY_RESOURCE_TYPE
1919
from zenml.container_registries.base_container_registry import (
20+
BaseContainerRegistry,
2021
BaseContainerRegistryFlavor,
2122
)
2223
from zenml.enums import ContainerRegistryFlavor
2324
from zenml.models import ServiceConnectorRequirements
2425

2526

27+
class DockerHubContainerRegistry(BaseContainerRegistry):
28+
"""Container registry implementation for DockerHub."""
29+
30+
@property
31+
def registry_server_uri(self) -> str:
32+
"""Get the DockerHub registry server URI.
33+
34+
DockerHub requires authentication against the specific
35+
'https://index.docker.io/v1/' endpoint regardless of how
36+
the registry URI is configured.
37+
38+
Returns:
39+
The DockerHub registry server URI.
40+
"""
41+
return "https://index.docker.io/v1/"
42+
43+
2644
class DockerHubContainerRegistryFlavor(BaseContainerRegistryFlavor):
2745
"""Class for DockerHub Container Registry."""
2846

@@ -80,3 +98,12 @@ def logo_url(self) -> str:
8098
The flavor logo.
8199
"""
82100
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/container_registry/docker.png"
101+
102+
@property
103+
def implementation_class(self) -> Type[DockerHubContainerRegistry]:
104+
"""Implementation class for DockerHub container registry.
105+
106+
Returns:
107+
The DockerHub container registry implementation class.
108+
"""
109+
return DockerHubContainerRegistry

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py

Lines changed: 121 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,12 @@ def submit_pipeline(
518518
"schedule. Use `Schedule(cron_expression=...)` instead."
519519
)
520520
cron_expression = deployment.schedule.cron_expression
521+
522+
# Prepare dependencies for pod creation
523+
registry_credentials, local_stores_path = self._prepare_pod_dependencies(
524+
settings, stack
525+
)
526+
521527
cron_job_manifest, secret_manifests = build_cron_job_manifest(
522528
cron_expression=cron_expression,
523529
run_name=orchestrator_run_name,
@@ -535,18 +541,14 @@ def submit_pipeline(
535541
failed_jobs_history_limit=settings.failed_jobs_history_limit,
536542
ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
537543
namespace=self.config.kubernetes_namespace,
538-
container_registry=stack.container_registry,
539544
auto_generate_image_pull_secrets=settings.auto_generate_image_pull_secrets,
540545
core_api=self._k8s_core_api,
546+
registry_credentials=registry_credentials,
547+
local_stores_path=local_stores_path,
541548
)
542549

543550
# Create imagePullSecrets first
544-
create_image_pull_secrets_from_manifests(
545-
secret_manifests=secret_manifests,
546-
core_api=self._k8s_core_api,
547-
namespace=self.config.kubernetes_namespace,
548-
reuse_existing=False, # Orchestrator creates/updates all secrets
549-
)
551+
self._create_image_pull_secrets(secret_manifests)
550552

551553
self._k8s_batch_api.create_namespaced_cron_job(
552554
body=cron_job_manifest,
@@ -558,6 +560,11 @@ def submit_pipeline(
558560
)
559561
return None
560562
else:
563+
# Prepare dependencies for pod creation
564+
registry_credentials, local_stores_path = self._prepare_pod_dependencies(
565+
settings, stack
566+
)
567+
561568
# Create and run the orchestrator pod.
562569
pod_manifest, secret_manifests = build_pod_manifest(
563570
run_name=orchestrator_run_name,
@@ -572,18 +579,14 @@ def submit_pipeline(
572579
env=environment,
573580
mount_local_stores=self.config.is_local,
574581
namespace=self.config.kubernetes_namespace,
575-
container_registry=stack.container_registry,
576582
auto_generate_image_pull_secrets=settings.auto_generate_image_pull_secrets,
577583
core_api=self._k8s_core_api,
584+
registry_credentials=registry_credentials,
585+
local_stores_path=local_stores_path,
578586
)
579587

580588
# Create imagePullSecrets first
581-
create_image_pull_secrets_from_manifests(
582-
secret_manifests=secret_manifests,
583-
core_api=self._k8s_core_api,
584-
namespace=self.config.kubernetes_namespace,
585-
reuse_existing=False, # Orchestrator creates/updates all secrets
586-
)
589+
self._create_image_pull_secrets(secret_manifests)
587590

588591
kube_utils.create_and_wait_for_pod_to_start(
589592
core_api=self._k8s_core_api,
@@ -650,6 +653,54 @@ def _get_service_account_name(
650653
)
651654
return service_account_name
652655

656+
def _prepare_pod_dependencies(
657+
self,
658+
settings: KubernetesOrchestratorSettings,
659+
stack: "Stack"
660+
) -> Tuple[Optional[Tuple[str, str, str]], Optional[str]]:
661+
"""Prepare dependencies needed for pod manifest creation.
662+
663+
Args:
664+
settings: The orchestrator settings.
665+
stack: The stack the pipeline will run on.
666+
667+
Returns:
668+
Tuple of (registry_credentials, local_stores_path).
669+
"""
670+
# Get registry credentials if auto-generation is enabled
671+
registry_credentials = None
672+
if settings.auto_generate_image_pull_secrets:
673+
registry_credentials = self.get_kubernetes_image_pull_secret_data(
674+
stack.container_registry
675+
)
676+
677+
# Get local stores path if mounting local stores
678+
local_stores_path = None
679+
if self.config.is_local:
680+
from zenml.config.global_config import GlobalConfiguration
681+
682+
stack.check_local_paths()
683+
local_stores_path = GlobalConfiguration().local_stores_path
684+
685+
return registry_credentials, local_stores_path
686+
687+
def _create_image_pull_secrets(
688+
self,
689+
secret_manifests: List[Dict[str, str]],
690+
) -> None:
691+
"""Create imagePullSecrets in the cluster.
692+
693+
Args:
694+
secret_manifests: List of secret manifests for imagePullSecrets.
695+
"""
696+
# Create imagePullSecrets first
697+
create_image_pull_secrets_from_manifests(
698+
secret_manifests=secret_manifests,
699+
core_api=self._k8s_core_api,
700+
namespace=self.config.kubernetes_namespace,
701+
reuse_existing=False, # Orchestrator creates/updates all secrets
702+
)
703+
653704
def get_orchestrator_run_id(self) -> str:
654705
"""Returns the active orchestrator run id.
655706
@@ -667,3 +718,59 @@ def get_orchestrator_run_id(self) -> str:
667718
"Unable to read run id from environment variable "
668719
f"{ENV_ZENML_KUBERNETES_RUN_ID}."
669720
)
721+
722+
def get_kubernetes_image_pull_secret_data(
723+
self, container_registry
724+
) -> Optional[Tuple[str, str, str]]:
725+
"""Get container registry credentials for Kubernetes imagePullSecrets.
726+
727+
This method extracts credentials from the container registry for use
728+
in Kubernetes imagePullSecrets. It only works with this Kubernetes
729+
orchestrator since other orchestrators don't need imagePullSecrets.
730+
731+
Args:
732+
container_registry: The container registry to get credentials from.
733+
734+
Returns:
735+
Tuple of (registry_uri, username, password) if credentials are available,
736+
None otherwise. The registry_uri is normalized for use in Kubernetes
737+
imagePullSecrets.
738+
"""
739+
from zenml.logger import get_logger
740+
741+
logger = get_logger(__name__)
742+
743+
# Check if this is a local Kubernetes orchestrator
744+
if self.config.is_local:
745+
logger.debug(
746+
"Skipping ImagePullSecret generation for local Kubernetes orchestrator"
747+
)
748+
return None
749+
750+
logger.debug(
751+
f"Getting ImagePullSecret data for registry: {container_registry.config.uri}"
752+
)
753+
754+
credentials = container_registry.credentials
755+
if not credentials:
756+
logger.debug("No credentials found for container registry")
757+
return None
758+
759+
username, password = credentials
760+
registry_uri = container_registry.registry_server_uri
761+
762+
logger.debug(
763+
f"Found credentials - username: {username[:3]}***, registry_uri: {registry_uri}"
764+
)
765+
766+
# Validate the final result
767+
if not registry_uri or not username or not password:
768+
logger.warning(
769+
f"Invalid ImagePullSecret data: registry_uri='{registry_uri}', username='{username}', password_length={len(password) if password else 0}"
770+
)
771+
return None
772+
773+
logger.debug(
774+
f"Returning ImagePullSecret data: registry='{registry_uri}', username='{username[:3]}***'"
775+
)
776+
return registry_uri, username, password

0 commit comments

Comments
 (0)