Skip to content

Commit 3d8be24

Browse files
authored
Merge pull request #327 from wudidapaopao/yuxiaozhe-dev
Fix signal loss by removing the signal handler in the ClickHouse engine
2 parents 6cdf9d2 + 0809c7d commit 3d8be24

File tree

2 files changed

+102
-1
lines changed

2 files changed

+102
-1
lines changed

programs/local/LocalServer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ try
481481

482482
StackTrace::setShowAddresses(server_settings.show_addresses_in_stack_traces);
483483

484-
setupSignalHandler();
484+
// setupSignalHandler();
485485

486486
std::cout << std::fixed << std::setprecision(3);
487487
std::cerr << std::fixed << std::setprecision(3);

tests/test_signal_handler.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#!python3
2+
3+
import unittest
4+
import os
5+
import shutil
6+
import signal
7+
import threading
8+
import time
9+
from chdb import session
10+
11+
test_signal_handler_dir = ".tmp_test_signal_handler"
12+
insert_counter = 0
13+
exit_event1 = threading.Event()
14+
exit_event2 = threading.Event()
15+
16+
class TestSignalHandler(unittest.TestCase):
17+
def setUp(self) -> None:
18+
shutil.rmtree(test_signal_handler_dir, ignore_errors=True)
19+
self.sess = session.Session(test_signal_handler_dir)
20+
self.signal_received = False
21+
return super().setUp()
22+
23+
def tearDown(self) -> None:
24+
self.sess.close()
25+
shutil.rmtree(test_signal_handler_dir, ignore_errors=True)
26+
return super().tearDown()
27+
28+
def background_sender(self, delay):
29+
time.sleep(delay)
30+
print(f"send signal after {delay} seconds")
31+
os.kill(os.getpid(), signal.SIGINT)
32+
33+
def test_signal_response(self):
34+
sender_thread = threading.Thread(
35+
target=self.background_sender,
36+
daemon=True,
37+
args=(3,)
38+
)
39+
sender_thread.start()
40+
41+
start_time = time.time()
42+
try:
43+
while time.time() - start_time < 10:
44+
time.sleep(0.1)
45+
self.sess.query("SELECT 1")
46+
except KeyboardInterrupt:
47+
print("receive signal")
48+
self.signal_received = True
49+
50+
self.assertTrue(self.signal_received)
51+
52+
def test_data_integrity_after_interrupt(self):
53+
def data_writer():
54+
global insert_counter
55+
i = 500000
56+
while not exit_event1.is_set():
57+
self.sess.query(f"INSERT INTO signal_handler_table VALUES ({i})")
58+
insert_counter += 1
59+
i += 1
60+
exit_event2.set()
61+
62+
self.sess.query("CREATE DATABASE IF NOT EXISTS test")
63+
self.sess.query("USE test")
64+
self.sess.query("CREATE TABLE signal_handler_table (id Int64) ENGINE = MergeTree() ORDER BY id")
65+
self.sess.query("INSERT INTO signal_handler_table SELECT number FROM numbers(500000)")
66+
67+
writer_thread = threading.Thread(
68+
target=data_writer,
69+
daemon=False
70+
)
71+
sender_thread = threading.Thread(
72+
target=self.background_sender,
73+
daemon=True,
74+
args=(30,)
75+
)
76+
77+
writer_thread.start()
78+
sender_thread.start()
79+
80+
start_time = time.time()
81+
try:
82+
while time.time() - start_time < 60:
83+
self.sess.query("SELECT * FROM signal_handler_table")
84+
except KeyboardInterrupt:
85+
print("receive signal")
86+
exit_event1.set()
87+
self.signal_received = True
88+
89+
self.assertTrue(self.signal_received)
90+
91+
while not exit_event2.is_set():
92+
continue
93+
self.sess.close()
94+
self.sess = session.Session(test_signal_handler_dir)
95+
96+
final_count = self.sess.query("SELECT * FROM test.signal_handler_table").rows_read()
97+
print(f"final count is {final_count}")
98+
self.assertEqual(int(final_count), 500000 + insert_counter)
99+
100+
if __name__ == "__main__":
101+
unittest.main()

0 commit comments

Comments
 (0)