Skip to content

Commit 1d474ed

Browse files
authored
Add MetricsCollector (#809)
* feat: add MetricsCollector * feat: improvements to api design * style: lint * style: lint * feat: weird exit code type convertion logic * fix: typing * feat: add missing except * feat: log crash * add raise * feat: add re-raise
1 parent 59c506a commit 1d474ed

File tree

6 files changed

+306
-29
lines changed

6 files changed

+306
-29
lines changed

src/xpk/core/telemetry.py

Lines changed: 136 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,143 @@
1414
limitations under the License.
1515
"""
1616

17+
import platform
1718
import uuid
18-
from .config import xpk_config, CLIENT_ID_KEY
19+
import json
20+
import time
21+
from enum import Enum
22+
from dataclasses import dataclass
23+
from .config import xpk_config, CLIENT_ID_KEY, __version__ as xpk_version
24+
from ..utils.execution_context import is_dry_run
1925

2026

21-
def generate_client_id():
27+
def ensure_client_id() -> str:
2228
"""Generates Client ID and stores in configuration if not already present."""
23-
if xpk_config.get(CLIENT_ID_KEY) is None:
24-
xpk_config.set(CLIENT_ID_KEY, str(uuid.uuid4()))
29+
current_client_id = xpk_config.get(CLIENT_ID_KEY)
30+
if current_client_id is not None:
31+
return current_client_id
32+
33+
new_client_id = str(uuid.uuid4())
34+
xpk_config.set(CLIENT_ID_KEY, new_client_id)
35+
return new_client_id
36+
37+
38+
class MetricsEventMetadataKey(Enum):
39+
SESSION_ID = "XPK_SESSION_ID"
40+
DRY_RUN = "XPK_DRY_RUN"
41+
PYTHON_VERSION = "XPK_PYTHON_VERSION"
42+
ZONE = "XPK_ZONE"
43+
SYSTEM_CHARACTERISTICS = "XPK_SYSTEM_CHARACTERISTICS"
44+
PROVISIONING_MODE = "XPK_PROVISIONING_MODE"
45+
COMMAND = "XPK_COMMAND"
46+
EXIT_CODE = "XPK_EXIT_CODE"
47+
48+
49+
@dataclass
50+
class _MetricsEvent:
51+
time: float
52+
type: str
53+
name: str
54+
metadata: dict[MetricsEventMetadataKey, str]
55+
56+
57+
class _MetricsCollector:
58+
"""Metrics collector for collecting various metrics and events across application."""
59+
60+
_events: list[_MetricsEvent] = []
61+
62+
def log_start(self, command: str) -> None:
63+
"""Logs start event."""
64+
self._events.append(
65+
_MetricsEvent(
66+
time=time.time(),
67+
type="commands",
68+
name="start",
69+
metadata={MetricsEventMetadataKey.COMMAND: command},
70+
)
71+
)
72+
73+
def log_complete(self, exit_code: int) -> None:
74+
"""Logs complete event."""
75+
self._events.append(
76+
_MetricsEvent(
77+
time=time.time(),
78+
type="commands",
79+
name="complete",
80+
metadata={MetricsEventMetadataKey.EXIT_CODE: str(exit_code)},
81+
)
82+
)
83+
84+
def log_custom(
85+
self,
86+
name: str,
87+
metadata: dict[MetricsEventMetadataKey, str] | None = None,
88+
) -> None:
89+
"""Logs custom event."""
90+
self._events.append(
91+
_MetricsEvent(
92+
time=time.time(),
93+
type="custom",
94+
name=name,
95+
metadata=metadata if metadata is not None else {},
96+
)
97+
)
98+
99+
def flush(self) -> str:
100+
"""Flushes collected events into concord payload."""
101+
result = _generate_payload(self._events)
102+
self._events.clear()
103+
return result
104+
105+
106+
MetricsCollector = _MetricsCollector()
107+
108+
109+
def _generate_payload(events: list[_MetricsEvent]) -> str:
110+
base_concord_event = _get_base_concord_event()
111+
base_event_metadata = _get_base_event_metadata()
112+
serialized_events = []
113+
for event in events:
114+
metadata = {
115+
**base_event_metadata,
116+
**event.metadata,
117+
}
118+
serialized_events.append({
119+
"event_time_ms": int(event.time * 1000),
120+
"source_extension_json": json.dumps({
121+
**base_concord_event,
122+
"event_type": event.type,
123+
"event_name": event.name,
124+
"event_metadata": [
125+
{"key": key.value, "value": value}
126+
for key, value in metadata.items()
127+
],
128+
}),
129+
})
130+
131+
return json.dumps({
132+
"client_info": {"client_type": "XPK"},
133+
"log_source_name": "CONCORD",
134+
"request_time_ms": int(time.time() * 1000),
135+
"log_event": serialized_events,
136+
})
137+
138+
139+
def _get_base_event_metadata() -> dict[MetricsEventMetadataKey, str]:
140+
return {
141+
MetricsEventMetadataKey.SESSION_ID: _get_session_id(),
142+
MetricsEventMetadataKey.DRY_RUN: str(is_dry_run()).lower(),
143+
MetricsEventMetadataKey.PYTHON_VERSION: platform.python_version(),
144+
}
145+
146+
147+
def _get_base_concord_event() -> dict[str, str]:
148+
return {
149+
"release_version": xpk_version,
150+
"console_type": "XPK",
151+
"client_install_id": ensure_client_id(),
152+
}
153+
154+
155+
def _get_session_id() -> str:
156+
return str(uuid.uuid4())

src/xpk/core/telemetry_test.py

Lines changed: 110 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,123 @@
1414
limitations under the License.
1515
"""
1616

17+
import pytest
18+
import json
1719
from .config import xpk_config, CLIENT_ID_KEY
18-
from .telemetry import generate_client_id
20+
from .telemetry import ensure_client_id, MetricsCollector, MetricsEventMetadataKey
21+
from ..utils.execution_context import set_dry_run
1922

2023

21-
def test_generates_client_id_when_its_not_present():
24+
@pytest.fixture(autouse=True)
25+
def setup_mocks(mocker):
26+
mocker.patch('xpk.core.telemetry._get_session_id', return_value='321231')
27+
mocker.patch('time.time', return_value=0)
28+
mocker.patch('platform.python_version', return_value='99.99.99')
29+
xpk_config.set(CLIENT_ID_KEY, 'client_id')
30+
yield
2231
xpk_config.set(CLIENT_ID_KEY, None)
23-
generate_client_id()
32+
33+
34+
def test_ensure_client_id_generates_client_id_when_its_not_present():
35+
xpk_config.set(CLIENT_ID_KEY, None)
36+
ensure_client_id()
2437
assert xpk_config.get(CLIENT_ID_KEY) is not None
2538

2639

27-
def test_does_not_generate_client_id_when_its_present():
40+
def test_ensure_client_id_does_not_regenerate_id_when_its_present():
2841
client_id = '1337'
2942
xpk_config.set(CLIENT_ID_KEY, client_id)
30-
generate_client_id()
43+
ensure_client_id()
3144
assert xpk_config.get(CLIENT_ID_KEY) == client_id
45+
46+
47+
def test_metrics_collector_generates_client_id_if_not_present():
48+
xpk_config.set(CLIENT_ID_KEY, None)
49+
MetricsCollector.log_start(command='test')
50+
payload = json.loads(MetricsCollector.flush())
51+
extension_json = json.loads(payload['log_event'][0]['source_extension_json'])
52+
assert extension_json['client_install_id'] is not None
53+
assert len(extension_json['client_install_id']) > 0
54+
55+
56+
def test_metrics_collector_logs_start_event_correctly():
57+
set_dry_run(False)
58+
MetricsCollector.log_start(command='test')
59+
payload = json.loads(MetricsCollector.flush())
60+
extension_json = json.loads(payload['log_event'][0]['source_extension_json'])
61+
assert extension_json == {
62+
'client_install_id': 'client_id',
63+
'console_type': 'XPK',
64+
'event_metadata': [
65+
{'key': 'XPK_SESSION_ID', 'value': '321231'},
66+
{'key': 'XPK_DRY_RUN', 'value': 'false'},
67+
{'key': 'XPK_PYTHON_VERSION', 'value': '99.99.99'},
68+
{'key': 'XPK_COMMAND', 'value': 'test'},
69+
],
70+
'event_name': 'start',
71+
'event_type': 'commands',
72+
'release_version': 'v0.14.3',
73+
}
74+
75+
76+
def test_metrics_collector_logs_complete_event_correctly():
77+
set_dry_run(True)
78+
MetricsCollector.log_complete(exit_code=2)
79+
payload = json.loads(MetricsCollector.flush())
80+
extension_json = json.loads(payload['log_event'][0]['source_extension_json'])
81+
assert extension_json == {
82+
'client_install_id': 'client_id',
83+
'console_type': 'XPK',
84+
'event_metadata': [
85+
{'key': 'XPK_SESSION_ID', 'value': '321231'},
86+
{'key': 'XPK_DRY_RUN', 'value': 'true'},
87+
{'key': 'XPK_PYTHON_VERSION', 'value': '99.99.99'},
88+
{'key': 'XPK_EXIT_CODE', 'value': '2'},
89+
],
90+
'event_name': 'complete',
91+
'event_type': 'commands',
92+
'release_version': 'v0.14.3',
93+
}
94+
95+
96+
def test_metrics_collector_logs_custom_event_correctly():
97+
set_dry_run(False)
98+
MetricsCollector.log_custom(
99+
name='test', metadata={MetricsEventMetadataKey.PROVISIONING_MODE: 'flex'}
100+
)
101+
payload = json.loads(MetricsCollector.flush())
102+
extension_json = json.loads(payload['log_event'][0]['source_extension_json'])
103+
assert extension_json == {
104+
'client_install_id': 'client_id',
105+
'console_type': 'XPK',
106+
'event_metadata': [
107+
{'key': 'XPK_SESSION_ID', 'value': '321231'},
108+
{'key': 'XPK_DRY_RUN', 'value': 'false'},
109+
{'key': 'XPK_PYTHON_VERSION', 'value': '99.99.99'},
110+
{'key': 'XPK_PROVISIONING_MODE', 'value': 'flex'},
111+
],
112+
'event_name': 'test',
113+
'event_type': 'custom',
114+
'release_version': 'v0.14.3',
115+
}
116+
117+
118+
def test_metrics_collector_logs_correct_envelope():
119+
MetricsCollector.log_start(command='test')
120+
MetricsCollector.log_custom(
121+
name='test', metadata={MetricsEventMetadataKey.PROVISIONING_MODE: 'flex'}
122+
)
123+
MetricsCollector.log_complete(exit_code=2)
124+
payload = json.loads(MetricsCollector.flush())
125+
assert payload['client_info'] == {'client_type': 'XPK'}
126+
assert payload['log_source_name'] == 'CONCORD'
127+
assert payload['request_time_ms'] == 0
128+
assert len(payload['log_event']) == 3
129+
130+
131+
def test_metrics_collector_does_not_flush_event_twice():
132+
MetricsCollector.log_start(command='test')
133+
MetricsCollector.flush()
134+
MetricsCollector.log_start(command='version')
135+
payload = json.loads(MetricsCollector.flush())
136+
assert len(payload['log_event']) == 1

src/xpk/main.py

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@
3737

3838
from .parser.core import set_parser
3939
from .core.updates import print_xpk_hello
40-
from .core.telemetry import generate_client_id
41-
from .utils.console import xpk_print
40+
from .core.telemetry import MetricsCollector
41+
from .utils.feature_flags import FeatureFlags
42+
from .utils.console import xpk_print, exit_code_to_int
4243
from .utils.execution_context import set_context
4344
################### Compatibility Check ###################
4445
# Check that the user runs the below version or greater.
@@ -61,24 +62,36 @@
6162

6263

6364
def main() -> None:
64-
# Create top level parser for xpk command.
65-
parser = argparse.ArgumentParser(description='xpk command', prog='xpk')
66-
set_parser(parser=parser)
67-
argcomplete.autocomplete(parser)
65+
try:
66+
# Create top level parser for xpk command.
67+
parser = argparse.ArgumentParser(description='xpk command', prog='xpk')
68+
set_parser(parser=parser)
69+
argcomplete.autocomplete(parser)
6870

69-
main_args = parser.parse_args()
70-
main_args.enable_ray_cluster = False
71-
set_context(
72-
dry_run_value='dry_run' in main_args and main_args.dry_run,
73-
quiet_value=(
74-
('quiet' in main_args and main_args.quiet)
75-
or ('force' in main_args and main_args.force)
76-
),
77-
)
78-
generate_client_id()
79-
print_xpk_hello()
80-
main_args.func(main_args)
81-
xpk_print('XPK Done.', flush=True)
71+
main_args = parser.parse_args()
72+
main_args.enable_ray_cluster = False
73+
set_context(
74+
dry_run_value='dry_run' in main_args and main_args.dry_run,
75+
quiet_value=(
76+
('quiet' in main_args and main_args.quiet)
77+
or ('force' in main_args and main_args.force)
78+
),
79+
)
80+
MetricsCollector.log_start(main_args.xpk_subcommands)
81+
print_xpk_hello()
82+
main_args.func(main_args)
83+
xpk_print('XPK Done.', flush=True)
84+
MetricsCollector.log_complete(0)
85+
except SystemExit as e:
86+
MetricsCollector.log_complete(exit_code_to_int(e.code))
87+
raise
88+
except:
89+
MetricsCollector.log_complete(-1)
90+
raise
91+
finally:
92+
if FeatureFlags.TELEMETRY_ENABLED:
93+
# TODO(@scaliby): Flush to server instead of a console
94+
xpk_print(MetricsCollector.flush())
8295

8396

8497
if __name__ == '__main__':

src/xpk/utils/console.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,15 @@ def ask_for_user_consent(
7373
return False
7474
else:
7575
xpk_print("Invalid input. Please enter: yes/no/y/n.")
76+
77+
78+
def exit_code_to_int(exit_code: str | int | None) -> int:
79+
"""
80+
Converts sys._ExitCode to an int value that is used to exit the program.
81+
See more: https://github.com/python/typeshed/issues/8513#issue-1333671093
82+
"""
83+
if isinstance(exit_code, int):
84+
return int(exit_code)
85+
if exit_code is None:
86+
return 0
87+
return 1

src/xpk/utils/console_test.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import pytest
1919
from pytest_mock import MockerFixture
2020

21-
from xpk.utils.console import ask_for_user_consent
21+
from .console import exit_code_to_int, ask_for_user_consent
2222

2323

2424
@pytest.fixture(autouse=True)
@@ -90,3 +90,17 @@ def test_ask_for_user_consent_with_quiet_mode_always_agrees(
9090

9191
assert agreed is True
9292
mock_input.assert_not_called()
93+
94+
95+
@pytest.mark.parametrize(
96+
"exit_code,expected",
97+
[
98+
(0, 0),
99+
(1, 1),
100+
("Error", 1),
101+
({"foo": "bar"}, 1),
102+
(None, 0),
103+
],
104+
)
105+
def test_exit_code_to_int_returns_correct_value(exit_code, expected):
106+
assert exit_code_to_int(exit_code) == expected

src/xpk/utils/feature_flags.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def _get_boolean_flag(flag: str, default: bool) -> bool:
2323

2424
class _FeatureFlags:
2525
SUB_SLICING_ENABLED = _get_boolean_flag("SUB_SLICING_ENABLED", default=False)
26+
TELEMETRY_ENABLED = _get_boolean_flag("TELEMETRY_ENABLED", default=False)
2627

2728

2829
FeatureFlags = _FeatureFlags()

0 commit comments

Comments
 (0)