Skip to content

Commit ccff0ac

Browse files
committed
log jobs and job defs in mlflow
1 parent e7a34bc commit ccff0ac

File tree

10 files changed

+145
-23
lines changed

10 files changed

+145
-23
lines changed

jupyter_scheduler/executors.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
from typing import Dict
88

99
import fsspec
10+
import mlflow
1011
import nbconvert
1112
import nbformat
1213
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor
1314

1415
from jupyter_scheduler.models import DescribeJob, JobFeature, Status
1516
from jupyter_scheduler.orm import Job, create_session
1617
from jupyter_scheduler.parameterize import add_parameters
18+
from jupyter_scheduler.scheduler import MLFLOW_SERVER_URI
1719
from jupyter_scheduler.utils import get_utc_timestamp
1820

1921

@@ -136,13 +138,17 @@ def execute(self):
136138
kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=staging_dir
137139
)
138140

139-
try:
140-
ep.preprocess(nb, {"metadata": {"path": staging_dir}})
141-
except CellExecutionError as e:
142-
raise e
143-
finally:
144-
self.add_side_effects_files(staging_dir)
145-
self.create_output_files(job, nb)
141+
mlflow.set_tracking_uri(MLFLOW_SERVER_URI)
142+
with mlflow.start_run(run_id=job.mlflow_run_id):
143+
try:
144+
ep.preprocess(nb, {"metadata": {"path": staging_dir}})
145+
if job.parameters:
146+
mlflow.log_params(job.parameters)
147+
except CellExecutionError as e:
148+
raise e
149+
finally:
150+
self.add_side_effects_files(staging_dir)
151+
self.create_output_files(job, nb)
146152

147153
def add_side_effects_files(self, staging_dir: str):
148154
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
@@ -170,8 +176,10 @@ def create_output_files(self, job: DescribeJob, notebook_node):
170176
for output_format in job.output_formats:
171177
cls = nbconvert.get_exporter(output_format)
172178
output, _ = cls().from_notebook_node(notebook_node)
173-
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
179+
output_path = self.staging_paths[output_format]
180+
with fsspec.open(output_path, "w", encoding="utf-8") as f:
174181
f.write(output)
182+
mlflow.log_artifact(output_path)
175183

176184
def supported_features(cls) -> Dict[JobFeature, bool]:
177185
return {

jupyter_scheduler/models.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ class CreateJob(BaseModel):
8686
output_filename_template: Optional[str] = OUTPUT_FILENAME_TEMPLATE
8787
compute_type: Optional[str] = None
8888
package_input_folder: Optional[bool] = None
89+
mlflow_logging: Optional[bool] = None
90+
mlflow_experiment_id: Optional[str] = None
91+
mlflow_run_id: Optional[str] = None
8992

9093
@root_validator
9194
def compute_input_filename(cls, values) -> Dict:
@@ -148,6 +151,9 @@ class DescribeJob(BaseModel):
148151
downloaded: bool = False
149152
package_input_folder: Optional[bool] = None
150153
packaged_files: Optional[List[str]] = []
154+
mlflow_logging: Optional[bool] = None
155+
mlflow_experiment_id: Optional[str] = None
156+
mlflow_run_id: Optional[str] = None
151157

152158
class Config:
153159
orm_mode = True
@@ -213,6 +219,8 @@ class CreateJobDefinition(BaseModel):
213219
schedule: Optional[str] = None
214220
timezone: Optional[str] = None
215221
package_input_folder: Optional[bool] = None
222+
mlflow_logging: Optional[bool] = None
223+
mlflow_experiment_id: Optional[str] = None
216224

217225
@root_validator
218226
def compute_input_filename(cls, values) -> Dict:
@@ -240,6 +248,8 @@ class DescribeJobDefinition(BaseModel):
240248
active: bool
241249
package_input_folder: Optional[bool] = None
242250
packaged_files: Optional[List[str]] = []
251+
mlflow_logging: Optional[bool] = None
252+
mlflow_experiment_id: Optional[str] = None
243253

244254
class Config:
245255
orm_mode = True

jupyter_scheduler/orm.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ class CommonColumns:
8787
create_time = Column(Integer, default=get_utc_timestamp)
8888
package_input_folder = Column(Boolean)
8989
packaged_files = Column(JsonType, default=[])
90+
mlflow_logging = Column(Boolean)
91+
mlflow_experiment_id = Column(String(256), nullable=True)
9092

9193

9294
class Job(CommonColumns, Base):
@@ -100,6 +102,7 @@ class Job(CommonColumns, Base):
100102
url = Column(String(256), default=generate_jobs_url)
101103
pid = Column(Integer)
102104
idempotency_token = Column(String(256))
105+
mlflow_run_id = Column(String(256), nullable=True)
103106

104107

105108
class JobDefinition(CommonColumns, Base):

jupyter_scheduler/scheduler.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
import shutil
55
from typing import Dict, List, Optional, Type, Union
66
import subprocess
7+
from typing import Dict, Optional, Type, Union
8+
from uuid import uuid4
79

810
import fsspec
11+
import mlflow
912
import psutil
1013
from jupyter_core.paths import jupyter_data_dir
1114
from jupyter_server.transutils import _i18n
@@ -46,6 +49,10 @@
4649
create_output_filename,
4750
)
4851

52+
MLFLOW_SERVER_HOST = "127.0.0.1"
53+
MLFLOW_SERVER_PORT = "5000"
54+
MLFLOW_SERVER_URI = f"http://{MLFLOW_SERVER_HOST}:{MLFLOW_SERVER_PORT}"
55+
4956

5057
class BaseScheduler(LoggingConfigurable):
5158
"""Base class for schedulers. A default implementation
@@ -405,16 +412,13 @@ def start_mlflow_server(self):
405412
[
406413
"mlflow",
407414
"server",
408-
"--backend-store-uri",
409-
"./mlruns",
410-
"--default-artifact-root",
411-
"./mlartifacts",
412415
"--host",
413-
"0.0.0.0",
416+
MLFLOW_SERVER_HOST,
414417
"--port",
415-
"5000",
418+
MLFLOW_SERVER_PORT,
416419
]
417420
)
421+
mlflow.set_tracking_uri(MLFLOW_SERVER_URI)
418422

419423
def __init__(
420424
self,
@@ -481,6 +485,19 @@ def create_job(self, model: CreateJob) -> str:
481485
if not model.output_formats:
482486
model.output_formats = []
483487

488+
mlflow_client = mlflow.MlflowClient()
489+
490+
if model.job_definition_id and model.mlflow_experiment_id:
491+
experiment_id = model.mlflow_experiment_id
492+
else:
493+
experiment_id = mlflow_client.create_experiment(f"{model.name}-{uuid4()}")
494+
model.mlflow_experiment_id = experiment_id
495+
input_file_path = os.path.join(self.root_dir, model.input_uri)
496+
mlflow.log_artifact(input_file_path, "input")
497+
498+
mlflow_run = mlflow_client.create_run(experiment_id=experiment_id, run_name=model.name)
499+
model.mlflow_run_id = mlflow_run.info.run_id
500+
484501
job = Job(**model.dict(exclude_none=True, exclude={"input_uri"}))
485502

486503
session.add(job)
@@ -628,6 +645,12 @@ def create_job_definition(self, model: CreateJobDefinition) -> str:
628645
if not self.file_exists(model.input_uri):
629646
raise InputUriError(model.input_uri)
630647

648+
mlflow_client = mlflow.MlflowClient()
649+
experiment_id = mlflow_client.create_experiment(f"{model.name}-{uuid4()}")
650+
model.mlflow_experiment_id = experiment_id
651+
input_file_path = os.path.join(self.root_dir, model.input_uri)
652+
mlflow.log_artifact(input_file_path, "input")
653+
631654
job_definition = JobDefinition(**model.dict(exclude_none=True, exclude={"input_uri"}))
632655
session.add(job_definition)
633656
session.commit()

src/components/mlflow-checkbox.tsx

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@ import React, { ChangeEvent } from 'react';
22

33
import { Checkbox, FormControlLabel, FormGroup } from '@mui/material';
44

5-
export type MLFlowCheckboxProps = {
5+
export function MLFlowLoggingControl(props: {
66
onChange: (event: ChangeEvent<HTMLInputElement>) => void;
7-
};
8-
9-
export function MLFlowCheckbox(props: MLFlowCheckboxProps): JSX.Element {
7+
}): JSX.Element {
108
return (
119
<FormGroup>
1210
<FormControlLabel
13-
control={<Checkbox onChange={props.onChange} value={'mlflowLogging'} />}
11+
control={<Checkbox onChange={props.onChange} name={'mlflowLogging'} />}
1412
label="Log with MLFlow"
1513
/>
1614
</FormGroup>

src/handler.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,8 @@ export namespace Scheduler {
364364
schedule?: string;
365365
timezone?: string;
366366
package_input_folder?: boolean;
367+
mlflow_logging?: boolean;
368+
mlflow_experiment_id?: string;
367369
}
368370

369371
export interface IUpdateJobDefinition {
@@ -391,6 +393,8 @@ export namespace Scheduler {
391393
update_time: number;
392394
active: boolean;
393395
package_input_folder?: boolean;
396+
mlflow_logging: boolean;
397+
mlflow_experiment_id?: string;
394398
}
395399

396400
export interface IEmailNotifications {
@@ -418,6 +422,9 @@ export namespace Scheduler {
418422
output_formats?: string[];
419423
compute_type?: string;
420424
package_input_folder?: boolean;
425+
mlflow_logging?: boolean;
426+
mlflow_experiment_id?: string;
427+
mlflow_run_id?: string;
421428
}
422429

423430
export interface ICreateJobFromDefinition {
@@ -467,6 +474,9 @@ export namespace Scheduler {
467474
end_time?: number;
468475
downloaded: boolean;
469476
package_input_folder?: boolean;
477+
mlflow_logging?: boolean;
478+
mlflow_experiment_id?: string;
479+
mlflow_run_id?: string;
470480
}
471481

472482
export interface ICreateJobResponse {

src/mainviews/create-job.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import {
4343
import { Box, Stack } from '@mui/system';
4444
import { getErrorMessage } from '../util/errors';
4545
import { PackageInputFolderControl } from '../components/input-folder-checkbox';
46-
import { MLFlowCheckbox } from '../components/mlflow-checkbox';
46+
import { MLFlowLoggingControl } from '../components/mlflow-checkbox';
4747

4848
export interface ICreateJobProps {
4949
model: ICreateJobModel;
@@ -512,7 +512,7 @@ export function CreateJob(props: ICreateJobProps): JSX.Element {
512512
onChange={handleInputChange}
513513
inputFile={props.model.inputFile}
514514
/>
515-
<MLFlowCheckbox onChange={handleInputChange} />
515+
<MLFlowLoggingControl onChange={handleInputChange} />
516516
<OutputFormatPicker
517517
label={trans.__('Output formats')}
518518
name="outputFormat"

src/mainviews/detail-view/job-definition.tsx

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import { Scheduler as SchedulerTokens } from '../../tokens';
3232

3333
import { timestampLocalize } from './job-detail';
3434
import { getErrorMessage } from '../../util/errors';
35+
import { OpenInNew } from '@mui/icons-material';
3536

3637
export interface IJobDefinitionProps {
3738
app: JupyterFrontEnd;
@@ -175,6 +176,18 @@ export function JobDefinition(props: IJobDefinitionProps): JSX.Element {
175176
>
176177
{trans.__('Edit Job Definition')}
177178
</Button>
179+
{model.mlflowLogging === true && (
180+
<Button
181+
variant="outlined"
182+
onClick={() => {
183+
const mlFlowUrl = `http://127.0.0.1:5000/#/experiments/${props.model?.mlflowExperimentId}`;
184+
window.open(mlFlowUrl);
185+
}}
186+
endIcon={<OpenInNew />}
187+
>
188+
{trans.__('Open in MLFlow')}
189+
</Button>
190+
)}
178191
<ConfirmDialogDeleteButton
179192
handleDelete={async () => {
180193
log('job-definition-detail.delete');
@@ -231,6 +244,16 @@ export function JobDefinition(props: IJobDefinitionProps): JSX.Element {
231244
label: trans.__('Time zone')
232245
}
233246
],
247+
[
248+
{
249+
value: model.mlflowLogging ? trans.__('Yes') : trans.__('No'),
250+
label: trans.__('MLFlow Logging')
251+
},
252+
{
253+
value: props.model.mlflowExperimentId,
254+
label: trans.__('MLFLow Experiment Id')
255+
}
256+
],
234257
[
235258
{
236259
value: model.packageInputFolder ? trans.__('Yes') : trans.__('No'),

src/mainviews/detail-view/job-detail.tsx

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ import {
3939
LabeledValue
4040
} from '../../components/labeled-value';
4141
import { getErrorMessage } from '../../util/errors';
42+
import { OpenInNew } from '@mui/icons-material';
43+
44+
const MLFLOW_SERVER_HOST = '127.0.0.1';
45+
const MLFLOW_SERVER_PORT = '5000';
46+
const MLFLOW_SERVER_URI = `http://${MLFLOW_SERVER_HOST}:${MLFLOW_SERVER_PORT}`;
4247

4348
export interface IJobDetailProps {
4449
app: JupyterFrontEnd;
@@ -167,6 +172,18 @@ export function JobDetail(props: IJobDetailProps): JSX.Element {
167172
{trans.__('Download Job Files')}
168173
</Button>
169174
)}
175+
{props.model?.mlflowLogging === true && (
176+
<Button
177+
variant="outlined"
178+
onClick={() => {
179+
const mlFlowUrl = `${MLFLOW_SERVER_URI}/#/experiments/${props.model?.mlflowExperimentId}/runs/${props.model?.mlflowRunId}`;
180+
window.open(mlFlowUrl);
181+
}}
182+
endIcon={<OpenInNew />}
183+
>
184+
{trans.__('Open in MLFlow')}
185+
</Button>
186+
)}
170187
{props.model !== null && props.model.status === 'IN_PROGRESS' && (
171188
<ConfirmDialogStopButton
172189
handleStop={handleStopJob}
@@ -250,6 +267,23 @@ export function JobDetail(props: IJobDetailProps): JSX.Element {
250267
value: timestampLocalize(props.model.endTime ?? ''),
251268
label: trans.__('End time')
252269
},
270+
{
271+
value: props.model.mlflowLogging ? trans.__('Yes') : trans.__('No'),
272+
label: trans.__('MLFlow Logging')
273+
}
274+
],
275+
[
276+
{
277+
value: props.model.mlflowExperimentId,
278+
label: trans.__('MLFLow Experiment Id')
279+
},
280+
{
281+
value: props.model.mlflowRunId,
282+
283+
label: trans.__('MLFlow Run Id')
284+
}
285+
],
286+
[
253287
{
254288
value: props.model.packageInputFolder
255289
? trans.__('Yes')

0 commit comments

Comments
 (0)