|
29 | 29 | keeper_required_feature_flags=["multi_read", "create_if_not_exists"], |
30 | 30 | macros={"shard": "shard1", "replica": "2"}, |
31 | 31 | ) |
| 32 | + |
| 33 | +reading_node = cluster.add_instance( |
| 34 | + "reading_node", |
| 35 | + main_configs=["configs/read_only.xml"], |
| 36 | + user_configs=["configs/users.xml"], |
| 37 | + with_zookeeper=True, |
| 38 | + keeper_required_feature_flags=["multi_read", "create_if_not_exists"], |
| 39 | + macros={"shard": "shard1", "replica": "3"}, |
| 40 | +) |
32 | 41 | nodes = [node1, node2] |
33 | 42 |
|
34 | 43 |
|
@@ -223,6 +232,60 @@ def test_refreshable_mv_in_system_db(started_cluster): |
223 | 232 |
|
224 | 233 | node1.query("drop table system.a") |
225 | 234 |
|
| 235 | +def test_refreshable_mv_in_read_only_node(started_cluster): |
| 236 | + # writable node |
| 237 | + node1.query( |
| 238 | + "create database re engine = Replicated('/test/re', 'shard1', '{replica}');" |
| 239 | + ) |
| 240 | + |
| 241 | + # read_only node |
| 242 | + reading_node.query( |
| 243 | + "create database re engine = Replicated('/test/re', 'shard1', '{replica}');" |
| 244 | + ) |
| 245 | + |
| 246 | + # disable view sync on writable node, see if there's RefreshTask on read_only node |
| 247 | + node1.query( |
| 248 | + "system stop view sync" |
| 249 | + ) |
| 250 | + |
| 251 | + # clear text_log ensure all logs are related to this test |
| 252 | + reading_node.query( |
| 253 | + "system flush logs;" |
| 254 | + "truncate table system.text_log;" |
| 255 | + ) |
| 256 | + |
| 257 | + # this MV will be replicated to read_only node |
| 258 | + node1.query( |
| 259 | + "create materialized view re.a refresh every 1 second (x Int64) engine ReplicatedMergeTree order by x as select number*10 as x from numbers(2)" |
| 260 | + ) |
| 261 | + |
| 262 | + # refresh the view manually |
| 263 | + reading_node.query("system refresh view re.a") |
| 264 | + |
| 265 | + # slepp 3 seconds to make sure the view is refreshed |
| 266 | + reading_node.query("select sleep(3)") |
| 267 | + |
| 268 | + # check if there's RefreshTask on read_only node |
| 269 | + reading_node.query("system flush logs") |
| 270 | + assert reading_node.query("select count() from system.text_log where message like '%QUERY_IS_PROHIBITED%'") == "0\n" |
| 271 | + assert reading_node.query("select count() from system.view_refreshes where exception != ''") == "0\n" |
| 272 | + |
| 273 | + # start sync and chek refresh task works well on node1 |
| 274 | + node1.query("system start view sync") |
| 275 | + node1.query("system refresh view re.a") |
| 276 | + assert_eq_with_retry( |
| 277 | + node1, |
| 278 | + "select * from re.a order by x", |
| 279 | + "0\n10\n", |
| 280 | + ) |
| 281 | + assert_eq_with_retry( |
| 282 | + node1, |
| 283 | + "select count() from system.view_refreshes where exception = '' and last_refresh_replica = '1'", |
| 284 | + "1\n", |
| 285 | + ) |
| 286 | + |
| 287 | + reading_node.query("drop database re sync") |
| 288 | + node1.query("drop database re sync") |
226 | 289 |
|
227 | 290 | def test_refresh_vs_shutdown_smoke(started_cluster): |
228 | 291 | for node in nodes: |
|
0 commit comments