Skip to content

Commit a3f54d1

Browse files
Add parca-agent integration for self-contained coordinator (#348)
* feat: add parca-agent integration for self-contained coordinator - Add parca_agent.py module with helper functions for snap integration - Check parca-agent availability on coordinator startup - Update external labels with benchmark metadata before each test run - Labels include: platform, arch, coordinator_version, git_hash, git_branch, git_version, build_variant, github_org, github_repo, test_name, topology, client_tool, tested_commands, tested_groups, dataset_name * fix: set TOX_DOCKER_GATEWAY to fix CI KeyError on Gateway * ci: don't fail CI on codecov errors
1 parent ddbe551 commit a3f54d1

File tree

3 files changed

+286
-1
lines changed

3 files changed

+286
-1
lines changed

.github/workflows/tox.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ jobs:
1717
ACTIONS_ALLOW_UNSECURE_COMMANDS: true
1818
USING_COVERAGE: "3.10"
1919
USING_COVERAGE_OS: "ubuntu-latest"
20+
TOX_DOCKER_GATEWAY: localhost
2021

2122
runs-on: "ubuntu-latest"
2223
name: os ${{ matrix.os }} python ${{ matrix.python-version }} Linting, testing, and compliance
@@ -49,5 +50,5 @@ jobs:
4950
file: ./coverage.xml
5051
flags: unittests
5152
name: codecov-umbrella
52-
fail_ci_if_error: true
53+
fail_ci_if_error: false
5354
verbose: true
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
"""
2+
Parca Agent integration for the self-contained coordinator.
3+
4+
This module provides functions to check if parca-agent snap is available
5+
and to update its external labels with benchmark metadata for profiling correlation.
6+
"""
7+
8+
import logging
9+
import shutil
10+
import subprocess
11+
from typing import Dict
12+
13+
14+
def check_parca_agent_available() -> bool:
15+
"""
16+
Check if snap and parca-agent are available on the system.
17+
18+
Returns True only if:
19+
1. snap command exists
20+
2. parca-agent snap is installed
21+
3. parca-agent service is running
22+
"""
23+
# Step 1: Check if snap command exists
24+
if shutil.which("snap") is None:
25+
logging.info("snap command not found - parca-agent integration disabled")
26+
return False
27+
28+
# Step 2: Check if parca-agent snap is installed
29+
try:
30+
result = subprocess.run(
31+
["snap", "list", "parca-agent"],
32+
capture_output=True,
33+
text=True,
34+
timeout=10,
35+
)
36+
if result.returncode != 0:
37+
logging.info(
38+
"parca-agent snap not installed - parca-agent integration disabled"
39+
)
40+
return False
41+
except subprocess.TimeoutExpired:
42+
logging.warning("Timeout checking parca-agent snap installation")
43+
return False
44+
except Exception as e:
45+
logging.warning(f"Failed to check parca-agent snap: {e}")
46+
return False
47+
48+
# Step 3: Check if parca-agent service is running
49+
try:
50+
result = subprocess.run(
51+
["snap", "services", "parca-agent"],
52+
capture_output=True,
53+
text=True,
54+
timeout=10,
55+
)
56+
if "active" in result.stdout:
57+
logging.info(
58+
"parca-agent snap is available and running - integration enabled"
59+
)
60+
return True
61+
else:
62+
logging.info(
63+
"parca-agent snap is installed but not running - integration disabled"
64+
)
65+
return False
66+
except subprocess.TimeoutExpired:
67+
logging.warning("Timeout checking parca-agent service status")
68+
return False
69+
except Exception as e:
70+
logging.warning(f"Failed to check parca-agent service status: {e}")
71+
return False
72+
73+
74+
def sanitize_label_value(value: str, max_length: int = 64) -> str:
75+
"""
76+
Sanitize a label value for use in parca-agent external labels.
77+
78+
- Replaces '=' and ',' with '_' (these are delimiters in the label format)
79+
- Replaces ':' with '-' (common in build variants)
80+
- Truncates to max_length
81+
- Returns 'unknown' for empty/None values
82+
"""
83+
if not value:
84+
return "unknown"
85+
86+
# Convert to string if needed
87+
value = str(value)
88+
89+
# Replace problematic characters
90+
value = value.replace("=", "_")
91+
value = value.replace(",", "_")
92+
value = value.replace(":", "-")
93+
value = value.replace("'", "")
94+
value = value.replace('"', "")
95+
96+
# Truncate if too long
97+
if len(value) > max_length:
98+
value = value[:max_length]
99+
100+
return value
101+
102+
103+
def build_labels_string(labels: Dict[str, str]) -> str:
104+
"""
105+
Build the labels string for the snap set command.
106+
107+
Format: key1=value1,key2=value2,...
108+
"""
109+
parts = []
110+
for key, value in labels.items():
111+
if value is not None:
112+
sanitized_value = sanitize_label_value(value)
113+
parts.append(f"{key}={sanitized_value}")
114+
return ",".join(parts)
115+
116+
117+
def update_parca_agent_labels(labels: Dict[str, str], timeout: int = 30) -> bool:
118+
"""
119+
Update parca-agent external labels and restart the agent.
120+
121+
Args:
122+
labels: Dictionary of label key-value pairs
123+
timeout: Timeout in seconds for each subprocess call
124+
125+
Returns:
126+
True if successful, False otherwise
127+
"""
128+
labels_string = build_labels_string(labels)
129+
130+
# Set the external labels
131+
try:
132+
logging.info(f"Setting parca-agent external labels: {labels_string}")
133+
result = subprocess.run(
134+
[
135+
"sudo",
136+
"snap",
137+
"set",
138+
"parca-agent",
139+
f"metadata-external-labels={labels_string}",
140+
],
141+
capture_output=True,
142+
text=True,
143+
timeout=timeout,
144+
)
145+
if result.returncode != 0:
146+
logging.warning(f"Failed to set parca-agent labels: {result.stderr}")
147+
return False
148+
except subprocess.TimeoutExpired:
149+
logging.warning("Timeout setting parca-agent labels")
150+
return False
151+
except Exception as e:
152+
logging.warning(f"Failed to set parca-agent labels: {e}")
153+
return False
154+
155+
# Restart parca-agent to apply the new labels
156+
try:
157+
logging.info("Restarting parca-agent to apply new labels")
158+
result = subprocess.run(
159+
["sudo", "snap", "restart", "parca-agent"],
160+
capture_output=True,
161+
text=True,
162+
timeout=timeout,
163+
)
164+
if result.returncode != 0:
165+
logging.warning(f"Failed to restart parca-agent: {result.stderr}")
166+
return False
167+
except subprocess.TimeoutExpired:
168+
logging.warning("Timeout restarting parca-agent")
169+
return False
170+
except Exception as e:
171+
logging.warning(f"Failed to restart parca-agent: {e}")
172+
return False
173+
174+
logging.info("Successfully updated parca-agent labels")
175+
return True
176+
177+
178+
def extract_test_labels_from_benchmark_config(benchmark_config: dict) -> Dict[str, str]:
179+
"""
180+
Extract test-level labels from a benchmark configuration YAML.
181+
182+
Extracts:
183+
- test_name: from 'name' field
184+
- topology: from 'redis-topologies' (first one)
185+
- client_tool: from 'clientconfig.tool'
186+
- tested_commands: from 'tested-commands' (joined with '+')
187+
- tested_groups: from 'tested-groups' (joined with '+')
188+
- dataset_name: from 'dbconfig.dataset_name' (if present)
189+
"""
190+
labels = {}
191+
192+
# Test name
193+
if "name" in benchmark_config:
194+
labels["test_name"] = benchmark_config["name"]
195+
196+
# Topology (take first one if list)
197+
if "redis-topologies" in benchmark_config:
198+
topologies = benchmark_config["redis-topologies"]
199+
if isinstance(topologies, list) and len(topologies) > 0:
200+
labels["topology"] = topologies[0]
201+
elif isinstance(topologies, str):
202+
labels["topology"] = topologies
203+
204+
# Client tool
205+
if "clientconfig" in benchmark_config:
206+
clientconfig = benchmark_config["clientconfig"]
207+
if isinstance(clientconfig, dict) and "tool" in clientconfig:
208+
labels["client_tool"] = clientconfig["tool"]
209+
210+
# Tested commands (join multiple with '+')
211+
if "tested-commands" in benchmark_config:
212+
commands = benchmark_config["tested-commands"]
213+
if isinstance(commands, list):
214+
labels["tested_commands"] = "+".join(str(cmd) for cmd in commands)
215+
elif isinstance(commands, str):
216+
labels["tested_commands"] = commands
217+
218+
# Tested groups (join multiple with '+')
219+
if "tested-groups" in benchmark_config:
220+
groups = benchmark_config["tested-groups"]
221+
if isinstance(groups, list):
222+
labels["tested_groups"] = "+".join(str(grp) for grp in groups)
223+
elif isinstance(groups, str):
224+
labels["tested_groups"] = groups
225+
226+
# Dataset name (from dbconfig)
227+
if "dbconfig" in benchmark_config:
228+
dbconfig = benchmark_config["dbconfig"]
229+
if isinstance(dbconfig, dict) and "dataset_name" in dbconfig:
230+
labels["dataset_name"] = dbconfig["dataset_name"]
231+
elif isinstance(dbconfig, list):
232+
for item in dbconfig:
233+
if isinstance(item, dict) and "dataset_name" in item:
234+
labels["dataset_name"] = item["dataset_name"]
235+
break
236+
237+
return labels

redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@
119119
from redis_benchmarks_specification.__self_contained_coordinator__.build_info import (
120120
extract_build_info_from_streamdata,
121121
)
122+
from redis_benchmarks_specification.__self_contained_coordinator__.parca_agent import (
123+
check_parca_agent_available,
124+
update_parca_agent_labels,
125+
extract_test_labels_from_benchmark_config,
126+
)
122127

123128
# Global variables for HTTP server control
124129
_reset_queue_requested = False
@@ -127,6 +132,10 @@
127132
_http_auth_password = None
128133
_flush_timestamp = None
129134

135+
# Global variables for parca-agent integration
136+
_parca_agent_available = False
137+
_parca_startup_labels = {}
138+
130139

131140
class CoordinatorHTTPHandler(BaseHTTPRequestHandler):
132141
"""HTTP request handler for coordinator endpoints"""
@@ -743,6 +752,18 @@ def main():
743752
override_memtier_test_time
744753
)
745754
)
755+
756+
# Check parca-agent availability and set startup labels
757+
global _parca_agent_available, _parca_startup_labels
758+
_parca_agent_available = check_parca_agent_available()
759+
if _parca_agent_available:
760+
_parca_startup_labels = {
761+
"platform": running_platform,
762+
"arch": arch,
763+
"coordinator_version": project_version,
764+
}
765+
logging.info(f"Parca-agent startup labels: {_parca_startup_labels}")
766+
746767
logging.info("Entering blocking read waiting for work.")
747768
if stream_id is None:
748769
stream_id = args.consumer_start_id
@@ -1032,6 +1053,16 @@ def process_self_contained_coordinator_stream(
10321053
f"detected a github_org definition on the streamdata: {tf_github_repo}. Overriding the default one: {default_github_repo}"
10331054
)
10341055

1056+
# Build parca-agent build-level labels
1057+
parca_build_labels = {
1058+
"git_hash": git_hash,
1059+
"git_branch": git_branch,
1060+
"git_version": git_version,
1061+
"build_variant": build_variant_name,
1062+
"github_org": tf_github_org,
1063+
"github_repo": tf_github_repo,
1064+
}
1065+
10351066
mnt_point = "/mnt/redis/"
10361067
if b"mnt_point" in testDetails:
10371068
mnt_point = testDetails[b"mnt_point"].decode()
@@ -1329,6 +1360,22 @@ def process_self_contained_coordinator_stream(
13291360
logging.info(
13301361
f"Running topology named {topology_spec_name} of type {setup_type}"
13311362
)
1363+
1364+
# Update parca-agent labels if available
1365+
global _parca_agent_available, _parca_startup_labels
1366+
if _parca_agent_available:
1367+
test_labels = extract_test_labels_from_benchmark_config(
1368+
benchmark_config
1369+
)
1370+
# Override topology with current iteration's topology
1371+
test_labels["topology"] = topology_spec_name
1372+
all_labels = {
1373+
**_parca_startup_labels,
1374+
**parca_build_labels,
1375+
**test_labels,
1376+
}
1377+
update_parca_agent_labels(all_labels)
1378+
13321379
test_result = False
13331380
redis_container = None
13341381
try:

0 commit comments

Comments
 (0)