Skip to content

Commit a7f81a2

Browse files
committed
add iris pipeline example
Signed-off-by: Trevor Royer <[email protected]>
1 parent 01e370f commit a7f81a2

File tree

4 files changed

+491
-0
lines changed

4 files changed

+491
-0
lines changed

samples/iris-sklearn/README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Iris SKLearn Pipeline
2+
3+
This pipeline is used to demonstrate a basic data science pipeline using the [Iris Dataset](https://scikit-learn.org/stable/auto_examples/datasets/plot_iris_dataset.html) using the sklearn library.
4+
5+
## Prerequisites
6+
- Install [KFP Tekton prerequisites](/samples/README.md)
7+
- Install the additional requirements from [requirements.txt](./requirements.txt)
8+
9+
## Instructions
10+
11+
This sample provides two different ways to execute the pipeline. The first option is to compile the pipeline to a yaml Tekton pipeline, the second option is to connect directly to the Kubeflow Pipeline UI using the kfp TektonClient and run the pipeline directly.
12+
13+
### Compiled
14+
15+
The compiled pipeline uses the kfp-tekton `TektonCompiler()` to generate a yaml object. The `TektonCompiler()` will produce a Tekton PipelineRun yaml object in the same directory called `iris-pipeline-compiled.yaml`.
16+
17+
This pipeline does utilize a PVC in the pipeline and you may need to set a storage class as an environment variable to match one that is available on your cluster.
18+
19+
To compile the pipeline run:
20+
21+
```sh
22+
# Optional: Set the storage class for the pipeline
23+
export DEFAULT_STORAGE_CLASS="my-storage-class"
24+
25+
python iris-pipeline-compiled.py
26+
```
27+
28+
Once the pipeline is compiled, upload the `iris-pipeline-compiled.yaml` file to the Kubeflow Pipeline dashboard with Tekton Backend. Once the pipeline is uploaded, you can create a new Pipeline Run from the Dashboard.
29+
30+
### Direct Run
31+
32+
The direct run uses the kfp-tekton `TektonClient()` to connect directly to Kubeflow and create a pipeline run.
33+
34+
Like the compiled pipeline, this pipeline does utilize a PVC in the pipeline and you may need to set a storage class as an environment variable to match one that is available on your cluster. This example also relies on some additional environment variables to set the Kubeflow UI endpoint and the users bearer token to authenticate to the UI.
35+
36+
To execute the pipeline run:
37+
38+
```sh
39+
# Optional: Set the storage class for the pipeline
40+
export DEFAULT_STORAGE_CLASS="my-storage-class"
41+
42+
DS_PIPELINE_NAMESPACE="my-namespace"
43+
export KUBEFLOW_ENDPOINT="https://$(oc get route ds-pipeline-ui -n DS_PIPELINE_NAMESPACE -o jsonpath='{.spec.host}')"
44+
45+
export BEARER_TOKEN=$(oc whoami --show-token)
46+
47+
python iris-pipeline-direct-run.py
48+
```
49+
50+
A pipeline run will automatically kick off in the UI.
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
"""Example of a pipeline to demonstrate a simple data science workflow."""
2+
import os
3+
import urllib
4+
5+
import kfp
6+
7+
import kfp_tekton
8+
9+
10+
def data_prep(
11+
X_train_file: kfp.components.OutputPath(),
12+
X_test_file: kfp.components.OutputPath(),
13+
y_train_file: kfp.components.OutputPath(),
14+
y_test_file: kfp.components.OutputPath(),
15+
):
16+
import pickle
17+
18+
import pandas as pd
19+
20+
from sklearn import datasets
21+
from sklearn.model_selection import train_test_split
22+
23+
def get_iris_data() -> pd.DataFrame:
24+
iris = datasets.load_iris()
25+
data = pd.DataFrame(
26+
{
27+
"sepalLength": iris.data[:, 0],
28+
"sepalWidth": iris.data[:, 1],
29+
"petalLength": iris.data[:, 2],
30+
"petalWidth": iris.data[:, 3],
31+
"species": iris.target,
32+
}
33+
)
34+
35+
print("Initial Dataset:")
36+
print(data.head())
37+
38+
return data
39+
40+
def create_training_set(dataset: pd.DataFrame, test_size: float = 0.3):
41+
# Features
42+
X = dataset[["sepalLength", "sepalWidth", "petalLength", "petalWidth"]]
43+
# Labels
44+
y = dataset["species"]
45+
46+
# Split dataset into training set and test set
47+
X_train, X_test, y_train, y_test = train_test_split(
48+
X, y, test_size=test_size, random_state=11
49+
)
50+
51+
return X_train, X_test, y_train, y_test
52+
53+
def save_pickle(object_file, target_object):
54+
with open(object_file, "wb") as f:
55+
pickle.dump(target_object, f)
56+
57+
dataset = get_iris_data()
58+
X_train, X_test, y_train, y_test = create_training_set(dataset)
59+
60+
save_pickle(X_train_file, X_train)
61+
save_pickle(X_test_file, X_test)
62+
save_pickle(y_train_file, y_train)
63+
save_pickle(y_test_file, y_test)
64+
65+
66+
def train_model(
67+
X_train_file: kfp.components.InputPath(),
68+
y_train_file: kfp.components.InputPath(),
69+
model_file: kfp.components.OutputPath(),
70+
):
71+
import pickle
72+
73+
from sklearn.ensemble import RandomForestClassifier
74+
75+
def load_pickle(object_file):
76+
with open(object_file, "rb") as f:
77+
target_object = pickle.load(f)
78+
79+
return target_object
80+
81+
def save_pickle(object_file, target_object):
82+
with open(object_file, "wb") as f:
83+
pickle.dump(target_object, f)
84+
85+
def train_iris(X_train: pd.DataFrame, y_train: pd.DataFrame):
86+
model = RandomForestClassifier(n_estimators=100)
87+
model.fit(X_train, y_train)
88+
89+
return model
90+
91+
X_train = load_pickle(X_train_file)
92+
y_train = load_pickle(y_train_file)
93+
94+
model = train_iris(X_train, y_train)
95+
96+
save_pickle(model_file, model)
97+
98+
99+
def validate_model(model_file: kfp.components.InputPath()):
100+
import pickle
101+
102+
def load_pickle(object_file):
103+
with open(object_file, "rb") as f:
104+
target_object = pickle.load(f)
105+
106+
return target_object
107+
108+
model = load_pickle(model_file)
109+
110+
input_values = [[5, 3, 1.6, 0.2]]
111+
112+
print(f"Performing test prediction on {input_values}")
113+
result = model.predict(input_values)
114+
115+
print(f"Response: {result}")
116+
117+
118+
def evaluate_model(
119+
X_test_file: kfp.components.InputPath(),
120+
y_test_file: kfp.components.InputPath(),
121+
model_file: kfp.components.InputPath(),
122+
mlpipeline_metrics_file: kfp.components.OutputPath("Metrics"),
123+
):
124+
import json
125+
import pickle
126+
127+
from sklearn.metrics import accuracy_score
128+
129+
def load_pickle(object_file):
130+
with open(object_file, "rb") as f:
131+
target_object = pickle.load(f)
132+
133+
return target_object
134+
135+
X_test = load_pickle(X_test_file)
136+
y_test = load_pickle(y_test_file)
137+
model = load_pickle(model_file)
138+
139+
y_pred = model.predict(X_test)
140+
141+
accuracy_score_metric = accuracy_score(y_test, y_pred)
142+
print(f"Accuracy: {accuracy_score_metric}")
143+
144+
metrics = {
145+
"metrics": [
146+
{
147+
"name": "accuracy-score",
148+
"numberValue": accuracy_score_metric,
149+
"format": "PERCENTAGE",
150+
},
151+
]
152+
}
153+
154+
with open(mlpipeline_metrics_file, "w") as f:
155+
json.dump(metrics, f)
156+
157+
158+
data_prep_op = kfp.components.create_component_from_func(
159+
data_prep,
160+
base_image="image-registry.openshift-image-registry.svc:5000/openshift/python:latest",
161+
packages_to_install=["pandas", "scikit-learn"],
162+
)
163+
164+
train_model_op = kfp.components.create_component_from_func(
165+
train_model,
166+
base_image="image-registry.openshift-image-registry.svc:5000/openshift/python:latest",
167+
packages_to_install=["pandas", "scikit-learn"],
168+
)
169+
170+
evaluate_model_op = kfp.components.create_component_from_func(
171+
evaluate_model,
172+
base_image="image-registry.openshift-image-registry.svc:5000/openshift/python:latest",
173+
packages_to_install=["pandas", "scikit-learn"],
174+
)
175+
176+
validate_model_op = kfp.components.create_component_from_func(
177+
validate_model,
178+
base_image="image-registry.openshift-image-registry.svc:5000/openshift/python:latest",
179+
packages_to_install=["pandas", "scikit-learn"],
180+
)
181+
182+
183+
@kfp.dsl.pipeline(
184+
name="Iris Pipeline",
185+
)
186+
def iris_pipeline(model_obc: str = "iris-model"):
187+
data_prep_task = data_prep_op()
188+
189+
train_model_task = train_model_op(
190+
data_prep_task.outputs["X_train"],
191+
data_prep_task.outputs["y_train"],
192+
)
193+
194+
evaluate_model_task = evaluate_model_op( # noqa: F841
195+
data_prep_task.outputs["X_test"],
196+
data_prep_task.outputs["y_test"],
197+
train_model_task.output,
198+
)
199+
200+
validate_model_task = validate_model_op(train_model_task.output) # noqa: F841
201+
202+
203+
if __name__ == "__main__":
204+
# set the default storage class and mode if they don't already exists
205+
os.environ["DEFAULT_STORAGE_CLASS"] = os.environ.get(
206+
"DEFAULT_STORAGE_CLASS", "ocs-storagecluster-ceph-rbd"
207+
)
208+
os.environ["DEFAULT_ACCESSMODES"] = os.environ.get(
209+
"DEFAULT_ACCESSMODES", "ReadWriteOnce"
210+
)
211+
212+
kfp_tekton.compiler.TektonCompiler().compile(
213+
iris_pipeline, __file__.replace(".py", ".yaml")
214+
)

0 commit comments

Comments
 (0)