Skip to content

Commit e3f8f9d

Browse files
feat: improve error handling
1 parent cd55bcd commit e3f8f9d

File tree

5 files changed

+49
-22
lines changed

5 files changed

+49
-22
lines changed

influxdb_client_3/__init__.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
from typing import Any
55

66
import pyarrow as pa
7+
from pyarrow import ArrowException
78

9+
from influxdb_client_3.influxdb_client_error import InfluxdbClientQueryError
810
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
911
from influxdb_client_3.read_file import UploadFile
1012
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
@@ -400,8 +402,8 @@ def query(self, query: str, language: str = "sql", mode: str = "all", database:
400402

401403
try:
402404
return self._query_api.query(query=query, language=language, mode=mode, database=database, **kwargs)
403-
except InfluxDBError as e:
404-
raise e
405+
except ArrowException as e:
406+
raise InfluxdbClientQueryError(f"Error while executing query: {e}")
405407

406408
async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
407409
"""Query data from InfluxDB asynchronously.
@@ -428,13 +430,13 @@ async def query_async(self, query: str, language: str = "sql", mode: str = "all"
428430
database = self._database
429431

430432
try:
431-
return await self._query_api.query_async(query=query,
433+
return await self._query_api.query_async(query=query,
432434
language=language,
433435
mode=mode,
434436
database=database,
435437
**kwargs)
436-
except InfluxDBError as e:
437-
raise e
438+
except ArrowException as e:
439+
raise InfluxdbClientQueryError(f"Error while executing query: {e}")
438440

439441
def close(self):
440442
"""Close the client and clean up resources."""
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
class InfluxDBClientError(Exception):
2+
pass
3+
4+
5+
class InfluxdbClientQueryError(InfluxDBClientError):
6+
def __init__(self, error_message, *args, **kwargs):
7+
super().__init__(error_message, *args, **kwargs)
8+
self.message = error_message
9+
10+
11+
class InfluxdbClientWriteError(InfluxDBClientError):
12+
pass

influxdb_client_3/query/query_api.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,12 @@ def query(self, query: str, language: str, mode: str, database: str, **kwargs):
162162
It should be a ``dictionary`` of key-value pairs.
163163
:return: The query result in the specified mode.
164164
"""
165-
try:
166-
ticket, _options = self._prepare_query(query, language, database, **kwargs)
165+
ticket, _options = self._prepare_query(query, language, database, **kwargs)
167166

168-
flight_reader = self._do_get(ticket, _options)
167+
flight_reader = self._do_get(ticket, _options)
168+
169+
return self._translate_stream_reader(flight_reader, mode)
169170

170-
return self._translate_stream_reader(flight_reader, mode)
171-
except Exception as e:
172-
raise e
173171

174172
async def query_async(self, query: str, language: str, mode: str, database: str, **kwargs):
175173
"""Query data from InfluxDB asynchronously.
@@ -187,16 +185,13 @@ async def query_async(self, query: str, language: str, mode: str, database: str,
187185
It should be a ``dictionary`` of key-value pairs.
188186
:return: The query result in the specified mode.
189187
"""
190-
try:
191-
ticket, options = self._prepare_query(query, language, database, **kwargs)
192-
loop = asyncio.get_running_loop()
193-
_flight_reader = await loop.run_in_executor(None,
194-
self._flight_client.do_get, ticket, options)
195-
return await loop.run_in_executor(None, self._translate_stream_reader,
188+
ticket, options = self._prepare_query(query, language, database, **kwargs)
189+
loop = asyncio.get_running_loop()
190+
_flight_reader = await loop.run_in_executor(None,
191+
self._flight_client.do_get, ticket, options)
192+
return await loop.run_in_executor(None, self._translate_stream_reader,
196193
_flight_reader,
197194
mode)
198-
except Exception as e:
199-
raise e
200195

201196
def _translate_stream_reader(self, reader: FlightStreamReader, mode: str):
202197
from influxdb_client_3 import polars as has_polars

tests/test_influxdb_client_3.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
from unittest.mock import patch
33

44
from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions, Point, WriteOptions, WriteType
5+
from influxdb_client_3.influxdb_client_error import InfluxdbClientQueryError
56
from tests.util import asyncio_run
6-
from tests.util.mocks import ConstantFlightServer, ConstantData
7+
from tests.util.mocks import ConstantFlightServer, ConstantData, ErrorFlightServer
78

89

910
class TestInfluxDBClient3(unittest.TestCase):
@@ -142,6 +143,20 @@ def test_parse_invalid_write_precision(self):
142143
InfluxDBClient3.from_env()
143144
self.assertIn("Invalid precision value: invalid_value", str(context.exception))
144145

146+
def test_query_with_arrow_error(self):
147+
f = ErrorFlightServer()
148+
with InfluxDBClient3(f"http://localhost:{f.port}", "my_org", "my_db", "my_token") as c:
149+
with self.assertRaises(InfluxdbClientQueryError) as err:
150+
c.query("SELECT * FROM my_data")
151+
self.assertIn("Error while executing query", str(err.exception))
152+
153+
@asyncio_run
154+
async def test_async_query_with_arrow_error(self):
155+
f = ErrorFlightServer()
156+
with InfluxDBClient3(f"http://localhost:{f.port}", "my_org", "my_db", "my_token") as c:
157+
with self.assertRaises(InfluxdbClientQueryError) as err:
158+
await c.query_async("SELECT * FROM my_data")
159+
self.assertIn("Error while executing query", str(err.exception))
145160

146161
if __name__ == '__main__':
147162
unittest.main()

tests/util/mocks.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@
55
from pyarrow import (
66
array,
77
Table,
8-
concat_tables
8+
concat_tables, ArrowException
99
)
10-
1110
from pyarrow.flight import (
1211
FlightServerBase,
1312
RecordBatchStream,
@@ -158,3 +157,7 @@ def number_batches(table):
158157
for idx, batch in enumerate(table.to_batches()):
159158
buf = struct.pack('<i', idx)
160159
yield batch, buf
160+
161+
class ErrorFlightServer(FlightServerBase):
162+
def do_get(self, context, ticket):
163+
raise ArrowException

0 commit comments

Comments
 (0)