1
- # TODO: decide how to treat these imports & possibly an extras_require
2
1
import logging
3
- import subprocess
4
2
import uuid
5
3
from typing import Any
6
4
@@ -38,14 +36,7 @@ def __init__(
38
36
try :
39
37
import kfp
40
38
41
- result = subprocess .run (
42
- ["oc" , "whoami" , "-t" ],
43
- capture_output = True ,
44
- text = True ,
45
- check = True ,
46
- timeout = 5 ,
47
- )
48
- token = result .stdout .strip ()
39
+ token = self ._get_token ()
49
40
if not token :
50
41
raise RagasEvaluationError (
51
42
"No token found. Please run `oc login` and try again."
@@ -70,10 +61,6 @@ def __init__(
70
61
raise RagasEvaluationError (
71
62
"Kubeflow Pipelines SDK not available. Install with: pip install .[remote]"
72
63
) from e
73
- except subprocess .CalledProcessError as e :
74
- raise RagasEvaluationError (
75
- f"Failed to get OpenShift token. Command failed with exit code { e .returncode } : { e .stderr .strip ()} "
76
- ) from e
77
64
except requests .exceptions .RequestException as e :
78
65
raise RagasEvaluationError (
79
66
f"Failed to connect to Kubeflow Pipelines server at { self .config .kubeflow_config .pipelines_endpoint } , "
@@ -84,6 +71,25 @@ def __init__(
84
71
"Failed to initialize Kubeflow Pipelines client."
85
72
) from e
86
73
74
+ def _get_token (self ) -> str :
75
+ try :
76
+ from kubernetes .client .configuration import Configuration
77
+ from kubernetes .config .kube_config import load_kube_config
78
+
79
+ config = Configuration ()
80
+ load_kube_config (client_configuration = config )
81
+ token = str (config .api_key ["authorization" ].split (" " )[- 1 ])
82
+ except ImportError as e :
83
+ raise RagasEvaluationError (
84
+ "Kubernetes client is not installed. Install with: pip install .[remote]"
85
+ ) from e
86
+ except Exception as e :
87
+ raise RagasEvaluationError (
88
+ "Failed to get OpenShift token. Please run `oc login` and try again."
89
+ ) from e
90
+
91
+ return token
92
+
87
93
async def run_eval (
88
94
self ,
89
95
benchmark_id : str ,
0 commit comments