Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions clarifai/cli/templates/pipeline_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ def get_pipeline_config_template(
- name: sequence
steps:
{steps_yaml}
# Optional: Define secrets for pipeline steps
# step_version_secrets:
# step-0:
# secrets:
# API_KEY: users/{user_id}/secrets/my-api-key
# DB_PASSWORD: users/{user_id}/secrets/db-secret
# step-1:
# secrets:
# EMAIL_TOKEN: users/{user_id}/secrets/email-token
"""


Expand Down
159 changes: 159 additions & 0 deletions clarifai/client/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,162 @@ def _display_new_logs(self, run_id: str, seen_logs: set, current_page: int = 1)
logger.debug(f"Error fetching logs: {e}")
# Return current page on error to retry the same page next fetch
return current_page

def get_pipeline_version(self, pipeline_version_id: Optional[str] = None) -> Dict:
"""Get pipeline version details including step secrets.

Args:
pipeline_version_id (Optional[str]): The pipeline version ID. If None, uses self.pipeline_version_id.

Returns:
Dict: Pipeline version information including step_version_secrets if configured.
"""
version_id = pipeline_version_id or self.pipeline_version_id
if not version_id:
raise UserError("pipeline_version_id is required")

request = service_pb2.GetPipelineVersionRequest()
request.user_app_id.CopyFrom(self.user_app_id)
request.pipeline_id = self.pipeline_id
request.pipeline_version_id = version_id

response = self.STUB.GetPipelineVersion(request, metadata=self.auth_helper.metadata)

if response.status.code != status_code_pb2.StatusCode.SUCCESS:
raise UserError(
f"Failed to get pipeline version: {response.status.description}. "
f"Details: {response.status.details}"
)

return json_format.MessageToDict(
response.pipeline_version, preserving_proto_field_name=True
)

def create_pipeline_version(
self,
orchestration_spec: Dict,
step_version_secrets: Optional[Dict[str, Dict[str, str]]] = None,
description: Optional[str] = None,
) -> str:
"""Create a new pipeline version with optional step secrets.

Note: This creates a new version by patching the pipeline with a new version.

Args:
orchestration_spec (Dict): The orchestration specification for the pipeline.
step_version_secrets (Optional[Dict[str, Dict[str, str]]]): Map of step references to their secrets.
Format: {step_ref: {secret_name: secret_path}}
Example: {"step-0": {"API_KEY": "users/user123/secrets/my-api-key"}}
description (Optional[str]): Description for the pipeline version.

Returns:
str: The created pipeline version ID.
"""
pipeline_version = resources_pb2.PipelineVersion()
if description:
pipeline_version.description = description

# Set orchestration spec
if "argo_orchestration_spec" in orchestration_spec:
argo_spec_str = orchestration_spec["argo_orchestration_spec"]
import yaml

argo_spec = yaml.safe_load(argo_spec_str)
api_version = argo_spec.get("apiVersion", "argoproj.io/v1alpha1")

orchestration_spec_proto = resources_pb2.OrchestrationSpec()
argo_orchestration_spec_proto = resources_pb2.ArgoOrchestrationSpec()
argo_orchestration_spec_proto.api_version = api_version
import json

argo_orchestration_spec_proto.spec_json = json.dumps(argo_spec)

orchestration_spec_proto.argo_orchestration_spec.CopyFrom(
argo_orchestration_spec_proto
)
pipeline_version.orchestration_spec.CopyFrom(orchestration_spec_proto)

# Add step_version_secrets if provided
if step_version_secrets:
for step_ref, secrets in step_version_secrets.items():
if not secrets:
continue
step_secret_config = resources_pb2.StepSecretConfig()
for secret_name, secret_path in secrets.items():
step_secret_config.secrets[secret_name] = secret_path
pipeline_version.config.step_version_secrets[step_ref].CopyFrom(step_secret_config)

# Make the API call using PatchPipelineVersions
# This creates a new version for an existing pipeline
request = service_pb2.PatchPipelineVersionsRequest()
request.user_app_id.CopyFrom(self.user_app_id)
request.pipeline_id = self.pipeline_id
request.pipeline_versions.append(pipeline_version)
request.action = "overwrite" # Create a new version

response = self.STUB.PatchPipelineVersions(request, metadata=self.auth_helper.metadata)

if response.status.code != status_code_pb2.StatusCode.SUCCESS:
raise UserError(
f"Failed to create pipeline version: {response.status.description}. "
f"Details: {response.status.details}"
)

if not response.pipeline_versions:
raise UserError("No pipeline version was created")

created_version = response.pipeline_versions[0]
logger.info(f"Created pipeline version: {created_version.id}")
return created_version.id

def add_step_secret(
self,
step_ref: str,
secret_name: str,
secret_ref: str,
pipeline_version_id: Optional[str] = None,
) -> None:
"""Add a secret to a specific pipeline step.

Args:
step_ref (str): The step reference (e.g., "step-0", "step-1").
secret_name (str): The name of the secret environment variable.
secret_ref (str): The secret reference path (e.g., "users/user123/secrets/my-api-key").
pipeline_version_id (Optional[str]): The pipeline version ID. If None, uses self.pipeline_version_id.

Note:
This is a convenience method. For production use, manage secrets via the config.yaml
orchestration spec and use the pipeline upload command.
"""
raise NotImplementedError(
"Adding secrets to existing pipeline versions is not supported. "
"Please define step secrets in your config.yaml orchestration spec "
"and use 'clarifai pipeline upload' to create a new pipeline version."
)

def list_step_secrets(
self, step_ref: Optional[str] = None, pipeline_version_id: Optional[str] = None
) -> Dict[str, Dict[str, str]]:
"""List secrets configured for pipeline steps.

Args:
step_ref (Optional[str]): If provided, only return secrets for this step.
pipeline_version_id (Optional[str]): The pipeline version ID. If None, uses self.pipeline_version_id.

Returns:
Dict[str, Dict[str, str]]: Map of step references to their secrets.
Format: {step_ref: {secret_name: secret_path}}
"""
version_data = self.get_pipeline_version(pipeline_version_id)
config = version_data.get("config", {})
step_version_secrets = config.get("step_version_secrets", {})

if step_ref:
# Return only the specified step's secrets
return {step_ref: step_version_secrets.get(step_ref, {}).get("secrets", {})}

# Return all step secrets
result = {}
for step, config in step_version_secrets.items():
result[step] = config.get("secrets", {})
return result
46 changes: 46 additions & 0 deletions clarifai/runners/pipelines/pipeline_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@
}
}

# Include step_version_secrets if present in config
step_version_secrets = orchestration_spec.get("step_version_secrets", {})
if step_version_secrets:
lockfile_data["pipeline"]["orchestration_spec"]["step_version_secrets"] = (
step_version_secrets
)

return lockfile_data

def update_lockfile_with_pipeline_info(
Expand Down Expand Up @@ -246,6 +253,13 @@
}
}

# Include step_version_secrets if present in config
step_version_secrets = orchestration_spec.get("step_version_secrets", {})
if step_version_secrets:
lockfile_data["pipeline"]["orchestration_spec"]["step_version_secrets"] = (
step_version_secrets
)

return lockfile_data

def save_lockfile(self, lockfile_data: Dict[str, Any], lockfile_path: str = None) -> None:
Expand Down Expand Up @@ -362,6 +376,33 @@

return None

def _add_step_version_secrets(
self, pipeline_version: resources_pb2.PipelineVersion, step_version_secrets: Dict[str, Any]
) -> None:
"""Add step_version_secrets to the pipeline version config.
Args:
pipeline_version: The PipelineVersion proto to update
step_version_secrets: Dictionary mapping step references to their secret configs
Format: {step_ref: {secrets: {secret_name: secret_path}}}
"""
logger.info(f"Adding step version secrets for {len(step_version_secrets)} steps")

for step_ref, step_config in step_version_secrets.items():
secrets = step_config.get("secrets", {})
if not secrets:
logger.warning(f"No secrets found for step {step_ref}, skipping")
continue

# Create StepSecretConfig proto
step_secret_config = resources_pb2.StepSecretConfig()
for secret_name, secret_ref in secrets.items():
step_secret_config.secrets[secret_name] = secret_ref
logger.info(f"Added secret {secret_name} for step {step_ref}")

# Add to pipeline version config
pipeline_version.config.step_version_secrets[step_ref].CopyFrom(step_secret_config)

def create_pipeline(self) -> tuple[bool, str]:
"""Create the pipeline using PostPipelines RPC.
Expand Down Expand Up @@ -404,6 +445,11 @@
)
pipeline_version.orchestration_spec.CopyFrom(orchestration_spec_proto)

# Add step_version_secrets if present in config
step_version_secrets = orchestration_spec.get("step_version_secrets", {})
if step_version_secrets:
self._add_step_version_secrets(pipeline_version, step_version_secrets)

pipeline.pipeline_version.CopyFrom(pipeline_version)

# Make the RPC call
Expand Down
Loading
Loading