Skip to content

Commit d33dd31

Browse files
committed
Simpler API
1 parent 440a049 commit d33dd31

File tree

7 files changed

+135
-113
lines changed

7 files changed

+135
-113
lines changed

README.md

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,14 @@ A python client for sending SQL queries and receive results from BoilingData Web
1414
1515
```python
1616
import asyncio
17-
from pprint import pprint
1817
from py_boilingdata import BoilingData
1918

20-
2119
async def main():
2220
boiling = BoilingData()
2321
await boiling.connect()
24-
await boiling.populate()
25-
26-
def cb(resp):
27-
pprint(resp)
28-
2922
q = "SELECT first_name, email FROM parquet_scan('s3://boilingdata-demo/test.parquet') LIMIT 10"
30-
await boiling.execute(q, cb)
31-
await asyncio.sleep(10) # There is no way to await for the cb call..
32-
print("DONE.")
23+
resp = await boiling.execute(q)
24+
print(resp)
3325

3426
loop = asyncio.get_event_loop()
3527
loop.run_until_complete(main())

main.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
import asyncio
2-
from pprint import pprint
32
from py_boilingdata import BoilingData
43

4+
boiling = BoilingData()
5+
56

67
async def main():
7-
boiling = BoilingData()
88
await boiling.connect()
9-
await boiling.populate()
10-
11-
def cb(resp):
12-
pprint(resp)
13-
14-
q = "SELECT first_name, email FROM parquet_scan('s3://boilingdata-demo/test.parquet') LIMIT 10"
15-
await boiling.execute(q, cb)
16-
await asyncio.sleep(10)
17-
print("DONE.")
9+
results = await boiling.execute(
10+
"""
11+
SELECT first_name, email
12+
FROM parquet_scan('s3://boilingdata-demo/test.parquet')
13+
LIMIT 2
14+
"""
15+
)
16+
print(results)
17+
await boiling.close()
1818

1919

2020
loop = asyncio.get_event_loop()

py_boilingdata/__init__.py

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
"""BoilingData Client"""
2-
import os, json, uuid, time
3-
import rel
4-
import duckdb
2+
import os, json, uuid
3+
import logging
54
import threading
5+
import duckdb
66
import websocket
77
import boto3
88
import asyncio
99
import botocore.auth
10-
from pprint import pprint
1110
from botocore.exceptions import NoCredentialsError
1211
from botocore.awsrequest import AWSRequest
1312
from botocore.credentials import Credentials
@@ -39,22 +38,22 @@
3938
class BoilingData:
4039
"""Run SQL with BoilingData and local DuckDB"""
4140

42-
def __init__(self):
43-
self.bd_conn = BoilingDataConnection()
41+
def __init__(self, log_level=logging.ERROR):
42+
self.log_level = log_level
43+
self.logger = logging.getLogger("BoilingData")
44+
self.logger.setLevel(self.log_level)
45+
self.bd_conn = BoilingDataConnection(log_level=self.log_level)
4446
self.conn = duckdb.connect(":memory:")
4547

46-
async def populate(self):
48+
async def _populate(self):
4749
self.conn.execute("ATTACH ':memory:' AS boilingdata;")
4850
self.conn.execute("SET search_path='memory,boilingdata';")
49-
# Boiling specific "information_schema" table
51+
# Boiling specific table, contains data shares
5052
q = "SELECT * FROM information_schema.create_tables_statements"
51-
52-
def cb(bd_tables):
53-
if bd_tables:
54-
for table in bd_tables:
55-
self.conn.execute(table)
56-
57-
await self.bd_conn.bd_execute(q, cb)
53+
tables = await self.execute(q, None, True)
54+
if tables:
55+
for table in tables:
56+
self.conn.execute(table)
5857

5958
def _is_boiling_execute(self, sql):
6059
## 1) Get all Boiling tables so we know what to intercept
@@ -91,22 +90,32 @@ def _is_boiling_execute(self, sql):
9190
async def connect(self):
9291
"""Connect to BoilingData"""
9392
await self.bd_conn.connect()
93+
await self._populate() # get catalog entries
9494

9595
async def close(self):
9696
"""Close WebSocket connection to Boiling"""
9797
await self.bd_conn.close()
9898

99-
async def execute(self, sql, cb):
99+
async def execute(self, sql, cb=None, force_boiling=False):
100100
"""Send SQL Query to Boiling or run locally"""
101-
if not self._is_boiling_execute(sql):
101+
if not force_boiling and not self._is_boiling_execute(sql):
102102
return self.conn.execute(sql).fetchall()
103-
return await self.bd_conn.bd_execute(sql, cb)
103+
fut = await self.bd_conn.bd_execute(sql, cb)
104+
if cb is not None:
105+
return
106+
# TODO: Get rid of this while loop?!
107+
while not fut.done():
108+
await asyncio.sleep(0.005)
109+
return fut.result()
104110

105111

106112
class BoilingDataConnection:
107113
"""Create authenticated WebSocket connection to BoilingData"""
108114

109-
def __init__(self, region=AWS_REGION):
115+
def __init__(self, region=AWS_REGION, log_level=logging.ERROR):
116+
self.log_level = log_level
117+
self.logger = logging.getLogger("BoilingDataConnection")
118+
self.logger.setLevel(self.log_level)
110119
self.region = region
111120
self.username = os.getenv("BD_USERNAME", "")
112121
self.password = os.getenv("BD_PASSWORD", "")
@@ -115,6 +124,7 @@ def __init__(self, region=AWS_REGION):
115124
"Missing username (BD_USERNAME) and/or "
116125
+ "password (BD_PASSWORD) environment variable(s)"
117126
)
127+
self.wsConnectTimeoutS = 10
118128
self.websocket = None
119129
self.aws_creds = None
120130
self.ws_app = None
@@ -150,10 +160,10 @@ def _get_cognito_tokens(self, username, password):
150160
)
151161
return response["AuthenticationResult"]
152162
except self.idp_client.exceptions.NotAuthorizedException as e:
153-
print("The username or password is incorrect.")
163+
self.logger.error("The username or password is incorrect.")
154164
raise e
155165
except NoCredentialsError as e:
156-
print("Credentials not available.")
166+
self.logger.error("Credentials not available.")
157167
raise e
158168

159169
def _get_credentials(self):
@@ -175,20 +185,21 @@ def _get_credentials(self):
175185
return self.aws_creds
176186

177187
async def _ws_send(self, msg):
178-
# print(f"> {msg}")
188+
self.logger.debug(f"> {msg}")
179189
return self.ws_app.send(msg)
180190

181191
def _on_open(self, ws_app):
182-
print("WS OPEN")
192+
self.logger.info("WS OPEN")
183193
self.bd_is_open = True
184194

185195
def _on_msg(self, ws_app, data):
186-
# print(f"< {data}")
196+
self.logger.debug(f"< {data}")
187197
msg = json.loads(data)
188198
reqId = msg.get("requestId")
189199
if not reqId:
190200
return
191201
msg_type = msg.get("messageType")
202+
# TODO: Store statistics sent from Boiling (INFO messages)
192203
if msg_type != "DATA":
193204
return
194205
req = self.requests.get(reqId)
@@ -201,32 +212,41 @@ def _on_msg(self, ws_app, data):
201212
del self.requests[reqId]
202213

203214
def _on_error(self, ws_app, error):
204-
print(f"WS ERROR: {error}")
215+
self.logger.error(f"WS ERROR: {error}")
205216

206217
def _on_close(self, ws_app, code, msg):
207-
print(f"WS CLOSE: {code} {msg}")
208-
209-
##
210-
## public
211-
##
218+
self.logger.info(f"WS CLOSE: {code} {msg}")
219+
self.is_open = False
212220

213221
def _all_messages_received(self, event):
214222
requestId = event["requestId"]
215223
data = event["data"]
216224
cb = self.requests.get(requestId)
217-
cb["callback"](data)
225+
cb.get("callback")(data) if cb.get("callback") else None
226+
cb.get("fut").set_result(data) if cb.get("fut") else None
227+
228+
##
229+
## public
230+
##
218231

219232
async def bd_execute(self, sql, cb):
220-
if self.bd_is_open is not True:
233+
if not self.bd_is_open:
234+
await self.connect()
235+
if not self.bd_is_open:
221236
raise Exception("No Boiling connection")
222237
reqId = uuid.uuid4().hex
223238
body = '{"sql":"' + sql + '","requestId":"' + reqId + '"}'
239+
loop = asyncio.get_running_loop()
240+
fut = loop.create_future()
224241
self.requests[reqId] = {
225-
"q": DataQueue(reqId, self._all_messages_received),
242+
"q": DataQueue(reqId, self._all_messages_received, fut),
243+
"sql": sql,
226244
"reqId": reqId,
227245
"callback": cb,
246+
"future": fut,
228247
}
229248
await self._ws_send(body)
249+
return fut
230250

231251
async def connect(self):
232252
"""Connect to BoilingData WebSocket API"""
@@ -247,7 +267,7 @@ async def connect(self):
247267
wst.daemon = True
248268
wst.start()
249269
timeoutS = 1
250-
while self.bd_is_open is not True and timeoutS < 10:
270+
while not self.bd_is_open and timeoutS < self.wsConnectTimeoutS:
251271
await asyncio.sleep(1)
252272
timeoutS = timeoutS + 1
253273

py_boilingdata/data_queue.py

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@
22

33

44
class DataQueue:
5-
def __init__(self, requestId, callback):
5+
def __init__(self, requestId, callback, fut=None):
66
self.requestId = requestId
77
self.callback = callback
8+
self.fut = fut
89
self.data = []
910
self.batchCounters = dict()
1011
self.parts_done = False
1112
self.have_new_messages = False
1213
self.is_deleted = False
1314
return
1415

15-
def have_all_parts(self):
16+
def _have_all_parts(self):
1617
if self.is_deleted:
1718
raise Exception("Deleted queue!")
1819
if not self.have_new_messages:
@@ -75,29 +76,7 @@ def have_all_parts(self):
7576
self.parts_done = True
7677
return True
7778

78-
def push(self, msg):
79-
if self.is_deleted:
80-
raise Exception("Deleted queue!")
81-
self.data.append(msg)
82-
self.have_new_messages = True
83-
if self.have_all_parts():
84-
self.notify()
85-
return
86-
87-
# Prefer the callback method
88-
def get_data(self):
89-
if self.is_deleted:
90-
raise Exception("Deleted queue!")
91-
if self.is_done():
92-
return self.compile()
93-
return None
94-
95-
def is_done(self):
96-
if self.is_deleted:
97-
raise Exception("Deleted queue!")
98-
return self.parts_done == True or self.have_all_parts()
99-
100-
def compile(self):
79+
def _compile(self):
10180
if self.is_deleted:
10281
raise Exception("Deleted queue!")
10382
data = []
@@ -115,12 +94,42 @@ def compile(self):
11594
data.extend(value3["data"])
11695
return data
11796

118-
def notify(self):
97+
def _notify(self):
11998
if self.is_deleted:
12099
raise Exception("Deleted queue!")
121-
data = self.compile()
100+
data = self._compile()
122101
if self.callback:
102+
# print(f"CALLING CALLBACK {data}")
123103
self.callback({"data": data, "requestId": self.requestId})
104+
if self.fut:
105+
# print(f"SETTING FUT RESULT: {data}")
106+
self.fut.set_result(data)
107+
108+
##
109+
## public
110+
##
111+
112+
def push(self, msg):
113+
if self.is_deleted:
114+
raise Exception("Deleted queue!")
115+
self.data.append(msg)
116+
self.have_new_messages = True
117+
if self._have_all_parts():
118+
self._notify() # callback and future
119+
return
120+
121+
# Prefer the callback method
122+
def get_data(self):
123+
if self.is_deleted:
124+
raise Exception("Deleted queue!")
125+
if self.is_done():
126+
return self._compile()
127+
return None
128+
129+
def is_done(self):
130+
if self.is_deleted:
131+
raise Exception("Deleted queue!")
132+
return self.parts_done == True or self._have_all_parts()
124133

125134
def delete(self):
126135
self.is_deleted = True

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ asyncio_mode = "auto"
33

44
[tool.poetry]
55
name = "py-boilingdata"
6-
version = "0.2.1"
6+
version = "0.2.3"
77
description = "BoilingData (websockets) client for Python"
88
authors = ["Dan Forsberg <dan@boilingdata.com>"]
99
readme = "README.md"

0 commit comments

Comments
 (0)