Skip to content

Commit c9c4130

Browse files
committed
Initial commit
1 parent 6d977d8 commit c9c4130

7 files changed

Lines changed: 947 additions & 9 deletions

File tree

configs/throughput/example.json

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
{
2+
"INCLUDE": ["../common/sklearn.json"],
3+
"PARAMETERS_SETS": {
4+
"common parameters": {
5+
"data": {
6+
"split_kwargs": {
7+
"train_size": 8000,
8+
"test_size": 2000,
9+
"shuffle": true,
10+
"random_state": 42
11+
}
12+
},
13+
"algorithm": { "device": "default" }
14+
},
15+
"throughput settings": {
16+
"bench": {
17+
"num_instances": 4,
18+
"cores_per_instance": 4,
19+
"measurement_duration": 30
20+
}
21+
},
22+
"datasets": {
23+
"data": [
24+
{
25+
"source": "make_classification",
26+
"generation_kwargs": {
27+
"n_classes": 2,
28+
"n_samples": 10000,
29+
"n_features": 64,
30+
"n_informative": 32
31+
}
32+
}
33+
]
34+
},
35+
"algorithms": [
36+
{
37+
"algorithm": {
38+
"estimator": "RandomForestClassifier",
39+
"estimator_params": { "n_estimators": 50 }
40+
}
41+
},
42+
{
43+
"algorithm": {
44+
"estimator": "KMeans",
45+
"estimator_params": {
46+
"n_clusters": 10,
47+
"init": "random",
48+
"algorithm": "lloyd",
49+
"max_iter": 100
50+
}
51+
}
52+
},
53+
{
54+
"algorithm": {
55+
"estimator": "LinearRegression"
56+
}
57+
}
58+
]
59+
},
60+
"TEMPLATES": {
61+
"throughput_test": {
62+
"SETS": [
63+
"sklearn-ex[cpu] implementations",
64+
"common parameters",
65+
"throughput settings",
66+
"datasets",
67+
"algorithms"
68+
]
69+
}
70+
}
71+
}
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
# ===============================================================================
2+
# Copyright 2024 Intel Corporation
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# ===============================================================================
16+
17+
import argparse
18+
import inspect
19+
import json
20+
import socket
21+
import sys
22+
import time
23+
from typing import Dict, List, Tuple
24+
25+
from ..datasets import load_data
26+
from ..datasets.transformer import split_and_transform_data
27+
from ..utils.bench_case import get_bench_case_value
28+
from ..utils.common import convert_to_numpy
29+
from ..utils.config import bench_case_filter
30+
from ..utils.custom_types import BenchCase
31+
from ..utils.logger import logger
32+
from ..utils.special_params import assign_case_special_values_on_run
33+
from .sklearn_estimator import (
34+
estimator_to_task,
35+
get_estimator,
36+
get_estimator_methods,
37+
get_subset_metrics_of_estimator,
38+
validate_estimator_params,
39+
)
40+
41+
42+
def barrier_wait(sock: socket.socket, msg_send: bytes, msg_expect_prefix: bytes):
43+
"""Send a message and block until response from parent."""
44+
sock.sendall(msg_send)
45+
data = b""
46+
while not data.startswith(msg_expect_prefix):
47+
chunk = sock.recv(1024)
48+
if not chunk:
49+
raise ConnectionError("Barrier socket closed unexpectedly")
50+
data += chunk
51+
52+
53+
def run_measurement_loop(
54+
func, args: tuple, measurement_duration: float
55+
) -> Dict[str, List]:
56+
"""Run func repeatedly for measurement_duration seconds, recording each iteration."""
57+
start_timestamps = []
58+
durations_ms = []
59+
end_time = time.time() + measurement_duration
60+
while time.time() < end_time:
61+
t0 = time.time()
62+
func(*args)
63+
t1 = time.time()
64+
start_timestamps.append(t0)
65+
durations_ms.append((t1 - t0) * 1000)
66+
return {"start_ts": start_timestamps, "duration_ms": durations_ms}
67+
68+
69+
def prepare_estimator(bench_case: BenchCase) -> Tuple:
70+
"""Load data, create estimator, return everything needed for measurement."""
71+
library_name = get_bench_case_value(bench_case, "algorithm:library")
72+
estimator_name = get_bench_case_value(bench_case, "algorithm:estimator")
73+
74+
estimator_class = get_estimator(library_name, estimator_name)
75+
task = estimator_to_task(estimator_name)
76+
77+
data, data_description = load_data(bench_case)
78+
(x_train, x_test, y_train, y_test), data_description = split_and_transform_data(
79+
bench_case, data, data_description
80+
)
81+
82+
assign_case_special_values_on_run(
83+
bench_case, (x_train, y_train, x_test, y_test), data_description
84+
)
85+
86+
estimator_params = get_bench_case_value(
87+
bench_case, "algorithm:estimator_params", dict()
88+
)
89+
estimator_params = validate_estimator_params(estimator_class, estimator_params)
90+
estimator_methods = get_estimator_methods(bench_case)
91+
92+
return (
93+
estimator_class,
94+
estimator_params,
95+
estimator_methods,
96+
task,
97+
x_train,
98+
x_test,
99+
y_train,
100+
y_test,
101+
data_description,
102+
)
103+
104+
105+
def get_method_and_args(estimator_instance, method_name, stage, x_train, x_test, y_train, y_test):
106+
"""Get bound method and appropriate data arguments."""
107+
method_instance = getattr(estimator_instance, method_name)
108+
if "y" in list(inspect.signature(method_instance).parameters):
109+
if stage == "training":
110+
data_args = (x_train, y_train)
111+
else:
112+
data_args = (x_test, y_test)
113+
else:
114+
if stage == "training":
115+
data_args = (x_train,)
116+
else:
117+
data_args = (x_test,)
118+
return method_instance, data_args
119+
120+
121+
def main():
122+
parser = argparse.ArgumentParser()
123+
parser.add_argument("--bench-case", required=True, type=str)
124+
parser.add_argument("--filters", required=True, type=str)
125+
parser.add_argument("--instance-id", required=True, type=int)
126+
parser.add_argument("--barrier-port", required=True, type=int)
127+
parser.add_argument("--measurement-duration", required=True, type=float)
128+
parser.add_argument(
129+
"--log-level",
130+
default="WARNING",
131+
type=str,
132+
choices=("ERROR", "WARNING", "INFO", "DEBUG"),
133+
)
134+
args = parser.parse_args()
135+
136+
logger.setLevel(args.log_level)
137+
138+
bench_case = json.loads(args.bench_case)
139+
filters = json.loads(args.filters)["filters"]
140+
141+
if not bench_case_filter(bench_case, filters):
142+
logger.warning("Benchmarking case was filtered.")
143+
print(json.dumps({"instance_id": args.instance_id, "filtered": True}))
144+
return
145+
146+
# --- Preparation phase (unlimited time) ---
147+
(
148+
estimator_class,
149+
estimator_params,
150+
estimator_methods,
151+
task,
152+
x_train,
153+
x_test,
154+
y_train,
155+
y_test,
156+
data_description,
157+
) = prepare_estimator(bench_case)
158+
159+
estimator_instance = estimator_class(**estimator_params)
160+
161+
# Warmup: run one fit to trigger JIT/allocations
162+
training_methods = estimator_methods.get("training", ["fit"])
163+
for method_name in training_methods:
164+
if hasattr(estimator_instance, method_name):
165+
method_instance, data_args = get_method_and_args(
166+
estimator_instance, method_name, "training",
167+
x_train, x_test, y_train, y_test
168+
)
169+
method_instance(*data_args)
170+
break
171+
172+
# --- Connect to barrier ---
173+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
174+
sock.connect(("localhost", args.barrier_port))
175+
sock.sendall(b"ready")
176+
177+
# --- Measurement stages ---
178+
stages_results = {}
179+
180+
for stage in ["training", "inference"]:
181+
methods = estimator_methods.get(stage, [])
182+
available_methods = [m for m in methods if hasattr(estimator_instance, m)]
183+
if not available_methods:
184+
continue
185+
186+
# Wait for "go" signal from parent before each stage
187+
data = b""
188+
while b"go" not in data:
189+
chunk = sock.recv(1024)
190+
if not chunk:
191+
raise ConnectionError("Barrier socket closed unexpectedly")
192+
data += chunk
193+
194+
method_name = available_methods[0]
195+
method_instance, data_args = get_method_and_args(
196+
estimator_instance, method_name, stage,
197+
x_train, x_test, y_train, y_test
198+
)
199+
200+
timing_data = run_measurement_loop(
201+
method_instance, data_args, args.measurement_duration
202+
)
203+
204+
stages_results[stage] = {
205+
"method": method_name,
206+
"iterations_completed": len(timing_data["start_ts"]),
207+
"start_ts": timing_data["start_ts"],
208+
"duration_ms": timing_data["duration_ms"],
209+
}
210+
211+
# Signal done to parent
212+
sock.sendall(b"done")
213+
214+
# --- Compute quality metrics from final fitted model ---
215+
quality_metrics = {}
216+
quality_metrics.update(
217+
get_subset_metrics_of_estimator(
218+
task, "training", estimator_instance, (x_train, y_train)
219+
)
220+
)
221+
quality_metrics.update(
222+
get_subset_metrics_of_estimator(
223+
task, "inference", estimator_instance, (x_test, y_test)
224+
)
225+
)
226+
227+
# Get final estimator params
228+
final_params = {}
229+
if hasattr(estimator_instance, "get_params"):
230+
final_params = estimator_instance.get_params()
231+
if "handle" in final_params:
232+
del final_params["handle"]
233+
234+
sock.close()
235+
236+
# --- Output ---
237+
output = {
238+
"instance_id": args.instance_id,
239+
"stages": stages_results,
240+
"quality_metrics": quality_metrics,
241+
"estimator_params": final_params,
242+
}
243+
print(json.dumps(output))
244+
245+
246+
if __name__ == "__main__":
247+
main()

sklbench/runner/arguments.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,45 @@ def add_runner_arguments(parser: argparse.ArgumentParser) -> argparse.ArgumentPa
137137
action="store_true",
138138
help="Interrupt runner and exit if last benchmark failed with error.",
139139
)
140+
# throughput mode arguments
141+
parser.add_argument(
142+
"--throughput-mode",
143+
default=False,
144+
action="store_true",
145+
help="Run in throughput mode: multiple synchronized parallel instances "
146+
"with CPU pinning via numactl.",
147+
)
148+
parser.add_argument(
149+
"--num-instances",
150+
type=int,
151+
default=None,
152+
help="Number of parallel instances in throughput mode.",
153+
)
154+
parser.add_argument(
155+
"--cores-per-instance",
156+
type=int,
157+
default=None,
158+
help="CPU cores per instance in throughput mode.",
159+
)
160+
parser.add_argument(
161+
"--measurement-duration",
162+
type=float,
163+
default=60.0,
164+
help="Duration (seconds) for each measurement stage in throughput mode.",
165+
)
166+
parser.add_argument(
167+
"--emergency-timeout",
168+
type=float,
169+
default=3600.0,
170+
help="Emergency subprocess timeout (seconds). Safety net only.",
171+
)
172+
parser.add_argument(
173+
"--throughput-full-logs",
174+
default=False,
175+
action="store_true",
176+
help="Store per-iteration start_ts and duration_ms arrays in throughput results. "
177+
"Disabled by default to reduce output size.",
178+
)
140179
# option to get parser description in Markdown table format for READMEs
141180
parser.add_argument(
142181
"--describe-parser",

0 commit comments

Comments
 (0)