@@ -126,6 +126,23 @@ private void waitTransactionDone(TransactionState transaction) throws Exception
126126 }
127127 }
128128
129+ /**
130+ * Simulates the real PublishVersionDaemon's periodic behavior by retrying runAfterCatalogReady()
131+ * until all waiters are satisfied. This prevents flakiness caused by transient RPC failures or
132+ * thread pool scheduling delays under CI load.
133+ */
134+ private void awaitPublish (PublishVersionDaemon daemon , VisibleStateWaiter ... waiters ) {
135+ Awaitility .await ().atMost (60 , TimeUnit .SECONDS ).pollInterval (1 , TimeUnit .SECONDS ).until (() -> {
136+ daemon .runAfterCatalogReady ();
137+ for (VisibleStateWaiter waiter : waiters ) {
138+ if (!waiter .await (500 , TimeUnit .MILLISECONDS )) {
139+ return false ;
140+ }
141+ }
142+ return true ;
143+ });
144+ }
145+
129146 @ BeforeAll
130147 public static void setUp () throws Exception {
131148 FeConstants .runningUnitTest = true ;
@@ -253,12 +270,7 @@ public void testNormal(boolean enableAggregation) throws Exception {
253270 Lists .newArrayList (), null );
254271
255272 PublishVersionDaemon publishVersionDaemon = new PublishVersionDaemon ();
256- publishVersionDaemon .runAfterCatalogReady ();
257-
258- Assertions .assertTrue (waiter1 .await (60 , TimeUnit .SECONDS ));
259- Assertions .assertTrue (waiter2 .await (60 , TimeUnit .SECONDS ));
260- Assertions .assertTrue (waiter3 .await (60 , TimeUnit .SECONDS ));
261- Assertions .assertTrue (waiter4 .await (60 , TimeUnit .SECONDS ));
273+ awaitPublish (publishVersionDaemon , waiter1 , waiter2 , waiter3 , waiter4 );
262274 }
263275
264276 // @ParameterizedTest
@@ -468,10 +480,7 @@ public void testTransformBatchToSingle(boolean enableAggregation) throws Excepti
468480 Lists .newArrayList (), null );
469481
470482 PublishVersionDaemon publishVersionDaemon = new PublishVersionDaemon ();
471- publishVersionDaemon .runAfterCatalogReady ();
472-
473- Assertions .assertTrue (waiter1 .await (60 , TimeUnit .SECONDS ));
474- Assertions .assertTrue (waiter2 .await (60 , TimeUnit .SECONDS ));
483+ awaitPublish (publishVersionDaemon , waiter1 , waiter2 );
475484
476485 // Ensure publishingLakeTransactionsBatchTableId has been cleared, otherwise the following single publish may fail.
477486 publishVersionDaemon .publishingLakeTransactionsBatchTableId .clear ();
@@ -486,8 +495,7 @@ public void testTransformBatchToSingle(boolean enableAggregation) throws Excepti
486495 VisibleStateWaiter waiter3 = globalTransactionMgr .commitTransaction (db .getId (), transactionId3 , transTablets1 ,
487496 Lists .newArrayList (), null );
488497
489- publishVersionDaemon .runAfterCatalogReady ();
490- Assertions .assertTrue (waiter3 .await (60 , TimeUnit .SECONDS ));
498+ awaitPublish (publishVersionDaemon , waiter3 );
491499
492500 Config .lake_enable_batch_publish_version = true ;
493501 }
@@ -514,8 +522,7 @@ public void testTransformSingleToBatch(boolean enableAggregation) throws Excepti
514522
515523 Config .lake_enable_batch_publish_version = false ;
516524 PublishVersionDaemon publishVersionDaemon = new PublishVersionDaemon ();
517- publishVersionDaemon .runAfterCatalogReady ();
518- Assertions .assertTrue (waiter5 .await (60 , TimeUnit .SECONDS ));
525+ awaitPublish (publishVersionDaemon , waiter5 );
519526
520527 long transactionId6 = globalTransactionMgr .
521528 beginTransaction (db .getId (), Lists .newArrayList (table .getId ()),
@@ -545,9 +552,7 @@ public void testTransformSingleToBatch(boolean enableAggregation) throws Excepti
545552 Assertions .assertFalse (waiter7 .await (5 , TimeUnit .SECONDS ));
546553
547554 publishVersionDaemon .publishingTransactionIds .clear ();
548- publishVersionDaemon .runAfterCatalogReady ();
549- Assertions .assertTrue (waiter6 .await (60 , TimeUnit .SECONDS ));
550- Assertions .assertTrue (waiter7 .await (60 , TimeUnit .SECONDS ));
555+ awaitPublish (publishVersionDaemon , waiter6 , waiter7 );
551556 }
552557
553558 @ ParameterizedTest
@@ -699,11 +704,7 @@ public void testBatchPublishShadowIndex() throws Exception {
699704 Lists .newArrayList (), null );
700705
701706 PublishVersionDaemon publishVersionDaemon = new PublishVersionDaemon ();
702- publishVersionDaemon .runAfterCatalogReady ();
703-
704- Assertions .assertTrue (waiter1 .await (1 , TimeUnit .MINUTES ));
705- Assertions .assertTrue (waiter2 .await (1 , TimeUnit .MINUTES ));
706- Assertions .assertTrue (waiter3 .await (1 , TimeUnit .MINUTES ));
707+ awaitPublish (publishVersionDaemon , waiter1 , waiter2 , waiter3 );
707708
708709 ComputeNode shadowTabletNode = GlobalStateMgr .getCurrentState ().getWarehouseMgr ()
709710 .getComputeNodeAssignedToTablet (WarehouseManager .DEFAULT_RESOURCE , shadowTablet .getId ());
0 commit comments