Skip to content

Commit 23502f3

Browse files
authored
feat: move shipwright clients over to use multi cluster cached client (#822)
* feat: move shipwright clients over to use multi cluster cached client * address comments * fix tests
1 parent efda515 commit 23502f3

File tree

13 files changed

+370
-353
lines changed

13 files changed

+370
-353
lines changed

bases/renku_data_services/k8s_cache/main.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@
88
from renku_data_services.k8s.models import Cluster, ClusterId
99
from renku_data_services.k8s_cache.config import Config
1010
from renku_data_services.k8s_watcher import K8sWatcher, k8s_object_handler
11-
from renku_data_services.notebooks.constants import AMALTHEA_SESSION_KIND, JUPYTER_SESSION_KIND
11+
from renku_data_services.notebooks.constants import (
12+
AMALTHEA_SESSION_KIND,
13+
AMALTHEA_SESSION_VERSION,
14+
JUPYTER_SESSION_KIND,
15+
JUPYTER_SESSION_VERSION,
16+
)
17+
from renku_data_services.session.constants import BUILD_RUN_KIND, BUILD_RUN_VERSION, TASK_RUN_KIND, TASK_RUN_VERSION
1218

1319

1420
async def main() -> None:
@@ -23,7 +29,12 @@ async def main() -> None:
2329
watcher = K8sWatcher(
2430
handler=k8s_object_handler(config.k8s_cache, config.metrics, rp_repo=config.rp_repo),
2531
clusters={c.id: c for c in clusters},
26-
kinds=[AMALTHEA_SESSION_KIND, JUPYTER_SESSION_KIND],
32+
kinds=[
33+
f"{AMALTHEA_SESSION_KIND}.{AMALTHEA_SESSION_VERSION}",
34+
f"{JUPYTER_SESSION_KIND}.{JUPYTER_SESSION_VERSION}",
35+
f"{BUILD_RUN_KIND}.{BUILD_RUN_VERSION}",
36+
f"{TASK_RUN_KIND}.{TASK_RUN_VERSION}",
37+
],
2738
)
2839
await watcher.start()
2940
logging.info("started watching resources")

components/renku_data_services/app_config/config.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,25 @@
5555
)
5656
from renku_data_services.db_config import DBConfig
5757
from renku_data_services.git.gitlab import DummyGitlabAPI, GitlabAPI
58-
from renku_data_services.k8s.clients import DummyCoreClient, DummySchedulingClient, K8sCoreClient, K8sSchedulingClient
58+
from renku_data_services.k8s.clients import (
59+
DummyCoreClient,
60+
DummySchedulingClient,
61+
K8sClusterClientsPool,
62+
K8sCoreClient,
63+
K8sSchedulingClient,
64+
)
65+
from renku_data_services.k8s.config import KubeConfigEnv
5966
from renku_data_services.k8s.quota import QuotaRepository
67+
from renku_data_services.k8s_watcher.db import K8sDbCache
6068
from renku_data_services.message_queue.config import RedisConfig
6169
from renku_data_services.message_queue.db import EventRepository, ReprovisioningRepository
6270
from renku_data_services.message_queue.interface import IMessageQueue
6371
from renku_data_services.message_queue.redis_queue import RedisQueue
6472
from renku_data_services.metrics.core import StagingMetricsService
6573
from renku_data_services.metrics.db import MetricsRepository
6674
from renku_data_services.namespace.db import GroupRepository
67-
from renku_data_services.notebooks.config import NotebooksConfig
75+
from renku_data_services.notebooks.config import NotebooksConfig, get_clusters
76+
from renku_data_services.notebooks.constants import AMALTHEA_SESSION_KIND, JUPYTER_SESSION_KIND
6877
from renku_data_services.platform.db import PlatformRepository
6978
from renku_data_services.project.db import (
7079
ProjectMemberRepository,
@@ -77,6 +86,7 @@
7786
from renku_data_services.search.reprovision import SearchReprovision
7887
from renku_data_services.secrets.db import LowLevelUserSecretsRepo, UserSecretsRepo
7988
from renku_data_services.session import crs as session_crs
89+
from renku_data_services.session.constants import BUILD_RUN_KIND, TASK_RUN_KIND
8090
from renku_data_services.session.db import SessionRepository
8191
from renku_data_services.session.k8s_client import ShipwrightClient
8292
from renku_data_services.solr.solr_client import SolrClientConfig
@@ -182,7 +192,7 @@ class BuildsConfig:
182192
tolerations: list[session_crs.Toleration] | None = None
183193

184194
@classmethod
185-
def from_env(cls, prefix: str = "", namespace: str = "") -> "BuildsConfig":
195+
def from_env(cls, db: DBConfig, prefix: str = "", namespace: str = "") -> "BuildsConfig":
186196
"""Create a config from environment variables."""
187197
enabled = os.environ.get(f"{prefix}IMAGE_BUILDERS_ENABLED", "false").lower() == "true"
188198
build_output_image_prefix = os.environ.get(f"{prefix}BUILD_OUTPUT_IMAGE_PREFIX")
@@ -216,11 +226,18 @@ def from_env(cls, prefix: str = "", namespace: str = "") -> "BuildsConfig":
216226
elif not enabled:
217227
shipwright_client = None
218228
else:
219-
# TODO: is there a reason to use a different cache URL here?
220-
cache_url = os.environ["NB_AMALTHEA_V2__CACHE_URL"]
229+
# NOTE: we need to get an async client as a sync client can't be used in an async way
230+
# But all the config code is not async, so we need to drop into the running loop, if there is one
231+
kr8s_api = KubeConfigEnv().api()
232+
k8s_db_cache = K8sDbCache(db.async_session_maker)
233+
client = K8sClusterClientsPool(
234+
clusters=get_clusters("/secrets/kube_configs", namespace=namespace, api=kr8s_api),
235+
cache=k8s_db_cache,
236+
kinds_to_cache=[AMALTHEA_SESSION_KIND, JUPYTER_SESSION_KIND, BUILD_RUN_KIND, TASK_RUN_KIND],
237+
)
221238
shipwright_client = ShipwrightClient(
239+
client=client,
222240
namespace=namespace,
223-
cache_url=cache_url,
224241
)
225242

226243
node_selector: dict[str, str] | None = None
@@ -730,7 +747,7 @@ def from_env(cls, prefix: str = "") -> "Config":
730747
trusted_proxies = TrustedProxiesConfig.from_env(prefix)
731748
message_queue = RedisQueue(redis)
732749
nb_config = NotebooksConfig.from_env(db)
733-
builds_config = BuildsConfig.from_env(prefix, k8s_namespace)
750+
builds_config = BuildsConfig.from_env(db, prefix, k8s_namespace)
734751
posthog = PosthogConfig.from_env(prefix)
735752

736753
return cls(

components/renku_data_services/k8s/clients.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ async def __list(self, _filter: K8sObjectFilter) -> AsyncIterable[APIObjectInClu
245245

246246
try:
247247
res = await self.__cluster.api.async_get(
248-
_filter.kind,
248+
_filter.fully_qualified_kind,
249249
*names,
250250
label_selector=_filter.label_selector,
251251
namespace=_filter.namespace,
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""Base config for k8s."""
2+
3+
import glob
4+
import os
5+
6+
import kr8s
7+
import yaml
8+
from sanic.log import logger
9+
10+
from renku_data_services.k8s.constants import DEFAULT_K8S_CLUSTER
11+
from renku_data_services.k8s.models import Cluster, ClusterId
12+
13+
14+
class KubeConfig:
15+
"""Wrapper around kube config to get a kr8s api."""
16+
17+
def __init__(
18+
self,
19+
kubeconfig: str | None = None,
20+
current_context_name: str | None = None,
21+
ns: str | None = None,
22+
sa: str | None = None,
23+
url: str | None = None,
24+
) -> None:
25+
self._kubeconfig = kubeconfig
26+
self._ns = ns
27+
self._current_context_name = current_context_name
28+
self._sa = sa
29+
self._url = url
30+
31+
def _sync_api(self) -> kr8s.Api | kr8s._AsyncApi:
32+
return kr8s.api(
33+
kubeconfig=self._kubeconfig,
34+
namespace=self._ns,
35+
context=self._current_context_name,
36+
)
37+
38+
def _async_api(self) -> kr8s.asyncio.Api:
39+
"""Create an async api client from sync code.
40+
41+
Kr8s cannot return an AsyncAPI instance from sync code, and we can't easily make all our config code async,
42+
so this method is a direct copy of the kr8s sync client code, just that it returns an async client.
43+
"""
44+
ret = kr8s._async_utils.run_sync(kr8s.asyncio.api)(
45+
url=self._url,
46+
kubeconfig=self._kubeconfig,
47+
serviceaccount=self._sa,
48+
namespace=self._ns,
49+
context=self._current_context_name,
50+
_asyncio=True, # This is the only line that is different from kr8s code
51+
)
52+
assert isinstance(ret, kr8s.asyncio.Api)
53+
return ret
54+
55+
def api(self, _async: bool = True) -> kr8s.Api | kr8s._AsyncApi:
56+
"""Instantiate the Kr8s Api object based on the configuration."""
57+
if _async:
58+
return self._async_api()
59+
else:
60+
return self._sync_api()
61+
62+
63+
class KubeConfigEnv(KubeConfig):
64+
"""Get a kube config from the environment."""
65+
66+
def __init__(self) -> None:
67+
super().__init__(ns=os.environ.get("K8S_NAMESPACE", "default"))
68+
69+
70+
class KubeConfigYaml(KubeConfig):
71+
"""Get a kube config from a yaml file."""
72+
73+
def __init__(self, kubeconfig: str) -> None:
74+
super().__init__(kubeconfig=kubeconfig)
75+
76+
with open(kubeconfig) as stream:
77+
_conf = yaml.safe_load(stream)
78+
79+
self._current_context_name = _conf.get("current-context", None)
80+
if self._current_context_name is not None:
81+
for context in _conf.get("contexts", []):
82+
name = context.get("name", None)
83+
inner = context.get("context", None)
84+
if inner is not None and name is not None and name == self._current_context_name:
85+
self._ns = inner.get("namespace", None)
86+
break
87+
88+
89+
def get_clusters(kube_conf_root_dir: str, namespace: str, api: kr8s.asyncio.Api) -> list[Cluster]:
90+
"""Get all clusters accessible to the application."""
91+
clusters = [Cluster(id=DEFAULT_K8S_CLUSTER, namespace=namespace, api=api)]
92+
93+
if os.path.exists(kube_conf_root_dir):
94+
for filename in glob.glob(pathname="*.yaml", root_dir=kube_conf_root_dir):
95+
try:
96+
kube_config = KubeConfigYaml(filename)
97+
cluster = Cluster(
98+
id=ClusterId(filename.removesuffix(".yaml")),
99+
namespace=kube_config.api().namespace,
100+
api=kube_config.api(),
101+
)
102+
clusters.append(cluster)
103+
logger.info(f"Successfully loaded Kubernetes config: '{kube_conf_root_dir}/{filename}'")
104+
except Exception as e:
105+
logger.warning(
106+
f"Failed while loading '{kube_conf_root_dir}/{filename}', ignoring kube config. Error: {e}"
107+
)
108+
else:
109+
logger.warning(f"Cannot open directory '{kube_conf_root_dir}', ignoring kube configs...")
110+
111+
return clusters
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""Constant values for k8s."""
2+
3+
from typing import Final
4+
5+
from renku_data_services.k8s.models import ClusterId
6+
7+
DEFAULT_K8S_CLUSTER: Final[ClusterId] = ClusterId("renkulab")

components/renku_data_services/k8s/models.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,27 @@ class K8sObjectFilter:
140140
namespace: str | None = None
141141
cluster: ClusterId | None = None
142142
version: str | None = None
143+
"""Version should be of the form 'group/version', e.g. 'shipwright.io/v1' or 'core/v1'"""
144+
143145
label_selector: dict[str, str] | None = None
144146
user_id: str | None = None
145147

148+
@property
149+
def fully_qualified_kind(self) -> str:
150+
"""Returns the fully qualified kind string for this filter.
151+
152+
Note: This exists because kr8s has some methods where it only allows you to specify 'kind' and then has
153+
weird logic to split that. This method is essentially the reverse of the kr8s logic so we can hand it a
154+
string it will accept.
155+
"""
156+
if not self.version:
157+
return self.kind
158+
if "/" in self.version:
159+
# e.g. buildrun.shipwright.io/v1beta1
160+
return f"{self.kind}.{self.version}"
161+
# e.g. pod/v1
162+
return f"{self.kind}/{self.version}"
163+
146164

147165
@dataclass(eq=True, frozen=True)
148166
class Cluster:

components/renku_data_services/k8s_watcher/core.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from renku_data_services.k8s.models import Cluster, ClusterId, K8sObject, K8sObjectMeta
2323
from renku_data_services.k8s_watcher.db import K8sDbCache
2424
from renku_data_services.notebooks.crs import State
25+
from renku_data_services.session.constants import DUMMY_TASK_RUN_USER_ID
2526

2627

2728
@dataclass
@@ -39,6 +40,11 @@ def user_id(self) -> str | None:
3940
return cast(str, self.obj.metadata.labels["renku.io/userId"])
4041
case "amaltheasession":
4142
return cast(str, self.obj.metadata.labels["renku.io/safe-username"])
43+
case "buildrun":
44+
return cast(str, self.obj.metadata.labels["renku.io/safe-username"])
45+
46+
case "taskrun":
47+
return DUMMY_TASK_RUN_USER_ID
4248
case _:
4349
return None
4450

components/renku_data_services/notebooks/api/classes/k8s_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from renku_data_services.crc.db import ResourcePoolRepository
1717
from renku_data_services.errors import errors
1818
from renku_data_services.k8s.clients import K8sClusterClientsPool
19+
from renku_data_services.k8s.constants import DEFAULT_K8S_CLUSTER
1920
from renku_data_services.k8s.models import Cluster, ClusterId, K8sObject, K8sObjectFilter, K8sObjectMeta
2021
from renku_data_services.k8s_watcher.core import APIObjectInCluster
2122
from renku_data_services.notebooks.api.classes.auth import GitlabToken, RenkuTokens
@@ -25,7 +26,6 @@
2526
from renku_data_services.notebooks.util.kubernetes_ import find_env_var
2627
from renku_data_services.notebooks.util.retries import retry_with_exponential_backoff_async
2728

28-
DEFAULT_K8S_CLUSTER: ClusterId = ClusterId("renkulab")
2929
sanitizer = kubernetes.client.ApiClient().sanitize_for_serialization
3030

3131

0 commit comments

Comments
 (0)