Skip to content

Commit ceec337

Browse files
committed
add DatabaseManager class
1 parent 576fecf commit ceec337

File tree

9 files changed

+145
-25
lines changed

9 files changed

+145
-25
lines changed

conftest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from sqlalchemy import create_engine
55
from sqlalchemy.orm import sessionmaker
66

7+
from jupyter_scheduler.managers import SQLAlchemyDatabaseManager
78
from jupyter_scheduler.orm import Base
89
from jupyter_scheduler.scheduler import Scheduler
910
from jupyter_scheduler.tests.mocks import MockEnvironmentManager
@@ -59,6 +60,8 @@ def jp_scheduler(jp_scheduler_db_url, jp_scheduler_root_dir, jp_scheduler_db):
5960
db_url=jp_scheduler_db_url,
6061
root_dir=str(jp_scheduler_root_dir),
6162
environments_manager=MockEnvironmentManager(),
63+
database_manager=SQLAlchemyDatabaseManager(),
64+
database_manager_class="jupyter_scheduler.managers.SQLAlchemyDatabaseManager",
6265
)
6366

6467

jupyter_scheduler/executors.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import importlib
12
import io
23
import os
34
import shutil
@@ -29,12 +30,30 @@ class ExecutionManager(ABC):
2930
_model = None
3031
_db_session = None
3132

32-
def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]):
33+
def __init__(
34+
self,
35+
job_id: str,
36+
root_dir: str,
37+
db_url: str,
38+
staging_paths: Dict[str, str],
39+
database_manager_class,
40+
):
3341
self.job_id = job_id
3442
self.staging_paths = staging_paths
3543
self.root_dir = root_dir
3644
self.db_url = db_url
3745

46+
self.database_manager = self._create_database_manager(database_manager_class)
47+
48+
def _create_database_manager(self, database_manager_class):
49+
try:
50+
module_name, class_name = database_manager_class.rsplit(".", 1)
51+
module = importlib.import_module(module_name)
52+
DatabaseManagerClass = getattr(module, class_name)
53+
return DatabaseManagerClass()
54+
except (ValueError, ImportError, AttributeError) as e:
55+
raise ValueError(f"Invalid database_manager_class '{database_manager_class}': {e}")
56+
3857
@property
3958
def model(self):
4059
if self._model is None:
@@ -46,7 +65,7 @@ def model(self):
4665
@property
4766
def db_session(self):
4867
if self._db_session is None:
49-
self._db_session = create_session(self.db_url)
68+
self._db_session = create_session(self.db_url, self.database_manager)
5069

5170
return self._db_session
5271

jupyter_scheduler/extension.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ class SchedulerApp(ExtensionApp):
4545
def _db_url_default(self):
4646
return f"sqlite:///{jupyter_data_dir()}/scheduler.sqlite"
4747

48+
database_manager_class = Type(
49+
default_value="jupyter_scheduler.managers.SQLAlchemyDatabaseManager",
50+
klass="jupyter_scheduler.managers.DatabaseManager",
51+
config=True,
52+
help=_i18n("Database manager class for custom database backends."),
53+
)
54+
4855
environment_manager_class = Type(
4956
default_value="jupyter_scheduler.environments.CondaEnvironmentManager",
5057
klass="jupyter_scheduler.environments.EnvironmentManager",
@@ -69,7 +76,8 @@ def _db_url_default(self):
6976
def initialize_settings(self):
7077
super().initialize_settings()
7178

72-
create_tables(self.db_url, self.drop_tables)
79+
database_manager = self.database_manager_class()
80+
create_tables(self.db_url, self.drop_tables, database_manager=database_manager)
7381

7482
environments_manager = self.environment_manager_class()
7583

@@ -78,6 +86,8 @@ def initialize_settings(self):
7886
environments_manager=environments_manager,
7987
db_url=self.db_url,
8088
config=self.config,
89+
database_manager=database_manager,
90+
database_manager_class=self.database_manager_class,
8191
)
8292

8393
job_files_manager = self.job_files_manager_class(scheduler=scheduler)

jupyter_scheduler/job_files_manager.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,23 @@ def generate_filepaths(self):
6161
output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format])
6262
if not os.path.exists(output_filepath) or self.redownload:
6363
yield input_filepath, output_filepath
64+
65+
if self.staging_paths:
66+
staging_dir = os.path.dirname(next(iter(self.staging_paths.values())))
67+
if os.path.exists(staging_dir):
68+
explicit_files = set()
69+
for output_format in output_formats:
70+
if output_format in self.staging_paths:
71+
explicit_files.add(os.path.basename(self.staging_paths[output_format]))
72+
73+
for file_name in os.listdir(staging_dir):
74+
file_path = os.path.join(staging_dir, file_name)
75+
if os.path.isfile(file_path) and file_name not in explicit_files:
76+
input_filepath = file_path
77+
output_filepath = os.path.join(self.output_dir, file_name)
78+
if not os.path.exists(output_filepath) or self.redownload:
79+
yield input_filepath, output_filepath
80+
6481
if self.include_staging_files:
6582
staging_dir = os.path.dirname(self.staging_paths["input"])
6683
for file_relative_path in self.output_filenames["files"]:

jupyter_scheduler/managers.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from abc import ABC, abstractmethod
2+
from sqlite3 import OperationalError
3+
4+
from sqlalchemy import create_engine
5+
from sqlalchemy.orm import sessionmaker
6+
7+
from jupyter_scheduler.orm import Base as DefaultBase
8+
from jupyter_scheduler.orm import update_db_schema
9+
10+
11+
class DatabaseManager(ABC):
12+
"""Base class for database managers.
13+
14+
Database managers handle database operations for jupyter-scheduler.
15+
Subclasses can implement custom storage backends (K8s, Redis, etc.)
16+
while maintaining compatibility with the scheduler's session interface.
17+
"""
18+
19+
@abstractmethod
20+
def create_session(self, db_url: str):
21+
"""Create a database session.
22+
23+
Args:
24+
db_url: Database URL (e.g., "k8s://namespace", "redis://localhost")
25+
26+
Returns:
27+
Session object compatible with SQLAlchemy session interface
28+
"""
29+
pass
30+
31+
@abstractmethod
32+
def create_tables(self, db_url: str, drop_tables: bool = False, Base=None):
33+
"""Create database tables/schema.
34+
35+
Args:
36+
db_url: Database URL
37+
drop_tables: Whether to drop existing tables first
38+
Base: SQLAlchemy Base for custom schemas (tests)
39+
"""
40+
pass
41+
42+
43+
class SQLAlchemyDatabaseManager(DatabaseManager):
44+
"""Default database manager using SQLAlchemy."""
45+
46+
def create_session(self, db_url: str):
47+
"""Create SQLAlchemy session factory."""
48+
engine = create_engine(db_url, echo=False)
49+
Session = sessionmaker(bind=engine)
50+
return Session
51+
52+
def create_tables(self, db_url: str, drop_tables: bool = False, Base=None):
53+
"""Create database tables using SQLAlchemy."""
54+
if Base is None:
55+
Base = DefaultBase
56+
57+
engine = create_engine(db_url)
58+
update_db_schema(engine, Base)
59+
60+
try:
61+
if drop_tables:
62+
Base.metadata.drop_all(engine)
63+
except OperationalError:
64+
pass
65+
finally:
66+
Base.metadata.create_all(engine)

jupyter_scheduler/orm.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -146,21 +146,9 @@ def update_db_schema(engine, Base):
146146
connection.execute(alter_statement)
147147

148148

149-
def create_tables(db_url, drop_tables=False, Base=Base):
150-
engine = create_engine(db_url)
151-
update_db_schema(engine, Base)
149+
def create_tables(db_url, drop_tables=False, Base=Base, *, database_manager):
150+
database_manager.create_tables(db_url, drop_tables, Base)
152151

153-
try:
154-
if drop_tables:
155-
Base.metadata.drop_all(engine)
156-
except OperationalError:
157-
pass
158-
finally:
159-
Base.metadata.create_all(engine)
160152

161-
162-
def create_session(db_url):
163-
engine = create_engine(db_url, echo=False)
164-
Session = sessionmaker(bind=engine)
165-
166-
return Session
153+
def create_session(db_url, database_manager):
154+
return database_manager.create_session(db_url)

jupyter_scheduler/scheduler.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,19 +405,23 @@ def __init__(
405405
environments_manager: Type[EnvironmentManager],
406406
db_url: str,
407407
config=None,
408+
database_manager=None,
409+
database_manager_class=None,
408410
**kwargs,
409411
):
410412
super().__init__(
411413
root_dir=root_dir, environments_manager=environments_manager, config=config, **kwargs
412414
)
413415
self.db_url = db_url
416+
self.database_manager = database_manager
417+
self.database_manager_class = database_manager_class
414418
if self.task_runner_class:
415419
self.task_runner = self.task_runner_class(scheduler=self, config=config)
416420

417421
@property
418422
def db_session(self):
419423
if not self._db_session:
420-
self._db_session = create_session(self.db_url)
424+
self._db_session = create_session(self.db_url, self.database_manager)
421425

422426
return self._db_session
423427

@@ -492,6 +496,7 @@ def create_job(self, model: CreateJob) -> str:
492496
staging_paths=staging_paths,
493497
root_dir=self.root_dir,
494498
db_url=self.db_url,
499+
database_manager_class=self.database_manager_class,
495500
).process
496501
)
497502
p.start()

jupyter_scheduler/tests/test_execution_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def test_add_side_effects_files(
5353
root_dir=jp_scheduler_root_dir,
5454
db_url=jp_scheduler_db_url,
5555
staging_paths={"input": staged_notebook_file_path},
56+
database_manager_class="jupyter_scheduler.managers.SQLAlchemyDatabaseManager",
5657
)
5758
manager.add_side_effects_files(staged_notebook_dir)
5859

jupyter_scheduler/tests/test_orm.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,16 @@
1313

1414

1515
@pytest.fixture
16-
def initial_db(jp_scheduler_db_url) -> tuple[Type[DeclarativeMeta], sessionmaker, str]:
16+
def database_manager():
17+
from jupyter_scheduler.managers import SQLAlchemyDatabaseManager
18+
19+
return SQLAlchemyDatabaseManager()
20+
21+
22+
@pytest.fixture
23+
def initial_db(
24+
jp_scheduler_db_url, database_manager
25+
) -> tuple[Type[DeclarativeMeta], sessionmaker, str]:
1726
TestBase = declarative_base()
1827

1928
class MockInitialJob(TestBase):
@@ -24,9 +33,9 @@ class MockInitialJob(TestBase):
2433

2534
initial_job = MockInitialJob(runtime_environment_name="abc", input_filename="input.ipynb")
2635

27-
create_tables(db_url=jp_scheduler_db_url, Base=TestBase)
36+
create_tables(db_url=jp_scheduler_db_url, Base=TestBase, database_manager=database_manager)
2837

29-
Session = create_session(jp_scheduler_db_url)
38+
Session = create_session(jp_scheduler_db_url, database_manager)
3039
session = Session()
3140

3241
session.add(initial_job)
@@ -52,7 +61,9 @@ class MockUpdatedJob(TestBase):
5261
return MockUpdatedJob
5362

5463

55-
def test_create_tables_with_new_column(jp_scheduler_db_url, initial_db, updated_job_model):
64+
def test_create_tables_with_new_column(
65+
jp_scheduler_db_url, initial_db, updated_job_model, database_manager
66+
):
5667
TestBase, Session, initial_job_id = initial_db
5768

5869
session = Session()
@@ -61,7 +72,7 @@ def test_create_tables_with_new_column(jp_scheduler_db_url, initial_db, updated_
6172
session.close()
6273

6374
JobModel = updated_job_model
64-
create_tables(db_url=jp_scheduler_db_url, Base=TestBase)
75+
create_tables(db_url=jp_scheduler_db_url, Base=TestBase, database_manager=database_manager)
6576

6677
session = Session()
6778
updated_columns = {col["name"] for col in inspect(session.bind).get_columns("jobs")}

0 commit comments

Comments
 (0)