Skip to content

Commit 7401309

Browse files
committed
implement K8sDatabaseManager
1 parent c4918f4 commit 7401309

File tree

3 files changed

+79
-129
lines changed

3 files changed

+79
-129
lines changed

src/jupyter_scheduler_k8s/__init__.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
"""Kubernetes backend for jupyter-scheduler."""
22

3-
# Import k8s_orm FIRST to auto-install K8s database backend before anything else
4-
from . import k8s_orm
5-
63
from .executors import K8sExecutionManager
4+
from .database_manager import K8sDatabaseManager
75

86
__version__ = "0.1.0"
9-
__all__ = ["K8sExecutionManager"]
7+
__all__ = ["K8sExecutionManager", "K8sDatabaseManager"]
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from kubernetes import client, config
2+
from jupyter_scheduler.managers import DatabaseManager
3+
4+
from .k8s_orm import K8sSession
5+
6+
7+
class K8sDatabaseManager(DatabaseManager):
8+
"""Database manager that uses Kubernetes Jobs for storage."""
9+
10+
def create_session(self, db_url: str):
11+
"""Create K8s session factory."""
12+
if not db_url.startswith("k8s://"):
13+
raise ValueError(f"K8sDatabaseManager only supports k8s:// URLs, got: {db_url}")
14+
15+
namespace = db_url[6:] or "default"
16+
17+
def session_factory():
18+
return K8sSession(namespace=namespace)
19+
return session_factory
20+
21+
def create_tables(self, db_url: str, drop_tables: bool = False):
22+
"""Ensure K8s namespace exists."""
23+
if not db_url.startswith("k8s://"):
24+
return
25+
26+
namespace = db_url[6:] or "default"
27+
28+
try:
29+
config.load_incluster_config()
30+
except config.ConfigException:
31+
config.load_kube_config()
32+
33+
v1 = client.CoreV1Api()
34+
35+
try:
36+
v1.read_namespace(name=namespace)
37+
except client.ApiException as e:
38+
if e.status == 404:
39+
namespace_body = client.V1Namespace(
40+
metadata=client.V1ObjectMeta(name=namespace)
41+
)
42+
v1.create_namespace(body=namespace_body)

src/jupyter_scheduler_k8s/k8s_orm.py

Lines changed: 35 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22

33
import json
44
import logging
5-
from datetime import datetime
6-
from typing import Any, Dict, List, Optional
5+
from typing import Any, Dict
76
from kubernetes import client, config
87
from kubernetes.client.rest import ApiException
98

10-
from jupyter_scheduler.models import Status
119
from jupyter_scheduler.utils import get_utc_timestamp
1210

1311
logger = logging.getLogger(__name__)
@@ -42,7 +40,7 @@ def _init_k8s_client(self):
4240
self.k8s_batch = client.BatchV1Api()
4341
self.k8s_core = client.CoreV1Api()
4442

45-
# Test connection
43+
# Validate connectivity before proceeding
4644
self.k8s_core.get_api_versions()
4745
except Exception as e:
4846
logger.error(f"Failed to initialize K8s clients: {e}")
@@ -58,15 +56,15 @@ def __exit__(self, exc_type, exc_val, exc_tb):
5856
self.rollback()
5957

6058
def query(self, model_class):
61-
"""Return K8s query object."""
59+
"""Create query for model class."""
6260
return K8sQuery(self, model_class)
6361

6462
def add(self, job):
65-
"""Buffer job creation for batch commit."""
63+
"""Buffer job for batch commit."""
6664
self._pending_operations.append(('create', job))
6765

6866
def commit(self):
69-
"""Execute all buffered operations."""
67+
"""Execute buffered operations."""
7068
if not self._pending_operations:
7169
return
7270

@@ -86,11 +84,12 @@ def commit(self):
8684
raise
8785

8886
def rollback(self):
89-
"""Clear pending operations (K8s doesn't support true rollback)."""
87+
"""Clear pending operations."""
88+
# K8s doesn't support transactions, only clear pending operations
9089
self._pending_operations.clear()
9190

9291
def _job_to_dict(self, job) -> Dict[str, Any]:
93-
"""Convert SQLAlchemy Job model to dict."""
92+
"""Convert Job model to dict."""
9493
return {
9594
"job_id": job.job_id,
9695
"name": job.name,
@@ -104,11 +103,12 @@ def _job_to_dict(self, job) -> Dict[str, Any]:
104103
}
105104

106105
def _create_k8s_job(self, job_data: Dict):
107-
"""Create placeholder K8s Job for database storage."""
106+
"""Create K8s Job for metadata storage."""
107+
# Creates minimal busybox job that stores metadata in labels/annotations
108108
job_id = job_data['job_id']
109109
job_name = f"js-{job_id[:8]}-{job_id[-4:]}"
110110

111-
# Create minimal job for metadata storage
111+
# Busybox container runs once then exits, leaving metadata intact
112112
job_spec = client.V1JobSpec(
113113
template=client.V1PodTemplateSpec(
114114
spec=client.V1PodSpec(
@@ -126,14 +126,14 @@ def _create_k8s_job(self, job_data: Dict):
126126
backoff_limit=0
127127
)
128128

129-
# Database labels for querying
129+
# Labels enable fast K8s label selector queries
130130
labels = {
131131
"app.kubernetes.io/managed-by": "jupyter-scheduler-k8s",
132132
"jupyter-scheduler.io/job-id": self._sanitize(job_data["job_id"]),
133133
"jupyter-scheduler.io/status": self._sanitize(job_data["status"]),
134134
}
135135

136-
# Add schedule indicator for Job vs JobDefinition differentiation
136+
# Differentiate Job from JobDefinition using schedule presence
137137
if job_data.get("schedule"):
138138
labels["jupyter-scheduler.io/has-schedule"] = "true"
139139
else:
@@ -142,7 +142,7 @@ def _create_k8s_job(self, job_data: Dict):
142142
if job_data.get("name"):
143143
labels["jupyter-scheduler.io/name"] = self._sanitize(job_data["name"])
144144

145-
# Full metadata in annotation
145+
# Store complete job data in annotation for retrieval
146146
annotations = {
147147
"jupyter-scheduler.io/job-data": json.dumps(job_data)
148148
}
@@ -169,7 +169,8 @@ def _create_k8s_job(self, job_data: Dict):
169169
raise
170170

171171
def _sanitize(self, value: str) -> str:
172-
"""Sanitize for K8s labels."""
172+
"""Sanitize value for K8s labels."""
173+
# K8s labels must be alphanumeric, max 63 chars
173174
value = str(value).lower()
174175
value = ''.join(c if c.isalnum() or c in '-_.' else '-' for c in value)
175176
return value.strip('-_.')[:63] or "none"
@@ -194,63 +195,63 @@ def __init__(self, session: K8sSession, model_class):
194195

195196
def filter(self, condition):
196197
"""Add filter condition."""
197-
# Parse SQLAlchemy-style condition
198+
# Convert SQLAlchemy conditions to K8s label selectors or annotation filters
198199
if hasattr(condition, 'left') and hasattr(condition.left, 'name'):
199200
field_name = condition.left.name
200201
value = getattr(condition.right, 'value', condition.right)
201202

202203
if field_name in ['job_id', 'status', 'name']:
203204
self._label_filters[f'jupyter-scheduler.io/{field_name.replace("_", "-")}'] = self.session._sanitize(str(value))
204205
else:
205-
# Store for annotation-based filtering
206+
# Complex fields stored in annotations, filtered post-query
206207
self._filters[field_name] = value
207208
elif hasattr(condition, 'type') and condition.type.name == 'in_':
208-
# Handle IN clauses like Job.status.in_(['COMPLETED', 'FAILED'])
209+
# IN clauses require annotation filtering since K8s labels don't support OR
209210
field_name = condition.left.name
210211
if field_name == 'status':
211-
# For IN clauses, we'll need to handle multiple label selectors
212+
# Multiple values require post-query filtering
212213
self._filters['status_in'] = [self.session._sanitize(str(v)) for v in condition.right.value]
213214

214215
return self
215216

216217
def update(self, values: Dict):
217-
"""Update job in K8s."""
218-
# Build label selector from all label filters
218+
"""Update matching jobs."""
219+
# Use labels for efficient K8s filtering
219220
label_selector = ",".join([f"{k}={v}" for k, v in self._label_filters.items()])
220221
if not label_selector:
221222
raise ValueError("Update requires filterable conditions")
222223

223-
# Find and update K8s Jobs
224+
# Query matching jobs using label selector
224225
jobs = self.session.k8s_batch.list_namespaced_job(
225226
namespace=self.session.namespace,
226227
label_selector=label_selector
227228
)
228229

229230
for job in jobs.items:
230-
# Update annotation with new data
231+
# Merge new values into existing job data
231232
if job.metadata.annotations and "jupyter-scheduler.io/job-data" in job.metadata.annotations:
232233
job_data = json.loads(job.metadata.annotations["jupyter-scheduler.io/job-data"])
233234
job_data.update(values)
234235
job_data["update_time"] = get_utc_timestamp()
235236

236-
# Update annotation
237+
# Store updated data back to annotation
237238
job.metadata.annotations["jupyter-scheduler.io/job-data"] = json.dumps(job_data)
238239

239-
# Update corresponding labels if changed
240+
# Sync searchable fields to labels for query performance
240241
for field, value in values.items():
241242
if field in ['status', 'name']:
242243
label_key = f"jupyter-scheduler.io/{field.replace('_', '-')}"
243244
job.metadata.labels[label_key] = self.session._sanitize(str(value))
244245

245-
# Patch the job
246+
# Apply changes to K8s resource
246247
self.session.k8s_batch.patch_namespaced_job(
247248
name=job.metadata.name,
248249
namespace=self.session.namespace,
249250
body=job
250251
)
251252

252253
def one(self):
253-
"""Get single job (throw if not found)."""
254+
"""Get single job or raise."""
254255
result = self.first()
255256
if result is None:
256257
raise ValueError("Job not found")
@@ -283,8 +284,8 @@ def delete(self):
283284
)
284285

285286
def _get_matching_jobs(self):
286-
"""Get K8s jobs matching current filters."""
287-
# Build label selector
287+
"""Query jobs matching filters."""
288+
# Use labels for efficient server-side filtering
288289
label_selector = ",".join([f"{k}={v}" for k, v in self._label_filters.items()])
289290

290291
jobs = self.session.k8s_batch.list_namespaced_job(
@@ -297,14 +298,14 @@ def _get_matching_jobs(self):
297298
if job.metadata.annotations and "jupyter-scheduler.io/job-data" in job.metadata.annotations:
298299
job_data = json.loads(job.metadata.annotations["jupyter-scheduler.io/job-data"])
299300

300-
# Apply annotation-based filters
301+
# Post-filter using annotation data for complex conditions
301302
if self._matches_annotation_filters(job_data):
302303
results.append(self._dict_to_job(job_data))
303304

304305
return results
305306

306307
def _matches_annotation_filters(self, job_data: Dict) -> bool:
307-
"""Check if job data matches annotation-based filters."""
308+
"""Check annotation-based filter matches."""
308309
for field, value in self._filters.items():
309310
if field == 'status_in':
310311
if job_data.get('status') not in value:
@@ -316,7 +317,7 @@ def _matches_annotation_filters(self, job_data: Dict) -> bool:
316317
if not job_data.get('start_time') or job_data['start_time'] < value:
317318
return False
318319
elif field.endswith('_like'):
319-
# Handle LIKE queries (e.g., name LIKE 'prefix%')
320+
# SQL LIKE converted to string prefix matching
320321
actual_field = field[:-5]
321322
actual_value = job_data.get(actual_field, "")
322323
if not actual_value.startswith(str(value).rstrip('%')):
@@ -327,101 +328,10 @@ def _matches_annotation_filters(self, job_data: Dict) -> bool:
327328
return True
328329

329330
def _dict_to_job(self, job_data: Dict):
330-
"""Convert dict back to Job-like object."""
331+
"""Convert dict to Job-like object."""
331332
class JobRecord:
332333
def __init__(self, data):
333334
for k, v in data.items():
334335
setattr(self, k, v)
335336

336-
return JobRecord(job_data)
337-
338-
339-
# Store original functions for fallback
340-
_original_create_session = None
341-
_original_create_tables = None
342-
343-
344-
def k8s_create_session(db_url):
345-
"""K8s session factory that replaces SQLAlchemy."""
346-
if db_url.startswith("k8s://"):
347-
namespace = db_url[6:] or "default"
348-
def session_factory():
349-
return K8sSession(namespace=namespace)
350-
return session_factory
351-
else:
352-
# Fallback to original SQLAlchemy implementation
353-
if _original_create_session:
354-
return _original_create_session(db_url)
355-
else:
356-
# Import here to avoid circular imports
357-
from jupyter_scheduler.orm import create_session as original_create_session
358-
return original_create_session(db_url)
359-
360-
361-
def k8s_create_tables(db_url, drop_tables=False, Base=None):
362-
"""K8s equivalent of create_tables - ensure namespace exists."""
363-
if db_url.startswith("k8s://"):
364-
namespace = db_url[6:] or "default"
365-
366-
try:
367-
config.load_incluster_config()
368-
except config.ConfigException:
369-
config.load_kube_config()
370-
371-
k8s_core = client.CoreV1Api()
372-
373-
# Ensure namespace exists
374-
try:
375-
k8s_core.read_namespace(name=namespace)
376-
except ApiException as e:
377-
if e.status == 404:
378-
ns = client.V1Namespace(metadata=client.V1ObjectMeta(name=namespace))
379-
k8s_core.create_namespace(body=ns)
380-
logger.info(f"Created K8s namespace: {namespace}")
381-
382-
logger.info(f"K8s database initialized in namespace: {namespace}")
383-
else:
384-
# Fallback to original SQLAlchemy implementation
385-
if _original_create_tables:
386-
return _original_create_tables(db_url, drop_tables, Base)
387-
else:
388-
from jupyter_scheduler.orm import create_tables as original_create_tables
389-
return original_create_tables(db_url, drop_tables, Base)
390-
391-
392-
def install_k8s_backend():
393-
"""Install K8s backend by monkey patching jupyter_scheduler.orm functions."""
394-
try:
395-
import jupyter_scheduler.orm as orm
396-
397-
# Store originals for fallback
398-
global _original_create_session, _original_create_tables
399-
_original_create_session = orm.create_session
400-
_original_create_tables = orm.create_tables
401-
402-
# Replace with K8s-aware functions
403-
orm.create_session = k8s_create_session
404-
orm.create_tables = k8s_create_tables
405-
406-
# Also monkey patch SQLAlchemy's create_engine to handle k8s:// URLs
407-
import sqlalchemy
408-
original_create_engine = sqlalchemy.create_engine
409-
410-
def k8s_aware_create_engine(url, *args, **kwargs):
411-
if str(url).startswith("k8s://"):
412-
# Return a dummy engine object that won't be used
413-
# since our k8s_create_tables handles k8s:// URLs
414-
class DummyEngine:
415-
dialect = type('dialect', (), {'name': 'k8s'})()
416-
return DummyEngine()
417-
return original_create_engine(url, *args, **kwargs)
418-
419-
sqlalchemy.create_engine = k8s_aware_create_engine
420-
421-
logger.info("K8s database backend installed successfully")
422-
except ImportError:
423-
logger.warning("jupyter_scheduler not found, K8s backend not installed")
424-
425-
426-
# Auto-install K8s backend when this module is imported
427-
install_k8s_backend()
337+
return JobRecord(job_data)

0 commit comments

Comments
 (0)