1
+ # parseable_connector.py
2
+ from typing import Any , Dict , List , Optional , Tuple
3
+ from datetime import datetime
4
+ import requests
5
+ import json
6
+ import sys
7
+ from sqlalchemy .engine import default
8
+ from sqlalchemy .sql import compiler
9
+ from sqlalchemy import types
10
+ from sqlalchemy .engine import reflection
11
+ from sqlalchemy .engine .base import Connection
12
+ from sqlalchemy .engine .interfaces import Dialect
13
+ import base64
14
+
15
+ # DBAPI required attributes
16
+ apilevel = '2.0'
17
+ threadsafety = 1
18
+ paramstyle = 'named'
19
+
20
+ # DBAPI exceptions
21
+ class Error (Exception ):
22
+ pass
23
+
24
+ class InterfaceError (Error ):
25
+ pass
26
+
27
+ class DatabaseError (Error ):
28
+ pass
29
+
30
+ def parse_timestamp (timestamp_str : str ) -> datetime :
31
+ try :
32
+ return datetime .fromisoformat (timestamp_str .replace ('Z' , '+00:00' ))
33
+ except ValueError :
34
+ return None
35
+
36
+ class ParseableCursor :
37
+ def __init__ (self , connection ):
38
+ self .connection = connection
39
+ self ._rows = []
40
+ self ._rowcount = 0
41
+ self .description = None
42
+ self .arraysize = 1
43
+
44
+ def execute (self , operation : str , parameters : Optional [Dict ] = None ):
45
+ # Extract time range from query parameters if provided
46
+ start_time = "10m" # default
47
+ end_time = "now" # default
48
+
49
+ if parameters and 'start_time' in parameters :
50
+ start_time = parameters ['start_time' ]
51
+ if parameters and 'end_time' in parameters :
52
+ end_time = parameters ['end_time' ]
53
+
54
+ # Prepare request
55
+ headers = {
56
+ 'Content-Type' : 'application/json' ,
57
+ 'Authorization' : f'Basic { self .connection .credentials } '
58
+ }
59
+
60
+ data = {
61
+ 'query' : operation ,
62
+ 'startTime' : start_time ,
63
+ 'endTime' : end_time
64
+ }
65
+
66
+
67
+ # Log the request details
68
+ print ("Debug: Sending request to Parseable" , file = sys .stderr )
69
+ print (f"URL: { self .connection .host } /api/v1/query" , file = sys .stderr )
70
+ print (f"Headers: { headers } " , file = sys .stderr )
71
+ print (f"Payload: { json .dumps (data , indent = 2 )} " , file = sys .stderr )
72
+
73
+
74
+ # Make request to Parseable
75
+ response = requests .post (
76
+ f"{ self .connection .host } /api/v1/query" ,
77
+ headers = headers ,
78
+ json = data
79
+ )
80
+
81
+ print (f"Response Status: { response .status_code } " , file = sys .stderr )
82
+ print (f"Response Content: { response .text } " , file = sys .stderr )
83
+
84
+ if response .status_code != 200 :
85
+ raise DatabaseError (f"Query failed: { response .text } " )
86
+
87
+ # Process response
88
+ result = response .json ()
89
+
90
+ if not result :
91
+ self ._rows = []
92
+ self ._rowcount = 0
93
+ self .description = None
94
+ return
95
+
96
+ # Set up column descriptions (required for DBAPI compliance)
97
+ if result and len (result ) > 0 :
98
+ first_row = result [0 ]
99
+ self .description = []
100
+ for column_name in first_row .keys ():
101
+ # (name, type_code, display_size, internal_size, precision, scale, null_ok)
102
+ self .description .append ((column_name , None , None , None , None , None , None ))
103
+
104
+ self ._rows = result
105
+ self ._rowcount = len (result )
106
+
107
+ def executemany (self , operation : str , seq_of_parameters : List [Dict ]):
108
+ raise NotImplementedError ("executemany is not supported" )
109
+
110
+ def fetchall (self ) -> List [Tuple ]:
111
+ return [tuple (row .values ()) for row in self ._rows ]
112
+
113
+ def fetchone (self ) -> Optional [Tuple ]:
114
+ if not self ._rows :
115
+ return None
116
+ return tuple (self ._rows .pop (0 ).values ())
117
+
118
+ def fetchmany (self , size : Optional [int ] = None ) -> List [Tuple ]:
119
+ if size is None :
120
+ size = self .arraysize
121
+ result = self ._rows [:size ]
122
+ self ._rows = self ._rows [size :]
123
+ return [tuple (row .values ()) for row in result ]
124
+
125
+ @property
126
+ def rowcount (self ) -> int :
127
+ return self ._rowcount
128
+
129
+ def close (self ):
130
+ self ._rows = []
131
+
132
+ def setinputsizes (self , sizes ):
133
+ pass
134
+
135
+ def setoutputsize (self , size , column = None ):
136
+ pass
137
+
138
+ class ParseableConnection :
139
+ def __init__ (self , host : str , port : str , username : str , password : str ):
140
+ self .host = f"http://{ host } :{ port } " .rstrip ('/' )
141
+ credentials = f"{ username } :{ password } "
142
+ self .credentials = base64 .b64encode (credentials .encode ()).decode ()
143
+
144
+ def cursor (self ):
145
+ return ParseableCursor (self )
146
+
147
+ def close (self ):
148
+ pass
149
+
150
+ def commit (self ):
151
+ pass
152
+
153
+ def rollback (self ):
154
+ pass
155
+
156
+ def connect (username : Optional [str ] = None ,
157
+ password : Optional [str ] = None ,
158
+ host : Optional [str ] = None ,
159
+ port : Optional [str ] = None ,
160
+ ** kwargs ) -> ParseableConnection :
161
+ """
162
+ Connect to a Parseable instance.
163
+
164
+ :param username: Username for authentication (default: admin)
165
+ :param password: Password for authentication (default: admin)
166
+ :param host: Host address (default: localhost)
167
+ :param port: Port number (default: 8000)
168
+ :return: ParseableConnection object
169
+ """
170
+ username = username or 'admin'
171
+ password = password or 'admin'
172
+ host = host or 'localhost'
173
+ port = port or '8000'
174
+
175
+ return ParseableConnection (host = host , port = port , username = username , password = password )
176
+
177
+ # SQLAlchemy dialect
178
+ class ParseableCompiler (compiler .SQLCompiler ):
179
+ def visit_select (self , select , ** kwargs ):
180
+ return super ().visit_select (select , ** kwargs )
181
+
182
+ class ParseableDialect (default .DefaultDialect ):
183
+ name = 'parseable'
184
+ driver = 'rest'
185
+
186
+ supports_alter = False
187
+ supports_pk_autoincrement = False
188
+ supports_default_values = False
189
+ supports_empty_insert = False
190
+ supports_unicode_statements = True
191
+ supports_unicode_binds = True
192
+ returns_unicode_strings = True
193
+ description_encoding = None
194
+ supports_native_boolean = True
195
+
196
+ @classmethod
197
+ def dbapi (cls ):
198
+ return sys .modules [__name__ ]
199
+
200
+ def create_connect_args (self , url ):
201
+ kwargs = {
202
+ 'host' : url .host or 'localhost' ,
203
+ 'port' : str (url .port or 8000 ),
204
+ 'username' : url .username or 'admin' ,
205
+ 'password' : url .password or 'admin'
206
+ }
207
+ return [], kwargs
208
+
209
+ def do_ping (self , dbapi_connection ):
210
+ try :
211
+ cursor = dbapi_connection .cursor ()
212
+ cursor .execute ('SELECT * FROM "adheip" LIMIT 1' )
213
+ return True
214
+ except :
215
+ return False
216
+
217
+ def get_columns (self , connection : Connection , table_name : str , schema : Optional [str ] = None , ** kw ) -> List [Dict ]:
218
+ return [
219
+ {
220
+ 'name' : 'timestamp' ,
221
+ 'type' : types .TIMESTAMP (),
222
+ 'nullable' : True ,
223
+ 'default' : None ,
224
+ },
225
+ {
226
+ 'name' : 'message' ,
227
+ 'type' : types .String (),
228
+ 'nullable' : True ,
229
+ 'default' : None ,
230
+ }
231
+ ]
232
+
233
+ def get_table_names (self , connection : Connection , schema : Optional [str ] = None , ** kw ) -> List [str ]:
234
+ return ["adheip" ]
235
+
236
+ def get_view_names (self , connection : Connection , schema : Optional [str ] = None , ** kw ) -> List [str ]:
237
+ return []
238
+
239
+ def get_schema_names (self , connection : Connection , ** kw ) -> List [str ]:
240
+ return ['default' ]
241
+
242
+ def get_pk_constraint (self , connection : Connection , table_name : str , schema : Optional [str ] = None , ** kw ) -> Dict [str , Any ]:
243
+ return {'constrained_columns' : [], 'name' : None }
244
+
245
+ def get_foreign_keys (self , connection : Connection , table_name : str , schema : Optional [str ] = None , ** kw ) -> List [Dict [str , Any ]]:
246
+ return []
247
+
248
+ def get_indexes (self , connection : Connection , table_name : str , schema : Optional [str ] = None , ** kw ) -> List [Dict [str , Any ]]:
249
+ return []
250
+
251
+ def do_rollback (self , dbapi_connection ):
252
+ pass
253
+
254
+ def _check_unicode_returns (self , connection : Connection , additional_tests : Optional [List ] = None ):
255
+ pass
256
+
257
+ def _check_unicode_description (self , connection : Connection ):
258
+ pass
259
+
0 commit comments