22
33import unittest
44import os
5+ import shutil
56import signal
67import threading
78import time
89from chdb import session
910
11+ test_signal_handler_dir = ".tmp_test_signal_handler"
12+ insert_counter = 0
13+ exit_event = threading .Event ()
14+
1015class TestSignalHandler (unittest .TestCase ):
1116 def setUp (self ) -> None :
12- self .sess = session .Session ()
17+ shutil .rmtree (test_signal_handler_dir , ignore_errors = True )
18+ self .sess = session .Session (test_signal_handler_dir )
1319 self .signal_received = False
1420 return super ().setUp ()
1521
1622 def tearDown (self ) -> None :
1723 self .sess .close ()
24+ shutil .rmtree (test_signal_handler_dir , ignore_errors = True )
1825 return super ().tearDown ()
1926
20- def background_sender (self ):
21- time .sleep (5 )
22- print ("send signal" )
27+ def background_sender (self , delay ):
28+ time .sleep (delay )
29+ print (f "send signal after { delay } seconds " )
2330 os .kill (os .getpid (), signal .SIGINT )
2431
2532 def test_signal_response (self ):
26- sender_thread = threading .Thread (target = self .background_sender , daemon = True )
33+ sender_thread = threading .Thread (
34+ target = self .background_sender ,
35+ daemon = True ,
36+ args = (3 ,)
37+ )
2738 sender_thread .start ()
2839
2940 start_time = time .time ()
@@ -37,5 +48,51 @@ def test_signal_response(self):
3748
3849 self .assertTrue (self .signal_received )
3950
51+ def test_data_integrity_after_interrupt (self ):
52+ def data_writer ():
53+ global insert_counter
54+ i = 500000
55+ while not exit_event .is_set ():
56+ self .sess .query (f"INSERT INTO signal_handler_table VALUES ({ i } )" )
57+ insert_counter += 1
58+ i += 1
59+
60+ self .sess .query ("CREATE DATABASE IF NOT EXISTS test" )
61+ self .sess .query ("USE test" )
62+ self .sess .query ("CREATE TABLE signal_handler_table (id Int64) ENGINE = MergeTree() ORDER BY id" )
63+ self .sess .query ("INSERT INTO signal_handler_table SELECT number FROM numbers(500000)" )
64+
65+ writer_thread = threading .Thread (
66+ target = data_writer ,
67+ daemon = False
68+ )
69+ sender_thread = threading .Thread (
70+ target = self .background_sender ,
71+ daemon = True ,
72+ args = (30 ,)
73+ )
74+
75+ writer_thread .start ()
76+ sender_thread .start ()
77+
78+ start_time = time .time ()
79+ try :
80+ while time .time () - start_time < 60 :
81+ self .sess .query ("SELECT * FROM signal_handler_table" )
82+ except KeyboardInterrupt :
83+ print ("receive signal" )
84+ exit_event .set ()
85+ self .signal_received = True
86+
87+ self .assertTrue (self .signal_received )
88+
89+ time .sleep (5 )
90+ self .sess .close ()
91+ self .sess = session .Session (test_signal_handler_dir )
92+
93+ final_count = self .sess .query ("SELECT * FROM test.signal_handler_table" ).rows_read ()
94+ print (f"final count is { final_count } " )
95+ self .assertEqual (int (final_count ), 500000 + insert_counter )
96+
4097if __name__ == "__main__" :
4198 unittest .main ()
0 commit comments