Skip to content

Commit 6f6e159

Browse files
Akash Vermaharshachulixius9
authored andcommitted
Feat: BurstIQ database Connector (#25268)
* Feat: BurstIQ database Connector * ts files * burstiq python and ui changes * Added tests * Added req for sdz and customer name in schema and ts files * Fixed tests for new changes * fix code and python checkstyle --------- Co-authored-by: Akash Verma <akashverma@Akashs-MacBook-Pro-2.local> Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com> Co-authored-by: Mayur Singal <39544459+ulixius9@users.noreply.github.com>
1 parent 96a5090 commit 6f6e159

File tree

37 files changed

+3098
-66
lines changed

37 files changed

+3098
-66
lines changed

ingestion/src/metadata/ingestion/source/database/burstiq/__init__.py

Whitespace-only changes.
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
"""
12+
Client to interact with BurstIQ LifeGraph APIs
13+
"""
14+
import traceback
15+
from datetime import datetime, timedelta
16+
from typing import Any, Dict, List, Optional
17+
18+
import requests
19+
20+
from metadata.generated.schema.entity.services.connections.database.burstIQConnection import (
21+
BurstIQConnection,
22+
)
23+
from metadata.utils.logger import ingestion_logger
24+
25+
logger = ingestion_logger()
26+
27+
AUTH_TIMEOUT = (10, 30) # 10s connect, 30s read for authentication
28+
API_TIMEOUT = (
29+
10,
30+
120,
31+
) # 10s connect, 120s read for API calls (handles 600+ dictionaries)
32+
33+
AUTH_SERVER_BASE = "https://auth.burstiq.com"
34+
API_BASE_URL = "https://api.burstiq.com"
35+
36+
37+
class BurstIQClient:
38+
"""
39+
BurstIQClient creates a REST API connection to BurstIQ LifeGraph platform.
40+
Handles OAuth2 authentication and API requests.
41+
"""
42+
43+
def __init__(self, config: BurstIQConnection):
44+
"""
45+
Initialize BurstIQ Client
46+
47+
Args:
48+
config: BurstIQConnection configuration
49+
"""
50+
self.config = config
51+
self.api_base_url = getattr(config, "apiUrl", API_BASE_URL).rstrip("/")
52+
53+
# Token management
54+
self.access_token: Optional[str] = None
55+
self.token_expires_at: Optional[datetime] = None
56+
57+
def test_authenticate(self):
58+
"""
59+
Explicitly test authentication with BurstIQ.
60+
This is used during test_connection to validate credentials.
61+
62+
Raises:
63+
Exception: If authentication fails
64+
"""
65+
self._authenticate()
66+
67+
def _authenticate(self):
68+
"""Authenticate with BurstIQ and get access token"""
69+
# Get configuration values
70+
realm_name = getattr(self.config, "realmName", None)
71+
username = getattr(self.config, "username", None)
72+
password = getattr(self.config, "password", None)
73+
74+
# Validate required fields
75+
if not realm_name:
76+
raise ValueError("realmName is required for authentication")
77+
if not username:
78+
raise ValueError("username is required for authentication")
79+
if not password:
80+
raise ValueError("password is required for authentication")
81+
82+
auth_server_url = getattr(self.config, "authServerUrl", AUTH_SERVER_BASE)
83+
client_id = getattr(self.config, "clientId", "burst")
84+
token_url = (
85+
f"{auth_server_url}/realms/{realm_name}/protocol/openid-connect/token"
86+
)
87+
88+
payload = {
89+
"client_id": client_id,
90+
"grant_type": "password",
91+
"username": username,
92+
"password": password.get_secret_value(),
93+
}
94+
95+
headers = {"Content-Type": "application/x-www-form-urlencoded"}
96+
97+
try:
98+
logger.info(f"Authenticating with BurstIQ at: {token_url}")
99+
response = requests.post(
100+
token_url, data=payload, headers=headers, timeout=AUTH_TIMEOUT
101+
)
102+
response.raise_for_status()
103+
104+
token_data = response.json()
105+
106+
self.access_token = token_data.get("access_token")
107+
108+
# Calculate token expiration
109+
expires_in = token_data.get("expires_in", 3600)
110+
self.token_expires_at = datetime.now() + timedelta(
111+
seconds=expires_in - 60
112+
) # 60s buffer
113+
114+
customer_name = getattr(self.config, "biqCustomerName", None)
115+
sdz_name = getattr(self.config, "biqSdzName", None)
116+
117+
logger.info(
118+
f"Authentication successful. Token expires in {expires_in} seconds"
119+
)
120+
if customer_name and sdz_name:
121+
logger.info(f"Customer: {customer_name}, SDZ: {sdz_name}")
122+
123+
except Exception as exc:
124+
logger.error(f"Authentication failed: {exc}")
125+
logger.debug(traceback.format_exc())
126+
raise Exception("Failed to authenticate with BurstIQ") from exc
127+
128+
def _get_auth_header(self) -> Dict[str, str]:
129+
"""
130+
Get authentication headers with current access token.
131+
Authenticates on first call if not already authenticated.
132+
133+
Returns:
134+
Dictionary of headers
135+
"""
136+
# Authenticate if not already done (lazy authentication)
137+
if not self.access_token:
138+
logger.info("No access token found, authenticating...")
139+
self._authenticate()
140+
# Check if token needs refresh
141+
elif self.token_expires_at and datetime.now() >= self.token_expires_at:
142+
logger.info("Access token expired, re-authenticating...")
143+
self._authenticate()
144+
145+
headers = {
146+
"Authorization": f"Bearer {self.access_token}",
147+
"Content-Type": "application/json",
148+
"Accept": "application/json",
149+
}
150+
151+
# Add BurstIQ-specific headers from config
152+
customer_name = getattr(self.config, "biqCustomerName", None)
153+
sdz_name = getattr(self.config, "biqSdzName", None)
154+
155+
if customer_name:
156+
headers["biq_customer_name"] = customer_name
157+
if sdz_name:
158+
headers["biq_sdz_name"] = sdz_name
159+
160+
return headers
161+
162+
def _make_request(
163+
self, method: str, endpoint: str, **kwargs
164+
) -> Optional[Dict[str, Any]]:
165+
"""
166+
Make HTTP request to BurstIQ API
167+
168+
Args:
169+
method: HTTP method (GET, POST, etc.)
170+
endpoint: API endpoint path
171+
**kwargs: Additional arguments for requests
172+
173+
Returns:
174+
JSON response or None
175+
"""
176+
import time
177+
178+
url = f"{self.api_base_url}/{endpoint.lstrip('/')}"
179+
headers = self._get_auth_header()
180+
181+
# Merge with any additional headers provided
182+
if "headers" in kwargs:
183+
headers.update(kwargs.pop("headers"))
184+
185+
# Log request params for debugging
186+
params = kwargs.get("params", {})
187+
logger.debug(f"Making {method} request to {url} with params: {params}")
188+
189+
try:
190+
start_time = time.time()
191+
response = requests.request(
192+
method, url, headers=headers, timeout=API_TIMEOUT, **kwargs
193+
)
194+
elapsed_time = time.time() - start_time
195+
196+
logger.debug(
197+
f"Request completed in {elapsed_time:.2f}s - Status: {response.status_code}"
198+
)
199+
200+
response.raise_for_status()
201+
202+
# Parse JSON response
203+
json_data = response.json()
204+
205+
# Log response size
206+
if isinstance(json_data, list):
207+
logger.debug(f"Received {len(json_data)} items in response")
208+
else:
209+
logger.debug(f"Received single item response")
210+
211+
return json_data
212+
213+
except requests.exceptions.Timeout as exc:
214+
logger.error(f"Request timeout after {API_TIMEOUT}s for {url}: {exc}")
215+
logger.debug(traceback.format_exc())
216+
raise ConnectionError(
217+
f"BurstIQ API request timed out after {API_TIMEOUT}s for {url}. "
218+
"Please check your network connection and BurstIQ API availability."
219+
) from exc
220+
except requests.exceptions.ConnectionError as exc:
221+
logger.error(f"Connection error for {url}: {exc}")
222+
logger.debug(traceback.format_exc())
223+
raise ConnectionError(
224+
f"Failed to connect to BurstIQ API at {url}. "
225+
"Please verify the API URL and network connectivity."
226+
) from exc
227+
except Exception as exc:
228+
logger.error(f"API request failed for {url}: {exc}")
229+
logger.debug(traceback.format_exc())
230+
raise
231+
232+
def get_dictionaries(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
233+
"""
234+
Fetch all data dictionaries from BurstIQ
235+
236+
Args:
237+
limit: Optional limit on number of dictionaries to fetch
238+
239+
Returns:
240+
List of dictionary objects
241+
"""
242+
params = {}
243+
if limit:
244+
params["limit"] = limit
245+
246+
logger.info("Fetching dictionaries from BurstIQ...")
247+
data = self._make_request("GET", "/api/metadata/dictionary", params=params)
248+
249+
if data is None:
250+
return []
251+
252+
dictionaries = data if isinstance(data, list) else [data]
253+
logger.info(f"Found {len(dictionaries)} dictionaries")
254+
return dictionaries
255+
256+
def get_dictionary_by_name(self, name: str) -> Optional[Dict[str, Any]]:
257+
"""
258+
Get a specific dictionary by name
259+
260+
Args:
261+
name: Dictionary name
262+
263+
Returns:
264+
Dictionary object or None
265+
"""
266+
logger.debug(f"Fetching dictionary: {name}")
267+
return self._make_request("GET", f"/api/metadata/dictionary/{name}")
268+
269+
def get_edges(
270+
self,
271+
name: Optional[str] = None,
272+
from_dictionary: Optional[str] = None,
273+
to_dictionary: Optional[str] = None,
274+
limit: Optional[int] = None,
275+
skip: Optional[int] = None,
276+
) -> List[Dict[str, Any]]:
277+
"""
278+
Query edge definitions (lineage relationships) from BurstIQ
279+
280+
Args:
281+
name: Optional edge name filter
282+
from_dictionary: Optional source dictionary filter
283+
to_dictionary: Optional target dictionary filter
284+
limit: Optional limit on number of edges to fetch
285+
skip: Optional number of edges to skip (pagination)
286+
287+
Returns:
288+
List of edge definition objects
289+
"""
290+
params = {}
291+
if name:
292+
params["name"] = name
293+
if from_dictionary:
294+
params["fromDictionary"] = from_dictionary
295+
if to_dictionary:
296+
params["toDictionary"] = to_dictionary
297+
if limit:
298+
params["limit"] = limit
299+
if skip:
300+
params["skip"] = skip
301+
302+
logger.info(
303+
f"Fetching edges from BurstIQ (filters: name={name}, from={from_dictionary}, to={to_dictionary})"
304+
)
305+
data = self._make_request("GET", "/api/metadata/edge", params=params)
306+
307+
if data is None:
308+
return []
309+
310+
edges = data if isinstance(data, list) else [data]
311+
logger.info(f"Found {len(edges)} edge definitions")
312+
return edges
313+
314+
def close(self):
315+
"""Cleanup method - no session to close when using plain requests"""
316+
pass

0 commit comments

Comments
 (0)