Skip to content

Commit 064bf5b

Browse files
committed
Apply first PR review comments
1 parent 7f983a2 commit 064bf5b

File tree

10 files changed

+450
-111
lines changed

10 files changed

+450
-111
lines changed

docs/book/component-guide/orchestrators/kubernetes.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ Some configuration options for the Kubernetes orchestrator can only be set throu
130130
- **`skip_local_validations`** (default: False): If `True`, skips the local validations that would otherwise be performed when `local` is set.
131131
- **`parallel_step_startup_waiting_period`**: How long (in seconds) to wait between starting parallel steps, useful for distributing server load in highly parallel pipelines.
132132
- **`pass_zenml_token_as_secret`** (default: False): By default, the Kubernetes orchestrator will pass a short-lived API token to authenticate to the ZenML server as an environment variable as part of the Pod manifest. If you want this token to be stored in a Kubernetes secret instead, set `pass_zenml_token_as_secret=True` when registering your orchestrator. If you do so, make sure the service connector that you configure for your has permissions to create Kubernetes secrets. Additionally, the service account used for the Pods running your pipeline must have permissions to delete secrets, otherwise the cleanup will fail and you'll be left with orphaned secrets.
133+
- **`auto_generate_image_pull_secrets`** (default: True): If `True`, automatically generates imagePullSecrets from container registry credentials in the stack. If `False`, relies on manually configured imagePullSecrets. When enabled, the service account used for running pipelines must have permissions to create and list secrets in the target namespace.
133134

134135
The following configuration options can be set either through the orchestrator config or overridden using `KubernetesOrchestratorSettings` (at the pipeline or step level):
135136

@@ -464,6 +465,38 @@ kubectl logs job/<job-name> -n zenml
464465
465466
Common issues include incorrect cron expressions, insufficient permissions for the service account, or resource constraints.
466467
468+
#### Required Kubernetes RBAC Permissions
469+
470+
When using the automatic imagePullSecret generation feature (`auto_generate_image_pull_secrets=True`), the service account used by ZenML must have the following additional permissions in the target namespace:
471+
472+
```yaml
473+
apiVersion: rbac.authorization.k8s.io/v1
474+
kind: Role
475+
metadata:
476+
namespace: zenml # or your configured namespace
477+
name: zenml-image-pull-secret-manager
478+
rules:
479+
- apiGroups: [""]
480+
resources: ["secrets"]
481+
verbs: ["create", "get", "list", "update", "delete"]
482+
---
483+
apiVersion: rbac.authorization.k8s.io/v1
484+
kind: RoleBinding
485+
metadata:
486+
name: zenml-image-pull-secret-binding
487+
namespace: zenml # or your configured namespace
488+
subjects:
489+
- kind: ServiceAccount
490+
name: zenml-service-account # or your configured service account
491+
namespace: zenml
492+
roleRef:
493+
kind: Role
494+
name: zenml-image-pull-secret-manager
495+
apiGroup: rbac.authorization.k8s.io
496+
```
497+
498+
If you're using a custom service account (via `service_account_name`), ensure it has these permissions. The default `edit` role granted to `zenml-service-account` includes these permissions.
499+
467500
For a tutorial on how to work with schedules in ZenML, check out our ['Managing
468501
Scheduled
469502
Pipelines'](https://docs.zenml.io/user-guides/tutorial/managing-scheduled-pipelines)

docs/book/component-guide/step-operators/kubernetes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ The following configuration options can be set either through the step operator
126126
- **`pod_failure_max_retries`** (default: 3): The maximum number of times to retry a step pod if it fails to start.
127127
- **`pod_failure_retry_delay`** (default: 10): The delay (in seconds) between pod failure retries and pod startup retries.
128128
- **`pod_failure_backoff`** (default: 1.0): The backoff factor for pod failure retries and pod startup retries.
129+
- **`auto_generate_image_pull_secrets`** (default: True): If `True`, automatically generates imagePullSecrets from container registry credentials in the stack. If `False`, relies on manually configured imagePullSecrets. When enabled, the service account used for running steps must have permissions to create and list secrets in the target namespace.
129130
130131
```python
131132
from zenml.integrations.kubernetes.flavors import KubernetesStepOperatorSettings

src/zenml/container_registries/base_container_registry.py

Lines changed: 123 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,13 @@ def requires_authentication(self) -> bool:
9494
def credentials(self) -> Optional[Tuple[str, str]]:
9595
"""Username and password to authenticate with this container registry.
9696
97+
Service connector credentials take precedence over direct authentication secrets.
98+
9799
Returns:
98100
Tuple with username and password if this container registry
99101
requires authentication, `None` otherwise.
100102
"""
101-
secret = self.get_typed_authentication_secret(
102-
expected_schema_type=BasicAuthSecretSchema
103-
)
104-
if secret:
105-
return secret.username, secret.password
106-
103+
# Check service connector credentials first as they take precedence
107104
connector = self.get_connector()
108105
if connector:
109106
from zenml.service_connectors.docker_service_connector import (
@@ -116,6 +113,13 @@ def credentials(self) -> Optional[Tuple[str, str]]:
116113
connector.config.password.get_secret_value(),
117114
)
118115

116+
# Fall back to direct authentication secrets
117+
secret = self.get_typed_authentication_secret(
118+
expected_schema_type=BasicAuthSecretSchema
119+
)
120+
if secret:
121+
return secret.username, secret.password
122+
119123
return None
120124

121125
@property
@@ -232,6 +236,119 @@ def get_image_repo_digest(self, image_name: str) -> Optional[str]:
232236

233237
return cast(str, metadata.id.split(":")[-1])
234238

239+
def get_kubernetes_image_pull_secret_data(
240+
self,
241+
) -> Optional[Tuple[str, str, str]]:
242+
"""Get container registry credentials for Kubernetes imagePullSecrets.
243+
244+
This method only returns credentials when running with a Kubernetes-based
245+
orchestrator (kubernetes, kubeflow, etc.). For local orchestrators, it
246+
returns None since imagePullSecrets are not needed.
247+
248+
Returns:
249+
Tuple of (registry_uri, username, password) if credentials are available
250+
and running with a Kubernetes orchestrator, None otherwise. The
251+
registry_uri is normalized for use in Kubernetes imagePullSecrets.
252+
"""
253+
from zenml.client import Client
254+
from zenml.logger import get_logger
255+
256+
logger = get_logger(__name__)
257+
258+
# Check if we're using a Kubernetes-based orchestrator
259+
try:
260+
stack = Client().active_stack
261+
orchestrator_flavor = stack.orchestrator.flavor
262+
263+
# List of orchestrator flavors that use Kubernetes and need imagePullSecrets
264+
kubernetes_orchestrators = {
265+
"kubernetes",
266+
"kubeflow",
267+
"vertex",
268+
"sagemaker",
269+
"tekton",
270+
"airflow", # when running on Kubernetes
271+
}
272+
273+
if orchestrator_flavor not in kubernetes_orchestrators:
274+
logger.debug(
275+
f"Skipping ImagePullSecret generation for non-Kubernetes orchestrator: {orchestrator_flavor}"
276+
)
277+
return None
278+
279+
# Additional check for Kubernetes orchestrator with local flag
280+
if orchestrator_flavor == "kubernetes" and hasattr(
281+
stack.orchestrator.config, "local"
282+
):
283+
if stack.orchestrator.config.local:
284+
logger.debug(
285+
"Skipping ImagePullSecret generation for local Kubernetes orchestrator"
286+
)
287+
return None
288+
289+
except Exception as e:
290+
logger.debug(
291+
f"Could not determine orchestrator type: {e}. Proceeding with ImagePullSecret generation."
292+
)
293+
294+
logger.debug(
295+
f"Getting ImagePullSecret data for registry: {self.config.uri}"
296+
)
297+
298+
credentials = self.credentials
299+
if not credentials:
300+
logger.debug("No credentials found for container registry")
301+
return None
302+
303+
username, password = credentials
304+
registry_uri = self.config.uri
305+
306+
logger.debug(
307+
f"Found credentials - username: {username[:3]}***, registry_uri: {registry_uri}"
308+
)
309+
310+
# Check if there's a service connector with a different registry setting
311+
connector = self.get_connector()
312+
if connector:
313+
from zenml.service_connectors.docker_service_connector import (
314+
DockerServiceConnector,
315+
)
316+
317+
if (
318+
isinstance(connector, DockerServiceConnector)
319+
and connector.config.registry
320+
):
321+
# Use the service connector's registry setting
322+
original_registry_uri = registry_uri
323+
registry_uri = connector.config.registry
324+
logger.debug(
325+
f"Service connector override: {original_registry_uri} -> {registry_uri}"
326+
)
327+
328+
# Normalize registry URI for consistency
329+
original_registry_uri = registry_uri
330+
if registry_uri.startswith("https://"):
331+
registry_uri = registry_uri[8:]
332+
elif registry_uri.startswith("http://"):
333+
registry_uri = registry_uri[7:]
334+
335+
if original_registry_uri != registry_uri:
336+
logger.debug(
337+
f"Normalized registry URI: {original_registry_uri} -> {registry_uri}"
338+
)
339+
340+
# Validate the final result
341+
if not registry_uri or not username or not password:
342+
logger.warning(
343+
f"Invalid ImagePullSecret data: registry_uri='{registry_uri}', username='{username}', password_length={len(password) if password else 0}"
344+
)
345+
return None
346+
347+
logger.debug(
348+
f"Returning ImagePullSecret data: registry='{registry_uri}', username='{username[:3]}***'"
349+
)
350+
return registry_uri, username, password
351+
235352

236353
class BaseContainerRegistryFlavor(Flavor):
237354
"""Base flavor for container registries."""

src/zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ class KubernetesOrchestratorSettings(BaseSettings):
6969
scheduling a pipeline.
7070
prevent_orchestrator_pod_caching: If `True`, the orchestrator pod will
7171
not try to compute cached steps before starting the step pods.
72+
auto_generate_image_pull_secrets: If `True`, automatically generates
73+
imagePullSecrets from container registry credentials in the stack.
74+
If `False`, relies on manually configured imagePullSecrets.
7275
"""
7376

7477
synchronous: bool = True
@@ -88,6 +91,7 @@ class KubernetesOrchestratorSettings(BaseSettings):
8891
failed_jobs_history_limit: Optional[NonNegativeInt] = None
8992
ttl_seconds_after_finished: Optional[NonNegativeInt] = None
9093
prevent_orchestrator_pod_caching: bool = False
94+
auto_generate_image_pull_secrets: bool = True
9195

9296

9397
class KubernetesOrchestratorConfig(

src/zenml/integrations/kubernetes/flavors/kubernetes_step_operator_flavor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ class KubernetesStepOperatorSettings(BaseSettings):
4343
failure retries and pod startup retries (in seconds)
4444
pod_failure_backoff: The backoff factor for pod failure retries and
4545
pod startup retries.
46+
auto_generate_image_pull_secrets: If `True`, automatically generates
47+
imagePullSecrets from container registry credentials in the stack.
48+
If `False`, relies on manually configured imagePullSecrets.
4649
"""
4750

4851
pod_settings: Optional[KubernetesPodSettings] = None
@@ -52,6 +55,7 @@ class KubernetesStepOperatorSettings(BaseSettings):
5255
pod_failure_max_retries: int = 3
5356
pod_failure_retry_delay: int = 10
5457
pod_failure_backoff: float = 1.0
58+
auto_generate_image_pull_secrets: bool = True
5559

5660

5761
class KubernetesStepOperatorConfig(

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
from zenml.integrations.kubernetes.orchestrators.manifest_utils import (
5959
build_cron_job_manifest,
6060
build_pod_manifest,
61+
create_image_pull_secrets_from_manifests,
6162
)
6263
from zenml.integrations.kubernetes.pod_settings import KubernetesPodSettings
6364
from zenml.logger import get_logger
@@ -534,14 +535,18 @@ def submit_pipeline(
534535
failed_jobs_history_limit=settings.failed_jobs_history_limit,
535536
ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
536537
namespace=self.config.kubernetes_namespace,
538+
container_registry=stack.container_registry,
539+
auto_generate_image_pull_secrets=settings.auto_generate_image_pull_secrets,
540+
core_api=self._k8s_core_api,
537541
)
538542

539543
# Create imagePullSecrets first
540-
for secret_manifest in secret_manifests:
541-
kube_utils.create_or_update_secret_from_manifest(
542-
core_api=self._k8s_core_api,
543-
secret_manifest=secret_manifest,
544-
)
544+
create_image_pull_secrets_from_manifests(
545+
secret_manifests=secret_manifests,
546+
core_api=self._k8s_core_api,
547+
namespace=self.config.kubernetes_namespace,
548+
reuse_existing=False, # Orchestrator creates/updates all secrets
549+
)
545550

546551
self._k8s_batch_api.create_namespaced_cron_job(
547552
body=cron_job_manifest,
@@ -567,14 +572,18 @@ def submit_pipeline(
567572
env=environment,
568573
mount_local_stores=self.config.is_local,
569574
namespace=self.config.kubernetes_namespace,
575+
container_registry=stack.container_registry,
576+
auto_generate_image_pull_secrets=settings.auto_generate_image_pull_secrets,
577+
core_api=self._k8s_core_api,
570578
)
571579

572580
# Create imagePullSecrets first
573-
for secret_manifest in secret_manifests:
574-
kube_utils.create_or_update_secret_from_manifest(
575-
core_api=self._k8s_core_api,
576-
secret_manifest=secret_manifest,
577-
)
581+
create_image_pull_secrets_from_manifests(
582+
secret_manifests=secret_manifests,
583+
core_api=self._k8s_core_api,
584+
namespace=self.config.kubernetes_namespace,
585+
reuse_existing=False, # Orchestrator creates/updates all secrets
586+
)
578587

579588
kube_utils.create_and_wait_for_pod_to_start(
580589
core_api=self._k8s_core_api,

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
)
3838
from zenml.integrations.kubernetes.orchestrators.manifest_utils import (
3939
build_pod_manifest,
40+
cleanup_old_image_pull_secrets,
41+
create_image_pull_secrets_from_manifests,
4042
)
4143
from zenml.logger import get_logger
4244
from zenml.orchestrators import publish_utils
@@ -192,6 +194,8 @@ def run_step_on_kubernetes(step_name: str) -> None:
192194
}
193195
)
194196

197+
logger.info(f"Container registry: {active_stack.container_registry}")
198+
195199
# Define Kubernetes pod manifest and any required secrets.
196200
pod_manifest, secret_manifests = build_pod_manifest(
197201
pod_name=pod_name,
@@ -208,37 +212,19 @@ def run_step_on_kubernetes(step_name: str) -> None:
208212
mount_local_stores=mount_local_stores,
209213
owner_references=owner_references,
210214
namespace=args.kubernetes_namespace,
215+
auto_generate_image_pull_secrets=pipeline_settings.auto_generate_image_pull_secrets,
216+
container_registry=active_stack.container_registry,
217+
core_api=core_api,
211218
)
212219

213220
# Step pods should reuse secrets created by the orchestrator pod
214221
# Only create secrets if they don't already exist
215-
for secret_manifest in secret_manifests:
216-
secret_name = secret_manifest["metadata"]["name"]
217-
try:
218-
# Check if secret already exists
219-
core_api.read_namespaced_secret(
220-
name=secret_name, namespace=args.kubernetes_namespace
221-
)
222-
logger.debug(
223-
f"imagePullSecret {secret_name} already exists, reusing it"
224-
)
225-
except k8s_client.rest.ApiException as e:
226-
if e.status == 404:
227-
# Secret doesn't exist, create it
228-
try:
229-
kube_utils.create_or_update_secret_from_manifest(
230-
core_api=core_api,
231-
secret_manifest=secret_manifest,
232-
)
233-
logger.debug(f"Created imagePullSecret {secret_name}")
234-
except Exception as create_e:
235-
logger.warning(
236-
f"Failed to create imagePullSecret {secret_name}: {create_e}"
237-
)
238-
else:
239-
logger.warning(
240-
f"Failed to check for existing secret {secret_name}: {e}"
241-
)
222+
create_image_pull_secrets_from_manifests(
223+
secret_manifests=secret_manifests,
224+
core_api=core_api,
225+
namespace=args.kubernetes_namespace,
226+
reuse_existing=True, # Step pods reuse orchestrator-created secrets
227+
)
242228

243229
kube_utils.create_and_wait_for_pod_to_start(
244230
core_api=core_api,
@@ -370,6 +356,16 @@ def finalize_run(node_states: Dict[str, NodeStatus]) -> None:
370356
except k8s_client.rest.ApiException as e:
371357
logger.error(f"Error cleaning up secret {secret_name}: {e}")
372358

359+
# Clean up old imagePullSecrets to prevent accumulation
360+
# Only clean up for non-scheduled runs to avoid interfering with running schedules
361+
if deployment.schedule is None:
362+
logger.info("Cleaning up old imagePullSecrets...")
363+
cleanup_old_image_pull_secrets(
364+
core_api=core_api,
365+
namespace=args.kubernetes_namespace,
366+
max_age_hours=24, # Keep secrets for 24 hours
367+
)
368+
373369

374370
if __name__ == "__main__":
375371
main()

0 commit comments

Comments
 (0)