Skip to content

Commit 2e46872

Browse files
JackieTien97CRZbulabulaHTHou
authored
Support stream DataFrame interface in iotdb python client (#17035)
* Support stream DataFrame interface in iotdb python client * change according to code review * format code * Bug fix "optional" * add IT * fix illegal fetch_size --------- Co-authored-by: Yongzao <532741407@qq.com> Co-authored-by: HTHou <haonan@apache.org>
1 parent 8e5fcba commit 2e46872

File tree

6 files changed

+154
-1
lines changed

6 files changed

+154
-1
lines changed

iotdb-client/client-py/iotdb/Session.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,13 @@ def __init__(
9595
self.__default_endpoint = TEndPoint(self.__host, self.__port)
9696
self.__user = user
9797
self.__password = password
98-
self.__fetch_size = fetch_size
98+
if fetch_size > 0:
99+
self.__fetch_size = fetch_size
100+
else:
101+
logger.warning(
102+
f"fetch_size {fetch_size} is illegal, use default fetch_size {self.DEFAULT_FETCH_SIZE}"
103+
)
104+
self.__fetch_size = self.DEFAULT_FETCH_SIZE
99105
self.__is_close = True
100106
self.__client = None
101107
self.__default_connection = None

iotdb-client/client-py/iotdb/utils/SessionDataSet.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717
#
1818
import logging
19+
from typing import Optional
1920

2021
from iotdb.utils.Field import Field
2122

@@ -143,6 +144,24 @@ def construct_row_record_from_data_frame(self):
143144
def close_operation_handle(self):
144145
self.iotdb_rpc_data_set.close()
145146

147+
def has_next_df(self) -> bool:
148+
"""
149+
Evaluate if there are more DataFrames to be fetched.
150+
:return: whether there are more DataFrames to be fetched
151+
"""
152+
# Check if buffer has data or if there are more results to fetch
153+
rpc_ds = self.iotdb_rpc_data_set
154+
return rpc_ds._has_buffered_data() or rpc_ds._has_next_result_set()
155+
156+
def next_df(self) -> Optional[pd.DataFrame]:
157+
"""
158+
Get the next DataFrame from the result set.
159+
Each returned DataFrame contains exactly fetch_size rows,
160+
except for the last DataFrame which may contain fewer rows.
161+
:return: the next DataFrame, or None if no more data
162+
"""
163+
return self.iotdb_rpc_data_set.next_dataframe()
164+
146165
def todf(self) -> pd.DataFrame:
147166
return result_set_to_pandas(self)
148167

iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
# for package
2020
import logging
21+
from typing import Optional
2122

2223
import numpy as np
2324
import pandas as pd
@@ -120,10 +121,12 @@ def __init__(
120121
self.data_frame = None
121122
self.__zone_id = zone_id
122123
self.__time_precision = time_precision
124+
self.__df_buffer = None # Buffer for streaming DataFrames
123125

124126
def close(self):
125127
if self.__is_closed:
126128
return
129+
self.__df_buffer = None # Clean up streaming DataFrame buffer
127130
if self.__client is not None:
128131
try:
129132
status = self.__client.closeOperation(
@@ -243,11 +246,73 @@ def _has_next_result_set(self):
243246
return True
244247
return False
245248

249+
def _has_buffered_data(self) -> bool:
250+
"""
251+
Check if there is buffered data for streaming DataFrame interface.
252+
:return: True if there is buffered data, False otherwise
253+
"""
254+
return self.__df_buffer is not None and len(self.__df_buffer) > 0
255+
256+
def next_dataframe(self) -> Optional[pd.DataFrame]:
257+
"""
258+
Get the next DataFrame from the result set with exactly fetch_size rows.
259+
The last DataFrame may have fewer rows.
260+
:return: the next DataFrame with fetch_size rows, or None if no more data
261+
"""
262+
# Accumulate data until we have at least fetch_size rows or no more data
263+
while True:
264+
buffer_len = 0 if self.__df_buffer is None else len(self.__df_buffer)
265+
if buffer_len >= self.__fetch_size:
266+
# We have enough rows, return a chunk
267+
break
268+
if not self._has_next_result_set():
269+
# No more data to fetch
270+
break
271+
# Process and accumulate
272+
result = self._process_buffer()
273+
new_df = self._build_dataframe(result)
274+
if self.__df_buffer is None:
275+
self.__df_buffer = new_df
276+
else:
277+
self.__df_buffer = pd.concat(
278+
[self.__df_buffer, new_df], ignore_index=True
279+
)
280+
281+
if self.__df_buffer is None or len(self.__df_buffer) == 0:
282+
return None
283+
284+
if len(self.__df_buffer) <= self.__fetch_size:
285+
# Return all remaining rows
286+
result_df = self.__df_buffer
287+
self.__df_buffer = None
288+
return result_df
289+
else:
290+
# Slice off fetch_size rows
291+
result_df = self.__df_buffer.iloc[: self.__fetch_size].reset_index(
292+
drop=True
293+
)
294+
self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size :].reset_index(
295+
drop=True
296+
)
297+
return result_df
298+
246299
def result_set_to_pandas(self):
247300
result = {}
248301
for i in range(len(self.__column_index_2_tsblock_column_index_list)):
249302
result[i] = []
250303
while self._has_next_result_set():
304+
batch_result = self._process_buffer()
305+
for k, v in batch_result.items():
306+
result[k].extend(v)
307+
308+
return self._build_dataframe(result)
309+
310+
def _process_buffer(self):
311+
result = {}
312+
for i in range(len(self.__column_index_2_tsblock_column_index_list)):
313+
result[i] = []
314+
315+
while self.__query_result_index < len(self.__query_result):
251316
time_array, column_arrays, null_indicators, array_length = deserialize(
252317
memoryview(self.__query_result[self.__query_result_index])
253318
)
@@ -339,6 +404,9 @@ def result_set_to_pandas(self):
339404

340405
result[i].append(data_array)
341406

407+
return result
408+
409+
def _build_dataframe(self, result):
342410
for k, v in result.items():
343411
if v is None or len(v) < 1 or v[0] is None:
344412
result[k] = []

iotdb-client/client-py/session_example.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,12 @@
411411
df = dataset.todf()
412412
print(df.to_string())
413413

414+
with session.execute_query_statement(
415+
"select s_01,s_02,s_03,s_04 from root.sg_test_01.d_04"
416+
) as dataset:
417+
while dataset.has_next_df():
418+
print(dataset.next_df())
419+
414420
# delete database
415421
session.delete_storage_group("root.sg_test_01")
416422

iotdb-client/client-py/table_model_session_example.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,5 +158,9 @@
158158
df = dataset.todf()
159159
print(df)
160160

161+
with session.execute_query_statement("select * from table5 order by time") as dataset:
162+
while dataset.has_next_df():
163+
print(dataset.next_df())
164+
161165
# close session connection.
162166
session.close()

iotdb-client/client-py/tests/integration/test_dataframe.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,56 @@ def test_simple_query():
4242
assert_array_equal(df.values, [[123.0, 15.0]])
4343

4444

45+
def test_stream_query():
46+
with IoTDBContainer("iotdb:dev") as db:
47+
db: IoTDBContainer
48+
session = Session(
49+
db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=1
50+
)
51+
session.open(False)
52+
session.execute_non_query_statement("CREATE DATABASE root.device0")
53+
54+
# Write data
55+
session.insert_str_record("root.device0", 123, "pressure", "15.0")
56+
session.insert_str_record("root.device0", 124, "pressure", "15.0")
57+
session.insert_str_record("root.device0", 125, "pressure", "15.0")
58+
59+
# Read
60+
session_data_set = session.execute_query_statement("SELECT * FROM root.device0")
61+
index = 0
62+
while session_data_set.has_next_df():
63+
df = session_data_set.next_df()
64+
assert list(df.columns) == ["Time", "root.device0.pressure"]
65+
assert_array_equal(df.values, [[123.0 + index, 15.0]])
66+
index += 1
67+
session.close()
68+
assert index == 3
69+
70+
71+
def test_stream_query_with_illegal_fetch_size():
72+
with IoTDBContainer("iotdb:dev") as db:
73+
db: IoTDBContainer
74+
session = Session(
75+
db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=-1
76+
)
77+
session.open(False)
78+
session.execute_non_query_statement("CREATE DATABASE root.device0")
79+
80+
# Write data
81+
session.insert_str_record("root.device0", 123, "pressure", "15.0")
82+
session.insert_str_record("root.device0", 124, "pressure", "15.0")
83+
session.insert_str_record("root.device0", 125, "pressure", "15.0")
84+
85+
# Read
86+
session_data_set = session.execute_query_statement("SELECT * FROM root.device0")
87+
88+
while session_data_set.has_next_df():
89+
df = session_data_set.next_df()
90+
assert list(df.columns) == ["Time", "root.device0.pressure"]
91+
assert_array_equal(df.values, [[123.0, 15.0], [124.0, 15.0], [125.0, 15.0]])
92+
session.close()
93+
94+
4595
def test_non_time_query():
4696
with IoTDBContainer("iotdb:dev") as db:
4797
db: IoTDBContainer

0 commit comments

Comments
 (0)