11import pytest
2+ import time
23
34from helpers .cluster import ClickHouseCluster
45
@@ -25,46 +26,131 @@ def started_cluster():
2526 try :
2627 cluster .start ()
2728
28- for i , node in enumerate ([node1 , node2 ]):
29- node .query ("CREATE DATABASE testdb" )
30- node .query (
31- """CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table1', '{}') ORDER BY id;""" .format (
32- i
33- )
34- )
35- for i , node in enumerate ([node3 , node4 ]):
36- node .query ("CREATE DATABASE testdb" )
37- node .query (
38- """CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table2', '{}') ORDER BY id;""" .format (
39- i
40- )
41- )
4229 yield cluster
4330
4431 finally :
4532 cluster .shutdown ()
4633
4734
35+ def maintain_test_table (test_table ):
36+ tmark = time .time () # to guarantee ZK path uniqueness
37+
38+ for i , node in enumerate ([node1 , node2 ]):
39+ node .query (f"DROP TABLE IF EXISTS testdb.{ test_table } SYNC" )
40+ node .query ("DROP DATABASE IF EXISTS testdb" )
41+
42+ node .query ("CREATE DATABASE testdb" )
43+ node .query (
44+ f"CREATE TABLE testdb.{ test_table } (id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{ test_table } 1-{ tmark } ', '{ i } ') ORDER BY id;"
45+ )
46+ for i , node in enumerate ([node3 , node4 ]):
47+ node .query (f"DROP TABLE IF EXISTS testdb.{ test_table } SYNC" )
48+ node .query ("DROP DATABASE IF EXISTS testdb" )
49+
50+ node .query ("CREATE DATABASE testdb" )
51+ node .query (
52+ f"CREATE TABLE testdb.{ test_table } (id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{ test_table } 2-{ tmark } ', '{ i } ') ORDER BY id;"
53+ )
54+
55+
4856def test_distributed_ddl_queue (started_cluster ):
57+ test_table = "test_table"
58+ maintain_test_table (test_table )
4959 node1 .query (
50- "INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)"
60+ f "INSERT INTO testdb.{ test_table } SELECT number, toString(number) FROM numbers(100)"
5161 )
5262 node3 .query (
53- "INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)"
63+ f "INSERT INTO testdb.{ test_table } SELECT number, toString(number) FROM numbers(100)"
5464 )
55- node2 .query ("SYSTEM SYNC REPLICA testdb.test_table" )
56- node4 .query ("SYSTEM SYNC REPLICA testdb.test_table" )
65+ node2 .query (f "SYSTEM SYNC REPLICA testdb.{ test_table } " )
66+ node4 .query (f "SYSTEM SYNC REPLICA testdb.{ test_table } " )
5767
5868 node1 .query (
59- "ALTER TABLE testdb.test_table ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val" ,
69+ f "ALTER TABLE testdb.{ test_table } ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val" ,
6070 settings = {"replication_alter_partitions_sync" : "2" },
6171 )
6272 for node in nodes :
63- node .query ("SYSTEM SYNC REPLICA testdb.test_table" )
64- assert node .query ("SELECT somecolumn FROM testdb.test_table LIMIT 1" ) == "0\n "
73+ node .query (f"SYSTEM SYNC REPLICA testdb.{ test_table } " )
74+ assert (
75+ node .query (f"SELECT somecolumn FROM testdb.{ test_table } LIMIT 1" ) == "0\n "
76+ )
6577 assert (
6678 node .query (
6779 "SELECT If((SELECT count(*) FROM system.distributed_ddl_queue WHERE cluster='test_cluster' AND entry='query-0000000000') > 0, 'ok', 'fail')"
6880 )
6981 == "ok\n "
7082 )
83+
84+ node1 .query (
85+ f"ALTER TABLE testdb.{ test_table } ON CLUSTER test_cluster DROP COLUMN somecolumn" ,
86+ settings = {"replication_alter_partitions_sync" : "2" },
87+ )
88+
89+
90+ def test_distributed_ddl_rubbish (started_cluster ):
91+ test_table = "test_table_rubbish"
92+ maintain_test_table (test_table )
93+ node1 .query (
94+ f"ALTER TABLE testdb.{ test_table } ON CLUSTER test_cluster ADD COLUMN somenewcolumn UInt8 AFTER val" ,
95+ settings = {"replication_alter_partitions_sync" : "2" },
96+ )
97+
98+ zk_content = node1 .query (
99+ "SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true" ,
100+ parse = True ,
101+ ).to_dict ("records" )
102+
103+ original_query = ""
104+ new_query = "query-artificial-" + str (time .monotonic_ns ())
105+
106+ # Copy information about query (one that added 'somenewcolumn') with new query ID
107+ # and broken query text (TABLE => TUBLE)
108+ for row in zk_content :
109+ if row ["value" ].find ("somenewcolumn" ) >= 0 :
110+ original_query = row ["name" ]
111+ break
112+
113+ rows_to_insert = []
114+
115+ for row in zk_content :
116+ if row ["name" ] == original_query :
117+ rows_to_insert .append (
118+ {
119+ "name" : new_query ,
120+ "path" : row ["path" ],
121+ "value" : row ["value" ].replace ("TABLE" , "TUBLE" ),
122+ }
123+ )
124+ continue
125+ pos = row ["path" ].find (original_query )
126+ if pos >= 0 :
127+ rows_to_insert .append (
128+ {
129+ "name" : row ["name" ],
130+ "path" : row ["path" ].replace (original_query , new_query ),
131+ "value" : row ["value" ],
132+ }
133+ )
134+
135+ # Ingest it to ZK
136+ for row in rows_to_insert :
137+ node1 .query (
138+ "insert into system.zookeeper (name, path, value) values ('{}', '{}', '{}')" .format (
139+ f'{ row ["name" ]} ' , f'{ row ["path" ]} ' , f'{ row ["value" ]} '
140+ )
141+ )
142+
143+ # Ensure that data is visible via system.distributed_ddl_queue
144+ assert (
145+ int (
146+ node1 .query (
147+ f"SELECT count(1) FROM system.distributed_ddl_queue WHERE entry='{ new_query } ' AND cluster=''"
148+ )
149+ )
150+ == 4
151+ )
152+
153+ node1 .query (
154+ f"ALTER TABLE testdb.{ test_table } ON CLUSTER test_cluster DROP COLUMN somenewcolumn" ,
155+ settings = {"replication_alter_partitions_sync" : "2" },
156+ )
0 commit comments