Skip to content

Commit 9a36cd0

Browse files
authored
Merge pull request Jacoo-Zhao#14 from Jacoo-Zhao/feature-cicd
fix: api server; pipeline refine
2 parents dd8d9bd + 84a8020 commit 9a36cd0

File tree

5 files changed

+152
-89
lines changed

5 files changed

+152
-89
lines changed

.github/workflows/pipeline.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,8 @@ jobs:
3434
3535
- name: Run pipeline
3636
run: |
37+
# clearml-agent daemon --queue "task" --detached
38+
# python main.py
3739
python s1_dataset_artifact.py
40+
41+

pipeline_from_tasks.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def run_pipeline():
3030
pipe = PipelineController(
3131
name="AI_Studio_Pipeline_Demo", project="AI_Studio_Demo", version="0.0.1", add_pipeline_tags=False
3232
)
33-
#
33+
3434
# pipe.add_parameter(
3535
# "url",
3636
# "dataset_url",
@@ -72,9 +72,9 @@ def run_pipeline():
7272
)
7373

7474
# for debugging purposes use local jobs
75-
# pipe.start_locally()
75+
pipe.start_locally()
7676

7777
# Starting the pipeline (in the background)
7878
# pipe.start(queue="task")
79-
pipe.start(queue="pipeline_controller")
80-
print("done")
79+
# pipe.start(queue="pipeline_controller")
80+
# print("done")

s1_dataset_artifact.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,14 @@
77
task = Task.init(project_name="AI_Studio_Demo", task_name="Pipeline step 1 dataset artifact")
88

99
# only create the task, we will actually execute it later
10-
# task.execute_remotely()
10+
task.execute_remotely()
1111

12-
# Check if the local dataset file exists
13-
local_iris_csv_path = 'work_dataset/Iris.csv'
14-
if not os.path.exists(local_iris_csv_path):
15-
print(f"Local file '{local_iris_csv_path}' not found. Downloading...")
16-
local_iris_pkl = StorageManager.get_local_copy(
17-
remote_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
18-
)
19-
else:
20-
print(f"Using existing local file: '{local_iris_csv_path}'")
12+
13+
local_iris_pkl = StorageManager.get_local_copy(remote_url='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl')
2114

2215
# Add and upload the dataset file
23-
task.upload_artifact('dataset', artifact_object=local_iris_csv_path)
24-
print('uploading artifacts in the background')
16+
# task.upload_artifact('dataset', artifact_object=local_iris_csv_path)
17+
task.upload_artifact('dataset', artifact_object=local_iris_pkl)
2518

26-
# we are done
27-
print('Done')
19+
print('uploading artifacts in the background')
20+
print('Done🔥')

s2_data_preprocessing.py

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
import pickle
22
from clearml import Task, StorageManager
33
from sklearn.model_selection import train_test_split
4-
import pandas as pd
5-
4+
# import pandas as pd
5+
#
66
# Connecting ClearML with the current process,
77
# from here on everything is logged automatically
88
task = Task.init(project_name="AI_Studio_Demo", task_name="Pipeline step 2 process dataset")
99

1010
# program arguments
11-
# Use either dataset_task_id to point to a tasks artifact or
12-
# use a direct url with dataset_url
11+
# Use either dataset_task_id to point to a tasks artifact or use a direct url with dataset_url
1312
args = {
14-
'dataset_task_id': '3ceaa4409a70486b846e35bcbf229eab', #update id if it needs running locally
15-
# 'dataset_task_id': '', # update id if it needs running locally
16-
'dataset_url': '',
13+
'dataset_task_id': '',
1714
'random_state': 42,
1815
'test_size': 0.2,
1916
}
@@ -23,31 +20,24 @@
2320
print('Arguments: {}'.format(args))
2421

2522
# only create the task, we will actually execute it later
26-
# task.execute_remotely()
23+
task.execute_remotely()
24+
#
2725

2826
# get dataset from task's artifact
2927
if args['dataset_task_id']:
3028
dataset_upload_task = Task.get_task(task_id=args['dataset_task_id'])
3129
print('Input task id={} artifacts {}'.format(args['dataset_task_id'], list(dataset_upload_task.artifacts.keys())))
3230
# download the artifact
33-
iris_csv = dataset_upload_task.artifacts['dataset'].get_local_copy()
34-
# # get the dataset from a direct url
35-
# elif args['dataset_url']:
36-
# iris_pickle = StorageManager.get_local_copy(remote_url=args['dataset_url'])
31+
iris_pickle = dataset_upload_task.artifacts['dataset'].get_local_copy()
3732
else:
3833
raise ValueError("Missing dataset link")
3934

40-
iris_df = pd.read_csv(iris_csv)
41-
35+
# open the local copy
36+
iris = pickle.load(open(iris_pickle, 'rb'))
4237

4338
# "process" data
44-
# Extract features (X) and target (y)
45-
X = iris_df[['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']].values
46-
y = iris_df['Species'].astype('category').cat.codes.values # Convert species to numeric codes
47-
48-
species_mapping = dict(enumerate(iris_df['Species'].astype('category').cat.categories))
49-
print(species_mapping)
50-
39+
X = iris.data
40+
y = iris.target
5141
X_train, X_test, y_train, y_test = train_test_split(
5242
X, y, test_size=args['test_size'], random_state=args['random_state'])
5343

@@ -59,4 +49,4 @@
5949
task.upload_artifact('y_test', y_test)
6050

6151
print('Notice, artifacts are uploaded in the background')
62-
print('Done')
52+
print('Done🔥')

s3_train_model.py

Lines changed: 125 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,119 @@
1+
2+
# task.connect(args)
3+
#
4+
# # only create the task, we will actually execute it later
5+
# # task.execute_remotely() # After passing local testing, you should uncomment this command to initial task to ClearML
6+
#
7+
# print('Retrieving Iris dataset')
8+
# dataset_task = Task.get_task(task_id=args['dataset_task_id'])
9+
# X_train = dataset_task.artifacts['X_train'].get()
10+
# X_test = dataset_task.artifacts['X_test'].get()
11+
# y_train = dataset_task.artifacts['y_train'].get()
12+
# y_test = dataset_task.artifacts['y_test'].get()
13+
# print('Iris dataset loaded')
14+
#
15+
#
16+
# # Define a simple neural network
17+
# class SimpleNN(nn.Module):
18+
# def __init__(self, input_size, num_classes):
19+
# super(SimpleNN, self).__init__()
20+
# self.fc1 = nn.Linear(input_size, 50)
21+
# self.fc2 = nn.Linear(50, num_classes)
22+
#
23+
# def forward(self, x):
24+
# x = torch.relu(self.fc1(x))
25+
# x = self.fc2(x)
26+
# return x
27+
#
28+
# # Convert data to PyTorch tensors
29+
# X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
30+
# y_train_tensor = torch.tensor(y_train, dtype=torch.long)
31+
# X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
32+
# y_test_tensor = torch.tensor(y_test, dtype=torch.long)
33+
#
34+
# # Create DataLoader
35+
# train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
36+
# train_loader = DataLoader(train_dataset, batch_size=args['batch_size'], shuffle=True)
37+
# # Hyperparameters
38+
# # Initialize the model, loss function, and optimizer
39+
# model = SimpleNN(input_size=X_train.shape[1], num_classes=len(set(y_train)))
40+
# criterion = nn.CrossEntropyLoss()
41+
# optimizer = optim.Adam(
42+
# model.parameters(),
43+
# lr=args['learning_rate'],
44+
# weight_decay=args['weight_decay']
45+
# )
46+
#
47+
# for epoch in tqdm(range(args['num_epochs']), desc='Training Epochs'):
48+
# epoch_loss = 0.0
49+
#
50+
# for inputs, labels in train_loader:
51+
# optimizer.zero_grad()
52+
# outputs = model(inputs)
53+
# loss = criterion(outputs, labels)
54+
# loss.backward()
55+
# optimizer.step()
56+
#
57+
# # 累积 loss
58+
# epoch_loss += loss.item()
59+
#
60+
# avg_loss = epoch_loss / len(train_loader)
61+
# logger.report_scalar(title='train', series='epoch_loss', value=avg_loss, iteration=epoch)
62+
#
63+
# # Save model
64+
# model_path = 'assets/model.pkl'
65+
# torch.save(model.state_dict(), model_path)
66+
# task.upload_artifact(name='model', artifact_object=model_path)
67+
# print('Model saved and uploaded as artifact')
68+
#
69+
# # Load model for evaluation
70+
# model.load_state_dict(torch.load(model_path))
71+
# model.eval()
72+
# with torch.no_grad():
73+
# outputs = model(X_test_tensor)
74+
# _, predicted = torch.max(outputs, 1)
75+
# accuracy = (predicted == y_test_tensor).float().mean().item()
76+
# logger.report_scalar("validation_accuracy", "score", value=accuracy, iteration=0)
77+
#
78+
# print(f'Model trained & stored with accuracy: {accuracy:.4f}')
79+
#
80+
#
81+
# # Plotting confusion matrix
82+
# species_mapping = {0: 'Setosa', 1: 'Versicolor', 2: 'Virginica'}
83+
# y_test_names = [species_mapping[label.item()] for label in y_test]
84+
# predicted_names = [species_mapping[label.item()] for label in predicted]
85+
#
86+
# cm = confusion_matrix(y_test_names, predicted_names, labels=list(species_mapping.values()))
87+
# disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=list(species_mapping.values()))
88+
# disp.plot(cmap=plt.cm.Blues)
89+
#
90+
# plt.title('Confusion Matrix')
91+
# plt.savefig('figs/confusion_matrix.png')
92+
#
93+
# print('Confusion matrix plotted and saved as confusion_matrix.png')
94+
195
import matplotlib.pyplot as plt
96+
import numpy as np
297
from clearml import Task, Logger
398
import torch
499
import torch.nn as nn
5100
import torch.optim as optim
6101
from torch.utils.data import DataLoader, TensorDataset
7-
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
8-
from tqdm import tqdm
102+
9103

10104
# Connecting ClearML with the current process,
105+
# from here on everything is logged automatically
11106
task = Task.init(project_name="AI_Studio_Demo", task_name="Pipeline step 3 train model")
12107
logger = Logger.current_logger()
13108

109+
# Arguments
14110
args = {
15-
'dataset_task_id': '86f09f66d88c4ec7819f05b086deea15', # update id if it needs running locally
16-
'num_epochs': 20,
17-
'batch_size': 16,
18-
'dataset_task_id': '',
19-
20-
# ✅ HPO
21-
'learning_rate': 1e-3,
22-
'weight_decay': 1e-5,
111+
'dataset_task_id': '', # replace the value only when you need debug locally
23112
}
24-
25113
task.connect(args)
26114

27115
# only create the task, we will actually execute it later
28-
# task.execute_remotely() # After passing local testing, you should uncomment this command to initial task to ClearML
29-
116+
task.execute_remotely() # After passing local testing, you should uncomment this command to initial task to ClearML
30117

31118
print('Retrieving Iris dataset')
32119
dataset_task = Task.get_task(task_id=args['dataset_task_id'])
@@ -36,7 +123,6 @@
36123
y_test = dataset_task.artifacts['y_test'].get()
37124
print('Iris dataset loaded')
38125

39-
40126
# Define a simple neural network
41127
class SimpleNN(nn.Module):
42128
def __init__(self, input_size, num_classes):
@@ -57,61 +143,51 @@ def forward(self, x):
57143

58144
# Create DataLoader
59145
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
60-
train_loader = DataLoader(train_dataset, batch_size=args['batch_size'], shuffle=True)
61-
# Hyperparameters
146+
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
147+
62148
# Initialize the model, loss function, and optimizer
63149
model = SimpleNN(input_size=X_train.shape[1], num_classes=len(set(y_train)))
64150
criterion = nn.CrossEntropyLoss()
65-
optimizer = optim.Adam(
66-
model.parameters(),
67-
lr=args['learning_rate'],
68-
weight_decay=args['weight_decay']
69-
)
70-
71-
for epoch in tqdm(range(args['num_epochs']), desc='Training Epochs'):
72-
epoch_loss = 0.0
151+
optimizer = optim.Adam(model.parameters(), lr=0.001)
73152

153+
# Train the model
154+
num_epochs = 20
155+
for epoch in range(num_epochs):
74156
for inputs, labels in train_loader:
75157
optimizer.zero_grad()
76158
outputs = model(inputs)
77159
loss = criterion(outputs, labels)
78160
loss.backward()
79161
optimizer.step()
162+
logger.report_scalar(title='train', series='loss', value=loss.item(), iteration=epoch)
80163

81-
# 累积 loss
82-
epoch_loss += loss.item()
83-
84-
avg_loss = epoch_loss / len(train_loader)
85-
logger.report_scalar(title='train', series='epoch_loss', value=avg_loss, iteration=epoch)
86-
87-
# Save model
88-
model_path = 'assets/model.pkl'
89-
torch.save(model.state_dict(), model_path)
90-
task.upload_artifact(name='model', artifact_object=model_path)
91-
print('Model saved and uploaded as artifact')
92-
93-
# Load model for evaluation
94-
model.load_state_dict(torch.load(model_path))
164+
# Evaluate the model
95165
model.eval()
96166
with torch.no_grad():
97167
outputs = model(X_test_tensor)
98168
_, predicted = torch.max(outputs, 1)
99169
accuracy = (predicted == y_test_tensor).float().mean().item()
100-
logger.report_scalar("validation_accuracy", "score", value=accuracy, iteration=0)
101170

102171
print(f'Model trained & stored with accuracy: {accuracy:.4f}')
103172

173+
# Plotting (same as before)
174+
x_min, x_max = X_test[:, 0].min() - .5, X_test[:, 0].max() + .5
175+
y_min, y_max = X_test[:, 1].min() - .5, X_test[:, 1].max() + .5
176+
h = .02 # step size in the mesh
177+
xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
178+
plt.figure(1, figsize=(4, 3))
179+
104180

105-
# Plotting confusion matrix
106-
species_mapping = {0: 'Setosa', 1: 'Versicolor', 2: 'Virginica'}
107-
y_test_names = [species_mapping[label.item()] for label in y_test]
108-
predicted_names = [species_mapping[label.item()] for label in predicted]
181+
plt.scatter(X_test[:, 0], X_test[:, 1], c=y_test, edgecolors='k', cmap=plt.cm.Paired)
182+
plt.xlabel('Sepal length')
183+
plt.ylabel('Sepal width')
109184

110-
cm = confusion_matrix(y_test_names, predicted_names, labels=list(species_mapping.values()))
111-
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=list(species_mapping.values()))
112-
disp.plot(cmap=plt.cm.Blues)
185+
plt.xlim(xx.min(), xx.max())
186+
plt.ylim(yy.min(), yy.max())
187+
plt.xticks(())
188+
plt.yticks(())
113189

114-
plt.title('Confusion Matrix')
115-
plt.savefig('figs/confusion_matrix.png')
190+
plt.title('Iris Types')
191+
plt.savefig('iris_plot.png')
116192

117-
print('Confusion matrix plotted and saved as confusion_matrix.png')
193+
print('Done🔥')

0 commit comments

Comments
 (0)