|
21 | 21 | LOGGING_NAMESPACE,
|
22 | 22 | )
|
23 | 23 | from .process import run_command, stream_command
|
| 24 | +from .constants import KUBECONFIG |
24 | 25 |
|
25 | 26 |
|
26 | 27 | def get_static_client() -> CoreV1Api:
|
@@ -100,17 +101,30 @@ def create_kubernetes_object(
|
100 | 101 | return obj
|
101 | 102 |
|
102 | 103 |
|
103 |
| -def set_kubectl_context(namespace: str) -> bool: |
| 104 | +def set_context_namespace(namespace: str): |
104 | 105 | """
|
105 |
| - Set the default kubectl context to the specified namespace. |
| 106 | + Set the default kubeconfig context to the specified namespace. |
106 | 107 | """
|
107 |
| - command = f"kubectl config set-context --current --namespace={namespace}" |
108 |
| - result = stream_command(command) |
109 |
| - if result: |
110 |
| - print(f"Kubectl context set to namespace: {namespace}") |
111 |
| - else: |
112 |
| - print(f"Failed to set kubectl context to namespace: {namespace}") |
113 |
| - return result |
| 108 | + with open(KUBECONFIG) as file: |
| 109 | + kubeconfig_data = yaml.safe_load(file) |
| 110 | + |
| 111 | + current_context_name = kubeconfig_data.get("current-context") |
| 112 | + if not current_context_name: |
| 113 | + raise ValueError("No current context set in kubeconfig.") |
| 114 | + |
| 115 | + context_entry = None |
| 116 | + for context in kubeconfig_data.get("contexts", []): |
| 117 | + if context["name"] == current_context_name: |
| 118 | + context_entry = context |
| 119 | + break |
| 120 | + |
| 121 | + if not context_entry: |
| 122 | + raise ValueError(f"Context '{current_context_name}' not found in kubeconfig.") |
| 123 | + |
| 124 | + context_entry["context"]["namespace"] = namespace |
| 125 | + |
| 126 | + with open(KUBECONFIG, "w") as file: |
| 127 | + yaml.safe_dump(kubeconfig_data, file) |
114 | 128 |
|
115 | 129 |
|
116 | 130 | def apply_kubernetes_yaml(yaml_file: str) -> bool:
|
|
0 commit comments