|
38 | 38 | class BoilingData: |
39 | 39 | """Run SQL with BoilingData and local DuckDB""" |
40 | 40 |
|
41 | | - def __init__(self, log_level=logging.ERROR): |
| 41 | + def __init__(self, log_level=logging.DEBUG): |
| 42 | + logging.basicConfig() |
42 | 43 | self.log_level = log_level |
43 | 44 | self.logger = logging.getLogger("BoilingData") |
44 | 45 | self.logger.setLevel(self.log_level) |
45 | 46 | self.bd_conn = BoilingDataConnection(log_level=self.log_level) |
46 | 47 | self.conn = duckdb.connect(":memory:") |
47 | 48 |
|
48 | 49 | async def _populate(self): |
| 50 | + self.logger.debug("Creating local boilingdata data catalog") |
49 | 51 | self.conn.execute("ATTACH ':memory:' AS boilingdata;") |
50 | 52 | self.conn.execute("SET search_path='memory,boilingdata';") |
51 | 53 | # Boiling specific table, contains data shares |
52 | 54 | q = "SELECT * FROM information_schema.create_tables_statements" |
53 | 55 | tables = await self.execute(q, None, True) |
54 | 56 | if tables: |
55 | 57 | for table in tables: |
| 58 | + self.logger.debug(f"Creating table {table}") |
56 | 59 | self.conn.execute(table) |
57 | 60 |
|
58 | 61 | def _is_boiling_execute(self, sql): |
@@ -98,6 +101,7 @@ async def close(self): |
98 | 101 |
|
99 | 102 | async def execute(self, sql, cb=None, force_boiling=False): |
100 | 103 | """Send SQL Query to Boiling or run locally""" |
| 104 | + sql = sql.replace("\n", " ") |
101 | 105 | if not force_boiling and not self._is_boiling_execute(sql): |
102 | 106 | return self.conn.execute(sql).fetchall() |
103 | 107 | fut = await self.bd_conn.bd_execute(sql, cb) |
@@ -199,7 +203,10 @@ def _on_msg(self, ws_app, data): |
199 | 203 | if not reqId: |
200 | 204 | return |
201 | 205 | msg_type = msg.get("messageType") |
202 | | - # TODO: Store statistics sent from Boiling (INFO messages) |
| 206 | + if msg_type == "LOG_MESSAGE": |
| 207 | + log_level = msg.get("logLevel") |
| 208 | + if log_level == "ERROR": |
| 209 | + raise Exception(msg.get("logMessage")) |
203 | 210 | if msg_type != "DATA": |
204 | 211 | return |
205 | 212 | req = self.requests.get(reqId) |
@@ -252,6 +259,7 @@ async def connect(self): |
252 | 259 | """Connect to BoilingData WebSocket API""" |
253 | 260 | if self.websocket is not None: |
254 | 261 | raise Exception("WebSocket already exists") |
| 262 | + self.logger.info("Connecting") |
255 | 263 | self.websocket = websocket.WebSocket() |
256 | 264 | websocket.enableTrace(self.ws_trace) |
257 | 265 | auth_headers = self._get_auth_headers() |
|
0 commit comments