Skip to content

Commit a6a3de8

Browse files
authored
[TRTLLM-9003][infra] Add python OpenSearchDB query / push. (#8506)
Signed-off-by: ZhanruiSunCh <[email protected]> Signed-off-by: Zhanrui Sun <[email protected]>
1 parent 025d292 commit a6a3de8

File tree

2 files changed

+362
-0
lines changed

2 files changed

+362
-0
lines changed

jenkins/L0_Test.groovy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3247,6 +3247,8 @@ pipeline {
32473247
// force datasets to be offline mode, to prevent CI jobs are downloading HF dataset causing test failures
32483248
HF_DATASETS_OFFLINE=1
32493249
CMAKE_POLICY_VERSION_MINIMUM="3.5"
3250+
OPEN_SEARCH_DB_BASE_URL=credentials("open_search_db_base_url")
3251+
OPEN_SEARCH_DB_CREDENTIALS=credentials("open_search_db_credentials")
32503252
}
32513253
stages {
32523254
stage("Setup environment")

jenkins/scripts/open_search_db.py

Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# =============================================================================
17+
# open_search_db.py
18+
#
19+
# This module provides a client class for interacting with an OpenSearch database
20+
# used for CI/CD job and test result monitoring. It defines various project index
21+
# names, environment variable settings for the OpenSearch connection, and constants
22+
# for queries and timeouts.
23+
#
24+
# The primary interface is the `OpenSearchDB` class, which provides functionality for
25+
# querying and processing indexed test/job metadata within NVIDIA's CI infrastructure,
26+
# such as job information, stage results, test status, and related analytics for display
27+
# or automation.
28+
#
29+
# Typical usage requires setting environment variables for authentication and server
30+
# address, as used by Jenkins or related services.
31+
#
32+
# =============================================================================
33+
34+
import hashlib
35+
import json
36+
import logging
37+
import os
38+
import time
39+
40+
import requests
41+
from requests.auth import HTTPProxyAuth
42+
43+
PROJECT_ROOT = "swdl-trtllm-infra"
44+
MODE = "prod"
45+
VERSION = "v1"
46+
47+
# CI monitor indexes, now only support read access.
48+
JOB_PROJECT_NAME = f"{PROJECT_ROOT}-ci-{MODE}-job_info"
49+
STAGE_PROJECT_NAME = f"{PROJECT_ROOT}-ci-{MODE}-stage_info"
50+
TEST_PROJECT_NAME = f"{PROJECT_ROOT}-ci-{MODE}-test_info"
51+
JOB_MACHINE_PROJECT_NAME = f"{PROJECT_ROOT}-ci-{MODE}-job_machine_info"
52+
FAILED_STEP_PROJECT_NAME = f"{PROJECT_ROOT}-ci-{MODE}-failed_step_info"
53+
PR_PROJECT_NAME = f"{PROJECT_ROOT}-ci-{MODE}-pr_info"
54+
55+
READ_ACCESS_PROJECT_NAME = [
56+
JOB_PROJECT_NAME,
57+
STAGE_PROJECT_NAME,
58+
TEST_PROJECT_NAME,
59+
JOB_MACHINE_PROJECT_NAME,
60+
FAILED_STEP_PROJECT_NAME,
61+
PR_PROJECT_NAME,
62+
]
63+
64+
WRITE_ACCESS_PROJECT_NAME = []
65+
66+
DISABLE_OPEN_SEARCH_DB_FOR_LOCAL_TEST = False
67+
68+
DEFAULT_QUERY_SIZE = 3000
69+
DEFAULT_RETRY_COUNT = 5
70+
DEFAULT_LOOKBACK_DAYS = 7
71+
POST_TIMEOUT_SECONDS = 20
72+
QUERY_TIMEOUT_SECONDS = 10
73+
74+
OPEN_SEARCH_DB_BASE_URL = os.getenv("OPEN_SEARCH_DB_BASE_URL", "")
75+
OPEN_SEARCH_DB_USERNAME = os.getenv("OPEN_SEARCH_DB_CREDENTIALS_USR", "")
76+
OPEN_SEARCH_DB_PASSWORD = os.getenv("OPEN_SEARCH_DB_CREDENTIALS_PSW", "")
77+
78+
79+
class OpenSearchDB:
80+
logger = logging.getLogger(__name__)
81+
query_build_id_cache: dict = {}
82+
83+
def __init__(self) -> None:
84+
pass
85+
86+
@staticmethod
87+
def typeCheckForOpenSearchDB(json_data) -> bool:
88+
"""
89+
Check if the data is valid for OpenSearchDB.
90+
91+
:param json_data: Data to check, type dict or list.
92+
:return: bool, True if data is valid, False otherwise.
93+
"""
94+
if isinstance(json_data, list):
95+
return all(
96+
OpenSearchDB.typeCheckForOpenSearchDB(item)
97+
for item in json_data)
98+
if not isinstance(json_data, dict):
99+
OpenSearchDB.logger.error(
100+
f"OpenSearchDB type check failed! Expected dict, got {type(json_data).__name__}"
101+
)
102+
return False
103+
104+
allowed_keys = {"_id", "_project", "_shard", "_version"}
105+
type_map = {
106+
"l_": int,
107+
"d_": float,
108+
"s_": str,
109+
"b_": bool,
110+
"ts_": int,
111+
"flat_": dict,
112+
"ni_": (str, int, float),
113+
}
114+
115+
for key, value in json_data.items():
116+
matched = False
117+
for prefix, expected_type in type_map.items():
118+
if key.startswith(prefix):
119+
if not isinstance(value, expected_type):
120+
OpenSearchDB.logger.error(
121+
f"OpenSearchDB type check failed! key:{key}, value:{value} value_type:{type(value)}"
122+
)
123+
return False
124+
matched = True
125+
break
126+
if not matched:
127+
if key not in allowed_keys:
128+
OpenSearchDB.logger.error(
129+
f"Unknown key type! key:{key}, value_type:{type(value)}"
130+
)
131+
return False
132+
return True
133+
134+
@staticmethod
135+
def _calculate_timestamp(days_ago) -> int:
136+
"""
137+
Calculate timestamp in milliseconds.
138+
139+
:param days_ago: Number of days ago.
140+
:return: Timestamp in milliseconds.
141+
"""
142+
return int(time.time() -
143+
24 * 3600 * days_ago) // (24 * 3600) * 24 * 3600 * 1000
144+
145+
@staticmethod
146+
def add_id_of_json(data) -> None:
147+
"""
148+
Add _id field to the data.
149+
150+
:param data: Data to add _id field, type dict or list.
151+
:return: None.
152+
"""
153+
if isinstance(data, list):
154+
for d in data:
155+
OpenSearchDB.add_id_of_json(d)
156+
return
157+
if not isinstance(data, dict):
158+
raise TypeError("data is not a dict, type:{}".format(type(data)))
159+
data_str = json.dumps(data, sort_keys=True, indent=2).encode("utf-8")
160+
data["_id"] = hashlib.md5(data_str).hexdigest()
161+
162+
@staticmethod
163+
def postToOpenSearchDB(json_data, project) -> bool:
164+
"""
165+
Post data to OpenSearchDB.
166+
167+
:param json_data: Data to post, type dict or list.
168+
:param project: Name of the project.
169+
:return: bool, True if post successful, False otherwise.
170+
"""
171+
if not OPEN_SEARCH_DB_BASE_URL:
172+
OpenSearchDB.logger.error("OPEN_SEARCH_DB_BASE_URL is not set")
173+
return False
174+
if not OPEN_SEARCH_DB_USERNAME or not OPEN_SEARCH_DB_PASSWORD:
175+
OpenSearchDB.logger.error(
176+
"OPEN_SEARCH_DB_USERNAME or OPEN_SEARCH_DB_PASSWORD is not set")
177+
return False
178+
if project not in WRITE_ACCESS_PROJECT_NAME:
179+
OpenSearchDB.logger.error(
180+
f"project {project} is not in write access project list: {json.dumps(WRITE_ACCESS_PROJECT_NAME)}"
181+
)
182+
return False
183+
if not OpenSearchDB.typeCheckForOpenSearchDB(json_data):
184+
OpenSearchDB.logger.error(
185+
f"OpenSearchDB type check failed! json_data:{json_data}")
186+
return False
187+
188+
OpenSearchDB.add_id_of_json(json_data)
189+
json_data_dump = json.dumps(json_data)
190+
191+
if DISABLE_OPEN_SEARCH_DB_FOR_LOCAL_TEST:
192+
OpenSearchDB.logger.info(
193+
f"OpenSearchDB is disabled for local test, skip posting to OpenSearchDB: {json_data_dump}"
194+
)
195+
return True
196+
197+
url = f"{OPEN_SEARCH_DB_BASE_URL}/dataflow2/{project}/posting"
198+
headers = {
199+
"Content-Type": "application/json",
200+
"Accept-Charset": "UTF-8"
201+
}
202+
203+
for attempt in range(DEFAULT_RETRY_COUNT):
204+
try:
205+
res = requests.post(
206+
url,
207+
data=json_data_dump,
208+
headers=headers,
209+
auth=HTTPProxyAuth(OPEN_SEARCH_DB_USERNAME,
210+
OPEN_SEARCH_DB_PASSWORD),
211+
timeout=POST_TIMEOUT_SECONDS,
212+
)
213+
if res.status_code in (200, 201, 202):
214+
if res.status_code != 200 and project == JOB_PROJECT_NAME:
215+
OpenSearchDB.logger.info(
216+
f"OpenSearchDB post not 200, log:{res.status_code} {res.text}"
217+
)
218+
return True
219+
else:
220+
OpenSearchDB.logger.info(
221+
f"OpenSearchDB post failed, will retry, error:{res.status_code} {res.text}"
222+
)
223+
except Exception as e:
224+
OpenSearchDB.logger.info(
225+
f"OpenSearchDB post exception, attempt {attempt + 1} error: {e}"
226+
)
227+
OpenSearchDB.logger.error(
228+
f"Fail to postToOpenSearchDB after {DEFAULT_RETRY_COUNT} tries: {url}, json: {json_data_dump}, last error: {getattr(res, 'text', 'N/A') if 'res' in locals() else ''}"
229+
)
230+
return False
231+
232+
@staticmethod
233+
def queryFromOpenSearchDB(json_data, project) -> dict:
234+
"""
235+
Query data from OpenSearchDB.
236+
237+
:param json_data: Data to query, type dict or list.
238+
:param project: Name of the project.
239+
:return: dict, query result.
240+
"""
241+
if not OPEN_SEARCH_DB_BASE_URL:
242+
OpenSearchDB.logger.error("OPEN_SEARCH_DB_BASE_URL is not set")
243+
return {}
244+
if project not in READ_ACCESS_PROJECT_NAME:
245+
OpenSearchDB.logger.error(
246+
f"project {project} is not in read access project list: {json.dumps(READ_ACCESS_PROJECT_NAME)}"
247+
)
248+
return {}
249+
if not isinstance(json_data, str):
250+
json_data_dump = json.dumps(json_data)
251+
else:
252+
json_data_dump = json_data
253+
url = f"{OPEN_SEARCH_DB_BASE_URL}/opensearch/df-{project}-*/_search"
254+
headers = {
255+
"Content-Type": "application/json",
256+
"Accept-Charset": "UTF-8"
257+
}
258+
retry_time = DEFAULT_RETRY_COUNT
259+
while retry_time:
260+
res = requests.get(url,
261+
data=json_data_dump,
262+
headers=headers,
263+
timeout=QUERY_TIMEOUT_SECONDS)
264+
if res.status_code in [200, 201, 202]:
265+
return res.json()
266+
OpenSearchDB.logger.info(
267+
f"OpenSearchDB query failed, will retry, error:{res.status_code} {res.text}"
268+
)
269+
retry_time -= 1
270+
OpenSearchDB.logger.error(
271+
f"Fail to queryFromOpenSearchDB after {retry_time} retry: {url}, json: {json_data_dump}, error: {res.text}"
272+
)
273+
return {}
274+
275+
@staticmethod
276+
def queryBuildIdFromOpenSearchDB(job_name, last_days=DEFAULT_LOOKBACK_DAYS):
277+
if DISABLE_OPEN_SEARCH_DB_FOR_LOCAL_TEST:
278+
return []
279+
if job_name in OpenSearchDB.query_build_id_cache:
280+
return OpenSearchDB.query_build_id_cache[job_name]
281+
json_data = {
282+
"size": DEFAULT_QUERY_SIZE,
283+
"query": {
284+
"range": {
285+
"ts_created": {
286+
"gte": OpenSearchDB._calculate_timestamp(last_days),
287+
}
288+
}
289+
},
290+
"_source": ["ts_created", "s_job_name", "s_status", "s_build_id"],
291+
}
292+
build_ids = []
293+
try:
294+
query_res = OpenSearchDB.queryFromOpenSearchDB(
295+
json_data, JOB_PROJECT_NAME)
296+
for job in query_res["hits"]["hits"]:
297+
job_info = job.get("_source", {})
298+
if job_name == job_info.get("s_job_name"):
299+
build_ids.append(job_info.get("s_build_id"))
300+
OpenSearchDB.query_build_id_cache[job_name] = build_ids
301+
return build_ids
302+
except Exception as e:
303+
OpenSearchDB.logger.warning(
304+
f"Failed to query build IDs from OpenSearchDB: {e}")
305+
return []
306+
307+
@staticmethod
308+
def queryPRIdsFromOpenSearchDB(repo_name, last_days=DEFAULT_LOOKBACK_DAYS):
309+
"""
310+
Query existing PR IDs from OpenSearchDB for a specific repository.
311+
Mirrors queryBuildIdFromOpenSearchDB for PR monitoring.
312+
"""
313+
if DISABLE_OPEN_SEARCH_DB_FOR_LOCAL_TEST:
314+
return []
315+
316+
cache_key = f"pr_{repo_name}"
317+
if cache_key in OpenSearchDB.query_build_id_cache:
318+
return OpenSearchDB.query_build_id_cache[cache_key]
319+
320+
json_data = {
321+
"size": DEFAULT_QUERY_SIZE,
322+
"query": {
323+
"bool": {
324+
"must": [
325+
{
326+
"range": {
327+
"ts_created": {
328+
"gte":
329+
OpenSearchDB._calculate_timestamp(
330+
last_days),
331+
}
332+
}
333+
},
334+
{
335+
"term": {
336+
"s_repo_name": repo_name
337+
}
338+
},
339+
]
340+
}
341+
},
342+
"_source": ["ts_created", "s_repo_name", "l_pr_number", "s_pr_id"],
343+
}
344+
345+
pr_numbers = []
346+
try:
347+
query_res = OpenSearchDB.queryFromOpenSearchDB(
348+
json_data, PR_PROJECT_NAME)
349+
for pr in query_res["hits"]["hits"]:
350+
pr_info = pr.get("_source", {})
351+
if repo_name == pr_info.get("s_repo_name"):
352+
pr_number = pr_info.get("l_pr_number")
353+
if pr_number and pr_number not in pr_numbers:
354+
pr_numbers.append(pr_number)
355+
OpenSearchDB.query_build_id_cache[cache_key] = pr_numbers
356+
return pr_numbers
357+
except Exception as e:
358+
OpenSearchDB.logger.warning(
359+
f"Failed to query PR IDs from OpenSearchDB: {e}")
360+
return []

0 commit comments

Comments
 (0)