Skip to content

Commit 4eecda3

Browse files
authored
feat(jobs): improve MLOps example (#82)
1 parent f02cc9d commit 4eecda3

File tree

11 files changed

+142
-63
lines changed

11 files changed

+142
-63
lines changed

jobs/ml-ops/README.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,22 @@ Set your Scaleway access key, secret key and project ID in environment variables
3737
export TF_VAR_access_key=<your-access-key>
3838
export TF_VAR_secret_key=<your-secret-key>
3939
export TF_VAR_project_id=<your-project-id> # you can create a separate project for this example
40+
```
41+
42+
By default, both jobs and container trigger in the example run regularly on a schedule. The default values for these schedules are configured in `jobs/ml-ops/terraform/variables.tf`, and can be overridden using Terraform variables, e.g. `export TF_VAR_data_fetch_cron_schedule="0 10 * * *"`.
4043

44+
Then deploy MLOps infrastructure using the following:
45+
46+
```console
4147
cd terraform
4248
terraform init
4349
terraform plan
4450
terraform apply
4551
```
4652

47-
### Step 2. Run the data and training Jobs
53+
### Step 2. Optional: trigger jobs manually
54+
55+
The pipeline is automatic, all jobs will be run at their respective scheduled time. This step can be ignored unless for debugging or test purposes.
4856

4957
To run the jobs for the data and training, we can use the Scaleway CLI:
5058

@@ -60,12 +68,17 @@ You can also trigger the jobs from the [Jobs section](https://console.scaleway.c
6068

6169
### Step 3. Use the inference API
6270

71+
Load model with the latest version using:
72+
6373
```
6474
cd terraform
6575
export INFERENCE_URL=$(terraform output raw endpoint)
76+
curl -X POST ${INFERENCE_URL}
77+
```
6678

67-
curl -X POST ${INFERENCE_URL}/load
79+
Then post data to infer the class:
6880

81+
```
6982
curl -X POST \
7083
-H "Content-Type: application/json" \
7184
-d @../inference/example.json

jobs/ml-ops/data/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
import boto3
21
import os
32
import urllib.request
43
import zipfile
54

5+
import boto3
6+
67
DATA_DIR = "dataset"
78

89
ZIP_URL = "http://archive.ics.uci.edu/static/public/222/bank+marketing.zip"

jobs/ml-ops/inference/data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import pandas as pd
21
import numpy as np
2+
import pandas as pd
33
from pydantic import BaseModel
44

55

jobs/ml-ops/inference/loader.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import os
2+
import pickle
3+
4+
import boto3
5+
6+
7+
class ClassifierLoader:
8+
_classifier = None
9+
_classifier_version = ""
10+
11+
@classmethod
12+
def load(cls, force=False):
13+
if force or cls._classifier is None:
14+
access_key = os.environ["ACCESS_KEY"]
15+
secret_key = os.environ["SECRET_KEY"]
16+
region_name = os.environ["REGION"]
17+
18+
bucket_name = os.environ["S3_BUCKET_NAME"]
19+
s3_url = os.environ["S3_URL"]
20+
21+
s3 = boto3.client(
22+
"s3",
23+
region_name=region_name,
24+
endpoint_url=s3_url,
25+
aws_access_key_id=access_key,
26+
aws_secret_access_key=secret_key,
27+
)
28+
29+
# get model file with the latest version
30+
bucket_objects = s3.list_objects(Bucket=bucket_name)
31+
get_last_modified = lambda object: int(
32+
object["LastModified"].strftime("%s")
33+
)
34+
model_objects = [
35+
model_object
36+
for model_object in bucket_objects["Contents"]
37+
if "classifier" in model_object["Key"]
38+
]
39+
latest_model_file = [
40+
object["Key"] for object in sorted(model_objects, key=get_last_modified)
41+
][0]
42+
43+
s3.download_file(bucket_name, latest_model_file, latest_model_file)
44+
45+
with open(latest_model_file, "rb") as fh:
46+
cls._classifier = pickle.load(fh)
47+
cls._classifier_version = latest_model_file[11:-4]
48+
49+
print(
50+
"Successfully loaded model file: {latest_model_file}".format(
51+
latest_model_file=latest_model_file
52+
),
53+
flush=True,
54+
)
55+
56+
return cls._classifier
57+
58+
@classmethod
59+
def model_version(cls):
60+
return cls._classifier_version

jobs/ml-ops/inference/main.py

Lines changed: 25 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,38 @@
1+
import data
12
from fastapi import FastAPI
3+
from loader import ClassifierLoader
24
from sklearn.ensemble import RandomForestClassifier
3-
from sklearn.metrics import RocCurveDisplay
4-
import pickle
5-
import boto3
6-
import pandas
7-
import os
8-
9-
import data
105

116
classifier = RandomForestClassifier()
127

138
app = FastAPI()
149

15-
MODEL_FILE = "classifier.pkl"
16-
17-
18-
class ClassifierLoader:
19-
_classifier = None
2010

21-
@classmethod
22-
def load(cls, force=False):
23-
if force or cls._classifier is None:
24-
access_key = os.environ["ACCESS_KEY"]
25-
secret_key = os.environ["SECRET_KEY"]
26-
region_name = os.environ["REGION"]
11+
@app.get("/")
12+
def hello():
13+
"""Get Model Version"""
2714

28-
bucket_name = os.environ["S3_BUCKET_NAME"]
29-
s3_url = os.environ["S3_URL"]
15+
model_version = ClassifierLoader.model_version()
3016

31-
s3 = boto3.client(
32-
"s3",
33-
region_name=region_name,
34-
endpoint_url=s3_url,
35-
aws_access_key_id=access_key,
36-
aws_secret_access_key=secret_key,
37-
)
17+
if model_version == "":
18+
return {
19+
"message": "Hello, this is the inference server! No classifier loaded in memory."
20+
}
3821

39-
s3.download_file(bucket_name, MODEL_FILE, MODEL_FILE)
22+
return {
23+
"message": "Hello, this is the inference server! Serving classifier with version {model_version}".format(
24+
model_version=model_version
25+
)
26+
}
4027

41-
with open(MODEL_FILE, "rb") as fh:
42-
cls._classifier = pickle.load(fh)
4328

44-
return cls._classifier
45-
46-
47-
@app.post("/load")
29+
# this endpoint is used by cron trigger to load model from S3
30+
@app.post("/")
4831
def load():
4932
"""Reloads classifier from model registry bucket"""
5033

5134
ClassifierLoader.load(force=True)
35+
5236
return {"message": "model loaded successfully"}
5337

5438

@@ -59,8 +43,13 @@ def classify(profile: data.ClientProfile):
5943
cleaned_data = data.clean_profile(profile)
6044
data_point_processed = data.transform_data(cleaned_data)
6145

62-
# Lazy-loads classifer from S3
46+
# Lazy-loads classifier from S3
6347
classifier = ClassifierLoader.load()
6448
prediction = classifier.predict(data_point_processed)
6549

66-
return {"predicted_class": int(prediction)}
50+
response = "This client is likely to respond positively to a cold call"
51+
52+
if int(prediction) == 0:
53+
response = "This client is likely to respond negatively to a cold call"
54+
55+
return {"prediction": response}

jobs/ml-ops/terraform/container.tf

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ resource "scaleway_container" "inference" {
1212
cpu_limit = 2000
1313
memory_limit = 2048
1414
min_scale = 1
15-
max_scale = 5
15+
max_scale = 1
1616
environment_variables = {
1717
"S3_BUCKET_NAME" = scaleway_object_bucket.main.name
1818
"S3_URL" = var.s3_url
@@ -24,3 +24,9 @@ resource "scaleway_container" "inference" {
2424
}
2525
deploy = true
2626
}
27+
28+
resource scaleway_container_cron "inference_cron" {
29+
container_id = scaleway_container.inference.id
30+
schedule = var.inference_cron_schedule
31+
args = jsonencode({})
32+
}

jobs/ml-ops/terraform/jobs.tf

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ resource "scaleway_job_definition" "fetch_data" {
44
memory_limit = 1024
55
image_uri = docker_image.data.name
66
timeout = "10m"
7-
7+
cron {
8+
schedule = var.data_fetch_cron_schedule
9+
timezone = "Europe/Paris"
10+
}
811
env = {
912
"S3_BUCKET_NAME" : scaleway_object_bucket.main.name,
1013
"S3_URL" : var.s3_url,
@@ -20,7 +23,10 @@ resource "scaleway_job_definition" "training" {
2023
memory_limit = 4096
2124
image_uri = docker_image.training.name
2225
timeout = "10m"
23-
26+
cron {
27+
schedule = var.training_cron_schedule
28+
timezone = "Europe/Paris"
29+
}
2430
env = {
2531
"S3_BUCKET_NAME" : scaleway_object_bucket.main.name,
2632
"S3_URL" : var.s3_url,

jobs/ml-ops/terraform/variables.tf

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,17 @@ variable "s3_url" {
2525
default = "https://s3.fr-par.scw.cloud"
2626
}
2727

28-
variable "data_file" {
29-
type = string
30-
description = "name data file in data store"
31-
default = "bank_telemarketing.csv"
28+
variable "data_fetch_cron_schedule" {
29+
type = string
30+
default = "0 */10 * * *"
3231
}
3332

34-
variable "model_object" {
35-
type = string
36-
description = "name of model object stored in model registry"
37-
default = "classifier.pkl"
33+
variable "training_cron_schedule" {
34+
type = string
35+
default = "0 */11 * * *"
3836
}
3937

38+
variable "inference_cron_schedule" {
39+
type = string
40+
default = "0 */12 * * *"
41+
}

jobs/ml-ops/terraform/versions.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ terraform {
22
required_providers {
33
scaleway = {
44
source = "scaleway/scaleway"
5+
version = ">= 2.39"
56
}
67
docker = {
78
source = "kreuzwerker/docker"

jobs/ml-ops/training/main.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1-
import pandas as pd
21
import os
32
import pickle
3+
from datetime import datetime
4+
45
import boto3
6+
import pandas as pd
57
import training as ml
6-
from sklearn.metrics import RocCurveDisplay
7-
from sklearn.metrics import ConfusionMatrixDisplay
8+
from sklearn.metrics import ConfusionMatrixDisplay, RocCurveDisplay
89

9-
DATA_FILE_NAME = "bank-additional-full.csv"
10+
VERSION = datetime.now().strftime("%Y%m%d%H%M")
1011

11-
MODEL_FILE = "classifier.pkl"
12-
PERF_FILE = "performance.pkl"
13-
ROC_AUC_FILE = "roc_auc.png"
14-
CONFUSION_MATRIX_FILE = "confusion_matrix.png"
12+
DATA_FILE_NAME = "bank-additional-full.csv"
13+
MODEL_FILE = "classifier_" + VERSION + ".pkl"
14+
PERF_FILE = "performance_" + VERSION + ".pkl"
15+
ROC_AUC_FILE = "roc_auc_" + VERSION + ".png"
16+
CONFUSION_MATRIX_FILE = "confusion_matrix_" + VERSION + ".png"
1517

1618

1719
def main() -> int:

0 commit comments

Comments
 (0)