@@ -26,7 +26,7 @@ def tearDown(self) -> None:
2626 shutil .rmtree (test_streaming_query_dir , ignore_errors = True )
2727 return super ().tearDown ()
2828
29- def test_streaming_query (self ):
29+ def test_streaming_query_with_session (self ):
3030 self .sess .query ("CREATE DATABASE IF NOT EXISTS test" )
3131 self .sess .query ("USE test" )
3232 self .sess .query ("CREATE TABLE IF NOT EXISTS streaming_test (id Int64) ENGINE = MergeTree() ORDER BY id" )
@@ -56,6 +56,26 @@ def test_streaming_query(self):
5656 with self .assertRaises (Exception ):
5757 ret = self .sess .send_query ("SELECT * FROM streaming_test;SELECT * FROM streaming_test" , "CSVWITHNAMES" )
5858
59+ def test_streaming_query_with_connection (self ):
60+ self .sess .close ()
61+ conn = chdb .connect (":memory:" )
62+ total_rows = 0
63+ with conn .send_query ("SELECT * FROM numbers(200000)" ) as stream :
64+ for chunk in stream :
65+ total_rows += chunk .rows_read ()
66+ self .assertEqual (total_rows , 200000 )
67+ conn .close ()
68+ self .sess = session .Session (test_streaming_query_dir )
69+
70+ def test_large_table (self ):
71+ # Test querying extremely large dataset (1 billion rows)
72+ # with stable memory usage around 56MB
73+ total_rows = 0
74+ with self .sess .send_query ("SELECT * FROM numbers(1073741824)" , "CSVWITHNAMES" ) as stream :
75+ for chunk in stream :
76+ total_rows += chunk .rows_read ()
77+ self .assertEqual (total_rows , 1073741824 )
78+
5979 def test_cancel_streaming_query (self ):
6080 self .sess .query ("CREATE DATABASE IF NOT EXISTS test" )
6181 self .sess .query ("USE test" )
0 commit comments