@@ -49,16 +49,23 @@ def test_concurrent_queries_sends_telemetry(self):
4949 num_threads = 5
5050 captured_telemetry = []
5151 captured_telemetry_lock = threading .Lock ()
52+ captured_responses = []
53+ captured_responses_lock = threading .Lock ()
5254
53- def mock_send_telemetry (self , events ):
54- """
55- This is our telemetry interceptor. It captures events into our list
56- instead of sending them over the network.
57- """
55+ original_send_telemetry = TelemetryClient ._send_telemetry
56+ original_callback = TelemetryClient ._telemetry_request_callback
57+
58+ def send_telemetry_wrapper (self_client , events ):
5859 with captured_telemetry_lock :
5960 captured_telemetry .extend (events )
61+ original_send_telemetry (self_client , events )
62+
63+ def callback_wrapper (self_client , response ):
64+ with captured_responses_lock :
65+ captured_responses .append (response )
66+ original_callback (self_client , response )
6067
61- with patch .object (TelemetryClient , ' _send_telemetry' , mock_send_telemetry ):
68+ with patch .object (TelemetryClient , " _send_telemetry" , send_telemetry_wrapper ), patch . object ( TelemetryClient , "_telemetry_request_callback" , callback_wrapper ):
6269
6370 def execute_query_worker (thread_id ):
6471 """Each thread creates a connection and executes a query."""
@@ -74,6 +81,16 @@ def execute_query_worker(thread_id):
7481 TelemetryClientFactory ._executor .shutdown (wait = True )
7582
7683 # --- VERIFICATION ---
84+ # print event by event in a readable format
85+ for event in captured_telemetry :
86+ print (event )
87+ print ("-" * 100 )
88+
89+ # print response by response in a readable format
90+ for response in captured_responses :
91+ print (response )
92+ print ("-" * 100 )
93+
7794 assert len (captured_telemetry ) == num_threads * 4 # 4 events per thread (initial_telemetry_log, 3 latency_logs (execute_command, fetchall_arrow, _convert_arrow_table))
7895
7996 events_with_latency = [
0 commit comments