Skip to content

Commit 295b423

Browse files
christinaexyouChristina Xu
andauthored
add methods to execute and return queries from trustyai service (#202)
Co-authored-by: Christina Xu <[email protected]>
1 parent e40dc58 commit 295b423

File tree

5 files changed

+334
-0
lines changed

5 files changed

+334
-0
lines changed

.github/workflows/workflow.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
pip install .
2929
pip install ".[dev]"
3030
pip install ".[extras]"
31+
pip install ".[api]"
3132
- name: Lint
3233
run: |
3334
pylint --ignore-imports=yes $(find src/trustyai -type f -name "*.py")

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ detoxify = [
6363
"trl"
6464
]
6565

66+
api = [
67+
"kubernetes"
68+
]
69+
6670
[project.urls]
6771
homepage = "https://github.com/trustyai-explainability/trustyai-explainability-python"
6872
documentation = "https://trustyai-explainability-python.readthedocs.io/en/latest/"

src/trustyai/utils/api/api.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
"""
2+
Server module
3+
"""
4+
5+
# pylint: disable = import-error, too-few-public-methods, assignment-from-no-return
6+
__SUCCESSFUL_IMPORT = True
7+
8+
try:
9+
from kubernetes import config, dynamic
10+
from kubernetes.dynamic.exceptions import ResourceNotFoundError
11+
from kubernetes.client import api_client
12+
13+
except ImportError as e:
14+
print(
15+
"Warning: api dependencies not found. "
16+
"Dependencies can be installed with 'pip install trustyai[api]"
17+
)
18+
__SUCCESSFUL_IMPORT = False
19+
20+
if __SUCCESSFUL_IMPORT:
21+
22+
class TrustyAIApi:
23+
"""
24+
Gets TrustyAI service information
25+
"""
26+
27+
def __init__(self):
28+
try:
29+
k8s_client = config.load_incluster_config()
30+
except config.ConfigException:
31+
k8s_client = config.load_kube_config()
32+
self.dyn_client = dynamic.DynamicClient(
33+
api_client.ApiClient(configuration=k8s_client)
34+
)
35+
36+
def get_service_route(self, name: str, namespace: str):
37+
"""
38+
Gets routes for services under a specified namespace
39+
"""
40+
route_api = self.dyn_client.resources.get(api_version="v1", kind="Route")
41+
try:
42+
service = route_api.get(name=name, namespace=namespace)
43+
return f"https://{service.spec.host}"
44+
except ResourceNotFoundError:
45+
return f"Error accessing service {name} in namespace {namespace}."
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
"""Python client for TrustyAI metrics"""
2+
3+
from typing import List
4+
import json
5+
import datetime as dt
6+
import pandas as pd
7+
import requests
8+
9+
from trustyai.utils.api.api import TrustyAIApi
10+
11+
12+
def json_to_df(data_path: str, batch_list: List[int]) -> pd.DataFrame:
13+
"""
14+
Converts batched data in json files to a single pandas DataFrame
15+
"""
16+
final_df = pd.DataFrame()
17+
for batch in batch_list:
18+
file = data_path + f"{batch}.json"
19+
with open(file, encoding="utf8") as train_file:
20+
batch_data = json.load(train_file)["inputs"][0]
21+
batch_df = pd.DataFrame.from_dict(batch_data["data"]).T
22+
final_df = pd.concat([final_df, batch_df])
23+
return final_df
24+
25+
26+
def df_to_json(final_df: pd.DataFrame, name: str, json_file: str) -> None:
27+
"""
28+
Converts pandas DataFrame to json file
29+
"""
30+
inputs = [
31+
{
32+
"name": name,
33+
"shape": list(final_df.shape),
34+
"datatype": "FP64",
35+
"data": final_df.values.tolist(),
36+
}
37+
]
38+
data_dict = {"inputs": inputs}
39+
with open(json_file, "w", encoding="utf8") as outfile:
40+
json.dump(data_dict, outfile)
41+
42+
43+
class TrustyAIMetricsService:
44+
"""
45+
Executes and returns queries from TrustyAI service on ODH
46+
"""
47+
48+
def __init__(self, token: str, namespace: str, verify=True):
49+
"""
50+
:param token: OpenShift login token
51+
:param namespace: model namespace
52+
:param verify: enable SSL verification for requests
53+
"""
54+
self.token = token
55+
self.namespace = namespace
56+
self.trusty_url = TrustyAIApi().get_service_route(
57+
name="trustyai-service", namespace=self.namespace
58+
)
59+
self.thanos_url = TrustyAIApi().get_service_route(
60+
name="thanos-querier", namespace="openshift-monitoring"
61+
)
62+
self.headers = {
63+
"Authorization": "Bearer " + token,
64+
"Content-Type": "application/json",
65+
}
66+
self.verify = verify
67+
68+
def upload_payload_data(self, json_file: str, timeout=5) -> None:
69+
"""
70+
Uploads data to TrustyAI service
71+
"""
72+
with open(json_file, "r", encoding="utf8") as file:
73+
response = requests.post(
74+
f"{self.trusty_url}/data/upload",
75+
data=file,
76+
headers=self.headers,
77+
verify=self.verify,
78+
timeout=timeout,
79+
)
80+
if response.status_code == 200:
81+
print("Data sucessfully uploaded to TrustyAI service")
82+
else:
83+
print(f"Error {response.status_code}: {response.reason}")
84+
85+
def get_model_metadata(self, timeout=5):
86+
"""
87+
Retrieves model data from TrustyAI
88+
"""
89+
response = requests.get(
90+
f"{self.trusty_url}/info",
91+
headers=self.headers,
92+
verify=self.verify,
93+
timeout=timeout,
94+
)
95+
if response.status_code == 200:
96+
model_metadata = json.loads(response.text)
97+
return model_metadata
98+
raise RuntimeError(f"Error {response.status_code}: {response.reason}")
99+
100+
def label_data_fields(self, payload: str, timeout=5):
101+
"""
102+
Assigns feature names to model input data
103+
"""
104+
105+
def print_name_mapping(self):
106+
response = requests.get(
107+
f"{self.trusty_url}/info",
108+
headers=self.headers,
109+
verify=self.verify,
110+
timeout=timeout,
111+
)
112+
name_mapping = json.loads(response.text)[0]
113+
for key, val in name_mapping["data"]["inputSchema"]["nameMapping"].items():
114+
print(f"{key} -> {val}")
115+
116+
response = requests.get(
117+
f"{self.trusty_url}/info",
118+
headers=self.headers,
119+
verify=self.verify,
120+
timeout=timeout,
121+
)
122+
input_data_fields = list(
123+
json.loads(response.text)[0]["data"]["inputSchema"]["items"].keys()
124+
)
125+
input_mapping_keys = list(payload["inputMapping"].keys())
126+
if len(list(set(input_mapping_keys) - set(input_data_fields))) == 0:
127+
response = requests.post(
128+
f"{self.trusty_url}/info/names",
129+
json=payload,
130+
headers=self.headers,
131+
verify=True,
132+
timeout=timeout,
133+
)
134+
if response.status_code == 200:
135+
print_name_mapping(self)
136+
return response.text
137+
print(f"Error {response.status_code}: {response.reason}")
138+
raise ValueError("Field does not exist")
139+
140+
def get_metric_request(
141+
self, payload: str, metric: str, reoccuring: bool, timeout=5
142+
):
143+
"""
144+
Retrieve or schedule a metric request
145+
"""
146+
if reoccuring:
147+
response = requests.post(
148+
f"{self.trusty_url}/metrics/{metric}/request",
149+
json=payload,
150+
headers=self.headers,
151+
verify=self.verify,
152+
timeout=timeout,
153+
)
154+
else:
155+
response = requests.post(
156+
f"{self.trusty_url}/metrics/{metric}",
157+
json=payload,
158+
headers=self.headers,
159+
verify=self.verify,
160+
timeout=timeout,
161+
)
162+
if response.status_code == 200:
163+
return response.text
164+
raise RuntimeError(f"Error {response.status_code}: {response.reason}")
165+
166+
def upload_data_to_model(self, model_name: str, json_file: str, timeout=5):
167+
"""
168+
Sends an inference request to the model
169+
"""
170+
model_route = TrustyAIApi().get_service_route(
171+
name=model_name, namespace=self.namespace
172+
)
173+
with open(json_file, encoding="utf8") as batch_file:
174+
response = requests.post(
175+
url=f"https://{model_route}/infer",
176+
data=batch_file,
177+
headers=self.headers,
178+
verify=self.verify,
179+
timeout=timeout,
180+
)
181+
if response.status_code == 200:
182+
return response.text
183+
raise RuntimeError(f"Error {response.status_code}: {response.reason}")
184+
185+
def get_metric_data(
186+
self, namespace: str, metric: str, time_interval: List[str], timeout=5
187+
):
188+
"""
189+
Retrives metric data for a specific range in time
190+
"""
191+
params = {"query": f"{metric}{{namespace='{namespace}'}}{time_interval}"}
192+
response = requests.get(
193+
f"{self.thanos_url}/api/v1/query?",
194+
params=params,
195+
headers=self.headers,
196+
verify=self.verify,
197+
timeout=timeout,
198+
)
199+
if response.status_code == 200:
200+
data_dict = json.loads(response.text)["data"]["result"][0]["values"]
201+
metric_df = pd.DataFrame(data_dict, columns=["timestamp", metric])
202+
metric_df["timestamp"] = metric_df["timestamp"].apply(
203+
lambda epoch: dt.datetime.fromtimestamp(epoch).strftime(
204+
"%Y-%m-%d %H:%M:%S"
205+
)
206+
)
207+
return metric_df
208+
raise RuntimeError(f"Error {response.status_code}: {response.reason}")

tests/extras/test_metrics_service.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""Test suite for TrustyAI metrics service data conversions"""
2+
import json
3+
import os
4+
import random
5+
import unittest
6+
import numpy as np
7+
import pandas as pd
8+
9+
from trustyai.utils.extras.metrics_service import (
10+
json_to_df,
11+
df_to_json
12+
)
13+
14+
def generate_json_data(batch_list, data_path):
15+
for batch in batch_list:
16+
data = {
17+
"inputs": [
18+
{"name": "test_data_input",
19+
"shape": [1, 100],
20+
"datatype": "FP64",
21+
"data": [random.uniform(a=100, b=200) for i in range(100)]
22+
}
23+
]
24+
}
25+
for batch in batch_list:
26+
with open(data_path + f"{batch}.json", 'w', encoding="utf-8") as f:
27+
json.dump(data, f, ensure_ascii=False)
28+
29+
30+
def generate_test_df():
31+
data = {
32+
'0': np.random.uniform(low=100, high=200, size=100),
33+
'1': np.random.uniform(low=5000, high=10000, size=100),
34+
'2': np.random.uniform(low=100, high=200, size=100),
35+
'3': np.random.uniform(low=5000, high=10000, size=100),
36+
'4': np.random.uniform(low=5000, high=10000, size=100)
37+
}
38+
return pd.DataFrame(data=data)
39+
40+
41+
class TestMetricsService(unittest.TestCase):
42+
def setUp(self):
43+
self.df = generate_test_df()
44+
self.data_path = "data/"
45+
if not os.path.exists(self.data_path):
46+
os.mkdir("data/")
47+
self.batch_list = list(range(0, 5))
48+
49+
def test_json_to_df(self):
50+
"""Test json data to pandas dataframe conversion"""
51+
generate_json_data(batch_list=self.batch_list, data_path=self.data_path)
52+
df = json_to_df(self.data_path, self.batch_list)
53+
n_rows, n_cols = 0, 0
54+
for batch in self.batch_list:
55+
file = self.data_path + f"{batch}.json"
56+
with open(file, encoding="utf8") as f:
57+
data = json.load(f)["inputs"][0]
58+
n_rows += data["shape"][0]
59+
n_cols = data["shape"][1]
60+
self.assertEqual(df.shape, (n_rows, n_cols))
61+
62+
63+
def test_df_to_json(self):
64+
"""Test pandas dataframe to json data conversion"""
65+
df = generate_test_df()
66+
name = 'test_data_input'
67+
json_file = 'data/test.json'
68+
df_to_json(df, name, json_file)
69+
with open(json_file, encoding="utf8") as f:
70+
data = json.load(f)["inputs"][0]
71+
n_rows = data["shape"][0]
72+
n_cols = data["shape"][1]
73+
self.assertEqual(df.shape, (n_rows, n_cols))
74+
75+
if __name__ == "__main__":
76+
unittest.main()

0 commit comments

Comments
 (0)