Skip to content

Commit 1605f3a

Browse files
authored
Merge pull request ClickHouse#89789 from ClickHouse/backport/25.8/89692
Backport ClickHouse#89692 to 25.8: Speed up test_refreshable_mv_skip_old_temp_table_ddls
2 parents cc0e812 + d0b2a03 commit 1605f3a

File tree

3 files changed

+21
-18
lines changed

3 files changed

+21
-18
lines changed

src/Common/FailPoint.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ static struct InitFiu
117117
REGULAR(sleep_in_logs_flush) \
118118
ONCE(smt_commit_exception_before_op) \
119119
ONCE(backup_add_empty_memory_table) \
120-
REGULAR(refresh_task_delay_update_coordination_state_running)
120+
REGULAR(refresh_task_stop_racing_for_running_refresh)
121121

122122

123123
namespace FailPoints

src/Storages/MaterializedView/RefreshTask.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ namespace ErrorCodes
7878

7979
namespace FailPoints
8080
{
81-
extern const char refresh_task_delay_update_coordination_state_running[];
81+
extern const char refresh_task_stop_racing_for_running_refresh[];
8282
}
8383

8484
RefreshTask::RefreshTask(
@@ -1000,10 +1000,11 @@ bool RefreshTask::updateCoordinationState(CoordinationZnode root, bool running,
10001000
ops.emplace_back(zkutil::makeSetRequest(coordination.path, root.toString(), root.version));
10011001
if (running)
10021002
{
1003-
fiu_do_on(FailPoints::refresh_task_delay_update_coordination_state_running, {
1004-
std::chrono::milliseconds sleep_time{3000 + thread_local_rng() % 2000};
1005-
std::this_thread::sleep_for(sleep_time);
1006-
});
1003+
bool stop_racing_for_running_refresh = false;
1004+
fiu_do_on(FailPoints::refresh_task_stop_racing_for_running_refresh, { stop_racing_for_running_refresh = true; });
1005+
if (stop_racing_for_running_refresh)
1006+
return false;
1007+
10071008
ops.emplace_back(
10081009
zkutil::makeCreateRequest(coordination.path + "/running", coordination.replica_name, zkutil::CreateMode::Ephemeral));
10091010
}

tests/integration/test_refreshable_mv_skip_old_temp_table_ddls/test.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,7 @@ def test_refreshable_mv_skip_old_temp_tables_ddls(
8080
node1.query(f"DROP DATABASE IF EXISTS {db_name} SYNC")
8181
node2.query(f"DROP DATABASE IF EXISTS {db_name} SYNC")
8282
# Make sure that the MV is refreshed on node1
83-
node2.query(
84-
"SYSTEM ENABLE FAILPOINT refresh_task_delay_update_coordination_state_running"
85-
)
83+
node2.query("SYSTEM ENABLE FAILPOINT refresh_task_stop_racing_for_running_refresh")
8684
node2.query("SYSTEM ENABLE FAILPOINT database_replicated_delay_entry_execution")
8785

8886
node1.query(
@@ -98,11 +96,11 @@ def test_refreshable_mv_skip_old_temp_tables_ddls(
9896
f"CREATE TABLE {db_name}.target (x DateTime) ENGINE ReplicatedMergeTree ORDER BY x"
9997
)
10098
node1.query(
101-
f"CREATE MATERIALIZED VIEW {db_name}.mv REFRESH EVERY 1 SECOND {append_clause} TO {db_name}.target AS SELECT now() AS x"
99+
f"CREATE MATERIALIZED VIEW {db_name}.mv REFRESH EVERY 1 HOUR {append_clause} TO {db_name}.target AS SELECT now() AS x"
102100
)
103101
else:
104102
node1.query(
105-
f"CREATE MATERIALIZED VIEW {db_name}.mv REFRESH EVERY 1 SECOND {append_clause} (x DateTime) ENGINE ReplicatedMergeTree ORDER BY x AS SELECT now() AS x"
103+
f"CREATE MATERIALIZED VIEW {db_name}.mv REFRESH EVERY 1 HOUR {append_clause} (x DateTime) ENGINE ReplicatedMergeTree ORDER BY x AS SELECT now() AS x"
106104
)
107105

108106
node2.query(
@@ -122,10 +120,16 @@ def test_refreshable_mv_skip_old_temp_tables_ddls(
122120

123121
last_log_ts = get_last_ddl_worker_log_ts(node2, db_name)
124122

125-
# Wait for node1 to refresh the view several times
126-
time.sleep(5)
127-
128-
node1.query(f"ALTER TABLE {db_name}.mv MODIFY REFRESH EVERY 1 HOUR {append_clause}")
123+
last_refresh_time = node1.query(
124+
"SELECT last_refresh_time FROM system.view_refreshes WHERE view='mv'"
125+
)
126+
for i in range(2):
127+
node1.query(f"SYSTEM REFRESH VIEW {db_name}.mv")
128+
# Ensure that the mv is refresh
129+
node1.query_with_retry(
130+
"SELECT last_refresh_time FROM system.view_refreshes WHERE view='mv'",
131+
check_callback=lambda x: x != last_refresh_time,
132+
)
129133

130134
# Make sure that the view is not refreshing, and it is scheduled to be refreshed in at least 10 minutes
131135
node1.query_with_retry(
@@ -135,9 +139,7 @@ def test_refreshable_mv_skip_old_temp_tables_ddls(
135139
> datetime.timedelta(minutes=10),
136140
)
137141

138-
node2.query(
139-
"SYSTEM DISABLE FAILPOINT refresh_task_delay_update_coordination_state_running"
140-
)
142+
node2.query("SYSTEM DISABLE FAILPOINT refresh_task_stop_racing_for_running_refresh")
141143
node2.query("SYSTEM DISABLE FAILPOINT database_replicated_delay_entry_execution")
142144

143145
table_info1 = node1.query(f"SELECT uuid, name FROM system.tables WHERE database='{db_name}'")

0 commit comments

Comments
 (0)