Skip to content

Commit b0d7a63

Browse files
committed
add python backend
1 parent 7520d32 commit b0d7a63

File tree

4 files changed

+344
-0
lines changed

4 files changed

+344
-0
lines changed

jupyter_scheduler/backends.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,18 @@ class JupyterServerNotebookBackend(BaseBackend):
1717
priority = 0
1818

1919

20+
class JupyterServerPythonBackend(BaseBackend):
21+
"""Built-in backend executing Python scripts via subprocess on the Jupyter server."""
22+
23+
id = "jupyter_server_py"
24+
name = "Python Script"
25+
description = "Execute Python scripts on the Jupyter server"
26+
scheduler_class = "jupyter_scheduler.scheduler.Scheduler"
27+
execution_manager_class = "jupyter_scheduler.python_executor.PythonScriptExecutionManager"
28+
file_extensions = ["py"]
29+
priority = 0
30+
31+
2032
@dataclass
2133
class BackendConfig:
2234
"""Runtime configuration for an initialized backend instance."""
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import os
2+
import subprocess
3+
import sys
4+
from typing import Dict
5+
6+
import fsspec
7+
8+
from jupyter_scheduler.executors import ExecutionManager
9+
from jupyter_scheduler.models import JobFeature
10+
from jupyter_scheduler.orm import Job
11+
12+
13+
class PythonScriptExecutionManager(ExecutionManager):
14+
"""Execute Python scripts via subprocess."""
15+
16+
def execute(self):
17+
"""Execute the Python script and capture output."""
18+
job = self.model
19+
staging_dir = os.path.dirname(self.staging_paths["input"])
20+
21+
# Build environment with job parameters as JUPYTER_PARAM_* vars
22+
env = os.environ.copy()
23+
if job.parameters:
24+
for key, value in job.parameters.items():
25+
env[f"JUPYTER_PARAM_{key}"] = str(value)
26+
27+
# Execute script using sys.executable (guaranteed to work in all environments)
28+
result = subprocess.run(
29+
[sys.executable, self.staging_paths["input"]],
30+
cwd=staging_dir,
31+
capture_output=True,
32+
text=True,
33+
env=env,
34+
)
35+
36+
# Capture side effect files (same pattern as DefaultExecutionManager)
37+
self.add_side_effects_files(staging_dir)
38+
39+
# Write stdout/stderr to staging directory
40+
stdout_path = os.path.join(staging_dir, "stdout.log")
41+
stderr_path = os.path.join(staging_dir, "stderr.log")
42+
43+
with fsspec.open(stdout_path, "w", encoding="utf-8") as f:
44+
f.write(result.stdout)
45+
with fsspec.open(stderr_path, "w", encoding="utf-8") as f:
46+
f.write(result.stderr)
47+
48+
if result.returncode != 0:
49+
raise RuntimeError(
50+
f"Script exited with code {result.returncode}\nstderr: {result.stderr[:500]}"
51+
)
52+
53+
def add_side_effects_files(self, staging_dir: str):
54+
"""Scan for files created during execution and update job's packaged_files."""
55+
input_script = os.path.basename(self.staging_paths["input"])
56+
new_files = set()
57+
for root, _, files in os.walk(staging_dir):
58+
for file in files:
59+
rel_path = os.path.relpath(os.path.join(root, file), staging_dir)
60+
if rel_path != input_script:
61+
new_files.add(rel_path)
62+
63+
if new_files:
64+
with self.db_session() as session:
65+
current = set(
66+
session.query(Job.packaged_files)
67+
.filter(Job.job_id == self.job_id)
68+
.scalar()
69+
or []
70+
)
71+
session.query(Job).filter(Job.job_id == self.job_id).update(
72+
{"packaged_files": list(current.union(new_files))}
73+
)
74+
session.commit()
75+
76+
@classmethod
77+
def supported_features(cls) -> Dict[JobFeature, bool]:
78+
return {
79+
JobFeature.job_name: True,
80+
JobFeature.output_formats: False, # No notebook conversion for .py
81+
JobFeature.job_definition: False,
82+
JobFeature.idempotency_token: False,
83+
JobFeature.tags: False,
84+
JobFeature.email_notifications: False,
85+
JobFeature.timeout_seconds: False,
86+
JobFeature.retry_on_timeout: False,
87+
JobFeature.max_retries: False,
88+
JobFeature.min_retry_interval_millis: False,
89+
JobFeature.output_filename_template: False,
90+
JobFeature.stop_job: True,
91+
JobFeature.delete_job: True,
92+
}
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
import os
2+
from pathlib import Path
3+
from unittest.mock import MagicMock, patch
4+
5+
import pytest
6+
7+
from jupyter_scheduler.models import DescribeJob
8+
from jupyter_scheduler.orm import Job
9+
from jupyter_scheduler.python_executor import PythonScriptExecutionManager
10+
11+
12+
@pytest.fixture
13+
def python_script_staging_dir(jp_scheduler_staging_dir) -> Path:
14+
"""Create a staging directory with a simple Python script."""
15+
job_staging_dir = jp_scheduler_staging_dir / "job-py-1"
16+
job_staging_dir.mkdir()
17+
return job_staging_dir
18+
19+
20+
@pytest.fixture
21+
def simple_script(python_script_staging_dir) -> Path:
22+
"""Create a simple print script."""
23+
script_path = python_script_staging_dir / "test_script.py"
24+
script_path.write_text('print("Hello from Python script!")\n')
25+
return script_path
26+
27+
28+
@pytest.fixture
29+
def script_with_params(python_script_staging_dir) -> Path:
30+
"""Create a script that reads JUPYTER_PARAM_* env vars."""
31+
script_path = python_script_staging_dir / "param_script.py"
32+
script_path.write_text(
33+
"""import os
34+
learning_rate = os.environ.get('JUPYTER_PARAM_learning_rate', 'not_set')
35+
batch_size = os.environ.get('JUPYTER_PARAM_batch_size', 'not_set')
36+
print(f"lr={learning_rate}, batch={batch_size}")
37+
"""
38+
)
39+
return script_path
40+
41+
42+
@pytest.fixture
43+
def failing_script(python_script_staging_dir) -> Path:
44+
"""Create a script that exits with non-zero code."""
45+
script_path = python_script_staging_dir / "failing_script.py"
46+
script_path.write_text('import sys; print("error message", file=sys.stderr); sys.exit(1)\n')
47+
return script_path
48+
49+
50+
@pytest.fixture
51+
def script_with_side_effects(python_script_staging_dir) -> Path:
52+
"""Create a script that creates output files."""
53+
script_path = python_script_staging_dir / "side_effects_script.py"
54+
script_path.write_text(
55+
"""
56+
with open('output.txt', 'w') as f:
57+
f.write('Generated output')
58+
print("Created output.txt")
59+
"""
60+
)
61+
return script_path
62+
63+
64+
@pytest.fixture
65+
def python_job_record(simple_script, jp_scheduler_db) -> str:
66+
"""Create a job record for the Python script."""
67+
job = Job(
68+
name="test_python_job",
69+
runtime_environment_name="default",
70+
input_filename=simple_script.name,
71+
)
72+
jp_scheduler_db.add(job)
73+
jp_scheduler_db.commit()
74+
return job.job_id
75+
76+
77+
@pytest.fixture
78+
def python_job_with_params(script_with_params, jp_scheduler_db) -> str:
79+
"""Create a job record with parameters."""
80+
job = Job(
81+
name="test_python_job_with_params",
82+
runtime_environment_name="default",
83+
input_filename=script_with_params.name,
84+
parameters={"learning_rate": "0.01", "batch_size": "32"},
85+
)
86+
jp_scheduler_db.add(job)
87+
jp_scheduler_db.commit()
88+
return job.job_id
89+
90+
91+
class TestPythonScriptExecutionManager:
92+
def test_execute_simple_script(
93+
self,
94+
python_job_record,
95+
simple_script,
96+
jp_scheduler_root_dir,
97+
jp_scheduler_db_url,
98+
jp_scheduler_db,
99+
):
100+
"""Execute a simple print script and verify stdout is captured."""
101+
manager = PythonScriptExecutionManager(
102+
job_id=python_job_record,
103+
root_dir=str(jp_scheduler_root_dir),
104+
db_url=jp_scheduler_db_url,
105+
staging_paths={"input": str(simple_script)},
106+
)
107+
108+
# Execute should not raise
109+
manager.execute()
110+
111+
# Check stdout.log was created
112+
stdout_path = simple_script.parent / "stdout.log"
113+
assert stdout_path.exists()
114+
assert "Hello from Python script!" in stdout_path.read_text()
115+
116+
def test_execute_with_parameters(
117+
self,
118+
python_job_with_params,
119+
script_with_params,
120+
jp_scheduler_root_dir,
121+
jp_scheduler_db_url,
122+
jp_scheduler_db,
123+
):
124+
"""Parameters are passed as JUPYTER_PARAM_* env vars."""
125+
manager = PythonScriptExecutionManager(
126+
job_id=python_job_with_params,
127+
root_dir=str(jp_scheduler_root_dir),
128+
db_url=jp_scheduler_db_url,
129+
staging_paths={"input": str(script_with_params)},
130+
)
131+
132+
manager.execute()
133+
134+
stdout_path = script_with_params.parent / "stdout.log"
135+
content = stdout_path.read_text()
136+
assert "lr=0.01" in content
137+
assert "batch=32" in content
138+
139+
def test_execute_script_failure(
140+
self,
141+
failing_script,
142+
jp_scheduler_root_dir,
143+
jp_scheduler_db_url,
144+
jp_scheduler_db,
145+
):
146+
"""Non-zero exit code raises RuntimeError."""
147+
job = Job(
148+
name="test_failing_script",
149+
runtime_environment_name="default",
150+
input_filename=failing_script.name,
151+
)
152+
jp_scheduler_db.add(job)
153+
jp_scheduler_db.commit()
154+
155+
manager = PythonScriptExecutionManager(
156+
job_id=job.job_id,
157+
root_dir=str(jp_scheduler_root_dir),
158+
db_url=jp_scheduler_db_url,
159+
staging_paths={"input": str(failing_script)},
160+
)
161+
162+
with pytest.raises(RuntimeError) as exc_info:
163+
manager.execute()
164+
165+
assert "exited with code 1" in str(exc_info.value)
166+
assert "error message" in str(exc_info.value)
167+
168+
def test_stdout_stderr_captured(
169+
self,
170+
failing_script,
171+
jp_scheduler_root_dir,
172+
jp_scheduler_db_url,
173+
jp_scheduler_db,
174+
):
175+
"""Both stdout and stderr are written to files even on failure."""
176+
job = Job(
177+
name="test_stderr_capture",
178+
runtime_environment_name="default",
179+
input_filename=failing_script.name,
180+
)
181+
jp_scheduler_db.add(job)
182+
jp_scheduler_db.commit()
183+
184+
manager = PythonScriptExecutionManager(
185+
job_id=job.job_id,
186+
root_dir=str(jp_scheduler_root_dir),
187+
db_url=jp_scheduler_db_url,
188+
staging_paths={"input": str(failing_script)},
189+
)
190+
191+
with pytest.raises(RuntimeError):
192+
manager.execute()
193+
194+
stderr_path = failing_script.parent / "stderr.log"
195+
assert stderr_path.exists()
196+
assert "error message" in stderr_path.read_text()
197+
198+
def test_side_effects_captured(
199+
self,
200+
script_with_side_effects,
201+
jp_scheduler_root_dir,
202+
jp_scheduler_db_url,
203+
jp_scheduler_db,
204+
):
205+
"""Files created by the script are recorded in packaged_files."""
206+
job = Job(
207+
name="test_side_effects",
208+
runtime_environment_name="default",
209+
input_filename=script_with_side_effects.name,
210+
)
211+
jp_scheduler_db.add(job)
212+
jp_scheduler_db.commit()
213+
214+
manager = PythonScriptExecutionManager(
215+
job_id=job.job_id,
216+
root_dir=str(jp_scheduler_root_dir),
217+
db_url=jp_scheduler_db_url,
218+
staging_paths={"input": str(script_with_side_effects)},
219+
)
220+
221+
manager.execute()
222+
223+
# Refresh job from DB
224+
jp_scheduler_db.expire_all()
225+
job = jp_scheduler_db.query(Job).filter(Job.job_id == job.job_id).one()
226+
227+
# output.txt should be in packaged_files
228+
assert "output.txt" in job.packaged_files
229+
230+
def test_supported_features(self):
231+
"""Verify supported features match expected values."""
232+
from jupyter_scheduler.models import JobFeature
233+
234+
features = PythonScriptExecutionManager.supported_features()
235+
236+
assert features[JobFeature.job_name] is True
237+
assert features[JobFeature.output_formats] is False
238+
assert features[JobFeature.stop_job] is True
239+
assert features[JobFeature.delete_job] is True

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Homepage = "https://github.com/jupyter-server/jupyter-scheduler"
6060

6161
[project.entry-points."jupyter_scheduler.backends"]
6262
jupyter_server_nb = "jupyter_scheduler.backends:JupyterServerNotebookBackend"
63+
jupyter_server_py = "jupyter_scheduler.backends:JupyterServerPythonBackend"
6364

6465
[tool.check-wheel-contents]
6566
ignore = ["W002"]

0 commit comments

Comments
 (0)