@@ -2082,106 +2082,4 @@ public void testReplicatorWhenPartitionCountsDiffer() throws Exception {
20822082 assertEquals (receive .getValue (), "msg-p2-1" );
20832083 }
20842084 }
2085-
2086- // https://github.com/apache/pulsar/pull/24118 has denied a client to access a non-partitioned topic that end with
2087- // "-partition-{num}", but it is not cherry-picked to previous branches.
2088- // To guarantee compatibility with previous releases, the following tests should work for the releases that is
2089- // earlier than "4.1.0".
2090- // When cherry-picking the PR into previous branches, the following tests should be enabled.
2091- // I will push a seperate PR to remove these tests.
2092- // @Test
2093- // public void testReplicatorWithSpecialNonPartitionedTopic() throws Exception {
2094- // if (usingGlobalZK) {
2095- // // This test case is not applicable when using global ZK, because the namespace policies
2096- // // are shared among clusters.
2097- // return;
2098- // }
2099- //
2100- // String tp = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp") + "-partition-4";
2101- // String mlName = TopicName.get(tp).getPersistenceNamingEncoding();
2102- // pulsar1.getDefaultManagedLedgerFactory().open(mlName);
2103- // pulsar2.getDefaultManagedLedgerFactory().open(mlName);
2104- // Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create();
2105- // Consumer<String> c2 = client2.newConsumer(Schema.STRING).topic(tp).subscriptionName("test-sub")
2106- // .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
2107- //
2108- // waitReplicatorStarted(tp);
2109- // p1.send("abc");
2110- // Message<String> msg = c2.receive(2, TimeUnit.SECONDS);
2111- // assertNotNull(msg);
2112- // assertEquals(msg.getValue(), "abc");
2113- //
2114- // // cleanup
2115- // p1.close();
2116- // c2.close();
2117- // }
2118- //
2119- // @Test
2120- // public void testFailureReplicatorWithSpecialNonPartitionedTopic() throws Exception {
2121- // if (usingGlobalZK) {
2122- // // This test case is not applicable when using global ZK, because the namespace policies
2123- // // are shared among clusters.
2124- // return;
2125- // }
2126- //
2127- // String tp = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp") + "-partition-0";
2128- // String mlName = TopicName.get(tp).getPersistenceNamingEncoding();
2129- // pulsar2.getDefaultManagedLedgerFactory().open(mlName);
2130- // Thread.sleep(3000);
2131- // admin1.topics().createPartitionedTopic(TopicName.get(tp).getPartitionedTopicName(), 1);
2132- // Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create();
2133- // Producer<String> p2 = client2.newProducer(Schema.STRING).topic(tp).create();
2134- // p1.send("123");
2135- // p2.send("321");
2136- // Thread.sleep(3000);
2137- //
2138- // Awaitility.await().untilAsserted(() -> {
2139- // PersistentTopic persistentTopic = (PersistentTopic) broker1
2140- // .getTopic(TopicName.get(tp).getPartition(0).toString(), false).join().get();
2141- // assertTrue(persistentTopic.getReplicators().isEmpty()
2142- // || !persistentTopic.getReplicators().get(cluster2).isConnected());
2143- // });
2144- //
2145- // // cleanup
2146- // p1.close();
2147- // p2.close();
2148- // cleanupTopics(() -> {
2149- // admin1.topics().deletePartitionedTopic(TopicName.get(tp).getPartitionedTopicName());
2150- // admin2.topics().delete(tp);
2151- // });
2152- // }
2153- //
2154- // @Test
2155- // public void testFailureReplicatorWithSpecialNonPartitionedTopic2() throws Exception {
2156- // if (usingGlobalZK) {
2157- // // This test case is not applicable when using global ZK, because the namespace policies
2158- // // are shared among clusters.
2159- // return;
2160- // }
2161- //
2162- // String tp = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp") + "-partition-0";
2163- // String mlName = TopicName.get(tp).getPersistenceNamingEncoding();
2164- // pulsar1.getDefaultManagedLedgerFactory().open(mlName);
2165- // Thread.sleep(3000);
2166- // admin2.topics().createPartitionedTopic(TopicName.get(tp).getPartitionedTopicName(), 1);
2167- // Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create();
2168- // Producer<String> p2 = client2.newProducer(Schema.STRING).topic(tp).create();
2169- // p1.send("123");
2170- // p2.send("321");
2171- // Thread.sleep(3000);
2172- //
2173- // Awaitility.await().untilAsserted(() -> {
2174- // PersistentTopic persistentTopic = (PersistentTopic) broker1.getTopic(tp, false).join().get();
2175- // assertTrue(persistentTopic.getReplicators().isEmpty()
2176- // || !persistentTopic.getReplicators().get(cluster2).isConnected());
2177- // });
2178- //
2179- // // cleanup
2180- // p1.close();
2181- // p2.close();
2182- // cleanupTopics(() -> {
2183- // admin1.topics().delete(tp);
2184- // admin2.topics().deletePartitionedTopic(TopicName.get(tp).getPartitionedTopicName());
2185- // });
2186- // }
21872085}
0 commit comments