Skip to content

Commit 5fee02b

Browse files
committed
client: Integrate admin client with unified Client class
- Add query_url and admin_url parameters to Client (backward compatible with url) - Add datasets, jobs, schema properties for admin operations - Extend QueryBuilder with with_dependency() for manifest dependencies - Add to_manifest() for generating dataset manifests from SQL queries - Add register_as() for one-line registration returning DeploymentContext - Support fluent API: query → with_dependency → register_as → deploy - Maintain backward compatibility (existing Client(url=...) still works)
1 parent e7674f2 commit 5fee02b

File tree

2 files changed

+224
-4
lines changed

2 files changed

+224
-4
lines changed

src/amp/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""Amp - Flight SQL client with comprehensive data loading capabilities."""
2+
3+
from amp.client import Client, QueryBuilder
4+
5+
__all__ = ['Client', 'QueryBuilder']

src/amp/client.py

Lines changed: 219 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,17 @@
2020

2121

2222
class QueryBuilder:
23-
"""Chainable query builder for data loading operations"""
23+
"""Chainable query builder for data loading operations.
24+
25+
Supports both data loading to various destinations and manifest generation
26+
for dataset registration via the Admin API.
27+
"""
2428

2529
def __init__(self, client: 'Client', query: str):
2630
self.client = client
2731
self.query = query
2832
self._result_cache = None
33+
self._dependencies: Dict[str, str] = {} # For manifest generation
2934
self.logger = logging.getLogger(__name__)
3035

3136
def load(
@@ -115,19 +120,166 @@ def get_sql(self, read_all: bool = False):
115120
"""Backward compatibility with existing method"""
116121
return self.client.get_sql(self.query, read_all=read_all)
117122

123+
# Admin API manifest methods (require admin_url in Client)
124+
def with_dependency(self, alias: str, reference: str) -> 'QueryBuilder':
125+
"""Add a dataset dependency for manifest generation.
126+
127+
Use this to declare dependencies when generating manifests for derived datasets.
128+
The alias should match the dataset prefix used in your SQL query.
129+
130+
Args:
131+
alias: Local alias used in SQL (e.g., 'eth' for 'eth.blocks')
132+
reference: Full dataset reference (e.g., '_/[email protected]')
133+
134+
Returns:
135+
Self for method chaining
136+
137+
Example:
138+
>>> client.sql("SELECT block_num FROM eth.blocks WHERE block_num > 1000000") \\
139+
... .with_dependency("eth", "_/[email protected]") \\
140+
... .to_manifest("recent_blocks")
141+
"""
142+
self._dependencies[alias] = reference
143+
return self
144+
145+
def to_manifest(self, table_name: str, network: str = 'mainnet') -> dict:
146+
"""Generate a dataset manifest from this query.
147+
148+
Automatically fetches the Arrow schema using the Admin API /schema endpoint.
149+
Requires the Client to be initialized with admin_url.
150+
151+
Args:
152+
table_name: Name for the table in the manifest
153+
network: Network name (default: 'mainnet')
154+
155+
Returns:
156+
Complete manifest dict ready for registration
157+
158+
Raises:
159+
ValueError: If admin_url not configured in Client
160+
GetOutputSchemaError: If schema fetch fails
161+
162+
Example:
163+
>>> manifest = client.sql("SELECT block_num, hash FROM eth.blocks") \\
164+
... .with_dependency("eth", "_/[email protected]") \\
165+
... .to_manifest("blocks", network="mainnet")
166+
>>> print(manifest['kind'])
167+
'manifest'
168+
"""
169+
# Get schema from Admin API
170+
schema_response = self.client.schema.get_output_schema(self.query, is_sql_dataset=True)
171+
172+
# Build manifest structure matching tests/config/manifests/*.json format
173+
manifest = {
174+
'kind': 'manifest',
175+
'dependencies': self._dependencies,
176+
'tables': {
177+
table_name: {
178+
'input': {'sql': self.query},
179+
'schema': schema_response.schema_, # Use schema_ field (schema is aliased in Pydantic)
180+
'network': network,
181+
}
182+
},
183+
'functions': {},
184+
}
185+
return manifest
186+
187+
def register_as(self, namespace: str, name: str, version: str, table_name: str, network: str = 'mainnet'):
188+
"""Register this query as a new dataset.
189+
190+
Generates manifest and registers with Admin API in one call.
191+
Returns a DeploymentContext for optional chained deployment.
192+
193+
Args:
194+
namespace: Dataset namespace (e.g., '_')
195+
name: Dataset name
196+
version: Semantic version (e.g., '1.0.0')
197+
table_name: Table name in manifest
198+
network: Network name (default: 'mainnet')
199+
200+
Returns:
201+
DeploymentContext for optional deployment
202+
203+
Raises:
204+
ValueError: If admin_url not configured in Client
205+
InvalidManifestError: If manifest is invalid
206+
DependencyValidationError: If dependencies are invalid
207+
208+
Example:
209+
>>> # Register and deploy in one chain
210+
>>> client.sql("SELECT block_num, hash FROM eth.blocks WHERE block_num > 18000000") \\
211+
... .with_dependency("eth", "_/[email protected]") \\
212+
... .register_as("_", "recent_blocks", "1.0.0", "blocks") \\
213+
... .deploy(parallelism=4, wait=True)
214+
"""
215+
from amp.admin.deployment import DeploymentContext
216+
217+
# Generate manifest
218+
manifest = self.to_manifest(table_name, network)
219+
220+
# Register with Admin API
221+
self.client.datasets.register(namespace, name, version, manifest)
222+
223+
# Return deployment context for optional chaining
224+
return DeploymentContext(self.client, namespace, name, version)
225+
118226
def __repr__(self):
119227
return f"QueryBuilder(query='{self.query[:50]}{'...' if len(self.query) > 50 else ''}')"
120228

121229

122230
class Client:
123-
"""Enhanced Flight SQL client with data loading capabilities"""
231+
"""Enhanced Flight SQL client with data loading capabilities.
232+
233+
Supports both query operations (via Flight SQL) and optional admin operations
234+
(via HTTP Admin API).
235+
236+
Args:
237+
url: Flight SQL URL (for backward compatibility, treated as query_url)
238+
query_url: Query endpoint URL via Flight SQL (e.g., 'grpc://localhost:1602')
239+
admin_url: Optional Admin API URL (e.g., 'http://localhost:8080')
240+
auth_token: Optional Bearer token for Admin API authentication
241+
242+
Example:
243+
>>> # Query-only client (backward compatible)
244+
>>> client = Client(url='grpc://localhost:1602')
245+
>>>
246+
>>> # Client with admin capabilities
247+
>>> client = Client(
248+
... query_url='grpc://localhost:1602',
249+
... admin_url='http://localhost:8080'
250+
... )
251+
"""
252+
253+
def __init__(
254+
self,
255+
url: Optional[str] = None,
256+
query_url: Optional[str] = None,
257+
admin_url: Optional[str] = None,
258+
auth_token: Optional[str] = None,
259+
):
260+
# Backward compatibility: url parameter → query_url
261+
if url and not query_url:
262+
query_url = url
263+
264+
# Initialize Flight SQL client
265+
if query_url:
266+
self.conn = flight.connect(query_url)
267+
else:
268+
raise ValueError('Either url or query_url must be provided for Flight SQL connection')
124269

125-
def __init__(self, url):
126-
self.conn = flight.connect(url)
270+
# Initialize managers
127271
self.connection_manager = ConnectionManager()
128272
self.label_manager = LabelManager()
129273
self.logger = logging.getLogger(__name__)
130274

275+
# Initialize optional Admin API client
276+
if admin_url:
277+
from amp.admin.client import AdminClient
278+
279+
self._admin_client = AdminClient(admin_url, auth_token)
280+
else:
281+
self._admin_client = None
282+
131283
def sql(self, query: str) -> QueryBuilder:
132284
"""
133285
Create a chainable query builder
@@ -164,6 +316,69 @@ def get_available_loaders(self) -> List[str]:
164316
"""Get list of available data loaders"""
165317
return get_available_loaders()
166318

319+
# Admin API access (optional, requires admin_url)
320+
@property
321+
def datasets(self):
322+
"""Access datasets client for Admin API operations.
323+
324+
Returns:
325+
DatasetsClient for dataset registration, deployment, and management
326+
327+
Raises:
328+
ValueError: If admin_url was not provided during Client initialization
329+
330+
Example:
331+
>>> client = Client(query_url='...', admin_url='http://localhost:8080')
332+
>>> datasets = client.datasets.list_all()
333+
"""
334+
if not self._admin_client:
335+
raise ValueError(
336+
'Admin API not configured. Provide admin_url parameter to Client() '
337+
'to enable dataset management operations.'
338+
)
339+
return self._admin_client.datasets
340+
341+
@property
342+
def jobs(self):
343+
"""Access jobs client for Admin API operations.
344+
345+
Returns:
346+
JobsClient for job monitoring and management
347+
348+
Raises:
349+
ValueError: If admin_url was not provided during Client initialization
350+
351+
Example:
352+
>>> client = Client(query_url='...', admin_url='http://localhost:8080')
353+
>>> job = client.jobs.get(123)
354+
"""
355+
if not self._admin_client:
356+
raise ValueError(
357+
'Admin API not configured. Provide admin_url parameter to Client() to enable job monitoring operations.'
358+
)
359+
return self._admin_client.jobs
360+
361+
@property
362+
def schema(self):
363+
"""Access schema client for Admin API operations.
364+
365+
Returns:
366+
SchemaClient for SQL query schema analysis
367+
368+
Raises:
369+
ValueError: If admin_url was not provided during Client initialization
370+
371+
Example:
372+
>>> client = Client(query_url='...', admin_url='http://localhost:8080')
373+
>>> schema_resp = client.schema.get_output_schema('SELECT * FROM eth.blocks', True)
374+
"""
375+
if not self._admin_client:
376+
raise ValueError(
377+
'Admin API not configured. Provide admin_url parameter to Client() '
378+
'to enable schema analysis operations.'
379+
)
380+
return self._admin_client.schema
381+
167382
# Existing methods for backward compatibility
168383
def get_sql(self, query, read_all=False):
169384
"""Execute SQL query and return Arrow data"""

0 commit comments

Comments
 (0)