Skip to content

Server-side support for event-driven notebook execution #416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion jupyter_scheduler/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from enum import Enum
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel, root_validator

Expand All @@ -11,6 +11,17 @@
SCHEDULE_RE = ""


class EventType(BaseModel):
name: str
parameters: Dict[str, Any]


class Event(BaseModel):
event_id: str
event_type: str
parameters: Dict[str, Any]


class RuntimeEnvironment(BaseModel):
"""Defines a runtime context where job
execution will happen. For example, conda
Expand All @@ -26,6 +37,7 @@ class RuntimeEnvironment(BaseModel):
compute_types: Optional[List[str]]
default_compute_type: Optional[str] # Should be a member of the compute_types list
utc_only: Optional[bool]
event_types: Optional[List[EventType]]

def __str__(self):
return self.json()
Expand Down Expand Up @@ -85,6 +97,7 @@ class CreateJob(BaseModel):
name: str
output_filename_template: Optional[str] = OUTPUT_FILENAME_TEMPLATE
compute_type: Optional[str] = None
create_event: Optional[Event] = None

@root_validator
def compute_input_filename(cls, values) -> Dict:
Expand Down Expand Up @@ -145,6 +158,7 @@ class DescribeJob(BaseModel):
status: Status = Status.CREATED
status_message: Optional[str] = None
downloaded: bool = False
create_event: Optional[Event] = None

class Config:
orm_mode = True
Expand Down Expand Up @@ -209,6 +223,7 @@ class CreateJobDefinition(BaseModel):
compute_type: Optional[str] = None
schedule: Optional[str] = None
timezone: Optional[str] = None
events: List[EventType] = []

@root_validator
def compute_input_filename(cls, values) -> Dict:
Expand All @@ -234,6 +249,7 @@ class DescribeJobDefinition(BaseModel):
create_time: int
update_time: int
active: bool
events: List[EventType] = []

class Config:
orm_mode = True
Expand All @@ -253,6 +269,7 @@ class UpdateJobDefinition(BaseModel):
active: Optional[bool] = None
compute_type: Optional[str] = None
input_uri: Optional[str] = None
events: List[EventType] = []


class ListJobDefinitionsQuery(BaseModel):
Expand Down
2 changes: 2 additions & 0 deletions jupyter_scheduler/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class Job(CommonColumns, Base):
url = Column(String(256), default=generate_jobs_url)
pid = Column(Integer)
idempotency_token = Column(String(256))
create_event = Column(JsonType(1024), nullable=True)


class JobDefinition(CommonColumns, Base):
Expand All @@ -108,6 +109,7 @@ class JobDefinition(CommonColumns, Base):
url = Column(String(256), default=generate_job_definitions_url)
create_time = Column(Integer, default=get_utc_timestamp)
active = Column(Boolean, default=True)
events = Column(JsonType(1024), default=list)


def create_tables(db_url, drop_tables=False):
Expand Down
42 changes: 35 additions & 7 deletions jupyter_scheduler/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,38 @@

from jupyter_scheduler.models import (
CreateJobDefinition,
EventType,
ListJobDefinitionsQuery,
SortDirection,
SortField,
UpdateJobDefinition,
)
from jupyter_scheduler.orm import JobDefinition

test_job_def_params = {
"input_uri": "helloworld.ipynb",
"runtime_environment_name": "default",
"name": "hello world",
"output_formats": ["ipynb"],
}

def test_create_job_definition(jp_scheduler):
event_type_params = {"name": "a", "parameters": {"foo": "bar"}}


def create_job_definition(jp_scheduler, job_def_params):
with patch("jupyter_scheduler.scheduler.fsspec") as mock_fsspec:
with patch("jupyter_scheduler.scheduler.Scheduler.file_exists") as mock_file_exists:
mock_file_exists.return_value = True
job_definition_id = jp_scheduler.create_job_definition(
CreateJobDefinition(
input_uri="helloworld.ipynb",
runtime_environment_name="default",
name="hello world",
output_formats=["ipynb"],
)
CreateJobDefinition(**job_def_params)
)

return job_definition_id


def test_create_job_definition(jp_scheduler):
job_definition_id = create_job_definition(jp_scheduler, test_job_def_params)

with jp_scheduler.db_session() as session:
definitions = session.query(JobDefinition).all()
assert 1 == len(definitions)
Expand All @@ -37,6 +48,20 @@ def test_create_job_definition(jp_scheduler):
assert "helloworld.ipynb" == definition.input_filename
assert "default" == definition.runtime_environment_name
assert "hello world" == definition.name
assert [] == definition.events


def test_create_job_definition_with_events(jp_scheduler):
params_with_events = {
**test_job_def_params,
"events": [EventType(**event_type_params)],
}
create_job_definition(jp_scheduler, params_with_events)

with jp_scheduler.db_session() as session:
definitions = session.query(JobDefinition).all()
definition = definitions[0]
assert [{"name": "a", "parameters": {"foo": "bar"}}] == definition.events


job_definition_1 = {
Expand All @@ -49,6 +74,7 @@ def test_create_job_definition(jp_scheduler):
"update_time": 1,
"create_time": 1,
"active": True,
"events": [],
}

job_definition_2 = {
Expand All @@ -62,6 +88,7 @@ def test_create_job_definition(jp_scheduler):
"create_time": 2,
"active": True,
"tags": ["tag_2"],
"events": [],
}

job_definition_3 = {
Expand All @@ -75,6 +102,7 @@ def test_create_job_definition(jp_scheduler):
"create_time": 3,
"active": False,
"tags": ["tag_3"],
"events": [],
}


Expand Down