Skip to content
This repository was archived by the owner on Jun 7, 2023. It is now read-only.

Commit 7c37460

Browse files
author
Anand Sanmukhani
authored
Merge pull request #147 from arjunshenoymec/master
adding a new env variable that provides an option between sequential …
2 parents 20dc284 + 026aec2 commit 7c37460

File tree

2 files changed

+51
-29
lines changed

2 files changed

+51
-29
lines changed

app.py

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import os
44
import logging
55
from datetime import datetime
6-
from multiprocessing import Process, Queue
6+
from multiprocessing import Pool, Process, Queue
7+
from multiprocessing import cpu_count
8+
from functools import partial
79
from queue import Empty as EmptyQueueException
810
import tornado.ioloop
911
import tornado.web
@@ -117,37 +119,50 @@ def make_app(data_queue):
117119
]
118120
)
119121

122+
def train_individual_model(predictor_model, initial_run):
123+
metric_to_predict = predictor_model.metric
124+
pc = PrometheusConnect(
125+
url=Configuration.prometheus_url,
126+
headers=Configuration.prom_connect_headers,
127+
disable_ssl=True,
128+
)
120129

121-
def train_model(initial_run=False, data_queue=None):
122-
"""Train the machine learning model."""
123-
for predictor_model in PREDICTOR_MODEL_LIST:
124-
metric_to_predict = predictor_model.metric
125-
data_start_time = datetime.now() - Configuration.metric_chunk_size
126-
if initial_run:
127-
data_start_time = (
128-
datetime.now() - Configuration.rolling_training_window_size
129-
)
130-
131-
# Download new metric data from prometheus
132-
new_metric_data = pc.get_metric_range_data(
133-
metric_name=metric_to_predict.metric_name,
134-
label_config=metric_to_predict.label_config,
135-
start_time=data_start_time,
136-
end_time=datetime.now(),
137-
)[0]
138-
139-
# Train the new model
140-
start_time = datetime.now()
141-
predictor_model.train(
142-
new_metric_data, Configuration.retraining_interval_minutes
143-
)
144-
_LOGGER.info(
145-
"Total Training time taken = %s, for metric: %s %s",
146-
str(datetime.now() - start_time),
147-
metric_to_predict.metric_name,
148-
metric_to_predict.label_config,
130+
data_start_time = datetime.now() - Configuration.metric_chunk_size
131+
if initial_run:
132+
data_start_time = (
133+
datetime.now() - Configuration.rolling_training_window_size
149134
)
150135

136+
# Download new metric data from prometheus
137+
new_metric_data = pc.get_metric_range_data(
138+
metric_name=metric_to_predict.metric_name,
139+
label_config=metric_to_predict.label_config,
140+
start_time=data_start_time,
141+
end_time=datetime.now(),
142+
)[0]
143+
144+
# Train the new model
145+
start_time = datetime.now()
146+
predictor_model.train(
147+
new_metric_data, Configuration.retraining_interval_minutes)
148+
149+
_LOGGER.info(
150+
"Total Training time taken = %s, for metric: %s %s",
151+
str(datetime.now() - start_time),
152+
metric_to_predict.metric_name,
153+
metric_to_predict.label_config,
154+
)
155+
return predictor_model
156+
157+
def train_model(initial_run=False, data_queue=None):
158+
"""Train the machine learning model."""
159+
global PREDICTOR_MODEL_LIST
160+
parallelism = min(Configuration.parallelism, cpu_count())
161+
_LOGGER.info(f"Training models using ProcessPool of size:{parallelism}")
162+
training_partial = partial(train_individual_model, initial_run=initial_run)
163+
with Pool(parallelism) as p:
164+
result = p.map(training_partial, PREDICTOR_MODEL_LIST)
165+
PREDICTOR_MODEL_LIST = result
151166
data_queue.put(PREDICTOR_MODEL_LIST)
152167

153168

configuration.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,10 @@ class Configuration:
5757
"Metric data rolling training window size: %s", rolling_training_window_size
5858
)
5959
_LOGGER.info("Model retraining interval: %s minutes", retraining_interval_minutes)
60+
61+
# An option for Parallelism.
62+
# An Integer specifying the number of metrics to be trained in parallel.
63+
# Default: 1.
64+
# Note: The upper limit to this will be decided by the number of CPU cores
65+
# available to the container.
66+
parallelism = int(os.getenv("FLT_PARALLELISM", "1"))

0 commit comments

Comments
 (0)