Skip to content
This repository was archived by the owner on Oct 16, 2024. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 68 additions & 36 deletions clusterman/kubernetes/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,48 +53,75 @@
logger = colorlog.getLogger(__name__)


class KubeApiClientWrapper:
def __init__(self, kubeconfig_path: str, client_class: Type) -> None:
"""Init k8s API client

:param str kubeconfig_path: k8s configuration path
:param Type client_class: k8s client class to initialize
"""
try:
"""
https://kubernetes.io/docs/concepts/containers/container-environment/#container-environment
Every pod in k8s gets some default environment variable injected, including KUBERNETES_SERVICE_HOST
which points to default kuberbetes service. We are using this variable to distinguise between
when cluterman is started in a pod vs when it's started on host. For clusterman instances running inside
a k8s cluster, we prioritise using K8s Service account since that let us avoid creating any kubeconfig
in advance. For clusterman CLI invocation we continue using provided KUBECONFIG file
"""
if os.getenv("KUBERNETES_SERVICE_HOST"):
kubernetes.config.load_incluster_config()
else:
kubernetes.config.load_kube_config(kubeconfig_path, context=os.getenv("KUBECONTEXT"))
except (TypeError, ConfigException):
error_msg = "Could not load KUBECONFIG; is this running on Kubernetes master?"
if "yelpcorp" in socket.getfqdn():
error_msg += "\nHint: try using the clusterman-k8s-<clustername> wrapper script!"
logger.error(error_msg)
raise

self._client = client_class()
# class KubeApiClientWrapper:

# _client = None

# def __init__(self, kubeconfig_path: str, client_class: Type) -> None:
# """Init k8s API client

# :param str kubeconfig_path: k8s configuration path
# :param Type client_class: k8s client class to initialize
# """

# # By making client a class variable we are avoiding re-creating kubernetes
# # client object multiple times ( due to call to reload_state which calls reload_client in return)
# if self._client is None:
# try:
# """
# https://kubernetes.io/docs/concepts/containers/container-environment/#container-environment
# Every pod in k8s gets some default environment variable injected, including KUBERNETES_SERVICE_HOST
# which points to default kuberbetes service. We are using this variable to distinguise between
# when cluterman is started in a pod vs when it's started on host. For clusterman instances running
# inside a k8s cluster, we prioritise using K8s Service account since that let us avoid creating
# any kubeconfig in advance. For clusterman CLI invocation we continue using provided KUBECONFIG file
# """
# if os.getenv("KUBERNETES_SERVICE_HOST"):
# kubernetes.config.load_incluster_config()
# else:
# kubernetes.config.load_kube_config(kubeconfig_path, context=os.getenv("KUBECONTEXT"))
# except (TypeError, ConfigException):
# error_msg = "Could not load KUBECONFIG; is this running on Kubernetes master?"
# if "yelpcorp" in socket.getfqdn():
# error_msg += "\nHint: try using the clusterman-k8s-<clustername> wrapper script!"
# logger.error(error_msg)
# raise

# KubeApiClientWrapper._client = client_class()

# def __getattr__(self, attr):
# return getattr(self._client, attr)


def load_kube_client(kubeconfig_path: str, client_class: Type):
try:
print("load_kube_config is called with client_type: ", client_class)
if os.getenv("KUBERNETES_SERVICE_HOST"):
kubernetes.config.load_incluster_config()
else:
kubernetes.config.load_kube_config(kubeconfig_path, context=os.getenv("KUBECONTEXT"))
except (TypeError, ConfigException):
error_msg = "Could not load KUBECONFIG; is this running on Kubernetes master?"
if "yelpcorp" in socket.getfqdn():
error_msg += "\nHint: try using the clusterman-k8s-<clustername> wrapper script!"
logger.error(error_msg)
raise

def __getattr__(self, attr):
return getattr(self._client, attr)
return client_class()


class CachedCoreV1Api(KubeApiClientWrapper):
class CachedCoreV1Api:
CACHED_FUNCTION_CALLS = {"list_node", "list_pod_for_all_namespaces"}
_client = None

def __init__(self, kubeconfig_path: str):
super().__init__(kubeconfig_path, kubernetes.client.CoreV1Api)
if CachedCoreV1Api._client is None:
CachedCoreV1Api._client = load_kube_client(kubeconfig_path, kubernetes.client.CoreV1Api)
print(CachedCoreV1Api._client)

def __getattr__(self, attr):
global KUBERNETES_API_CACHE
func = getattr(self._client, attr)
func = getattr(CachedCoreV1Api._client, attr)

if os.environ.get("KUBE_CACHE_ENABLED", "") and attr in self.CACHED_FUNCTION_CALLS:

Expand All @@ -117,16 +144,21 @@ def wrapper(*args, **kwargs):
return func


class ConciseCRDApi(KubeApiClientWrapper):
class ConciseCRDApi:
_client = None

def __init__(self, kubeconfig_path: str, group: str, version: str, plural: str) -> None:
super().__init__(kubeconfig_path, kubernetes.client.CustomObjectsApi)
if ConciseCRDApi._client is None:
ConciseCRDApi._client = load_kube_client(kubeconfig_path, kubernetes.client.CustomObjectsApi)
print(type(ConciseCRDApi._client))

self.group = group
self.version = version
self.plural = plural

def __getattr__(self, attr):
return partial(
getattr(self._client, attr),
getattr(ConciseCRDApi._client, attr),
group=self.group,
version=self.version,
plural=self.plural,
Expand Down
36 changes: 24 additions & 12 deletions tests/kubernetes/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

@pytest.fixture
def mock_cached_core_v1_api():
with mock.patch("clusterman.kubernetes.util.kubernetes"):
yield CachedCoreV1Api("/foo/bar/admin.conf")
with mock.patch("clusterman.kubernetes.util.kubernetes") as mock_k8s_client:
api_instance = CachedCoreV1Api("/foo/bar/admin.conf")
yield api_instance
print("Clean up mock before reuse")
mock_k8s_client.reset_mock()


def test_cached_corev1_api_no_kubeconfig(caplog):
Expand All @@ -27,22 +30,31 @@ def test_cached_corev1_api_no_kubeconfig(caplog):
assert "Could not load KUBECONFIG" in caplog.text


def test_cached_corev1_api_use_load_incluster_config_when_running_in_pod():
with mock.patch.dict(os.environ, {"KUBERNETES_SERVICE_HOST": "ABC"}):
with mock.patch(
"clusterman.kubernetes.util.kubernetes.config.load_incluster_config"
) as mock_load_incluster_config:
_ = CachedCoreV1Api("/foo/bar/admin.conf")
assert mock_load_incluster_config.called
# def test_cached_corev1_api_use_load_incluster_config_when_running_in_pod(mock_cached_core_v1_api):
# with mock.patch.dict(os.environ, {"KUBERNETES_SERVICE_HOST": "ABC"}):
# with mock.patch(
# "clusterman.kubernetes.util.kubernetes.config.load_incluster_config"
# ) as mock_load_incluster_config:
# _ = CachedCoreV1Api("/foo/bar/admin.conf")
# assert mock_load_incluster_config.called


def test_cached_corev1_api_use_load_kubeconfig_config_when_running_as_cli():
with mock.patch("clusterman.kubernetes.util.kubernetes.config.load_kube_config") as mock_load_kube_config:
# def test_cached_corev1_api_use_load_kubeconfig_config_when_running_as_cli():
# print(0)
# with mock.patch("clusterman.kubernetes.util.kubernetes.config.load_kube_config") as mock_load_kube_config:
# _ = CachedCoreV1Api("/foo/bar/admin.conf")
# assert mock_load_kube_config.called


def test_client_initialization_happen_only_once():
with mock.patch("clusterman.kubernetes.util.load_kube_client") as mock_load_kube_client:
_ = CachedCoreV1Api("/foo/bar/admin.conf")
assert mock_load_kube_config.called
_ = CachedCoreV1Api("/foo/bar/admin1.conf")
assert mock_load_kube_client.call_count == 1


def test_cached_corev1_api_caches_non_cached_function(mock_cached_core_v1_api):
print("1")
mock_cached_core_v1_api.list_namespace()
assert mock_cached_core_v1_api._client.list_namespace.call_count == 1

Expand Down