Skip to content

Commit 9885b01

Browse files
committed
update table, columns and compiler
1 parent 1e0970d commit 9885b01

File tree

1 file changed

+153
-17
lines changed

1 file changed

+153
-17
lines changed

__init__.py

Lines changed: 153 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,19 @@ def connect(username: Optional[str] = None,
176176

177177
# SQLAlchemy dialect
178178
class ParseableCompiler(compiler.SQLCompiler):
179-
def visit_select(self, select, **kwargs):
180-
return super().visit_select(select, **kwargs)
179+
def visit_table(self, table, asfrom=False, iscrud=False, ashint=False, fromhints=None, **kwargs):
180+
# Get the original table representation
181+
text = super().visit_table(table, asfrom, iscrud, ashint, fromhints, **kwargs)
182+
183+
# Remove schema prefix (anything before the dot)
184+
if '.' in text:
185+
return text.split('.')[-1]
186+
return text
181187

182188
class ParseableDialect(default.DefaultDialect):
183189
name = 'parseable'
184190
driver = 'rest'
191+
statement_compiler = ParseableCompiler
185192

186193
supports_alter = False
187194
supports_pk_autoincrement = False
@@ -215,23 +222,153 @@ def do_ping(self, dbapi_connection):
215222
return False
216223

217224
def get_columns(self, connection: Connection, table_name: str, schema: Optional[str] = None, **kw) -> List[Dict]:
218-
return [
219-
{
220-
'name': 'timestamp',
221-
'type': types.TIMESTAMP(),
222-
'nullable': True,
223-
'default': None,
224-
},
225-
{
226-
'name': 'message',
227-
'type': types.String(),
228-
'nullable': True,
229-
'default': None,
225+
try:
226+
# Get host and credentials from the connection object
227+
host = connection.engine.url.host
228+
port = connection.engine.url.port
229+
username = connection.engine.url.username
230+
password = connection.engine.url.password
231+
base_url = f"http://{host}:{port}"
232+
233+
# Prepare the headers for authorization
234+
credentials = f"{username}:{password}"
235+
encoded_credentials = base64.b64encode(credentials.encode()).decode()
236+
headers = {
237+
'Authorization': f'Basic {encoded_credentials}',
230238
}
231-
]
239+
240+
# Fetch the schema for the given table (log stream)
241+
response = requests.get(f"{base_url}/api/v1/logstream/{table_name}/schema", headers=headers)
242+
243+
# Log the response details for debugging
244+
print(f"Debug: Fetching schema for {table_name} from {base_url}/api/v1/logstream/{table_name}/schema", file=sys.stderr)
245+
print(f"Response Status: {response.status_code}", file=sys.stderr)
246+
print(f"Response Content: {response.text}", file=sys.stderr)
247+
248+
if response.status_code != 200:
249+
raise DatabaseError(f"Failed to fetch schema for {table_name}: {response.text}")
250+
251+
# Parse the schema response
252+
schema_data = response.json()
253+
254+
if not isinstance(schema_data, dict) or 'fields' not in schema_data:
255+
raise DatabaseError(f"Unexpected schema format for {table_name}: {response.text}")
256+
257+
columns = []
258+
259+
# Map each field to a SQLAlchemy column descriptor
260+
for field in schema_data['fields']:
261+
column_name = field['name']
262+
data_type = field['data_type']
263+
nullable = field['nullable']
264+
265+
# Map Parseable data types to SQLAlchemy types
266+
if data_type == 'Utf8':
267+
sql_type = types.String()
268+
elif data_type == 'Int64':
269+
sql_type = types.BigInteger()
270+
elif data_type == 'Float64':
271+
sql_type = types.Float()
272+
else:
273+
sql_type = types.String() # Default type if unknown
274+
275+
# Append column definition to columns list
276+
columns.append({
277+
'name': column_name,
278+
'type': sql_type,
279+
'nullable': nullable,
280+
'default': None, # Assuming no default for now, adjust as needed
281+
})
282+
283+
return columns
284+
285+
except Exception as e:
286+
raise DatabaseError(f"Error fetching columns for {table_name}: {str(e)}")
287+
232288

233289
def get_table_names(self, connection: Connection, schema: Optional[str] = None, **kw) -> List[str]:
234-
return ["adheip"]
290+
"""
291+
Fetch the list of log streams (tables) from the Parseable instance.
292+
293+
:param connection: SQLAlchemy Connection object.
294+
:param schema: Optional schema (not used for Parseable).
295+
:param kw: Additional keyword arguments.
296+
:return: List of table names (log streams).
297+
"""
298+
try:
299+
# Get host and credentials from the connection object
300+
host = connection.engine.url.host
301+
port = connection.engine.url.port
302+
username = connection.engine.url.username
303+
password = connection.engine.url.password
304+
base_url = f"http://{host}:{port}"
305+
306+
# Prepare the headers
307+
credentials = f"{username}:{password}"
308+
encoded_credentials = base64.b64encode(credentials.encode()).decode()
309+
headers = {
310+
'Authorization': f'Basic {encoded_credentials}',
311+
}
312+
313+
# Make the GET request
314+
response = requests.get(f"{base_url}/api/v1/logstream", headers=headers)
315+
316+
# Log the response details for debugging
317+
print(f"Debug: Fetching table names from {base_url}/api/v1/logstream", file=sys.stderr)
318+
print(f"Response Status: {response.status_code}", file=sys.stderr)
319+
print(f"Response Content: {response.text}", file=sys.stderr)
320+
321+
if response.status_code != 200:
322+
raise DatabaseError(f"Failed to fetch table names: {response.text}")
323+
324+
# Parse the response JSON
325+
log_streams = response.json()
326+
if not isinstance(log_streams, list):
327+
raise DatabaseError(f"Unexpected response format: {response.text}")
328+
329+
# Extract table names (log stream names)
330+
return [stream['name'] for stream in log_streams if 'name' in stream]
331+
except Exception as e:
332+
raise DatabaseError(f"Error fetching table names: {str(e)}")
333+
334+
def has_table(self, connection: Connection, table_name: str, schema: Optional[str] = None, **kw) -> bool:
335+
"""
336+
Check if a table (log stream) exists in Parseable.
337+
338+
:param connection: SQLAlchemy Connection object
339+
:param table_name: Name of the table (log stream) to check
340+
:param schema: Schema name (not used for Parseable)
341+
:return: True if the table exists, False otherwise
342+
"""
343+
try:
344+
# Get connection details
345+
host = connection.engine.url.host
346+
port = connection.engine.url.port
347+
username = connection.engine.url.username
348+
password = connection.engine.url.password
349+
base_url = f"http://{host}:{port}"
350+
351+
# Prepare headers
352+
credentials = f"{username}:{password}"
353+
encoded_credentials = base64.b64encode(credentials.encode()).decode()
354+
headers = {
355+
'Authorization': f'Basic {encoded_credentials}',
356+
}
357+
358+
# Make request to list log streams
359+
response = requests.get(f"{base_url}/api/v1/logstream", headers=headers)
360+
361+
if response.status_code != 200:
362+
return False
363+
364+
log_streams = response.json()
365+
366+
# Check if the table name exists in the list of log streams
367+
return any(stream['name'] == table_name for stream in log_streams if 'name' in stream)
368+
369+
except Exception as e:
370+
print(f"Error checking table existence: {str(e)}", file=sys.stderr)
371+
return False
235372

236373
def get_view_names(self, connection: Connection, schema: Optional[str] = None, **kw) -> List[str]:
237374
return []
@@ -256,4 +393,3 @@ def _check_unicode_returns(self, connection: Connection, additional_tests: Optio
256393

257394
def _check_unicode_description(self, connection: Connection):
258395
pass
259-

0 commit comments

Comments
 (0)