|
26 | 26 | import java.util.Optional; |
27 | 27 | import java.util.concurrent.CompletableFuture; |
28 | 28 | import java.util.concurrent.TimeUnit; |
| 29 | +import java.util.stream.Collectors; |
29 | 30 | import lombok.extern.slf4j.Slf4j; |
30 | 31 | import org.apache.pulsar.broker.BrokerTestUtil; |
31 | 32 | import org.apache.pulsar.broker.ServiceConfiguration; |
|
39 | 40 | import org.testng.annotations.Test; |
40 | 41 |
|
41 | 42 | @Slf4j |
42 | | -@Test(groups = "flaky") |
| 43 | +@Test(groups = "broker") |
43 | 44 | public class ZkSessionExpireTest extends NetworkErrorTestBase { |
44 | 45 |
|
45 | 46 | private java.util.function.Consumer<ServiceConfiguration> settings; |
@@ -94,7 +95,7 @@ public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic, Class |
94 | 95 | admin2.namespaces().unload(defaultNamespace); |
95 | 96 |
|
96 | 97 | // Confirm all brokers registered. |
97 | | - Awaitility.await().untilAsserted(() -> { |
| 98 | + Awaitility.await().atMost(Duration.ofSeconds(20)).untilAsserted(() -> { |
98 | 99 | assertEquals(getAvailableBrokers(pulsar1).size(), 2); |
99 | 100 | assertEquals(getAvailableBrokers(pulsar2).size(), 2); |
100 | 101 | }); |
@@ -160,7 +161,21 @@ public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic, Class |
160 | 161 | // Verify: the topic on broker-2 is fine. |
161 | 162 | Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { |
162 | 163 | CompletableFuture<Optional<Topic>> future = pulsar1.getBrokerService().getTopic(topicName, false); |
163 | | - assertTrue(future == null || future.isCompletedExceptionally()); |
| 164 | + log.info("broker 1 topics {}", pulsar1.getBrokerService().getTopics().keySet()); |
| 165 | + log.info("broker 2 topics {}", pulsar2.getBrokerService().getTopics().keySet()); |
| 166 | + log.info("broker 1 bundles {}", pulsar1.getNamespaceService().getOwnershipCache().getOwnedBundles() |
| 167 | + .keySet().stream().map(k -> k.getNamespaceObject().toString() + "/" + k.getBundleRange()) |
| 168 | + .filter(s -> s.contains(defaultNamespace)).collect(Collectors.toList())); |
| 169 | + log.info("broker 2 bundles {}", pulsar2.getNamespaceService().getOwnershipCache().getOwnedBundles() |
| 170 | + .keySet().stream().map(k -> k.getNamespaceObject().toString() + "/" + k.getBundleRange()) |
| 171 | + .filter(s -> s.contains(defaultNamespace)).collect(Collectors.toList())); |
| 172 | + log.info("future: {}, isDone: {}, isCompletedExceptionally: {}", |
| 173 | + future, future == null ? "null" : future.isDone(), |
| 174 | + future, future == null ? "null" : future.isCompletedExceptionally()); |
| 175 | + assertTrue(future == null |
| 176 | + || !pulsar1.getBrokerService().getTopics().containsKey(topicName) |
| 177 | + || (future.isDone() && !future.isCompletedExceptionally() && future.get().isEmpty()) |
| 178 | + || future.isCompletedExceptionally()); |
164 | 179 | }); |
165 | 180 | Topic broker2Topic3 = pulsar2.getBrokerService().getTopic(topicName, false).join().get(); |
166 | 181 | assertNotNull(broker2Topic3); |
|
0 commit comments