Skip to content

Commit a42b430

Browse files
committed
feat: add cloud storage via rclone
1 parent b0a55dc commit a42b430

File tree

6 files changed

+143
-61
lines changed

6 files changed

+143
-61
lines changed

bases/renku_data_services/data_api/app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
142142
nb_config=config.nb_config,
143143
internal_gitlab_authenticator=config.gitlab_authenticator,
144144
git_repo=config.git_repositories_repo,
145+
rp_repo=config.rp_repo,
145146
)
146147
notebooks_new = NotebooksNewBP(
147148
name="notebooks",

components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,15 @@
66
class ICloudStorageRequest(Protocol):
77
"""The abstract class for cloud storage."""
88

9-
exists: bool
109
mount_folder: str
11-
source_folder: str
12-
bucket: str
10+
source_path: str
1311

1412
def get_manifest_patch(
1513
self,
1614
base_name: str,
1715
namespace: str,
18-
labels: dict[str, str] = {},
19-
annotations: dict[str, str] = {},
16+
labels: dict[str, str] | None = None,
17+
annotations: dict[str, str] | None = None,
2018
) -> list[dict[str, Any]]:
2119
"""The patches applied to a jupyter server to insert the storage in the session."""
2220
...

components/renku_data_services/notebooks/api/schemas/cloud_storage.py

Lines changed: 78 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
from configparser import ConfigParser
44
from io import StringIO
5-
from pathlib import Path
6-
from typing import Any, Optional, Self
5+
from pathlib import PurePosixPath
6+
from typing import Any, Final, Optional, Self
77

8+
from kubernetes import client
89
from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema
910

1011
from renku_data_services.base_models import APIUser
1112
from renku_data_services.notebooks.api.classes.cloud_storage import ICloudStorageRequest
1213
from renku_data_services.notebooks.config import _NotebooksConfig
1314

15+
_sanitize_for_serialization = client.ApiClient().sanitize_for_serialization
16+
1417

1518
class RCloneStorageRequest(Schema):
1619
"""Request for RClone based storage."""
@@ -36,6 +39,8 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None:
3639
class RCloneStorage(ICloudStorageRequest):
3740
"""RClone based storage."""
3841

42+
pvc_secret_annotation_name: Final[str] = "csi-rclone.dev/secretName"
43+
3944
def __init__(
4045
self,
4146
source_path: str,
@@ -60,7 +65,7 @@ async def storage_from_schema(
6065
user: APIUser,
6166
internal_gitlab_user: APIUser,
6267
project_id: int,
63-
work_dir: Path,
68+
work_dir: PurePosixPath,
6469
config: _NotebooksConfig,
6570
) -> Self:
6671
"""Create storage object from request."""
@@ -92,8 +97,73 @@ async def storage_from_schema(
9297
await config.storage_validator.validate_storage_configuration(configuration, source_path)
9398
return cls(source_path, configuration, readonly, mount_folder, name, config)
9499

100+
def pvc(
101+
self,
102+
base_name: str,
103+
namespace: str,
104+
labels: dict[str, str] | None = None,
105+
annotations: dict[str, str] | None = None,
106+
) -> client.V1PersistentVolumeClaim:
107+
"""The PVC for mounting cloud storage."""
108+
return client.V1PersistentVolumeClaim(
109+
metadata=client.V1ObjectMeta(
110+
name=base_name,
111+
namespace=namespace,
112+
annotations={self.pvc_secret_annotation_name: base_name} | (annotations or {}),
113+
labels={"name": base_name} | (labels or {}),
114+
),
115+
spec=client.V1PersistentVolumeClaimSpec(
116+
access_modes=["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
117+
resources=client.V1VolumeResourceRequirements(requests={"storage": "10Gi"}),
118+
storage_class_name=self.config.cloud_storage.storage_class,
119+
),
120+
)
121+
122+
def volume_mount(self, base_name: str) -> client.V1VolumeMount:
123+
"""The volume mount for cloud storage."""
124+
return client.V1VolumeMount(
125+
mount_path=self.mount_folder,
126+
name=base_name,
127+
read_only=self.readonly,
128+
)
129+
130+
def volume(self, base_name: str) -> client.V1Volume:
131+
"""The volume entry for the statefulset specification."""
132+
return client.V1Volume(
133+
name=base_name,
134+
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
135+
claim_name=base_name, read_only=self.readonly
136+
),
137+
)
138+
139+
def secret(
140+
self,
141+
base_name: str,
142+
namespace: str,
143+
labels: dict[str, str] | None = None,
144+
annotations: dict[str, str] | None = None,
145+
) -> client.V1Secret:
146+
"""The secret containing the configuration for the rclone csi driver."""
147+
return client.V1Secret(
148+
metadata=client.V1ObjectMeta(
149+
name=base_name,
150+
namespace=namespace,
151+
annotations=annotations,
152+
labels={"name": base_name} | (labels or {}),
153+
),
154+
string_data={
155+
"remote": self.name or base_name,
156+
"remotePath": self.source_path,
157+
"configData": self.config_string(self.name or base_name),
158+
},
159+
)
160+
95161
def get_manifest_patch(
96-
self, base_name: str, namespace: str, labels: dict = {}, annotations: dict = {}
162+
self,
163+
base_name: str,
164+
namespace: str,
165+
labels: dict[str, str] | None = None,
166+
annotations: dict[str, str] | None = None,
97167
) -> list[dict[str, Any]]:
98168
"""Get server manifest patch."""
99169
patches = []
@@ -104,57 +174,22 @@ def get_manifest_patch(
104174
{
105175
"op": "add",
106176
"path": f"/{base_name}-pv",
107-
"value": {
108-
"apiVersion": "v1",
109-
"kind": "PersistentVolumeClaim",
110-
"metadata": {
111-
"name": base_name,
112-
"labels": {"name": base_name},
113-
},
114-
"spec": {
115-
"accessModes": ["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
116-
"resources": {"requests": {"storage": "10Gi"}},
117-
"storageClassName": self.config.cloud_storage.storage_class,
118-
},
119-
},
177+
"value": _sanitize_for_serialization(self.pvc(base_name, namespace, labels, annotations)),
120178
},
121179
{
122180
"op": "add",
123181
"path": f"/{base_name}-secret",
124-
"value": {
125-
"apiVersion": "v1",
126-
"kind": "Secret",
127-
"metadata": {
128-
"name": base_name,
129-
"labels": {"name": base_name},
130-
},
131-
"type": "Opaque",
132-
"stringData": {
133-
"remote": self.name or base_name,
134-
"remotePath": self.source_path,
135-
"configData": self.config_string(self.name or base_name),
136-
},
137-
},
182+
"value": _sanitize_for_serialization(self.secret(base_name, namespace, labels, annotations)),
138183
},
139184
{
140185
"op": "add",
141186
"path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-",
142-
"value": {
143-
"mountPath": self.mount_folder,
144-
"name": base_name,
145-
"readOnly": self.readonly,
146-
},
187+
"value": _sanitize_for_serialization(self.volume_mount(base_name)),
147188
},
148189
{
149190
"op": "add",
150191
"path": "/statefulset/spec/template/spec/volumes/-",
151-
"value": {
152-
"name": base_name,
153-
"persistentVolumeClaim": {
154-
"claimName": base_name,
155-
"readOnly": self.readonly,
156-
},
157-
},
192+
"value": _sanitize_for_serialization(self.volume(base_name)),
158193
},
159194
],
160195
}

components/renku_data_services/notebooks/blueprints.py

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from dataclasses import dataclass
88
from datetime import UTC, datetime
99
from math import floor
10-
from pathlib import Path
10+
from pathlib import PurePosixPath
1111
from typing import Any
1212
from urllib.parse import urljoin, urlparse
1313

@@ -55,6 +55,7 @@
5555
Authentication,
5656
AuthenticationType,
5757
Culling,
58+
DataSource,
5859
ExtraContainer,
5960
ExtraVolume,
6061
ExtraVolumeMount,
@@ -64,7 +65,8 @@
6465
Resources,
6566
SecretAsVolume,
6667
SecretAsVolumeItem,
67-
SecretRef,
68+
SecretRefKey,
69+
SecretRefWhole,
6870
Session,
6971
SessionEnvItem,
7072
Storage,
@@ -85,6 +87,7 @@
8587
from renku_data_services.project.db import ProjectRepository
8688
from renku_data_services.repositories.db import GitRepositoriesRepository
8789
from renku_data_services.session.db import SessionRepository
90+
from renku_data_services.storage.db import StorageV2Repository
8891

8992

9093
@dataclass(kw_only=True)
@@ -414,7 +417,7 @@ async def launch_notebook_helper(
414417
if lfs_auto_fetch is not None:
415418
parsed_server_options.lfs_auto_fetch = lfs_auto_fetch
416419

417-
image_work_dir = image_repo.image_workdir(parsed_image) or Path("/")
420+
image_work_dir = image_repo.image_workdir(parsed_image) or PurePosixPath("/")
418421
mount_path = image_work_dir / "work"
419422

420423
server_work_dir = mount_path / gl_project_path
@@ -429,7 +432,7 @@ async def launch_notebook_helper(
429432
cstorage.model_dump(),
430433
user=user,
431434
project_id=gl_project_id,
432-
work_dir=server_work_dir.absolute(),
435+
work_dir=server_work_dir,
433436
config=nb_config,
434437
internal_gitlab_user=internal_gitlab_user,
435438
)
@@ -773,6 +776,7 @@ class NotebooksNewBP(CustomBlueprint):
773776
project_repo: ProjectRepository
774777
session_repo: SessionRepository
775778
rp_repo: ResourcePoolRepository
779+
storage_repo: StorageV2Repository
776780

777781
def start(self) -> BlueprintFactoryResponse:
778782
"""Start a session with the new operator."""
@@ -804,17 +808,49 @@ async def _handler(
804808
parsed_server_options = await self.nb_config.crc_validator.validate_class_storage(
805809
user, resource_class_id, body.disk_storage
806810
)
807-
work_dir = Path("/home/jovyan/work")
811+
work_dir = environment.working_directory
808812
user_secrets: K8sUserSecrets | None = None
809813
# if body.user_secrets:
810814
# user_secrets = K8sUserSecrets(
811815
# name=server_name,
812816
# user_secret_ids=body.user_secrets.user_secret_ids,
813817
# mount_path=body.user_secrets.mount_path,
814818
# )
815-
cloud_storage: list[RCloneStorage] = []
819+
cloud_storages_db = await self.storage_repo.get_storage(
820+
user=user, project_id=project.id, include_secrets=True
821+
)
822+
cloud_storage: dict[str, RCloneStorage] = {
823+
str(s.storage_id): RCloneStorage(
824+
source_path=s.source_path,
825+
mount_folder=(work_dir / s.target_path).as_posix(),
826+
configuration=s.configuration.model_dump(mode="python"),
827+
readonly=s.readonly,
828+
config=self.nb_config,
829+
name=s.name,
830+
)
831+
for s in cloud_storages_db
832+
}
833+
cloud_storage_request: dict[str, RCloneStorage] = {
834+
s.storage_id: RCloneStorage(
835+
source_path=s.source_path,
836+
mount_folder=(work_dir / s.target_path).as_posix(),
837+
configuration=s.configuration,
838+
readonly=s.readonly,
839+
config=self.nb_config,
840+
name=None,
841+
)
842+
for s in body.cloudstorage or []
843+
if s.storage_id is not None
844+
}
845+
# NOTE: Check the cloud storage in the request body and if any match
846+
# then overwrite the projects cloud storages
847+
# NOTE: Cloud storages in the session launch request body that are not form the DB are ignored
848+
for csr_id, csr in cloud_storage_request.items():
849+
if csr_id in cloud_storage:
850+
cloud_storage[csr_id] = csr
816851
# repositories = [Repository(i.url, branch=i.branch, commit_sha=i.commit_sha) for i in body.repositories]
817852
repositories = [Repository(url=i) for i in project.repositories]
853+
secrets_to_create: list[V1Secret] = []
818854
server = Renku2UserServer(
819855
user=user,
820856
image=image,
@@ -824,7 +860,7 @@ async def _handler(
824860
server_options=parsed_server_options,
825861
environment_variables={},
826862
user_secrets=user_secrets,
827-
cloudstorage=cloud_storage,
863+
cloudstorage=[i for i in cloud_storage.values()],
828864
k8s_client=self.nb_config.k8s_v2_client,
829865
workspace_mount_path=work_dir,
830866
work_dir=work_dir,
@@ -834,6 +870,14 @@ async def _handler(
834870
is_image_private=False,
835871
internal_gitlab_user=internal_gitlab_user,
836872
)
873+
# Generate the cloud starge secrets
874+
data_sources: list[DataSource] = []
875+
for ics, cs in enumerate(cloud_storage.values()):
876+
secret_name = f"{server_name}-ds-{ics}"
877+
secrets_to_create.append(cs.secret(secret_name, server.k8s_client.preferred_namespace))
878+
data_sources.append(
879+
DataSource(mountPath=cs.mount_folder, secretRef=SecretRefWhole(name=secret_name, adopt=True))
880+
)
837881
cert_init, cert_vols = init_containers.certificates_container(self.nb_config)
838882
session_init_containers = [InitContainer.model_validate(self.nb_config.k8s_v2_client.sanitize(cert_init))]
839883
extra_volumes = [ExtraVolume.model_validate(self.nb_config.k8s_v2_client.sanitize(i)) for i in cert_vols]
@@ -867,7 +911,6 @@ async def _handler(
867911
metadata=Metadata(name=server_name, annotations=annotations),
868912
spec=AmaltheaSessionSpec(
869913
codeRepositories=[],
870-
dataSources=[],
871914
hibernated=False,
872915
session=Session(
873916
image=image,
@@ -914,13 +957,14 @@ async def _handler(
914957
type=AuthenticationType.oauth2proxy
915958
if isinstance(user, AuthenticatedAPIUser)
916959
else AuthenticationType.token,
917-
secretRef=SecretRef(name=server_name, key="auth", adopt=True),
960+
secretRef=SecretRefKey(name=server_name, key="auth", adopt=True),
918961
extraVolumeMounts=[
919962
ExtraVolumeMount(name="renku-authorized-emails", mountPath="/authorized_emails")
920963
]
921964
if isinstance(user, AuthenticatedAPIUser)
922965
else [],
923966
),
967+
dataSources=data_sources,
924968
),
925969
)
926970
parsed_proxy_url = urlparse(urljoin(server.server_url + "/", "oauth2"))
@@ -951,12 +995,14 @@ async def _handler(
951995
"verbose": True,
952996
}
953997
)
954-
secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
955-
secret = await self.nb_config.k8s_v2_client.create_secret(secret)
998+
secrets_to_create.append(V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data))
999+
for s in secrets_to_create:
1000+
await self.nb_config.k8s_v2_client.create_secret(s)
9561001
try:
9571002
manifest = await self.nb_config.k8s_v2_client.create_server(manifest, user.id)
9581003
except Exception:
959-
await self.nb_config.k8s_v2_client.delete_secret(secret.metadata.name)
1004+
for s in secrets_to_create:
1005+
await self.nb_config.k8s_v2_client.delete_secret(s.metadata.name)
9601006
raise errors.ProgrammingError(message="Could not start the amalthea session")
9611007

9621008
return json(manifest.as_apispec().model_dump(mode="json", exclude_none=True), 201)

components/renku_data_services/notebooks/cr_amalthea_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# generated by datamodel-codegen:
22
# filename: <stdin>
3-
# timestamp: 2024-09-04T22:45:28+00:00
3+
# timestamp: 2024-09-04T21:22:45+00:00
44

55
from __future__ import annotations
66

0 commit comments

Comments
 (0)