Skip to content

Commit a436699

Browse files
committed
test: adds integration test for client.query_async()
1 parent 299bc83 commit a436699

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

tests/test_influxdb_client_3_integration.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from pyarrow._flight import FlightError
1111

1212
from influxdb_client_3 import InfluxDBClient3, InfluxDBError, write_client_options, WriteOptions
13+
from tests.util import asyncio_run, lp_to_py_object
1314

1415

1516
def random_hex(len=6):
@@ -249,3 +250,27 @@ def test_verify_ssl_false(self):
249250
assert len(list_results) > 0
250251
finally:
251252
self.remove_test_cert(cert_file)
253+
254+
@asyncio_run
255+
async def test_verify_query_async(self):
256+
measurement = f'test{random_hex(6)}'
257+
data = []
258+
lp_template = "%s,location=%s val=%f,ival=%di,index=%di %d"
259+
data_size = 10
260+
interval = 1_000_000_000 * 10
261+
ts = time.time_ns() - interval * data_size
262+
locations = ['springfield', 'gotham', 'balbec', 'yonville']
263+
for i in range(data_size):
264+
data.append(lp_template % (measurement, locations[random.randint(0, len(locations) - 1)],
265+
random.random() * 10,
266+
random.randint(0, 6), i, ts))
267+
ts = ts + interval
268+
269+
self.client.write(data)
270+
query = f"SELECT * FROM \"{measurement}\" ORDER BY time DESC"
271+
272+
result = await self.client.query_async(query)
273+
274+
result_list = result.to_pylist()
275+
for item in data:
276+
assert lp_to_py_object(item) in result_list, f"original lp data \"{item}\" should be in result list"

tests/util/__init__.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import sys
55
import traceback
66

7+
import pandas
8+
79

810
def asyncio_run(async_func):
911
def wrapper(*args, **kwargs):
@@ -15,3 +17,44 @@ def wrapper(*args, **kwargs):
1517

1618
wrapper.__signature__ = inspect.signature(async_func)
1719
return wrapper
20+
21+
22+
def lp_to_py_object(lp: str):
23+
"""
24+
Result format matches the format of objects returned in pyarrow.Table.to_pylist.
25+
26+
For verifying test data returned from queries.
27+
28+
:param lp: a lineprotocol formatted string
29+
:return: a list object
30+
"""
31+
result = {}
32+
groups = lp.split(' ')
33+
34+
tags = groups[0].split(',')
35+
tags.remove(tags[0])
36+
for tag in tags:
37+
t_set = tag.split('=')
38+
result[t_set[0]] = t_set[1]
39+
40+
fields = groups[1].split(',')
41+
for field in fields:
42+
f_set = field.split('=')
43+
lastchar = f_set[1][len(f_set[1]) - 1]
44+
match lastchar:
45+
case 'i': # integer
46+
result[f_set[0]] = int(f_set[1].replace('i',''))
47+
case 'u': # unsigned integer
48+
result[f_set[0]] = int(f_set[1].replace('u',''))
49+
case '"': # string
50+
result[f_set[0]] = f_set[1].replace('"',"")
51+
case 'e' | 'E' | 't' | 'T' | 'f' | 'F':
52+
if f_set[1][0].lower() == 't':
53+
result[f_set[0]] = True
54+
else:
55+
result[f_set[0]] = False
56+
case _: # assume float
57+
result[f_set[0]] = float(f_set[1])
58+
59+
result['time'] = pandas.Timestamp(int(groups[2]))
60+
return result

0 commit comments

Comments
 (0)