Skip to content

Commit 8149ea9

Browse files
committed
feat(utils): Add shared manifest inspection utilities
- describe_manifest(): Extract structured schema from manifests - format_arrow_type(): Format Arrow types into readable strings - print_schema(): Pretty-print schema in human-readable format
1 parent 651bd29 commit 8149ea9

File tree

3 files changed

+253
-0
lines changed

3 files changed

+253
-0
lines changed

src/amp/utils/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""Utility modules for amp Python client."""
2+
3+
from .manifest_inspector import describe_manifest, format_arrow_type, print_schema
4+
5+
__all__ = ['describe_manifest', 'format_arrow_type', 'print_schema']
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""Shared utilities for inspecting dataset manifests.
2+
3+
This module provides functions to parse and display dataset schemas
4+
from manifest files in a human-readable format.
5+
"""
6+
7+
from typing import Any, Dict
8+
9+
10+
def describe_manifest(manifest: dict) -> Dict[str, list[Dict[str, str | bool]]]:
11+
"""Extract structured schema information from a manifest.
12+
13+
Args:
14+
manifest: Dataset manifest dictionary
15+
16+
Returns:
17+
dict: Mapping of table names to column information. Each column is a dict with:
18+
- name: Column name
19+
- type: Arrow type (simplified string representation)
20+
- nullable: Whether the column allows NULL values
21+
"""
22+
tables = manifest.get('tables', {})
23+
24+
result = {}
25+
for table_name, table_def in tables.items():
26+
schema = table_def.get('schema', {}).get('arrow', {})
27+
fields = schema.get('fields', [])
28+
29+
columns = []
30+
for field in fields:
31+
col_type = format_arrow_type(field.get('type'))
32+
columns.append(
33+
{
34+
'name': field.get('name', ''),
35+
'type': col_type,
36+
'nullable': field.get('nullable', True),
37+
}
38+
)
39+
40+
result[table_name] = columns
41+
42+
return result
43+
44+
45+
def format_arrow_type(type_def: Any) -> str:
46+
"""Format Arrow type definition into a readable string.
47+
48+
Args:
49+
type_def: Arrow type definition (str or dict)
50+
51+
Returns:
52+
str: Human-readable type string
53+
"""
54+
if isinstance(type_def, str):
55+
return type_def
56+
elif isinstance(type_def, dict):
57+
# Handle complex types like Timestamp, FixedSizeBinary, Decimal128
58+
if 'Timestamp' in type_def:
59+
unit = type_def['Timestamp'][0] if type_def['Timestamp'] else 'Unknown'
60+
return f'Timestamp({unit})'
61+
elif 'FixedSizeBinary' in type_def:
62+
size = type_def['FixedSizeBinary']
63+
return f'FixedSizeBinary({size})'
64+
elif 'Decimal128' in type_def:
65+
precision, scale = type_def['Decimal128']
66+
return f'Decimal128({precision},{scale})'
67+
else:
68+
# Fallback for unknown complex types
69+
return str(type_def)
70+
else:
71+
return str(type_def)
72+
73+
74+
def print_schema(schema: Dict[str, list[Dict[str, Any]]], header: str = None) -> None:
75+
"""Pretty-print a schema dictionary.
76+
77+
Args:
78+
schema: Schema dictionary from describe_manifest()
79+
header: Optional header text to print before the schema
80+
"""
81+
if header:
82+
print(f'\n{header}')
83+
84+
if not schema:
85+
print('\n (No tables found in manifest)')
86+
return
87+
88+
# Print each table
89+
for table_name, columns in schema.items():
90+
print(f'\n{table_name} ({len(columns)} columns)')
91+
for col in columns:
92+
nullable_str = 'NULL ' if col['nullable'] else 'NOT NULL'
93+
# Pad column name for alignment
94+
col_name = col['name'].ljust(20)
95+
col_type = col['type'].ljust(20)
96+
print(f' {col_name} {col_type} {nullable_str}')
97+
98+
print() # Empty line at end
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
"""Unit tests for registry dataset inspection methods."""
2+
3+
import pytest
4+
5+
from amp.registry.datasets import RegistryDatasetsClient
6+
from amp.utils.manifest_inspector import format_arrow_type
7+
8+
9+
class MockRegistryClient:
10+
"""Mock registry client for testing."""
11+
12+
def __init__(self, manifest):
13+
self.manifest = manifest
14+
15+
def _request(self, method, path, params=None):
16+
"""Mock HTTP request."""
17+
18+
class MockResponse:
19+
def json(self):
20+
return manifest
21+
22+
return MockResponse()
23+
24+
25+
# Sample manifest for testing
26+
manifest = {
27+
'kind': 'manifest',
28+
'dependencies': {},
29+
'tables': {
30+
'blocks': {
31+
'schema': {
32+
'arrow': {
33+
'fields': [
34+
{'name': 'block_num', 'type': 'UInt64', 'nullable': False},
35+
{'name': 'timestamp', 'type': {'Timestamp': ['Nanosecond', '+00:00']}, 'nullable': False},
36+
{'name': 'hash', 'type': {'FixedSizeBinary': 32}, 'nullable': False},
37+
{'name': 'base_fee_per_gas', 'type': {'Decimal128': [38, 0]}, 'nullable': True},
38+
]
39+
}
40+
},
41+
},
42+
'transactions': {
43+
'schema': {
44+
'arrow': {
45+
'fields': [
46+
{'name': 'tx_hash', 'type': {'FixedSizeBinary': 32}, 'nullable': False},
47+
{'name': 'from', 'type': {'FixedSizeBinary': 20}, 'nullable': False},
48+
{'name': 'value', 'type': {'Decimal128': [38, 0]}, 'nullable': True},
49+
]
50+
}
51+
},
52+
},
53+
},
54+
}
55+
56+
57+
@pytest.mark.unit
58+
class TestDatasetInspection:
59+
"""Test dataset inspection methods."""
60+
61+
def test_format_arrow_type_primitive(self):
62+
"""Test formatting primitive Arrow types."""
63+
assert format_arrow_type('UInt64') == 'UInt64'
64+
assert format_arrow_type('Binary') == 'Binary'
65+
assert format_arrow_type('Boolean') == 'Boolean'
66+
67+
def test_format_arrow_type_timestamp(self):
68+
"""Test formatting Timestamp types."""
69+
result = format_arrow_type({'Timestamp': ['Nanosecond', '+00:00']})
70+
assert result == 'Timestamp(Nanosecond)'
71+
72+
result = format_arrow_type({'Timestamp': ['Microsecond', '+00:00']})
73+
assert result == 'Timestamp(Microsecond)'
74+
75+
def test_format_arrow_type_fixed_binary(self):
76+
"""Test formatting FixedSizeBinary types."""
77+
result = format_arrow_type({'FixedSizeBinary': 32})
78+
assert result == 'FixedSizeBinary(32)'
79+
80+
result = format_arrow_type({'FixedSizeBinary': 20})
81+
assert result == 'FixedSizeBinary(20)'
82+
83+
def test_format_arrow_type_decimal(self):
84+
"""Test formatting Decimal128 types."""
85+
result = format_arrow_type({'Decimal128': [38, 0]})
86+
assert result == 'Decimal128(38,0)'
87+
88+
result = format_arrow_type({'Decimal128': [18, 6]})
89+
assert result == 'Decimal128(18,6)'
90+
91+
def test_describe_returns_correct_structure(self):
92+
"""Test that describe returns the expected structure."""
93+
# Create mock client with test manifest
94+
mock_registry = MockRegistryClient(manifest)
95+
client = RegistryDatasetsClient(mock_registry)
96+
97+
# Mock get_manifest to return our test manifest
98+
client.get_manifest = lambda ns, name, ver: manifest
99+
100+
# Call describe
101+
schema = client.describe('test', 'dataset', 'latest')
102+
103+
# Verify structure
104+
assert 'blocks' in schema
105+
assert 'transactions' in schema
106+
107+
# Check blocks table
108+
blocks = schema['blocks']
109+
assert len(blocks) == 4
110+
assert blocks[0]['name'] == 'block_num'
111+
assert blocks[0]['type'] == 'UInt64'
112+
assert blocks[0]['nullable'] is False
113+
114+
# Check formatted complex types
115+
assert blocks[1]['name'] == 'timestamp'
116+
assert blocks[1]['type'] == 'Timestamp(Nanosecond)'
117+
118+
assert blocks[2]['name'] == 'hash'
119+
assert blocks[2]['type'] == 'FixedSizeBinary(32)'
120+
121+
assert blocks[3]['name'] == 'base_fee_per_gas'
122+
assert blocks[3]['type'] == 'Decimal128(38,0)'
123+
assert blocks[3]['nullable'] is True
124+
125+
def test_describe_handles_empty_manifest(self):
126+
"""Test that describe handles manifests with no tables."""
127+
empty_manifest = {'kind': 'manifest', 'dependencies': {}, 'tables': {}}
128+
129+
mock_registry = MockRegistryClient(empty_manifest)
130+
client = RegistryDatasetsClient(mock_registry)
131+
client.get_manifest = lambda ns, name, ver: empty_manifest
132+
133+
schema = client.describe('test', 'dataset', 'latest')
134+
assert schema == {}
135+
136+
def test_describe_handles_nullable_field(self):
137+
"""Test that describe correctly identifies nullable fields."""
138+
mock_registry = MockRegistryClient(manifest)
139+
client = RegistryDatasetsClient(mock_registry)
140+
client.get_manifest = lambda ns, name, ver: manifest
141+
142+
schema = client.describe('test', 'dataset', 'latest')
143+
144+
# Check nullable fields
145+
transactions = schema['transactions']
146+
value_field = next(col for col in transactions if col['name'] == 'value')
147+
assert value_field['nullable'] is True
148+
149+
from_field = next(col for col in transactions if col['name'] == 'from')
150+
assert from_field['nullable'] is False

0 commit comments

Comments
 (0)