Skip to content

Commit b837c6f

Browse files
committed
Multiprocessing works now
1 parent ca40be8 commit b837c6f

File tree

8 files changed

+174
-179
lines changed

8 files changed

+174
-179
lines changed

native-experiment-tracking/README.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,28 @@ Although ZenML plugs into many [experiment trackers](https://www.zenml.io/vs/zen
44
the functionality of experiment trackers is already covered by ZenML's native metadata and artifact tracking.
55
This project aims to show these capabilities.
66

7+
## 🎯 Project Overview
8+
We're tackling a simple classification task using the breast cancer dataset. Our goal is to showcase how ZenML can effortlessly track experiments, hyperparameters, and results throughout the machine learning workflow.
9+
### 🔍 What We're Doing
10+
11+
In this project, we begin by preparing the breast cancer dataset for our model through data preprocessing. For our machine learning task, we've chosen to use an SGDClassifier. Rather than relying on sklearn's GridSearchCV, we implement our own hyperparameter tuning process to showcase ZenML's robust tracking capabilities. Finally, we conduct a thorough analysis of the results, visualizing how various hyperparameters influence the model's accuracy. This approach allows us to demonstrate the power of ZenML in tracking and managing the machine learning workflow.
12+
13+
We are by no means claiming that our solution outperforms GridSearchCV, spoiler alert, this demo won't, rather, this project demonstrates how you would do hyperparameter tuning and experiment tracking with ZenML on large deep learning problems.
14+
15+
### 🛠 The Pipeline
16+
Our ZenML pipeline consists of the following steps:
17+
18+
The feature_engineering pipeline:
19+
* Data Loading: Load the breast cancer dataset.
20+
* Data Splitting: Split the data into training and testing sets.
21+
* Data Pre Processing: Pre process our dataset
22+
23+
The model training pipeline:
24+
* Model Training: Train multiple SGDClassifiers with different hyperparameters.
25+
* Model Evaluation: Evaluate each model's performance.
26+
27+
By running this pipeline iteratively
28+
729
## :running: Run locally
830

931

@@ -26,7 +48,7 @@ zenml integration install sklearn pandas -y
2648
zenml init
2749
```
2850

29-
## Explore your experiments
51+
## 📈 Explore your experiments
3052

3153
...
3254

native-experiment-tracking/analyze.py

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import numpy as np
22
from matplotlib import pyplot as plt
33
from zenml.client import Client
4+
import matplotlib.pyplot as plt
5+
import seaborn as sns
6+
import pandas as pd
47

58

69
def main():
@@ -23,10 +26,53 @@ def main():
2326
test_accuracies.append(mv_metadata.get("test_accuracy", None).value)
2427
train_accuracies.append(mv_metadata.get("train_accuracy", None).value)
2528

26-
generate_plot(alpha_values, losses, penalties, test_accuracies)
29+
generate_3d_plot(alpha_values, losses, penalties, test_accuracies)
30+
generate_2d_plots(alpha_values, losses, penalties, test_accuracies)
2731

2832

29-
def generate_plot(alpha_values, losses, penalties, test_accuracies):
33+
def generate_2d_plots(alpha_values, losses, penalties, test_accuracies):
34+
# Convert the data into a DataFrame
35+
df = pd.DataFrame({
36+
'Alpha': alpha_values,
37+
'Loss': losses,
38+
'Penalty': penalties,
39+
'Accuracy': test_accuracies
40+
})
41+
42+
# Get unique values
43+
unique_penalties = df['Penalty'].unique()
44+
unique_losses = df['Loss'].unique()
45+
unique_alphas = sorted(df['Alpha'].unique())
46+
47+
# Create a figure with subplots for each penalty
48+
fig, axes = plt.subplots(1, len(unique_penalties), figsize=(20, 6), sharey=True)
49+
fig.suptitle('Accuracy Heatmap for Different Penalties', fontsize=16)
50+
51+
for i, penalty in enumerate(unique_penalties):
52+
# Filter data for the current penalty
53+
df_penalty = df[df['Penalty'] == penalty]
54+
55+
# Create a pivot table
56+
pivot = df_penalty.pivot(index='Loss', columns='Alpha', values='Accuracy')
57+
58+
# Create heatmap
59+
sns.heatmap(pivot, ax=axes[i], cmap='viridis', annot=True, fmt='.3f', cbar=False)
60+
61+
axes[i].set_title(f'Penalty: {penalty}')
62+
axes[i].set_xlabel('Alpha')
63+
64+
if i == 0:
65+
axes[i].set_ylabel('Loss')
66+
67+
# Add a colorbar to the right of the subplots
68+
cbar_ax = fig.add_axes([.92, .15, .02, .7])
69+
fig.colorbar(axes[0].collections[0], cax=cbar_ax, label='Accuracy')
70+
71+
plt.tight_layout(rect=[0, 0, .9, 1])
72+
plt.show()
73+
74+
75+
def generate_3d_plot(alpha_values, losses, penalties, test_accuracies):
3076
# Convert losses and penalties to numerical indices
3177
unique_losses = list(set(losses))
3278
unique_penalties = list(set(penalties))
@@ -40,6 +86,16 @@ def generate_plot(alpha_values, losses, penalties, test_accuracies):
4086

4187
# Create a scatter plot
4288
scatter = ax.scatter(alpha_values, loss_indices, penalty_indices, c=test_accuracies, cmap='viridis')
89+
# Find the point with the highest accuracy
90+
max_accuracy_index = np.argmax(test_accuracies)
91+
max_accuracy = test_accuracies[max_accuracy_index]
92+
max_alpha = alpha_values[max_accuracy_index]
93+
max_loss = losses[max_accuracy_index]
94+
max_penalty = penalties[max_accuracy_index]
95+
96+
# Highlight the point with the highest accuracy
97+
ax.scatter([max_alpha], [loss_indices[max_accuracy_index]], [penalty_indices[max_accuracy_index]],
98+
c='red', s=100, edgecolors='black', linewidths=2, zorder=10)
4399

44100
# Set labels for each axis
45101
ax.set_xlabel('Alpha')
@@ -62,6 +118,11 @@ def generate_plot(alpha_values, losses, penalties, test_accuracies):
62118
# Adjust the viewing angle
63119
ax.view_init(elev=20, azim=45)
64120

121+
# Add legend with highest accuracy point description
122+
legend_text = f'Highest Accuracy:\nAccuracy: {max_accuracy:.4f}\nAlpha: {max_alpha}\nLoss: {max_loss}\nPenalty: {max_penalty}'
123+
ax.text2D(0.05, 0.95, legend_text, transform=ax.transAxes, fontsize=10, verticalalignment='top',
124+
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
125+
65126
# Show the plot
66127
plt.tight_layout()
67128
plt.show()
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# environment configuration
2+
settings:
3+
docker:
4+
required_integrations:
5+
- sklearn
6+
- pandas
7+
requirements:
8+
- pyarrow
9+
10+
# pipeline configuration
11+
test_size: 0.35

native-experiment-tracking/pipelines/training.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,23 @@
1616
#
1717

1818
from typing import Optional
19-
from uuid import UUID
20-
21-
from steps import model_evaluator, model_promoter, model_trainer
2219

2320
from pipelines import (
2421
feature_engineering,
2522
)
23+
from steps import model_evaluator, model_trainer
2624
from zenml import pipeline
27-
from zenml.client import Client
2825
from zenml.logger import get_logger
2926

30-
3127
logger = get_logger(__name__)
3228

3329

3430
@pipeline
3531
def training(
36-
alpha_value: float,
37-
penalty: str,
38-
loss: str,
39-
target: Optional[str] = "target",
32+
alpha_value: float,
33+
penalty: str,
34+
loss: str,
35+
target: Optional[str] = "target",
4036
):
4137
"""
4238
Model training pipeline.
@@ -63,9 +59,10 @@ def training(
6359
dataset_trn=dataset_trn, target=target, alpha_value=alpha_value, penalty=penalty, loss=loss
6460
)
6561

66-
acc, _ = model_evaluator(
62+
test_acc = model_evaluator(
6763
model=model,
6864
dataset_trn=dataset_trn,
6965
dataset_tst=dataset_tst,
7066
target=target,
7167
)
68+
return test_acc

native-experiment-tracking/run.py

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17+
import concurrent
18+
import multiprocessing
1719
import os
20+
import traceback
21+
from itertools import product
1822

1923
import click
20-
from sklearn.utils._param_validation import InvalidParameterError
2124
from zenml import Model
2225
from zenml.client import Client
2326
from zenml.logger import get_logger
2427

25-
from pipelines import training
28+
from pipelines import training, feature_engineering
2629

2730
logger = get_logger(__name__)
2831

@@ -34,7 +37,8 @@
3437
help="Disable caching for the pipeline run.",
3538
)
3639
def main(
37-
no_cache: bool = False,
40+
no_cache: bool = False,
41+
parallel: bool = False
3842
):
3943
"""Main entry point for the pipeline execution.
4044
@@ -47,6 +51,7 @@ def main(
4751
4852
Args:
4953
no_cache: If `True` cache will be disabled.
54+
parallel: If `True` multiprocessing will be used for running hyperparameter tuning in parallel
5055
"""
5156
client = Client()
5257
config_path = os.path.join(
@@ -56,25 +61,64 @@ def main(
5661
)
5762
enable_cache = not no_cache
5863

59-
alpha_values = [0.0001, 0.001, 0.01]
60-
penalties = ["l2", "l1", "elasticnet"]
61-
losses = ["hinge", "squared_hinge", "modified_huber"]
62-
for penalty in penalties:
63-
for loss in losses:
64-
for alpha_value in alpha_values:
65-
logger.info(f"Training with alpha: {alpha_value}, penalty: {penalty}, loss: {loss}")
66-
67-
model = Model(
68-
name="breast_cancer_classifier",
69-
tags=[f"alpha: {alpha_value}", f"penalty: {penalty}", f"loss: {loss}"]
70-
)
71-
try:
72-
training.with_options(config_path=config_path, enable_cache=enable_cache, model=model)(
73-
alpha_value=alpha_value, penalty=penalty, loss=loss)
74-
except RuntimeError:
75-
pass
76-
else:
77-
logger.info("Training pipeline finished successfully!\n\n")
64+
# Run the feature engineering pipeline, this way all invocations within the training pipelines
65+
# will use the cached output from this pipeline
66+
feature_engineering()
67+
68+
# Here is our set of parameters that we want to explore to find the best combination
69+
alpha_values = [0.0001, 0.001] # , 0.01]
70+
penalties = ["l2", "l1"] # , "elasticnet"]
71+
losses = ["hinge", "squared_hinge"] #, "modified_huber"]
72+
73+
# Lets loop over these
74+
# Create a list of all parameter combinations
75+
parameter_combinations = list(product(alpha_values, penalties, losses))
76+
77+
if parallel:
78+
parallel_training(config_path, enable_cache, parameter_combinations)
79+
else:
80+
for alpha_value, penalty, loss in parameter_combinations:
81+
train_model(alpha_value, penalty, loss, config_path, enable_cache)
82+
83+
84+
def parallel_training(config_path, enable_cache, parameter_combinations):
85+
# Determine the number of CPU cores to use
86+
num_cores = multiprocessing.cpu_count()
87+
# Use ProcessPoolExecutor for CPU-bound tasks
88+
with concurrent.futures.ProcessPoolExecutor(max_workers=num_cores) as executor:
89+
# Submit all tasks to the executor
90+
futures = [executor.submit(train_model, alpha, penalty, loss, config_path, enable_cache)
91+
for alpha, penalty, loss in parameter_combinations]
92+
93+
# Wait for all tasks to complete
94+
concurrent.futures.wait(futures)
95+
96+
97+
def train_model(alpha_value: float, penalty: str, loss: str, config_path: str, enable_cache: bool):
98+
logger.info(f"Training with alpha: {alpha_value}, penalty: {penalty}, loss: {loss}")
99+
100+
model = Model(
101+
name="breast_cancer_classifier",
102+
tags=[f"alpha: {alpha_value}", f"penalty: {penalty}", f"loss: {loss}"]
103+
)
104+
try:
105+
logger.info(f"Starting training with alpha: {alpha_value}, penalty: {penalty}, loss: {loss}")
106+
training.with_options(
107+
config_path=config_path, enable_cache=enable_cache, model=model
108+
)(
109+
alpha_value=alpha_value, penalty=penalty, loss=loss
110+
)
111+
112+
logger.info(f"Training finished successfully for alpha: {alpha_value}, penalty: {penalty}, loss: {loss}")
113+
# except ValueError:
114+
# logger.info("Pipeline run aborted!\n\n")
115+
# pass
116+
except Exception as e:
117+
logger.error(f"Error in training with alpha: {alpha_value}, penalty: {penalty}, loss: {loss}")
118+
logger.error(f"Exception: {str(e)}")
119+
logger.error(f"Traceback: {traceback.format_exc()}")
120+
else:
121+
logger.info("Training pipeline finished successfully!\n\n")
78122

79123

80124
if __name__ == "__main__":

0 commit comments

Comments
 (0)