Skip to content

Commit a50fd5f

Browse files
authored
[hud][ch][drci] add api to cache ch queries + cache issues query (#6578)
As evident from the [hud](https://hud.pytorch.org/query_execution_metrics), issues_query is biggest cumulative time and memory hoggers, simply because it runs too frequently (≈27 times / minute) <img width="2329" alt="image" src="https://github.com/user-attachments/assets/28c46f90-0ebf-49d7-9601-3e83a544b3f4" /> Since there are only three parameter values that dominate the executions: - ci: sev - skipped - unstable it makes sense to cache this query. This PR: * adds parameter to our ch apis to `use_query_cache` (1 minute TTL by default) * adds a python test (intended to be run locally, as we don't expose CH credentials in CI) * caches `issues_query` in several places * updates the ch client node package (the old one didn't expose `use_query_cache`) --- The intended effect of using query cache is to drop time and memory to 0 for cache hits.
1 parent 75212b8 commit a50fd5f

File tree

10 files changed

+271
-33
lines changed

10 files changed

+271
-33
lines changed

tools/scripts/fetch_latest_green_commit.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ def get_commit_results(
8383

8484
@lru_cache
8585
def fetch_unstable_issues() -> List[str]:
86-
issues = query_clickhouse_saved("issue_query", {"label": "unstable"})
86+
issues = query_clickhouse_saved(
87+
"issue_query", {"label": "unstable"}, useChQueryCache=True
88+
)
8789
return [
8890
issue["title"][len("UNSTABLE") :].strip()
8991
for issue in issues

tools/torchci/clickhouse.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import json
22
import os
33
from functools import lru_cache
4-
from typing import Any, Dict, List
4+
from typing import Any, Dict, List, Optional
55

66
import clickhouse_connect
7+
from clickhouse_connect.driver import Client
78
from torchci.utils import cache_json, REPO_ROOT
89

910

1011
@lru_cache(maxsize=1)
11-
def get_clickhouse_client() -> Any:
12+
def get_clickhouse_client() -> Client:
1213
endpoint = os.environ["CLICKHOUSE_ENDPOINT"]
1314
# I cannot figure out why these values aren't being handled automatically
1415
# when it is fine in the lambda
@@ -26,23 +27,37 @@ def get_clickhouse_client() -> Any:
2627
)
2728

2829

29-
def query_clickhouse_saved(queryName: str, inputParams: Dict[str, Any]) -> Any:
30+
def query_clickhouse_saved(
31+
queryName: str, inputParams: Dict[str, Any], useChQueryCache=False
32+
) -> Any:
33+
"""
34+
Queries ClickHouse using a saved query file and parameters.
35+
:param useChQueryCache: If True, caches the query result on ClickHouse side (1 minute TTL).
36+
:return:
37+
"""
3038
path = REPO_ROOT / "torchci" / "clickhouse_queries" / queryName
3139
with open(path / "query.sql") as f:
3240
queryText = f.read()
3341
with open(path / "params.json") as f:
3442
paramsText = json.load(f).get("params", {})
3543

3644
queryParams = {name: inputParams[name] for name in paramsText}
37-
return query_clickhouse(queryText, queryParams)
45+
return query_clickhouse(queryText, queryParams, use_ch_query_cache=useChQueryCache)
3846

3947

4048
def query_clickhouse(
41-
query: str, params: Dict[str, Any], use_cache: bool = False
49+
query: str,
50+
params: Dict[str, Any],
51+
use_cache: bool = False,
52+
use_ch_query_cache=False,
4253
) -> Any:
4354
"""
4455
Queries ClickHouse. Returns datetime in YYYY-MM-DD HH:MM:SS format.
56+
:param use_ch_query_cache: If True, uses ClickHouse's query cache (1 minute TTL).
4557
"""
58+
settings = None
59+
if use_ch_query_cache:
60+
settings = {"use_query_cache": 1}
4661

4762
def convert_to_json_list(res: str) -> List[Dict[str, Any]]:
4863
rows = []
@@ -52,13 +67,19 @@ def convert_to_json_list(res: str) -> List[Dict[str, Any]]:
5267
return rows
5368

5469
if not use_cache:
55-
res = get_clickhouse_client().raw_query(query, params, fmt="JSONEachRow")
70+
res = get_clickhouse_client().raw_query(
71+
query, params, settings=settings, fmt="JSONEachRow"
72+
)
5673
return convert_to_json_list(res)
5774
else:
5875

5976
@cache_json
60-
def cache_query_clickhouse(query, params):
61-
res = get_clickhouse_client().raw_query(query, params, fmt="JSONEachRow")
77+
def cache_query_clickhouse(
78+
query, params, settings: Optional[Dict[str, Any]] = None
79+
) -> Any:
80+
res = get_clickhouse_client().raw_query(
81+
query, params, settings=settings, fmt="JSONEachRow"
82+
)
6283
return convert_to_json_list(res)
6384

64-
return cache_query_clickhouse(query, params)
85+
return cache_query_clickhouse(query, params, settings)

tools/torchci/tests/test_ch_query.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
import os
2+
import time
3+
import unittest
4+
5+
from dotenv import load_dotenv
6+
from torchci.clickhouse import (
7+
get_clickhouse_client,
8+
query_clickhouse,
9+
query_clickhouse_saved,
10+
)
11+
12+
13+
# This test is intended to run locally against a real ClickHouse instance
14+
# Provide the necessary environment variables (e.g., in a .env file)
15+
class TestClickhouseQueries(unittest.TestCase):
16+
@classmethod
17+
def setUpClass(cls):
18+
load_dotenv()
19+
# Check if ClickHouse credentials are available
20+
cls.can_run = all(
21+
env_var in os.environ
22+
for env_var in [
23+
"CLICKHOUSE_ENDPOINT",
24+
"CLICKHOUSE_USERNAME",
25+
"CLICKHOUSE_PASSWORD",
26+
]
27+
)
28+
if not cls.can_run:
29+
print("Skipping ClickHouse tests: required environment variables not set")
30+
else:
31+
# Test connection before running tests
32+
try:
33+
client = get_clickhouse_client()
34+
# Simple query to check connection
35+
client.query("SELECT 1")
36+
cls.can_run = True
37+
except Exception as e:
38+
print(f"ClickHouse connection failed: {e}")
39+
cls.can_run = False
40+
41+
def setUp(self):
42+
"""Skip tests if ClickHouse is not available"""
43+
if not self.can_run:
44+
self.skipTest(
45+
"ClickHouse environment variables not set or connection failed"
46+
)
47+
48+
def test_simple_query_no_cache(self):
49+
"""Test a simple SELECT 1 query without cache"""
50+
query = "SELECT 1 AS value"
51+
results = query_clickhouse(query, {}, use_cache=False)
52+
53+
self.assertIsInstance(results, list)
54+
self.assertEqual(len(results), 1)
55+
self.assertEqual(results[0]["value"], 1)
56+
57+
def test_simple_query_with_cache(self):
58+
"""Test a simple SELECT 1 query with cache"""
59+
query = "SELECT 1 AS value"
60+
61+
# First call should hit database
62+
start_time = time.time()
63+
results1 = query_clickhouse(query, {}, use_cache=True)
64+
first_call_time = time.time() - start_time
65+
66+
# Second call should use cache
67+
start_time = time.time()
68+
results2 = query_clickhouse(query, {}, use_cache=True)
69+
second_call_time = time.time() - start_time
70+
71+
# Both should return same result
72+
self.assertEqual(results1, results2)
73+
self.assertEqual(results1[0]["value"], 1)
74+
75+
# Second call should be faster or similar (allowing for measurement noise)
76+
# We don't assert on exact timing as it depends on many factors
77+
print(
78+
f"First call: {first_call_time:.6f}s, Second call: {second_call_time:.6f}s"
79+
)
80+
81+
def test_simple_query_with_clickhouse_cache(self):
82+
"""Test a simple query with ClickHouse's query cache"""
83+
query = "SELECT 1 AS value"
84+
85+
# First call
86+
results1 = query_clickhouse(query, {}, use_ch_query_cache=True)
87+
88+
# Second call
89+
results2 = query_clickhouse(query, {}, use_ch_query_cache=True)
90+
91+
# Both should return same result
92+
self.assertEqual(results1, results2)
93+
self.assertEqual(results1[0]["value"], 1)
94+
95+
def test_parameterized_query(self):
96+
"""Test a query with parameters"""
97+
query = "SELECT {value:UInt8} AS value"
98+
params = {"value": 42}
99+
100+
results = query_clickhouse(query, params)
101+
self.assertEqual(len(results), 1)
102+
self.assertEqual(results[0]["value"], 42)
103+
104+
def test_saved_query(self):
105+
"""Test using a saved query (issue_query)"""
106+
try:
107+
results = query_clickhouse_saved("issue_query", {"label": "flaky"})
108+
self.assertIsInstance(results, list)
109+
110+
# Check structure of results based on query.sql
111+
if results:
112+
expected_columns = [
113+
"number",
114+
"title",
115+
"html_url",
116+
"state",
117+
"body",
118+
"updated_at",
119+
"author_association",
120+
"labels",
121+
]
122+
for col in expected_columns:
123+
self.assertIn(col, results[0], f"Missing expected column: {col}")
124+
except Exception as e:
125+
self.fail(f"Saved query test failed with: {e}")
126+
127+
def test_saved_query_with_cache(self):
128+
"""Test saved query with cache"""
129+
params = {"label": "bug"}
130+
131+
# First call with timing
132+
start_time = time.time()
133+
results1 = query_clickhouse_saved("issue_query", params, useChQueryCache=True)
134+
first_call_time = time.time() - start_time
135+
136+
# Second call with timing
137+
start_time = time.time()
138+
results2 = query_clickhouse_saved("issue_query", params, useChQueryCache=True)
139+
second_call_time = time.time() - start_time
140+
141+
# Print timing information
142+
print(
143+
f"Saved query - First call: {first_call_time:.6f}s, Second call: {second_call_time:.6f}s"
144+
)
145+
print(
146+
f"Speedup ratio: {first_call_time/second_call_time if second_call_time > 0 else 'inf':.2f}x"
147+
)
148+
149+
# Both should return same data structure
150+
self.assertEqual(type(results1), type(results2))
151+
152+
# Verify the results are identical (same number of rows)
153+
self.assertEqual(
154+
len(results1),
155+
len(results2),
156+
"Cached query returned different number of results",
157+
)
158+
159+
# If we got results, check they match expected structure based on query.sql
160+
if results1:
161+
expected_columns = [
162+
"number",
163+
"title",
164+
"html_url",
165+
"state",
166+
"body",
167+
"updated_at",
168+
"author_association",
169+
"labels",
170+
]
171+
for col in expected_columns:
172+
self.assertIn(col, results1[0], f"Missing expected column: {col}")
173+
174+
# Verify the labels array contains the search parameter
175+
if results1[0]["labels"]:
176+
# At least one issue should have the label we searched for
177+
found_label = False
178+
for issue in results1:
179+
if any(label == params["label"] for label in issue["labels"]):
180+
found_label = True
181+
break
182+
self.assertTrue(
183+
found_label,
184+
f"Couldn't find any issue with label '{params['label']}'",
185+
)
186+
else:
187+
# If there are no labels, the test will pass but we'll print a warning
188+
print(
189+
"Warning: No labels found in results, can't verify label filtering"
190+
)
191+
192+
193+
if __name__ == "__main__":
194+
unittest.main()

torchci/lib/clickhouse.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ export function getClickhouseClientWritable() {
3030
export async function queryClickhouse(
3131
query: string,
3232
params: Record<string, unknown>,
33-
query_id?: string
33+
query_id?: string,
34+
useQueryCache?: boolean
3435
): Promise<any[]> {
3536
if (query_id === undefined) {
3637
query_id = "adhoc";
@@ -41,6 +42,7 @@ export async function queryClickhouse(
4142
* queryClickhouse
4243
* @param query: string, the sql query
4344
* @param params: Record<string, unknown>, the parameters to the query ex { sha: "abcd" }
45+
* @param useQueryCache: boolean, if true, cache the query result on Ch side (1 minute TTL)
4446
*/
4547
const clickhouseClient = getClickhouseClient();
4648

@@ -51,6 +53,7 @@ export async function queryClickhouse(
5153
clickhouse_settings: {
5254
output_format_json_quote_64bit_integers: 0,
5355
date_time_output_format: "iso",
56+
use_query_cache: useQueryCache ? 1 : 0,
5457
},
5558
query_id,
5659
});
@@ -60,12 +63,14 @@ export async function queryClickhouse(
6063

6164
export async function queryClickhouseSaved(
6265
queryName: string,
63-
inputParams: Record<string, unknown>
66+
inputParams: Record<string, unknown>,
67+
useQueryCache?: boolean
6468
) {
6569
/**
6670
* queryClickhouseSaved
6771
* @param queryName: string, the name of the query, which is the name of the folder in clickhouse_queries
6872
* @param inputParams: Record<string, unknown>, the parameters to the query, an object where keys are the parameter names
73+
* @param useQueryCache: boolean, if true, cache the query result on Ch side (1 minute TTL)
6974
*
7075
* This function will filter the inputParams to only include the parameters
7176
* that are in the query params json file.
@@ -90,6 +95,7 @@ export async function queryClickhouseSaved(
9095
return await thisModule.queryClickhouse(
9196
query,
9297
Object.fromEntries(queryParams),
93-
queryName
98+
queryName,
99+
useQueryCache
94100
);
95101
}

torchci/lib/drciUtils.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,9 @@ export async function upsertDrCiComment(
204204
);
205205
const existingDrciID = existingDrciData.id;
206206
const existingDrciComment = existingDrciData.body;
207-
const sev = getActiveSEVs(await fetchIssuesByLabel("ci: sev"));
207+
const sev = getActiveSEVs(
208+
await fetchIssuesByLabel("ci: sev", /*cache*/ true)
209+
);
208210
const drciComment = formDrciComment(
209211
prNum,
210212
owner,

torchci/lib/fetchHud.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ export default async function fetchHud(
8383
results = results?.filter((job: JobData) => !isRerunDisabledTestsJob(job));
8484
}
8585
if (params.filter_unstable) {
86-
const unstableIssues = await fetchIssuesByLabel("unstable");
86+
const unstableIssues = await fetchIssuesByLabel("unstable", /*cache*/ true);
8787
results = results?.filter(
8888
(job: JobData) => !isUnstableJob(job, unstableIssues ?? [])
8989
);

torchci/lib/fetchIssuesByLabel.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@ import { queryClickhouseSaved } from "./clickhouse";
22
import { IssueData } from "./types";
33

44
export default async function fetchIssuesByLabel(
5-
label: string
5+
label: string,
6+
useChCache?: boolean
67
): Promise<IssueData[]> {
7-
return await queryClickhouseSaved("issue_query", {
8-
label,
9-
});
8+
return await queryClickhouseSaved(
9+
"issue_query",
10+
{
11+
label,
12+
},
13+
useChCache
14+
);
1015
}

torchci/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"@aws-sdk/client-dynamodb": "^3.347.1",
1818
"@aws-sdk/client-s3": "^3.347.1",
1919
"@aws-sdk/lib-dynamodb": "^3.72.0",
20-
"@clickhouse/client": "^0.1.0",
20+
"@clickhouse/client": "^1.11.1",
2121
"@codemirror/basic-setup": "^0.20.0",
2222
"@codemirror/state": "^0.20.0",
2323
"@codemirror/theme-one-dark": "^0.20.0",

0 commit comments

Comments
 (0)