Skip to content

Commit 7b77959

Browse files
authored
[OSO-731] add DataAnalytics feature to pyoso (#4723)
* feat(pyoso): add DataAnalytics feature to pyoso * chore(pyoso): bump package version * refactor(pyoso/analytics): make variables snake case * feat(pyoso): add and use pydantic models for JSON validation * fix(pyoso): fix README
1 parent 0360251 commit 7b77959

File tree

10 files changed

+1886
-1607
lines changed

10 files changed

+1886
-1607
lines changed

uv.lock

Lines changed: 1412 additions & 1410 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

warehouse/pyoso/README.md

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ _WARNING: THIS IS A WORK IN PROGRESS_
44

55
`pyoso` is a Python package for fetching models and metrics from OSO. This package provides an easy-to-use interface to interact with oso and retrieve valuable data for analysis and monitoring.
66

7-
## Current Features
7+
## Features
88

9-
- Execute custom SQL queries for analyzing the OSO dataset
9+
- Execute custom SQL queries for analyzing the OSO dataset.
10+
- Inspect data dependencies and freshness with an analytics tree.
11+
- Semantic modeling layer to build and execute complex queries (optional).
1012

1113
## Installation
1214

@@ -28,12 +30,13 @@ This will include the `oso_semantic` package for building semantic models and qu
2830

2931
## Usage
3032

31-
Here is a basic example of how to use `pyoso`:
33+
Here is a basic example of how to use `pyoso` to fetch data directly into a pandas DataFrame:
3234

3335
```python
36+
import os
3437
from pyoso import Client
3538

36-
# Initialize the client
39+
# Initialize the client with an API key
3740
os.environ["OSO_API_KEY"] = 'your_api_key'
3841
client = Client()
3942

@@ -44,6 +47,34 @@ artifacts = client.to_pandas(query)
4447
print(artifacts)
4548
```
4649

50+
### Inspecting Data Dependencies
51+
52+
For more advanced use cases, the `client.query()` method returns a `QueryResponse` object that contains both the data and analytics metadata. This allows you to inspect the dependency tree of the data sources used in your query.
53+
54+
```python
55+
import os
56+
from pyoso import Client
57+
58+
# Initialize the client
59+
os.environ["OSO_API_KEY"] = "your_api_key"
60+
client = Client()
61+
62+
# Execute a query to get a QueryResponse object
63+
query = "SELECT * FROM artifacts_v1 LIMIT 5"
64+
response = client.query(query)
65+
66+
# You can still get the DataFrame as before
67+
df = response.to_pandas()
68+
print("\n--- Query Data ---")
69+
print(df)
70+
71+
# Now, inspect the analytics to see the dependency tree
72+
print("\n--- Data Dependency Tree ---")
73+
response.analytics.print_tree()
74+
```
75+
76+
This will output a tree structure showing how the final `artifacts_v1` table was constructed from its upstream dependencies, helping you understand the data's origin and freshness.
77+
4778
## Documentation
4879

4980
For detailed documentation about the OSO dataset, please refer to the [official documentation](https://docs.opensource.observer/docs/integrate/datasets/).

warehouse/pyoso/pyoso/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@
77
except PackageNotFoundError:
88
__version__ = "unknown"
99

10-
from .client import Client
10+
from .analytics import DataAnalytics
11+
from .client import Client, QueryResponse
1112
from .exceptions import HTTPError, OsoError

warehouse/pyoso/pyoso/analytics.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
from datetime import datetime
2+
3+
from pydantic import BaseModel, Field
4+
5+
6+
class PartitionStatusRange(BaseModel):
7+
end_key: str = Field(alias="endKey")
8+
start_key: str = Field(alias="startKey")
9+
status: str
10+
11+
12+
class PartitionStatus(BaseModel):
13+
num_failed: int = Field(alias="numFailed")
14+
num_materialized: int = Field(alias="numMaterialized")
15+
num_materializing: int = Field(alias="numMaterializing")
16+
num_partitions: int = Field(alias="numPartitions")
17+
ranges: list[PartitionStatusRange]
18+
19+
20+
class MaterializationStatus(BaseModel):
21+
partition_status: PartitionStatus | None = Field(
22+
default=None, alias="partitionStatus"
23+
)
24+
latest_materialization: datetime | None = Field(
25+
default=None, alias="latestMaterialization"
26+
)
27+
28+
29+
class DataStatus(BaseModel):
30+
key: str
31+
status: MaterializationStatus
32+
dependencies: list[str]
33+
34+
35+
class DataAnalytics:
36+
"""Container for analytics data with tree-structured display methods."""
37+
38+
def __init__(self, analytics_data: dict[str, DataStatus]):
39+
self._analytics_data = analytics_data
40+
self._root_keys = self._calculate_root_keys()
41+
42+
def _calculate_root_keys(self) -> list[str]:
43+
"""Calculate root nodes (nodes that are not dependencies of others)."""
44+
all_dependencies = set()
45+
for data_status in self._analytics_data.values():
46+
all_dependencies.update(data_status.dependencies)
47+
48+
root_keys = [
49+
k for k in self._analytics_data.keys() if k not in all_dependencies
50+
]
51+
52+
# If no clear roots, return all keys
53+
if not root_keys:
54+
root_keys = list(self._analytics_data.keys())
55+
56+
return root_keys
57+
58+
def __iter__(self):
59+
"""Iterate over the analytics data keys."""
60+
return iter(self._analytics_data.keys())
61+
62+
def __contains__(self, key: str) -> bool:
63+
"""Check if a key exists in the analytics data."""
64+
return key in self._analytics_data
65+
66+
def __len__(self):
67+
"""Get the number of analytics entries."""
68+
return len(self._analytics_data)
69+
70+
@property
71+
def root_keys(self) -> list[str]:
72+
"""Get the root keys (top-level nodes in the dependency tree)."""
73+
return self._root_keys.copy()
74+
75+
def get(self, key: str) -> DataStatus | None:
76+
"""Get analytics data for a specific key."""
77+
return self._analytics_data.get(key)
78+
79+
def print_tree(self, key: str | None = None):
80+
"""Print analytics data as a tree structure.
81+
82+
Args:
83+
key: If provided, print analytics for this specific key and its dependencies.
84+
If None, print analytics for all root keys.
85+
"""
86+
if key is not None:
87+
self._print_analytics_tree(key, set())
88+
else:
89+
# Print all root nodes
90+
num_roots = len(self._root_keys)
91+
for i, root_key in enumerate(self._root_keys):
92+
self._print_analytics_tree(
93+
root_key, set(), is_last=(i == num_roots - 1)
94+
)
95+
96+
def _print_analytics_tree(
97+
self, key: str, visited: set, indent: str = "", is_last: bool = True
98+
):
99+
"""Recursively print analytics tree for a given key."""
100+
if key in visited:
101+
print(f"{indent}├──{key} (circular dependency)")
102+
return
103+
104+
visited.add(key)
105+
data_status = self._analytics_data[key]
106+
107+
# Print the current node with inline status information
108+
status = data_status.status
109+
status_parts = []
110+
111+
if not status.latest_materialization and not status.partition_status:
112+
status_parts.append("No analytics data")
113+
if status.latest_materialization:
114+
status_parts.append(
115+
f"Last: {status.latest_materialization.strftime('%Y-%m-%d %H:%M:%S')}"
116+
)
117+
if status.partition_status:
118+
ps = status.partition_status
119+
status_parts.append(
120+
f"Partitions: {ps.num_materialized}/{ps.num_partitions}"
121+
)
122+
123+
status_text = f" ({', '.join(status_parts)})" if status_parts else ""
124+
125+
# Determine the prefix for the current node
126+
prefix = "└── " if is_last else "├── "
127+
print(f"{indent}{prefix}{key}{status_text}")
128+
129+
# Print dependencies
130+
if data_status.dependencies:
131+
num_dependencies = len(data_status.dependencies)
132+
for i, dep in enumerate(data_status.dependencies):
133+
is_last_dep = i == num_dependencies - 1
134+
dep_indent = indent + (" " if is_last else "│ ")
135+
self._print_analytics_tree(
136+
dep, visited.copy(), dep_indent, is_last=is_last_dep
137+
)
138+
139+
visited.remove(key)

warehouse/pyoso/pyoso/client.py

Lines changed: 72 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import json
22
import os
3-
from dataclasses import dataclass
4-
from typing import Any, Optional
3+
from typing import Any
54

65
import pandas as pd
76
import requests
7+
from pydantic import BaseModel
8+
from pyoso.analytics import DataAnalytics, DataStatus
89
from pyoso.constants import DEFAULT_BASE_URL, OSO_API_KEY
910
from pyoso.exceptions import OsoError, OsoHTTPError
1011
from pyoso.semantic import create_registry
@@ -19,20 +20,66 @@
1920
pass
2021

2122

22-
@dataclass
23-
class ClientConfig:
24-
base_url: Optional[str]
23+
class ClientConfig(BaseModel):
24+
base_url: str | None
2525

2626

27-
@dataclass
28-
class QueryData:
27+
class QueryData(BaseModel):
2928
columns: list[str]
3029
data: list[list[Any]]
3130

3231

32+
class QueryResponse:
33+
"""Response object containing query data and optional analytics."""
34+
35+
def __init__(self, data: QueryData, analytics: DataAnalytics):
36+
self._data = data
37+
self._analytics = analytics
38+
39+
def to_pandas(self) -> pd.DataFrame:
40+
"""Convert the query data to a pandas DataFrame."""
41+
return pd.DataFrame(
42+
self._data.data, columns=self._data.columns
43+
).convert_dtypes()
44+
45+
@property
46+
def analytics(self) -> DataAnalytics:
47+
"""Get the analytics data."""
48+
return self._analytics
49+
50+
@property
51+
def data(self) -> QueryData:
52+
"""Get the raw query data."""
53+
return self._data
54+
55+
@staticmethod
56+
def from_response_chunks(response_chunks) -> "QueryResponse":
57+
"""Parse HTTP response chunks into QueryResponse."""
58+
columns = []
59+
data = []
60+
analytics = {}
61+
62+
for chunk in response_chunks:
63+
if chunk:
64+
parsed_obj = json.loads(chunk)
65+
66+
if "assetStatus" in parsed_obj:
67+
for asset in parsed_obj["assetStatus"]:
68+
data_status = DataStatus.model_validate(asset)
69+
analytics[data_status.key] = data_status
70+
elif "columns" in parsed_obj:
71+
columns.extend(parsed_obj["columns"])
72+
73+
if "data" in parsed_obj:
74+
data.extend(parsed_obj["data"])
75+
76+
query_data = QueryData(columns=columns, data=data)
77+
return QueryResponse(data=query_data, analytics=DataAnalytics(analytics))
78+
79+
3380
class Client:
3481
def __init__(
35-
self, api_key: Optional[str] = None, client_opts: Optional[ClientConfig] = None
82+
self, api_key: str | None = None, client_opts: ClientConfig | None = None
3683
):
3784
self.__api_key = api_key if api_key else os.environ.get(OSO_API_KEY)
3885
if not self.__api_key:
@@ -51,8 +98,12 @@ def __init__(
5198
)
5299

53100
def __query(
54-
self, query: str, input_dialect="trino", output_dialect="trino"
55-
) -> QueryData:
101+
self,
102+
query: str,
103+
input_dialect="trino",
104+
output_dialect="trino",
105+
include_analytics: bool = True,
106+
) -> QueryResponse:
56107
# The following checks are only for providing better error messages as
57108
# the oso api does _not_ support multiple queries nor the use of
58109
# semicolons.
@@ -78,26 +129,22 @@ def __query(
78129
json={
79130
"query": query_expression.sql(dialect=output_dialect),
80131
"format": "minimal",
132+
"includeAnalytics": include_analytics,
81133
},
82134
stream=True,
83135
)
84136
response.raise_for_status()
85-
columns = []
86-
data = []
87-
for chunk in response.iter_lines(chunk_size=None):
88-
if chunk:
89-
parsed_obj = json.loads(chunk)
90-
if "columns" in parsed_obj:
91-
columns.extend(parsed_obj["columns"])
92-
if "data" in parsed_obj:
93-
data.extend(parsed_obj["data"])
94-
95-
return QueryData(columns=columns, data=data)
137+
138+
return QueryResponse.from_response_chunks(
139+
response.iter_lines(chunk_size=None)
140+
)
96141
except requests.HTTPError as e:
97142
raise OsoHTTPError(e, response=e.response) from None
98143

99144
def to_pandas(self, query: str):
100-
query_data = self.__query(query)
101-
return pd.DataFrame(
102-
query_data.data, columns=query_data.columns
103-
).convert_dtypes()
145+
query_response = self.__query(query)
146+
return query_response.to_pandas()
147+
148+
def query(self, query: str, include_analytics: bool = True) -> QueryResponse:
149+
"""Execute a SQL query and return the full response including analytics data."""
150+
return self.__query(query, include_analytics=include_analytics)

0 commit comments

Comments
 (0)