Skip to content

Commit a9ed7bd

Browse files
yangw-devhuydhn
authored andcommitted
[utilization] pipeline to create clean db records (pytorch#145327)
upload_utilization_script to generate db-ready-insert records to s3 - generate two files: metadata and timeseries in ossci-utilization buckets - convert log record to db format ones - add unit test job for tools/stats/ Related Prs: setup composite action for data pipeline: pytorch#145310 add permission for composite action to access S3 bucket: meta-pytorch/pytorch-gha-infra#595 add insert logic in s3 replicator: pytorch/test-infra#6217 Pull Request resolved: pytorch#145327 Approved by: https://github.com/huydhn Co-authored-by: Huy Do <[email protected]>
1 parent 18a7a04 commit a9ed7bd

File tree

7 files changed

+738
-20
lines changed

7 files changed

+738
-20
lines changed

.github/workflows/lint.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
name: Lint
2+
# Workflow that runs lint checks and also unittests for tools, and scripts.
23

34
on:
45
pull_request:
@@ -207,6 +208,7 @@ jobs:
207208
conda activate "${CONDA_ENV}"
208209
209210
# Test tools
211+
PYTHONPATH=$(pwd) pytest tools/stats
210212
PYTHONPATH=$(pwd) pytest tools/test -o "python_files=test*.py"
211213
PYTHONPATH=$(pwd) pytest .github/scripts -o "python_files=test*.py"
212214

.lintrunner.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ init_command = [
161161
'rich==10.9.0',
162162
'pyyaml==6.0.1',
163163
'optree==0.13.0',
164-
'dataclasses_json==0.6.7'
164+
'dataclasses_json==0.6.7',
165+
'pandas==2.2.3',
165166
]
166167

167168
[[linter]]

tools/stats/monitor.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ def __init__(
190190
job_name=_job_name,
191191
workflow_id=_workflow_run_id,
192192
workflow_name=_workflow_name,
193+
start_at=datetime.datetime.now().timestamp(),
193194
)
194195
self._data_collect_interval = data_collect_interval
195196
self._has_pynvml = pynvml_enabled
@@ -257,7 +258,11 @@ def _output_data(self) -> None:
257258

258259
while not self.exit_event.is_set():
259260
collecting_start_time = time.time()
260-
stats = UtilizationRecord()
261+
stats = UtilizationRecord(
262+
level="record",
263+
timestamp=datetime.datetime.now().timestamp(),
264+
)
265+
261266
try:
262267
data_list, error_list = self.shared_resource.get_and_reset()
263268
if self._debug_mode:
@@ -275,8 +280,6 @@ def _output_data(self) -> None:
275280
if not data_list:
276281
# pass since no data is collected
277282
continue
278-
stats.level = "record"
279-
stats.timestamp = datetime.datetime.now().timestamp()
280283

281284
cpu_stats = self._generate_stats(
282285
[data.cpu_percent for data in data_list]

tools/stats/upload_stats_lib.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ def get_s3_resource() -> Any:
2323
return boto3.resource("s3")
2424

2525

26+
GHA_ARTIFACTS_BUCKET = "gha-artifacts"
27+
28+
2629
# NB: In CI, a flaky test is usually retried 3 times, then the test file would be rerun
2730
# 2 more times
2831
MAX_RETRY_IN_NON_DISABLED_MODE = 3 * 3
@@ -84,16 +87,22 @@ def _download_artifact(
8487

8588

8689
def download_s3_artifacts(
87-
prefix: str, workflow_run_id: int, workflow_run_attempt: int
90+
prefix: str,
91+
workflow_run_id: int,
92+
workflow_run_attempt: int,
93+
job_id: Optional[int] = None,
8894
) -> list[Path]:
89-
bucket = get_s3_resource().Bucket("gha-artifacts")
95+
bucket = get_s3_resource().Bucket(GHA_ARTIFACTS_BUCKET)
9096
objs = bucket.objects.filter(
9197
Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}"
9298
)
93-
9499
found_one = False
95100
paths = []
96101
for obj in objs:
102+
object_name = Path(obj.key).name
103+
# target an artifact for a specific job_id if provided, otherwise skip the download.
104+
if job_id is not None and str(job_id) not in object_name:
105+
continue
97106
found_one = True
98107
p = Path(Path(obj.key).name)
99108
print(f"Downloading {p}")
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
import os
2+
import sys
3+
import unittest
4+
from collections import Counter
5+
from datetime import datetime, timedelta
6+
7+
8+
# adding sys.path makes the monitor script able to import path tools.stats.utilization_stats_lib
9+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
10+
from tools.stats.upload_utilization_stats.upload_utilization_stats import (
11+
SegmentGenerator,
12+
)
13+
from tools.stats.utilization_stats_lib import OssCiSegmentV1, UtilizationRecord
14+
15+
16+
# datetimes from January 1, 2022 12:00:00
17+
TEST_DT_BASE = datetime(2022, 1, 1, 12, 0, 0)
18+
TEST_DT_PLUS_5S = TEST_DT_BASE + timedelta(seconds=5)
19+
TEST_DT_PLUS_10S = TEST_DT_BASE + timedelta(seconds=10)
20+
TEST_DT_PLUS_15S = TEST_DT_BASE + timedelta(seconds=15)
21+
TEST_DT_PLUS_30S = TEST_DT_BASE + timedelta(seconds=30)
22+
TEST_DT_PLUS_40S = TEST_DT_BASE + timedelta(seconds=40)
23+
24+
# timestamps from January 1, 2022 12:00:00
25+
TEST_TS_BASE = TEST_DT_BASE.timestamp()
26+
TEST_TS_PLUS_5S = TEST_DT_PLUS_5S.timestamp()
27+
TEST_TS_PLUS_10S = TEST_DT_PLUS_10S.timestamp()
28+
TEST_TS_PLUS_15S = TEST_DT_PLUS_15S.timestamp()
29+
TEST_TS_PLUS_30S = TEST_DT_PLUS_30S.timestamp()
30+
TEST_TS_PLUS_40S = TEST_DT_PLUS_40S.timestamp()
31+
32+
# test cmd names
33+
PYTEST1_NAME = "python test1.py"
34+
PYTEST2_NAME = "python test2.py"
35+
PYPIP_INSTALL_NAME = "python pip install install1"
36+
37+
38+
class TestSegmentGenerator(unittest.TestCase):
39+
def test_generate_empty_records(self) -> None:
40+
records: list[UtilizationRecord] = []
41+
42+
# execute
43+
generator = SegmentGenerator()
44+
segments = generator.generate(records)
45+
46+
# assert
47+
self.assertEqual(segments, [])
48+
49+
def test_generate_single_record(self) -> None:
50+
record = UtilizationRecord(
51+
timestamp=TEST_TS_BASE, cmd_names=[PYTEST1_NAME], level="PYTHON_CMD"
52+
)
53+
records = [record]
54+
55+
# execute
56+
generator = SegmentGenerator()
57+
segments = generator.generate(records)
58+
59+
# assert
60+
self.assertEqual(len(segments), 1)
61+
62+
def test_generate_single_record_with_multiple_cmds(self) -> None:
63+
record = UtilizationRecord(
64+
timestamp=TEST_TS_BASE,
65+
cmd_names=[PYTEST1_NAME, PYPIP_INSTALL_NAME],
66+
level="PYTHON_CMD",
67+
)
68+
records = [record]
69+
70+
# execute
71+
generator = SegmentGenerator()
72+
segments = generator.generate(records)
73+
74+
# assert
75+
self.assertEqual(len(segments), 2)
76+
77+
def test_generate_multiple_records(self) -> None:
78+
records = get_base_test_records()
79+
80+
# execute
81+
generator = SegmentGenerator()
82+
segments = generator.generate(records)
83+
84+
# assert
85+
self.assertEqual(len(segments), 2)
86+
self.validate_segment(segments[0], PYTEST1_NAME, TEST_TS_BASE, TEST_TS_PLUS_30S)
87+
self.validate_segment(
88+
segments[1], PYPIP_INSTALL_NAME, TEST_TS_PLUS_10S, TEST_TS_PLUS_15S
89+
)
90+
91+
def test_generate_cmd_interval_larger_than_default_threshold_setting(self) -> None:
92+
records = get_base_test_records()
93+
94+
# record has more than 1 minute gap than last default record
95+
test_gap_dt1 = TEST_DT_PLUS_30S + timedelta(seconds=80)
96+
test_gap_dt2 = TEST_DT_PLUS_30S + timedelta(seconds=85)
97+
record_gap_1 = UtilizationRecord(
98+
timestamp=test_gap_dt1.timestamp(),
99+
cmd_names=[PYTEST1_NAME],
100+
level="PYTHON_CMD",
101+
)
102+
record_gap_2 = UtilizationRecord(
103+
timestamp=test_gap_dt2.timestamp(),
104+
cmd_names=[PYTEST1_NAME],
105+
level="PYTHON_CMD",
106+
)
107+
records += [record_gap_1, record_gap_2]
108+
109+
# execute
110+
generator = SegmentGenerator()
111+
segments = generator.generate(records)
112+
113+
# assert
114+
counter = Counter(seg.name for seg in segments)
115+
self.assertEqual(counter[PYTEST1_NAME], 2)
116+
self.assertEqual(counter[PYPIP_INSTALL_NAME], 1)
117+
self.assertEqual(len(segments), 3)
118+
119+
self.validate_segment(segments[0], PYTEST1_NAME, TEST_TS_BASE, TEST_TS_PLUS_30S)
120+
self.validate_segment(
121+
segments[1],
122+
PYTEST1_NAME,
123+
test_gap_dt1.timestamp(),
124+
test_gap_dt2.timestamp(),
125+
)
126+
self.validate_segment(
127+
segments[2], PYPIP_INSTALL_NAME, TEST_TS_PLUS_10S, TEST_TS_PLUS_15S
128+
)
129+
130+
def test_generate_multiple_segments_with_customized_threshold(self) -> None:
131+
# set threshold to consider as continuous segment to 10 seconds
132+
test_threshold = 10
133+
134+
records = get_base_test_records()
135+
136+
# execute
137+
generator = SegmentGenerator()
138+
segments = generator.generate(records, test_threshold)
139+
140+
# assert
141+
counter = Counter(seg.name for seg in segments)
142+
self.assertEqual(counter[PYTEST1_NAME], 2)
143+
self.assertEqual(counter[PYPIP_INSTALL_NAME], 1)
144+
self.assertEqual(len(segments), 3)
145+
146+
self.validate_segment(segments[0], PYTEST1_NAME, TEST_TS_BASE, TEST_TS_PLUS_15S)
147+
self.validate_segment(
148+
segments[1], PYTEST1_NAME, TEST_TS_PLUS_30S, TEST_TS_PLUS_30S
149+
)
150+
self.validate_segment(
151+
segments[2], PYPIP_INSTALL_NAME, TEST_TS_PLUS_10S, TEST_TS_PLUS_15S
152+
)
153+
154+
def validate_segment(
155+
self, segment: OssCiSegmentV1, name: str, start_at: float, end_at: float
156+
) -> None:
157+
self.assertEqual(segment.name, name)
158+
self.assertEqual(segment.start_at, start_at)
159+
self.assertEqual(segment.end_at, end_at)
160+
161+
162+
def get_base_test_records() -> list[UtilizationRecord]:
163+
record1 = UtilizationRecord(
164+
timestamp=TEST_TS_BASE, cmd_names=[PYTEST1_NAME], level="PYTHON_CMD"
165+
)
166+
record2 = UtilizationRecord(
167+
timestamp=TEST_TS_PLUS_5S,
168+
cmd_names=[PYTEST1_NAME],
169+
level="PYTHON_CMD",
170+
)
171+
record3 = UtilizationRecord(
172+
timestamp=TEST_TS_PLUS_10S,
173+
cmd_names=[PYTEST1_NAME, PYPIP_INSTALL_NAME],
174+
level="PYTHON_CMD",
175+
)
176+
record4 = UtilizationRecord(
177+
timestamp=TEST_TS_PLUS_15S,
178+
cmd_names=[PYTEST1_NAME, PYPIP_INSTALL_NAME],
179+
level="PYTHON_CMD",
180+
)
181+
record5 = UtilizationRecord(
182+
timestamp=TEST_TS_PLUS_30S,
183+
cmd_names=[PYTEST1_NAME],
184+
level="PYTHON_CMD",
185+
)
186+
record6 = UtilizationRecord(
187+
timestamp=TEST_TS_PLUS_40S,
188+
cmd_names=[],
189+
level="PYTHON_CMD",
190+
)
191+
return [record1, record2, record3, record4, record5, record6]
192+
193+
194+
if __name__ == "__main__":
195+
unittest.main()

0 commit comments

Comments
 (0)