99from helpers .cluster import ClickHouseCluster
1010from helpers .network import PartitionManager
1111
12+
13+ def wait_for_export_status (
14+ node ,
15+ mt_table : str ,
16+ s3_table : str ,
17+ partition_id : str ,
18+ expected_status : str = "COMPLETED" ,
19+ timeout : int = 30 ,
20+ poll_interval : float = 0.5 ,
21+ ):
22+ start_time = time .time ()
23+ while time .time () - start_time < timeout :
24+ status = node .query (
25+ f"""
26+ SELECT status FROM system.replicated_partition_exports
27+ WHERE source_table = '{ mt_table } '
28+ AND destination_table = '{ s3_table } '
29+ AND partition_id = '{ partition_id } '
30+ """
31+ ).strip ()
32+
33+ if status and status == expected_status :
34+ return status
35+
36+ time .sleep (poll_interval )
37+
38+ raise TimeoutError (
39+ f"Export status did not reach '{ expected_status } ' within { timeout } s. " )
40+
41+
42+ def wait_for_export_to_start (
43+ node ,
44+ mt_table : str ,
45+ s3_table : str ,
46+ partition_id : str ,
47+ timeout : int = 10 ,
48+ poll_interval : float = 0.2 ,
49+ ):
50+ start_time = time .time ()
51+ while time .time () - start_time < timeout :
52+ count = node .query (
53+ f"""
54+ SELECT count() FROM system.replicated_partition_exports
55+ WHERE source_table = '{ mt_table } '
56+ AND destination_table = '{ s3_table } '
57+ AND partition_id = '{ partition_id } '
58+ """
59+ ).strip ()
60+
61+ if count != '0' :
62+ return True
63+
64+ time .sleep (poll_interval )
65+
66+ raise TimeoutError (f"Export did not start within { timeout } s. " )
67+
68+
1269@pytest .fixture (scope = "module" )
1370def cluster ():
1471 try :
@@ -115,7 +172,8 @@ def test_restart_nodes_during_export(cluster):
115172 node .query (export_queries )
116173
117174 # wait for the exports to start
118- time .sleep (3 )
175+ wait_for_export_to_start (node , mt_table , s3_table , "2020" )
176+ wait_for_export_to_start (node , mt_table , s3_table , "2021" )
119177
120178 node .stop_clickhouse (kill = True )
121179 node2 .stop_clickhouse (kill = True )
@@ -128,7 +186,8 @@ def test_restart_nodes_during_export(cluster):
128186 node .start_clickhouse ()
129187 node2 .start_clickhouse ()
130188
131- time .sleep (5 )
189+ wait_for_export_status (node , mt_table , s3_table , "2020" , "COMPLETED" )
190+ wait_for_export_status (node , mt_table , s3_table , "2021" , "COMPLETED" )
132191
133192 assert node .query (f"SELECT count() FROM { s3_table } WHERE year = 2020" ) != f'0\n ' , "Export of partition 2020 did not resume after crash"
134193
@@ -184,7 +243,7 @@ def test_kill_export(cluster):
184243 node .query (f"KILL EXPORT PARTITION WHERE partition_id = '2020' and source_table = '{ mt_table } ' and destination_table = '{ s3_table } '" )
185244
186245 # wait for 2021 to finish
187- time . sleep ( 5 )
246+ wait_for_export_status ( node , mt_table , s3_table , "2021" , "COMPLETED" )
188247
189248 # checking for the commit file because maybe the data file was too fast?
190249 assert node .query (f"SELECT count() FROM s3(s3_conn, filename='{ s3_table } /commit_2020_*', format=LineAsString)" ) == '0\n ' , "Partition 2020 was written to S3, it was not killed as expected"
@@ -268,7 +327,8 @@ def test_concurrent_exports_to_different_targets(cluster):
268327 f"ALTER TABLE { mt_table } EXPORT PARTITION ID '2020' TO TABLE { s3_table_b } "
269328 )
270329
271- time .sleep (5 )
330+ wait_for_export_status (node , mt_table , s3_table_a , "2020" , "COMPLETED" )
331+ wait_for_export_status (node , mt_table , s3_table_b , "2020" , "COMPLETED" )
272332
273333 # Both targets should receive the same data independently
274334 assert node .query (f"SELECT count() FROM { s3_table_a } WHERE year = 2020" ) == '3\n ' , "First target did not receive expected rows"
@@ -317,7 +377,7 @@ def test_failure_is_logged_in_system_table(cluster):
317377 )
318378
319379 # Wait so that the export fails
320- time . sleep ( 5 )
380+ wait_for_export_status ( node , mt_table , s3_table , "2020" , "FAILED" , timeout = 10 )
321381
322382 # Network restored; verify the export is marked as FAILED in the system table
323383 # Also verify we captured at least one exception and no commit file exists
@@ -386,7 +446,7 @@ def test_inject_short_living_failures(cluster):
386446 time .sleep (5 )
387447
388448 # wait for the export to finish
389- time . sleep ( 5 )
449+ wait_for_export_status ( node , mt_table , s3_table , "2020" , "COMPLETED" )
390450
391451 # Assert the export succeeded
392452 assert node .query (f"SELECT count() FROM { s3_table } WHERE year = 2020" ) == '3\n ' , "Export did not succeed"
@@ -419,7 +479,7 @@ def test_export_ttl(cluster):
419479 mt_table = "export_ttl_mt_table"
420480 s3_table = "export_ttl_s3_table"
421481
422- expiration_time = 5
482+ expiration_time = 3
423483
424484 create_tables_and_insert_data (node , mt_table , s3_table , "replica1" )
425485
@@ -431,7 +491,8 @@ def test_export_ttl(cluster):
431491 assert "Export with key" in error , "Expected error about expired export"
432492
433493 # wait for the export to finish and for the manifest to expire
434- time .sleep (expiration_time )
494+ wait_for_export_status (node , mt_table , s3_table , "2020" , "COMPLETED" )
495+ time .sleep (expiration_time * 2 )
435496
436497 # assert that the export succeeded, check the commit file
437498 assert node .query (f"SELECT count() FROM s3(s3_conn, filename='{ s3_table } /commit_2020_*', format=LineAsString)" ) == '1\n ' , "Export did not succeed"
@@ -440,7 +501,7 @@ def test_export_ttl(cluster):
440501 node .query (f"ALTER TABLE { mt_table } EXPORT PARTITION ID '2020' TO TABLE { s3_table } " )
441502
442503 # wait for the export to finish
443- time . sleep ( expiration_time )
504+ wait_for_export_status ( node , mt_table , s3_table , "2020" , "COMPLETED" )
444505
445506 # assert that the export succeeded, check the commit file
446507 # there should be two commit files now, one for the first export and one for the second export
@@ -474,14 +535,14 @@ def test_export_partition_file_already_exists_policy(cluster):
474535 ) == "COMPLETED\n " , "Export should be marked as COMPLETED"
475536
476537 # wait for the exports to finish
477- time . sleep ( 3 )
538+ wait_for_export_status ( node , mt_table , s3_table , "2020" , "COMPLETED" )
478539
479540 # try to export the partition
480541 node .query (
481542 f"ALTER TABLE { mt_table } EXPORT PARTITION ID '2020' TO TABLE { s3_table } SETTINGS export_merge_tree_partition_force_export=1"
482543 )
483544
484- time . sleep ( 3 )
545+ wait_for_export_status ( node , mt_table , s3_table , "2020" , "COMPLETED" )
485546
486547 assert node .query (
487548 f"""
@@ -499,7 +560,7 @@ def test_export_partition_file_already_exists_policy(cluster):
499560 )
500561
501562 # wait for the export to finish
502- time . sleep ( 3 )
563+ wait_for_export_status ( node , mt_table , s3_table , "2020" , "COMPLETED" )
503564
504565 # check system.replicated_partition_exports for the export
505566 # ideally we would make sure the transaction id is different, but I do not have the time to do that now
@@ -520,7 +581,7 @@ def test_export_partition_file_already_exists_policy(cluster):
520581 )
521582
522583 # wait for the export to finish
523- time . sleep ( 3 )
584+ wait_for_export_status ( node , mt_table , s3_table , "2020" , "FAILED" )
524585
525586 # check system.replicated_partition_exports for the export
526587 assert node .query (
@@ -603,7 +664,7 @@ def test_export_partition_permissions(cluster):
603664 )
604665
605666 # Wait for export to complete
606- time . sleep ( 5 )
667+ wait_for_export_status ( node , mt_table , s3_table , "2020" , "COMPLETED" )
607668
608669 # Verify the export succeeded
609670 result = node .query (f"SELECT count() FROM { s3_table } WHERE year = 2020" )
@@ -633,7 +694,8 @@ def test_multiple_exports_within_a_single_query(cluster):
633694
634695 node .query (f"ALTER TABLE { mt_table } EXPORT PARTITION ID '2020' TO TABLE { s3_table } , EXPORT PARTITION ID '2021' TO TABLE { s3_table } ;" )
635696
636- time .sleep (5 )
697+ wait_for_export_status (node , mt_table , s3_table , "2020" , "COMPLETED" )
698+ wait_for_export_status (node , mt_table , s3_table , "2021" , "COMPLETED" )
637699
638700 # assert the exports have been executed
639701 assert node .query (f"SELECT count() FROM { s3_table } WHERE year = 2020" ) == '3\n ' , "Export did not succeed"
0 commit comments