-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_doris_cmd.py
More file actions
executable file
·313 lines (248 loc) · 10.1 KB
/
test_doris_cmd.py
File metadata and controls
executable file
·313 lines (248 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Python test script for doris-cmd.
This script tests core functionalities of doris-cmd, such as connection, query execution, and progress reporting.
"""
import os
import time
import argparse
import threading
from doris_cmd.connection import DorisConnection
from doris_cmd.progress import ProgressTracker
def test_connection(host, port, user, password, database):
"""Test connection to Apache Doris."""
print("\n==== Testing Connection ====")
connection = DorisConnection(host, port, user, password, database)
success = connection.connect()
if success:
print("✅ Connection successful")
print(f"Initial Query ID: {connection.query_id}")
# Test getting current database
current_db = connection.get_current_database()
print(f"Current database: {current_db}")
print(f"Set Query ID: {connection.query_id}")
# Close connection
connection.close()
return connection
else:
print("❌ Connection failed")
return None
def test_query_id_generation(connection):
"""Test that each query generates a new query_id."""
print("\n==== Testing Query ID Generation ====")
# Reconnect because the connection was closed earlier
success = connection.connect()
if not success:
print("❌ Connection failed")
return
try:
# Execute first query and record query_id
print("Executing first query: SELECT 1")
_, _ = connection.execute_query("SELECT 1")
first_query_id = connection.query_id
print(f"First query's Query ID: {first_query_id}")
# Execute second query and record query_id
print("Executing second query: SELECT 2")
_, _ = connection.execute_query("SELECT 2")
second_query_id = connection.query_id
print(f"Second query's Query ID: {second_query_id}")
# Check if the two query_ids are different
if first_query_id != second_query_id:
print("✅ Verification successful: Each query generated a new Query ID")
else:
print("❌ Verification failed: Both queries used the same Query ID")
finally:
# Close connection
connection.close()
def display_results(column_names, results, query_id=None):
"""Display query results.
Args:
column_names (list): List of column names
results (list): List of result dictionaries
query_id (str, optional): Query ID
"""
if not column_names or not results:
return
print(f"Column names: {column_names}")
print(f"Number of result rows: {len(results)}")
print("Results preview:")
for i, row in enumerate(results[:5]): # Only show the first 5 rows
print(f" {i+1}. {row}")
if len(results) > 5:
print(f" ... Total {len(results)} rows")
# Display query ID
if query_id:
print(f"Query ID: {query_id}")
def test_simple_query(connection, mock_mode=False):
"""Test simple query and progress reporting."""
print("\n==== Testing Simple Query and Progress Reporting ====")
# Reconnect because the connection was closed earlier
success = connection.connect()
if not success:
print("❌ Connection failed")
return
try:
# Execute simple query - SHOW DATABASES
print("Executing query: SHOW DATABASES")
# Create progress tracker
progress_tracker = ProgressTracker(
host=connection.host,
connection=connection,
query_id=None,
mock_mode=mock_mode
)
# Execute query (this will set a new query_id)
column_names, results = connection.execute_query("SHOW DATABASES")
query_id = connection.query_id
# Update progress tracker's query_id and start tracking
progress_tracker.query_id = query_id
progress_tracker.start_tracking()
# Wait a short time to let progress tracking display
time.sleep(0.5)
# Stop progress tracking
progress_tracker.stop_tracking()
print() # New line
if column_names and results:
print("✅ Query successful")
display_results(column_names, results, query_id)
else:
print("❌ Query returned no results or failed")
finally:
# Close connection
connection.close()
def test_long_running_query(connection, mock_mode=False):
"""Test long-running query and progress reporting."""
print("\n==== Testing Long-Running Query and Progress Reporting ====")
# Reconnect because the connection was closed earlier
success = connection.connect()
if not success:
print("❌ Connection failed")
return
try:
# Create progress tracker
progress_tracker = ProgressTracker(
host=connection.host,
connection=connection,
query_id=None,
mock_mode=mock_mode
)
query_id = None
# Execute a long-running query, here using SELECT SLEEP(5) as an example
print("Executing long-running query: SELECT SLEEP(5)")
# Execute query in a separate thread
def execute_query():
nonlocal query_id
# Execute query (will set a new query_id)
connection.execute_query("SELECT SLEEP(5)")
query_id = connection.query_id
query_thread = threading.Thread(target=execute_query)
query_thread.daemon = True
query_thread.start()
# Wait a short time for the query to start
time.sleep(0.2)
# Set progress tracker's query_id and start tracking
progress_tracker.query_id = connection.query_id
progress_tracker.start_tracking()
# Wait a few seconds to let progress display
time.sleep(2)
# Simulate query cancellation
print("\nSimulating query cancellation...")
connection.cancel_query()
# Wait for query thread to finish
query_thread.join(timeout=2)
# Stop progress tracking
progress_tracker.stop_tracking()
print() # New line
print("✅ Long-running query test completed")
if query_id:
print(f"Query ID: {query_id}")
finally:
# Close connection
connection.close()
def test_query_with_results(connection, database, mock_mode=False):
"""Test query returning results."""
print("\n==== Testing Query with Results ====")
# Reconnect because the connection was closed earlier
success = connection.connect()
if not success:
print("❌ Connection failed")
return
# If a database is specified, switch to that database
if database:
print(f"Switching to database: {database}")
connection.use_database(database)
try:
# Execute query - simple SELECT query
print("Executing query: SELECT 1 AS test_col, 'Hello Doris' AS message")
# Create progress tracker
progress_tracker = ProgressTracker(
host=connection.host,
connection=connection,
query_id=None,
mock_mode=mock_mode
)
# Execute query
column_names, results = connection.execute_query(
"SELECT 1 AS test_col, 'Hello Doris' AS message"
)
query_id = connection.query_id
# Update progress tracker's query_id and start tracking
progress_tracker.query_id = query_id
progress_tracker.start_tracking()
# Wait a short time
time.sleep(0.5)
# Stop progress tracking
progress_tracker.stop_tracking()
print() # New line
if column_names and results:
print("✅ Query successful")
display_results(column_names, results, query_id)
else:
print("❌ Query returned no results or failed")
finally:
# Close connection
connection.close()
def main():
"""Main function."""
parser = argparse.ArgumentParser(description="doris-cmd test script")
parser.add_argument("--host", default="localhost", help="Apache Doris host")
parser.add_argument("--port", type=int, default=9030, help="Apache Doris MySQL port")
parser.add_argument("--user", default="root", help="Username")
parser.add_argument("--password", default="", help="Password")
parser.add_argument("--database", default=None, help="Database name")
parser.add_argument("--mock", action="store_true", help="Enable mock mode for testing")
args = parser.parse_args()
print("doris-cmd test script")
print(f"Connection information: {args.host}:{args.port}")
print(f"User: {args.user}, Database: {args.database or '(not specified)'}")
print(f"Mock mode: {'Enabled' if args.mock else 'Disabled'}")
# Test connection
connection = test_connection(args.host, args.port, args.user, args.password, args.database)
if connection is None:
print("Cannot continue testing, connection failed")
return
# Test query_id generation
test_query_id_generation(connection)
# Test simple query
test_simple_query(connection, args.mock)
# Test query with results
test_query_with_results(connection, args.database, args.mock)
# Test long-running query
test_long_running_query(connection, args.mock)
# If mock mode is enabled, test progress tracking in pure mock mode
if args.mock:
print("\n==== Testing Pure Mock Mode Progress Tracking ====")
mock_tracker = ProgressTracker(
host=connection.host,
mock_mode=True
)
mock_tracker.query_id = "mock_query_test_123"
mock_tracker.start_tracking()
print("Mock mode progress tracking started, waiting 5 seconds...")
time.sleep(5)
mock_tracker.stop_tracking()
print("\n✅ Mock mode test completed")
print("\nAll tests completed!")
if __name__ == "__main__":
main()