Skip to content

Commit 60579e2

Browse files
authored
Merge pull request ClickHouse#79178 from ClickHouse/rmvz
Fix znode_exists flakiness in test_refreshable_mv_in_replicated_db
2 parents 6073faa + 18ae139 commit 60579e2

File tree

2 files changed

+32
-34
lines changed

2 files changed

+32
-34
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
<clickhouse>
22
<disable_insertion_and_mutation>true</disable_insertion_and_mutation>
3+
<text_log><flush_interval_milliseconds>1000000000000</flush_interval_milliseconds></text_log>
34
</clickhouse>

tests/integration/test_refreshable_mv/test.py

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
)
4141
nodes = [node1, node2]
4242

43+
test_idx = 0
44+
4345

4446
@pytest.fixture(scope="module")
4547
def started_cluster():
@@ -50,11 +52,25 @@ def started_cluster():
5052
finally:
5153
cluster.shutdown()
5254

55+
@pytest.fixture
56+
def cleanup():
57+
yield
58+
59+
for node in nodes + [reading_node]:
60+
node.query(
61+
"drop database if exists re sync;"
62+
"drop table if exists system.a;")
63+
64+
global test_idx
65+
test_idx += 1
5366

54-
def test_refreshable_mv_in_replicated_db(started_cluster):
67+
def test_refreshable_mv_in_replicated_db(started_cluster, cleanup):
5568
for node in nodes:
69+
# (Use different znode path for each test because even `drop database ... sync` doesn't seem
70+
# to guarantee that a new database can be immediately created with the same znode path:
71+
# https://github.com/ClickHouse/ClickHouse/issues/76418 )
5672
node.query(
57-
"create database re engine = Replicated('/test/re', 'shard1', '{replica}');"
73+
f"create database re engine = Replicated('/test/re_{test_idx}', 'shard1', '{{replica}}');"
5874
)
5975

6076
# Table engine check.
@@ -162,12 +178,7 @@ def test_refreshable_mv_in_replicated_db(started_cluster):
162178
)
163179

164180
# Locate coordination znodes.
165-
znode_exists = (
166-
lambda uuid: nodes[randint(0, 1)].query(
167-
f"select count() from system.zookeeper where path = '/clickhouse/tables/{uuid}' and name = 'shard1'"
168-
)
169-
== "1\n"
170-
)
181+
znode_exists_query = lambda uuid: f"select count() from system.zookeeper where path = '/clickhouse/tables/{uuid}' and name = 'shard1'"
171182
tables = []
172183
for row in node1.query(
173184
"select table, uuid from system.tables where database = 're'"
@@ -178,7 +189,8 @@ def test_refreshable_mv_in_replicated_db(started_cluster):
178189
continue
179190
coordinated = not name.endswith("uncoordinated")
180191
tables.append((name, uuid, coordinated))
181-
assert coordinated == znode_exists(uuid)
192+
znode_exists = nodes[randint(0, 1)].query(znode_exists_query(uuid)) == '1\n'
193+
assert coordinated == znode_exists
182194
assert sorted([name for (name, _, _) in tables]) == [
183195
"a",
184196
"append",
@@ -194,7 +206,7 @@ def test_refreshable_mv_in_replicated_db(started_cluster):
194206
nodes[randint(0, 1)].query(f"drop table re.{name}{' sync' if sync else ''}")
195207
# TODO: After https://github.com/ClickHouse/ClickHouse/issues/61065 is done (for MVs, not ReplicatedMergeTree), check the parent znode instead.
196208
if sync:
197-
assert not znode_exists(uuid)
209+
assert_eq_with_retry(nodes[randint(0, 1)], znode_exists_query(uuid), '0\n')
198210

199211
# A little stress test dropping MV while it's refreshing, hoping to hit various cases where the
200212
# drop happens while creating/exchanging/dropping the inner table.
@@ -216,11 +228,8 @@ def test_refreshable_mv_in_replicated_db(started_cluster):
216228
for node in nodes:
217229
assert node.query("show tables from re") == ""
218230

219-
node1.query("drop database re sync")
220-
node2.query("drop database re sync")
221231

222-
223-
def test_refreshable_mv_in_system_db(started_cluster):
232+
def test_refreshable_mv_in_system_db(started_cluster, cleanup):
224233
node1.query(
225234
"create materialized view system.a refresh every 1 second (x Int64) engine Memory as select number+1 as x from numbers(2);"
226235
"system refresh view system.a;"
@@ -230,17 +239,15 @@ def test_refreshable_mv_in_system_db(started_cluster):
230239
node1.query("system refresh view system.a")
231240
assert node1.query("select count(), sum(x) from system.a") == "2\t3\n"
232241

233-
node1.query("drop table system.a")
234-
235-
def test_refreshable_mv_in_read_only_node(started_cluster):
242+
def test_refreshable_mv_in_read_only_node(started_cluster, cleanup):
236243
# writable node
237244
node1.query(
238-
"create database re engine = Replicated('/test/re', 'shard1', '{replica}');"
245+
f"create database re engine = Replicated('/test/re_{test_idx}', 'shard1', '{{replica}}');"
239246
)
240247

241248
# read_only node
242249
reading_node.query(
243-
"create database re engine = Replicated('/test/re', 'shard1', '{replica}');"
250+
f"create database re engine = Replicated('/test/re_{test_idx}', 'shard1', '{{replica}}');"
244251
)
245252

246253
# disable view sync on writable node, see if there's RefreshTask on read_only node
@@ -284,13 +291,10 @@ def test_refreshable_mv_in_read_only_node(started_cluster):
284291
"1\n",
285292
)
286293

287-
reading_node.query("drop database re sync")
288-
node1.query("drop database re sync")
289-
290-
def test_refresh_vs_shutdown_smoke(started_cluster):
294+
def test_refresh_vs_shutdown_smoke(started_cluster, cleanup):
291295
for node in nodes:
292296
node.query(
293-
"create database re engine = Replicated('/test/re', 'shard1', '{replica}');"
297+
f"create database re engine = Replicated('/test/re_{test_idx}', 'shard1', '{{replica}}');"
294298
)
295299

296300
node1.stop_clickhouse()
@@ -335,10 +339,8 @@ def test_refresh_vs_shutdown_smoke(started_cluster):
335339
)
336340

337341
node1.start_clickhouse()
338-
node1.query("drop database re sync")
339-
node2.query("drop database re sync")
340342

341-
def test_adding_replica(started_cluster):
343+
def test_adding_replica(started_cluster, cleanup):
342344
node1.query(
343345
"create database re engine = Replicated('/test/re', 'shard1', 'r1');"
344346
"create materialized view re.a refresh every 1 second (x Int64) engine ReplicatedMergeTree order by x as select number*10 as x from numbers(2);"
@@ -359,13 +361,10 @@ def test_adding_replica(started_cluster):
359361
"select last_refresh_replica from system.view_refreshes")
360362
assert r == "2\n"
361363

362-
node1.query("drop database re sync")
363-
node2.query("drop database re sync")
364-
365-
def test_replicated_db_startup_race(started_cluster):
364+
def test_replicated_db_startup_race(started_cluster, cleanup):
366365
for node in nodes:
367366
node.query(
368-
"create database re engine = Replicated('/test/re', 'shard1', '{replica}');"
367+
f"create database re engine = Replicated('/test/re_{test_idx}', 'shard1', '{{replica}}');"
369368
)
370369
node1.query(
371370
"create materialized view re.a refresh every 1 second (x Int64) engine ReplicatedMergeTree order by x as select number*10 as x from numbers(2);\
@@ -382,5 +381,3 @@ def test_replicated_db_startup_race(started_cluster):
382381
node1.query("system disable failpoint database_replicated_startup_pause")
383382
_, err = drop_query_handle.get_answer_and_error()
384383
assert err == ""
385-
386-
node2.query("drop database re sync")

0 commit comments

Comments
 (0)