Skip to content

Commit c72e22b

Browse files
committed
allow to stream logs of a single job
1 parent b74f64c commit c72e22b

File tree

1 file changed

+78
-29
lines changed

1 file changed

+78
-29
lines changed

tests/performance/locustfiles/functions/map_test.py

Lines changed: 78 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
# /// script
22
# requires-python = ">=3.11"
33
# dependencies = [
4+
# "httpx",
5+
# "matplotlib",
46
# "osparc>=0.8.3.post0.dev26",
57
# "tenacity",
68
# ]
79
# ///
810

911

12+
import argparse
1013
import json
1114
import os
12-
import time
13-
import zipfile
15+
from datetime import datetime, timedelta
1416
from pathlib import Path
1517

16-
# import osparc
18+
import matplotlib.pyplot as plt
1719
import osparc_client
20+
from httpx import BasicAuth, Client, HTTPStatusError
21+
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential
1822

1923
_SCRIPT_DIR = Path(__file__).parent
2024
_MAIN_FILE = _SCRIPT_DIR / "main.py"
@@ -28,7 +32,8 @@
2832
_SOLVER_VERSION = "1.2.200"
2933

3034

31-
def main():
35+
def main(log_job: bool = False):
36+
3237
url = os.environ.get("OSPARC_API_URL")
3338
assert url
3439
key = os.environ.get("OSPARC_API_KEY")
@@ -39,6 +44,7 @@ def main():
3944

4045
uploaded_files = []
4146
registered_functions = []
47+
job_statuses = dict()
4248

4349
with osparc_client.ApiClient(configuration) as api_client:
4450
try:
@@ -91,38 +97,33 @@ def main():
9197
function_id = registered_function.to_dict().get("uid")
9298
assert function_id
9399

94-
received_function = api_instance.get_function(function_id)
95-
96100
function_job = api_instance.run_function(
97101
function_id, {"input_3": values_file}
98102
)
99103

104+
print(f"function_job: {function_job.to_dict()}")
105+
100106
function_job_uid = function_job.to_dict().get("uid")
101107
assert function_job_uid
102108

103-
while (
104-
job_status := job_api_instance.function_job_status(
105-
function_job_uid
106-
).status
107-
) not in ("SUCCESS", "FAILED"):
108-
print(f"Job status: {job_status}")
109-
time.sleep(5)
110-
print(f"Job status: {job_status}")
111-
112-
job_output_dict = job_api_instance.function_job_outputs(function_job_uid)
113-
print(f"\nJob output: {job_output_dict}")
114-
115-
downloaded_file = file_instance.download_file(
116-
job_output_dict["output_1"]["id"],
117-
destination_folder=pl.Path("./solver_files"),
118-
)
119-
print(f"Downloaded file: {downloaded_file}")
120-
with zipfile.ZipFile(downloaded_file, "r") as zip_file:
121-
job_output = json.loads(
122-
zip_file.read("function_outputs.json").decode("utf-8")
123-
)
109+
if log_job:
110+
print(f"Logging job log for job UID: {function_job_uid}")
111+
print_job_logs(configuration, function_job_uid)
124112

125-
print(f"Job output: {job_output}")
113+
for job_uid in [function_job_uid]:
114+
status = wait_until_done(job_api_instance, job_uid)
115+
job_statuses[status] = job_statuses.get(status, 0) + 1
116+
117+
statuses = list(job_statuses.keys())
118+
counts = [job_statuses[status] for status in statuses]
119+
120+
plt.figure(figsize=(6, 4))
121+
plt.bar(statuses, counts, color="skyblue")
122+
plt.xlabel("Job Status")
123+
plt.ylabel("Count")
124+
plt.title("Function Job Status Counts")
125+
plt.tight_layout()
126+
plt.show(block=True)
126127

127128
finally:
128129

@@ -133,6 +134,54 @@ def main():
133134
except Exception as e:
134135
print(f"Failed to delete file {file.id}: {e}")
135136

137+
for function in registered_functions:
138+
try:
139+
api_instance.delete_function(function.uid)
140+
print(f"Deleted function {function.uid}")
141+
except Exception as e:
142+
print(f"Failed to delete function {function.uid}: {e}")
143+
144+
145+
@retry(
146+
stop=stop_after_delay(timedelta(minutes=10)),
147+
wait=wait_exponential(multiplier=1, min=1, max=5),
148+
retry=retry_if_exception_type(AssertionError),
149+
reraise=True,
150+
)
151+
def wait_until_done(function_api: osparc_client.FunctionJobsApi, function_job_uid: str):
152+
job_status = function_api.function_job_status(function_job_uid).status
153+
assert job_status in ("SUCCESS", "FAILED")
154+
return job_status
155+
156+
157+
@retry(
158+
stop=stop_after_delay(timedelta(minutes=10)),
159+
wait=wait_exponential(multiplier=1, min=1, max=5),
160+
retry=retry_if_exception_type(HTTPStatusError),
161+
reraise=True,
162+
)
163+
def print_job_logs(configuration: osparc_client.Configuration, job_uid: str):
164+
client = Client(
165+
base_url=configuration.host,
166+
auth=BasicAuth(
167+
username=configuration.username, password=configuration.password
168+
),
169+
)
170+
with client.stream(
171+
"GET",
172+
f"/v0/solvers/{_SOLVER_KEY}/releases/{_SOLVER_VERSION}/jobs/{job_uid}/logstream",
173+
timeout=10 * 60,
174+
) as response:
175+
response.raise_for_status()
176+
for line in response.iter_lines():
177+
for msg in json.loads(line).get("messages"):
178+
print(f"{datetime.now().isoformat()}: {msg}")
179+
136180

137181
if __name__ == "__main__":
138-
main()
182+
parser = argparse.ArgumentParser()
183+
parser.add_argument(
184+
"--log-job", action="store_true", help="Log details of a single job"
185+
)
186+
args = parser.parse_args()
187+
main(log_job=args.log_job)

0 commit comments

Comments
 (0)