Skip to content

Commit 7c9619c

Browse files
committed
finish sleeper load test
1 parent c9b0394 commit 7c9619c

File tree

1 file changed

+208
-0
lines changed

1 file changed

+208
-0
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
# /// script
2+
# requires-python = ">=3.11"
3+
# dependencies = [
4+
# "httpx",
5+
# "matplotlib",
6+
# "osparc>=0.8.3.post0.dev26",
7+
# "tenacity",
8+
# ]
9+
# ///
10+
11+
12+
import argparse
13+
import json
14+
import os
15+
from datetime import datetime, timedelta
16+
from pathlib import Path
17+
from tempfile import TemporaryDirectory
18+
19+
import matplotlib.pyplot as plt
20+
import osparc_client
21+
from httpx import BasicAuth, Client, HTTPStatusError
22+
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential
23+
24+
_SCRIPT_DIR = Path(__file__).parent
25+
_MAIN_FILE = _SCRIPT_DIR / "main.py"
26+
assert _MAIN_FILE.is_file(), f"Main file not found: {_MAIN_FILE}"
27+
_NERVE_MODEL_FILE = _SCRIPT_DIR / "Nerve_Model.sab"
28+
assert _NERVE_MODEL_FILE.is_file(), f"Nerve model file not found: {_NERVE_MODEL_FILE}"
29+
_VALUES_FILE = _SCRIPT_DIR / "values.json"
30+
assert _VALUES_FILE.is_file(), f"Values file not found: {_VALUES_FILE}"
31+
32+
_SOLVER_KEY = "simcore/services/comp/itis/sleeper"
33+
_SOLVER_VERSION = "2.2.1"
34+
35+
36+
def main(njobs: int, sleep_seconds: int, log_job: bool = False):
37+
assert njobs > 0, "Number of jobs must be greater than 0"
38+
assert sleep_seconds > 0, "Sleep seconds must be greater than 0"
39+
40+
url = os.environ.get("OSPARC_API_URL")
41+
assert url
42+
key = os.environ.get("OSPARC_API_KEY")
43+
assert key
44+
secret = os.environ.get("OSPARC_API_SECRET")
45+
assert secret
46+
configuration = osparc_client.Configuration(host=url, username=key, password=secret)
47+
48+
uploaded_files = []
49+
registered_functions = []
50+
job_statuses = dict()
51+
52+
with osparc_client.ApiClient(configuration) as api_client:
53+
try:
54+
api_instance = osparc_client.FunctionsApi(api_client)
55+
job_api_instance = osparc_client.FunctionJobsApi(api_client)
56+
job_collection_api_instance = osparc_client.FunctionJobCollectionsApi(
57+
api_client
58+
)
59+
file_client_instance = osparc_client.FilesApi(api_client)
60+
user_api_instance = osparc_client.UsersApi(api_client)
61+
62+
user_api_instance.get_my_profile()
63+
64+
with TemporaryDirectory() as temp_dir, Path(temp_dir) as tmpdir:
65+
_file = tmpdir / "file_with_number.txt"
66+
_file.write_text(f"{sleep_seconds}")
67+
file_with_number = file_client_instance.upload_file(
68+
file=f"{_file.resolve()}"
69+
)
70+
assert file_with_number.id
71+
uploaded_files.append(file_with_number)
72+
73+
solver_function = osparc_client.Function(
74+
osparc_client.SolverFunction(
75+
uid=None,
76+
title="s4l-python-runner",
77+
description="Run Python code using sim4life",
78+
input_schema=osparc_client.JSONFunctionInputSchema(),
79+
output_schema=osparc_client.JSONFunctionOutputSchema(),
80+
solver_key=_SOLVER_KEY,
81+
solver_version=_SOLVER_VERSION,
82+
default_inputs={},
83+
)
84+
)
85+
print(f"Built function: {solver_function.to_dict()}\n")
86+
87+
registered_function = api_instance.register_function(
88+
solver_function.model_dump()
89+
)
90+
registered_functions.append(registered_function)
91+
92+
print(f"Registered function: {registered_function.to_dict()}\n")
93+
94+
function_id = registered_function.to_dict().get("uid")
95+
assert function_id
96+
97+
inputs = njobs * [
98+
{
99+
"input_1": file_with_number,
100+
"input_2": 5,
101+
"input_3": "false",
102+
"input_4": 0,
103+
"input_5": 0,
104+
}
105+
]
106+
107+
function_jobs = api_instance.map_function(
108+
function_id=function_id,
109+
request_body=inputs,
110+
)
111+
112+
print(f"function_job: {function_jobs.to_dict()}")
113+
function_job_ids = function_jobs.job_ids
114+
assert function_job_ids
115+
116+
if log_job:
117+
job = job_api_instance.get_function_job(function_job_ids[0])
118+
print_job_logs(configuration, job.actual_instance.solver_job_id)
119+
120+
for job_uid in function_job_ids:
121+
status = wait_until_done(job_api_instance, job_uid)
122+
job_statuses[status] = job_statuses.get(status, 0) + 1
123+
124+
statuses = list(job_statuses.keys())
125+
counts = [job_statuses[status] for status in statuses]
126+
127+
plt.figure(figsize=(6, 4))
128+
plt.bar(statuses, counts, color="skyblue")
129+
plt.xlabel("Job Status")
130+
plt.ylabel("Count")
131+
plt.title("Function Job Status Counts")
132+
plt.tight_layout()
133+
plt.show(block=True)
134+
135+
finally:
136+
137+
for file in uploaded_files:
138+
try:
139+
file_client_instance.delete_file(file.id)
140+
print(f"Deleted file {file.id}")
141+
except Exception as e:
142+
print(f"Failed to delete file {file.id}: {e}")
143+
144+
for function in registered_functions:
145+
function_uid = function.actual_instance.uid
146+
try:
147+
api_instance.delete_function(function_uid)
148+
print(f"Deleted function {function_uid}")
149+
except Exception as e:
150+
print(f"Failed to delete function {function_uid}: {e}")
151+
152+
153+
@retry(
154+
stop=stop_after_delay(timedelta(minutes=10)),
155+
wait=wait_exponential(multiplier=1, min=1, max=5),
156+
retry=retry_if_exception_type(AssertionError),
157+
reraise=True,
158+
)
159+
def wait_until_done(function_api: osparc_client.FunctionJobsApi, function_job_uid: str):
160+
job_status = function_api.function_job_status(function_job_uid).status
161+
assert job_status in ("SUCCESS", "FAILED")
162+
return job_status
163+
164+
165+
@retry(
166+
stop=stop_after_delay(timedelta(minutes=5)),
167+
wait=wait_exponential(multiplier=1, min=1, max=5),
168+
retry=retry_if_exception_type(HTTPStatusError),
169+
reraise=False,
170+
)
171+
def print_job_logs(configuration: osparc_client.Configuration, solver_job_uid: str):
172+
print(f"Logging job log for solver job UID: {solver_job_uid}")
173+
client = Client(
174+
base_url=configuration.host,
175+
auth=BasicAuth(
176+
username=configuration.username, password=configuration.password
177+
),
178+
)
179+
with client.stream(
180+
"GET",
181+
f"/v0/solvers/{_SOLVER_KEY}/releases/{_SOLVER_VERSION}/jobs/{solver_job_uid}/logstream",
182+
timeout=10,
183+
) as response:
184+
response.raise_for_status()
185+
for line in response.iter_lines():
186+
for msg in json.loads(line).get("messages"):
187+
print(f"{datetime.now().isoformat()}: {msg}")
188+
189+
190+
if __name__ == "__main__":
191+
parser = argparse.ArgumentParser()
192+
parser.add_argument(
193+
"--log-job", action="store_true", help="Log details of a single job"
194+
)
195+
parser.add_argument(
196+
"--sleep-seconds",
197+
type=int,
198+
default=10,
199+
help="Number of seconds for the sleeper function (default: 10)",
200+
)
201+
parser.add_argument(
202+
"--njobs",
203+
type=int,
204+
default=100,
205+
help="Number of jobs to run (default: 100)",
206+
)
207+
args = parser.parse_args()
208+
main(njobs=args.njobs, sleep_seconds=args.sleep_seconds, log_job=args.log_job)

0 commit comments

Comments
 (0)