diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index f996d328090ca..c7a36ad1b21d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; @@ -57,6 +58,8 @@ public abstract class AbstractReplicator implements Replicator { protected final String remoteCluster; protected final PulsarClientImpl replicationClient; protected final PulsarClientImpl client; + protected final PulsarAdmin replicationAdmin; + protected final PulsarAdmin admin; protected String replicatorId; @Getter protected final Topic localTopic; @@ -107,7 +110,8 @@ public enum State { } public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, - String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient) + String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient, + PulsarAdmin replicationAdmin) throws PulsarServerException { this.brokerService = brokerService; this.localTopic = localTopic; @@ -117,7 +121,9 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl this.remoteTopicName = remoteTopicName; this.remoteCluster = StringInterner.intern(remoteCluster); this.replicationClient = replicationClient; + this.replicationAdmin = replicationAdmin; this.client = (PulsarClientImpl) brokerService.pulsar().getClient(); + this.admin = brokerService.pulsar().getAdminClient(); this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); this.replicatorId = String.format("%s | %s", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 38e1894c17854..38320e5be70c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -30,6 +30,7 @@ import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Replicator; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; @@ -51,9 +52,10 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl(); public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster, - BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException { + BrokerService brokerService, PulsarClientImpl replicationClient, + PulsarAdmin replicationAdmin) throws PulsarServerException { super(localCluster, topic, remoteCluster, topic.getName(), topic.getReplicatorPrefix(), brokerService, - replicationClient); + replicationClient, replicationAdmin); // NonPersistentReplicator does not support limitation so far, so reset pending queue size to the default value. producerBuilder.maxPendingMessages(1000); producerBuilder.blockIfQueueFull(false); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 6021c41142a5e..96a9f97d70f50 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -71,7 +71,9 @@ import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; @@ -628,14 +630,15 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, No String localCluster) { return AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(), brokerService) .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(remoteCluster) - .thenApply(clusterData -> - brokerService.getReplicationClient(remoteCluster, clusterData))) - .thenAccept(replicationClient -> { + .getClusterAsync(remoteCluster)) + .thenAccept((clusterData) -> { + PulsarClient replicationClient = brokerService.getReplicationClient(remoteCluster, clusterData); + PulsarAdmin replicationAdmin = brokerService.getClusterPulsarAdmin(remoteCluster, clusterData); replicators.computeIfAbsent(remoteCluster, r -> { try { return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, - remoteCluster, brokerService, (PulsarClientImpl) replicationClient); + remoteCluster, brokerService, (PulsarClientImpl) replicationClient, + replicationAdmin); } catch (PulsarServerException e) { log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index 46f8a27d58020..922a0e42c3ddb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -27,10 +27,12 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.util.FutureUtil; @@ -40,9 +42,10 @@ public class GeoPersistentReplicator extends PersistentReplicator { public GeoPersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, BrokerService brokerService, - PulsarClientImpl replicationClient) + PulsarClientImpl replicationClient, PulsarAdmin replicationAdmin) throws PulsarServerException { - super(localCluster, topic, cursor, remoteCluster, topic.getName(), brokerService, replicationClient); + super(localCluster, topic, cursor, remoteCluster, topic.getName(), brokerService, replicationClient, + replicationAdmin); } /** @@ -55,29 +58,85 @@ protected String getProducerName() { @Override protected CompletableFuture prepareCreateProducer() { - if (brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) { - return CompletableFuture.completedFuture(null); - } else { - CompletableFuture topicCheckFuture = new CompletableFuture<>(); - replicationClient.getPartitionedTopicMetadata(localTopic.getName(), false, false) - .whenComplete((metadata, ex) -> { - if (ex == null) { - if (metadata.partitions == 0) { - topicCheckFuture.complete(null); - } else { - String errorMsg = String.format("%s Can not create the replicator due to the partitions in the" - + " remote cluster is not 0, but is %s", - replicatorId, metadata.partitions); - log.error(errorMsg); - topicCheckFuture.completeExceptionally( - new PulsarClientException.NotAllowedException(errorMsg)); + return createRemoteTopicIfDoesNotExist(TopicName.get(localTopicName).getPartitionedTopicName()); + } + + private CompletableFuture getLocalPartitionMetadata(String topic) { + return admin.topics().getPartitionedTopicMetadataAsync(topic) + .thenApply(metadata -> metadata.partitions) + .exceptionallyCompose(t -> { + Throwable actEx = FutureUtil.unwrapCompletionException(t); + if (actEx instanceof PulsarAdminException.NotFoundException) { + // Legacy edge case: Local topic is non-partitioned but name ends with "-partition-{num}". + // This should never happen in practice because PIP-414 disables this naming pattern. + return createRemoteTopicIfDoesNotExist(localTopicName) + .thenApply(__ -> -1); // Special marker } - } else { - topicCheckFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); + return CompletableFuture.failedFuture(actEx); + }); + } + + private CompletableFuture getRemotePartitionMetadata(String topic) { + return replicationAdmin.topics().getPartitionedTopicMetadataAsync(topic) + .thenApply(metadata -> metadata.partitions) + .exceptionallyCompose(t -> { + Throwable actEx = FutureUtil.unwrapCompletionException(t); + if (actEx instanceof PulsarAdminException.NotFoundException) { + return CompletableFuture.completedFuture(-1); // Topic doesn't exist + } + return CompletableFuture.failedFuture(actEx); + }); + } + + private CompletableFuture handlePartitionComparison(String topic, int localPartitions, int remotePartitions) { + // Skip if already handled by recursion + if (localPartitions == -1) { + return CompletableFuture.completedFuture(null); + } + + // Remote topic doesn't exist - create it + if (remotePartitions == -1) { + if (!brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) { + String errorMsg = String.format("[%s] Can not start replicator because there is no topic on" + + " the remote cluster. Please create a %s on the remote cluster", + replicatorId, localPartitions == 0 ? "non-partitioned topic" + : "partitioned topic with " + localPartitions + " partitions"); + log.error(errorMsg); + return CompletableFuture.failedFuture(new PulsarServerException(errorMsg)); + } + + CompletableFuture createFuture = localPartitions == 0 + ? replicationAdmin.topics().createNonPartitionedTopicAsync(topic) + : replicationAdmin.topics().createPartitionedTopicAsync(topic, localPartitions); + + return createFuture.whenComplete((__, t) -> { + if (t != null) { + log.error("[{}] Failed to create topic on remote cluster. Local has {} partitions", + replicatorId, localPartitions, t); } }); - return topicCheckFuture; } + + // Both exist - verify compatibility + if (localPartitions == remotePartitions) { + return CompletableFuture.completedFuture(null); + } + + // Incompatible partitions + String errorMsg = String.format("[%s] Can not start replicator because the partitions between" + + " local and remote cluster are different. local: %s, remote: %s", + replicatorId, localPartitions, remotePartitions); + log.error(errorMsg); + return CompletableFuture.failedFuture(new PulsarServerException(errorMsg)); + } + + private CompletableFuture createRemoteTopicIfDoesNotExist(String partitionedTopic) { + return getLocalPartitionMetadata(partitionedTopic) + .thenCompose(localPartitions -> + getRemotePartitionMetadata(partitionedTopic).thenCompose(remotePartitions -> + handlePartitionComparison(partitionedTopic, localPartitions, remotePartitions) + ) + ); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index e0a31476fc9f7..c1d73cd389183 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -58,6 +58,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.broker.service.Replicator; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; @@ -119,11 +120,12 @@ protected enum ReasonOfWaitForCursorRewinding { protected final LinkedList inFlightTasks = new LinkedList<>(); public PersistentReplicator(String localCluster, PersistentTopic localTopic, ManagedCursor cursor, - String remoteCluster, String remoteTopic, - BrokerService brokerService, PulsarClientImpl replicationClient) + String remoteCluster, String remoteTopic, + BrokerService brokerService, PulsarClientImpl replicationClient, + PulsarAdmin replicationAdmin) throws PulsarServerException { super(localCluster, localTopic, remoteCluster, remoteTopic, localTopic.getReplicatorPrefix(), - brokerService, replicationClient); + brokerService, replicationClient, replicationAdmin); this.topic = localTopic; this.localSchemaTopicName = TopicName.getPartitionedTopicName(localTopicName).toString(); this.cursor = Objects.requireNonNull(cursor); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 451471e215ea5..5a1e0e940a802 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -148,6 +148,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -2343,11 +2344,11 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma String localCluster) { return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService) .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(remoteCluster) - .thenApply(clusterData -> - brokerService.getReplicationClient(remoteCluster, clusterData))) - .thenAccept(replicationClient -> { - if (replicationClient == null) { + .getClusterAsync(remoteCluster)) + .thenAccept((clusterData) -> { + PulsarClient replicationClient = brokerService.getReplicationClient(remoteCluster, clusterData); + PulsarAdmin replicationAdmin = brokerService.getClusterPulsarAdmin(remoteCluster, clusterData); + if (replicationClient == null || replicationAdmin == null) { log.error("[{}] Can not create replicator because the remote client can not be created." + " remote cluster: {}. State of transferring : {}", topic, remoteCluster, transferring); @@ -2365,7 +2366,8 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { try { return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, - remoteCluster, brokerService, (PulsarClientImpl) replicationClient); + remoteCluster, brokerService, (PulsarClientImpl) replicationClient, + replicationAdmin); } catch (PulsarServerException e) { log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); } @@ -2431,9 +2433,10 @@ protected CompletableFuture addShadowReplicationCluster(String shadowTopic String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService) .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(localCluster) - .thenApply(clusterData -> brokerService.getReplicationClient(localCluster, clusterData))) - .thenAccept(replicationClient -> { + .getClusterAsync(localCluster)) + .thenAccept((clusterData) -> { + PulsarClient replicationClient = brokerService.getReplicationClient(localCluster, clusterData); + PulsarAdmin replicationAdmin = brokerService.getClusterPulsarAdmin(localCluster, clusterData); Replicator replicator = shadowReplicators.computeIfAbsent(shadowTopic, r -> { try { TopicName sourceTopicName = TopicName.get(getName()); @@ -2442,7 +2445,7 @@ protected CompletableFuture addShadowReplicationCluster(String shadowTopic shadowPartitionTopic += "-partition-" + sourceTopicName.getPartitionIndex(); } return new ShadowReplicator(shadowPartitionTopic, PersistentTopic.this, cursor, - brokerService, (PulsarClientImpl) replicationClient); + brokerService, (PulsarClientImpl) replicationClient, replicationAdmin); } catch (PulsarServerException e) { log.error("[{}] ShadowReplicator startup failed {}", topic, shadowTopic, e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java index a334fd86dd02f..2e5e91fb9a633 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -39,11 +40,12 @@ public class ShadowReplicator extends PersistentReplicator { public ShadowReplicator(String shadowTopic, PersistentTopic sourceTopic, ManagedCursor cursor, - BrokerService brokerService, PulsarClientImpl replicationClient) + BrokerService brokerService, PulsarClientImpl replicationClient, + PulsarAdmin replicationAdmin) throws PulsarServerException { super(brokerService.pulsar().getConfiguration().getClusterName(), sourceTopic, cursor, brokerService.pulsar().getConfiguration().getClusterName(), shadowTopic, brokerService, - replicationClient); + replicationClient, replicationAdmin); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 39252ac0a94e6..5f1d3a8a6c50b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import io.netty.channel.DefaultEventLoop; @@ -39,12 +40,15 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -94,9 +98,19 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception { when(producerBuilder.create()).thenThrow(new RuntimeException("mocked ex")); when(producerBuilder.createAsync()) .thenReturn(CompletableFuture.failedFuture(new RuntimeException("mocked ex"))); + + @Cleanup + PulsarAdmin admin = mock(PulsarAdmin.class); + Topics adminTopics = mock(Topics.class); + doReturn(adminTopics).when(admin).topics(); + doReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))).when(adminTopics) + .getPartitionedTopicMetadataAsync(anyString()); + doReturn(CompletableFuture.completedFuture(null)).when(adminTopics) + .createNonPartitionedTopicAsync(anyString()); + // Make race condition: "retry start producer" and "close replicator". final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName, - replicatorPrefix, broker, remoteClient); + replicatorPrefix, broker, remoteClient, admin); replicator.startProducer(); replicator.terminate(); @@ -122,9 +136,10 @@ private static class ReplicatorInTest extends AbstractReplicator { public ReplicatorInTest(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, String replicatorPrefix, BrokerService brokerService, - PulsarClientImpl replicationClient) throws PulsarServerException { + PulsarClientImpl replicationClient, PulsarAdmin replicationAdmin) + throws PulsarServerException { super(localCluster, localTopic, remoteCluster, remoteTopicName, replicatorPrefix, brokerService, - replicationClient); + replicationClient, replicationAdmin); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 8a80f57f5b29f..8971e42f1d11a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -63,6 +64,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.AllArgsConstructor; +import lombok.Cleanup; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -90,6 +92,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ClientCnx; @@ -108,6 +111,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -1889,4 +1893,193 @@ public void testReplicatorsInflightTaskListIsEmptyAfterReplicationFinished() thr // Verify: all inflight tasks are done. ensureNoBacklogByInflightTask(getReplicator(topicName)); } + + @DataProvider + public Object[] isPartitioned() { + return new Object[]{ + true, + false + }; + } + + @Test(dataProvider = "isPartitioned") + public void testReplicatorCreateTopic(boolean isPartitioned) throws Exception { + String ns = defaultTenant + "/" + UUID.randomUUID().toString().replace("-", ""); + admin1.namespaces().createNamespace(ns); + if (!usingGlobalZK){ + admin2.namespaces().createNamespace(ns); + } + + int numPartitions = 4; + List partitions = new ArrayList<>(); + final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_"); + if (isPartitioned) { + admin1.topics().createPartitionedTopic(tp, numPartitions); + for (int i = 0; i < numPartitions; i++) { + partitions.add(TopicName.getTopicPartitionNameString(tp, i)); + } + } else { + admin1.topics().createNonPartitionedTopic(tp); + } + + admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2))); + + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = + (PersistentTopic) broker1.getTopic( + isPartitioned ? TopicName.get(tp).getPartition(0).toString() : tp, false).join() + .get(); + assertFalse(persistentTopic.getReplicators().isEmpty()); + }); + + @Cleanup + Producer p1 = client1.newProducer(Schema.STRING).topic(tp).create(); + p1.send("msg-1"); + + Awaitility.await().untilAsserted(() -> { + List partitionedTopicList = admin2.topics().getPartitionedTopicList(ns); + if (isPartitioned) { + assertThat(partitionedTopicList).contains(tp); + assertThat(admin2.topics().getList(ns)).containsAll(partitions); + } else { + assertThat(partitionedTopicList).doesNotContain(tp); + assertThat(admin2.topics().getList(ns)).contains(tp); + } + }); + } + + @Test + public void testReplicatorCreateTopicWhenTopicExistsWithDifferentTypeAcrossClusters() throws Exception { + if (usingGlobalZK) { + // This test case is not applicable when using global ZK, because the namespace policies + // are shared among clusters. + return; + } + + String ns = defaultTenant + "/" + UUID.randomUUID().toString().replace("-", ""); + admin1.namespaces().createNamespace(ns); + admin2.namespaces().createNamespace(ns); + + final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_"); + admin1.topics().createPartitionedTopic(tp, 4); + admin2.topics().createNonPartitionedTopic(tp); + + admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2))); + admin2.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2))); + + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = + (PersistentTopic) broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join() + .get(); + assertFalse(persistentTopic.getReplicators().isEmpty()); + }); + + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = (PersistentTopic) broker2.getTopic(tp, false).join().get(); + assertFalse(persistentTopic.getReplicators().isEmpty()); + }); + + @Cleanup + Producer p1 = client1.newProducer(Schema.STRING).topic(tp).create(); + p1.send("msg-p1-1"); + @Cleanup + Producer p2 = client2.newProducer(Schema.STRING).topic(tp).create(); + p2.send("msg-p2-1"); + + // The topic exists, but its type differs between the local and remote clusters. The replicator should not + // recreate the topic. + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = + (PersistentTopic) broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join() + .get(); + persistentTopic.getReplicators().forEach((key, value) -> { + assertFalse(value.isConnected()); + }); + }); + assertThat(admin2.topics().getPartitionedTopicList(ns)).doesNotContain(tp); + + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = (PersistentTopic) broker2.getTopic(tp, false).join().get(); + persistentTopic.getReplicators().forEach((key, value) -> { + assertFalse(value.isConnected()); + }); + }); + assertThat(admin1.topics().getList(ns)).doesNotContain(tp); + } + + @Test + public void testReplicatorWhenPartitionCountsDiffer() throws Exception { + if (usingGlobalZK) { + // This test case is not applicable when using global ZK, because the namespace policies + // are shared among clusters. + return; + } + + String ns = defaultTenant + "/" + UUID.randomUUID().toString().replace("-", ""); + + admin1.namespaces().createNamespace(ns); + admin1.namespaces().setAutoTopicCreation(ns, AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType(TopicType.PARTITIONED.toString()) + .defaultNumPartitions(12) + .build()); + + admin2.namespaces().createNamespace(ns); + admin2.namespaces().setAutoTopicCreation(ns, AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType(TopicType.NON_PARTITIONED.toString()) + .build()); + + final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_"); + admin1.topics().createPartitionedTopic(tp, 4); + admin2.topics().createPartitionedTopic(tp, 8); + + admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2))); + admin2.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2))); + + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = + (PersistentTopic) broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join() + .get(); + assertFalse(persistentTopic.getReplicators().isEmpty()); + }); + + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = + (PersistentTopic) broker2.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join() + .get(); + assertFalse(persistentTopic.getReplicators().isEmpty()); + }); + + // Trigger the replicator. + @Cleanup + Producer p1 = client1.newProducer(Schema.STRING).topic(tp).create(); + p1.send("msg-p1-1"); + @Cleanup + Producer p2 = client2.newProducer(Schema.STRING).topic(tp).create(); + p2.send("msg-p2-1"); + + // Topic partition counts differ between the local and remote clusters. + // The replicator should not replicate the messages. + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = + (PersistentTopic) broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join() + .get(); + persistentTopic.getReplicators().forEach((key, value) -> { + assertFalse(value.isConnected()); + }); + }); + + @Cleanup + Consumer c2 = client2.newConsumer(Schema.STRING).topic(tp).subscriptionName("test-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + + while (true) { + Message receive = c2.receive(3, TimeUnit.SECONDS); + if (receive == null) { + break; + } + assertEquals(receive.getValue(), "msg-p2-1"); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 3b64b2ecc2cd0..c83f3750ff03f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -111,11 +111,14 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription; import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.ConnectionPool; +import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -130,6 +133,8 @@ import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; @@ -233,6 +238,13 @@ public void setup() throws Exception { doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(nsSvc) .checkTopicExistsAsync(any()); + PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + LookupService lookupService = mock(LookupService.class); + doReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))).when(lookupService) + .getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean()); + doReturn(lookupService).when(pulsarClient).getLookup(); + doReturn(pulsarClient).when(pulsarTestContext.getPulsarService()).getClient(); + setupMLAsyncCallbackMocks(); } @@ -1653,6 +1665,16 @@ public void testFailoverSubscription() throws Exception { assertNull(topic2.getSubscription(successSubName)); } + private PulsarAdmin mockReplicationAdmin() { + PulsarAdmin admin = mock(PulsarAdmin.class); + Topics topics = mock(Topics.class); + doReturn(topics).when(admin).topics(); + doReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))).when(topics) + .getPartitionedTopicMetadataAsync(anyString()); + doReturn(CompletableFuture.completedFuture(null)).when(topics).createNonPartitionedTopicAsync(anyString()); + return admin; + } + /** * NonPersistentReplicator.removeReplicator doesn't remove replicator in atomic way and does in multiple step: * 1. disconnect replicator producer @@ -1699,11 +1721,18 @@ public CompletableFuture createAsync() { return producerBuilder; }); brokerService.getReplicationClients().put(remoteCluster, pulsarClientMock); + + @Cleanup + PulsarAdmin admin = mockReplicationAdmin(); + PulsarService pulsar = brokerService.getPulsar(); + doReturn(admin).when(pulsar).getAdminClient(); + brokerService.getClusterAdmins().put(remoteCluster, admin); + Optional clusterData = brokerService.pulsar().getPulsarResources().getClusterResources() + .getCluster(remoteCluster); PersistentReplicator replicator = spy( new GeoPersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService, - (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster, - brokerService.pulsar().getPulsarResources().getClusterResources() - .getCluster(remoteCluster)))); + (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster, clusterData), + brokerService.getClusterPulsarAdmin(remoteCluster, clusterData))); replicatorMap.put(remoteReplicatorName, replicator); // step-1 remove replicator : it will disconnect the producer but it will wait for callback to be completed @@ -1751,10 +1780,16 @@ public void testClosingReplicationProducerTwice() throws Exception { ManagedCursor cursor = mock(ManagedCursorImpl.class); doReturn(remoteCluster).when(cursor).getName(); brokerService.getReplicationClients().put(remoteCluster, client); + + @Cleanup + PulsarAdmin admin = mockReplicationAdmin(); + doReturn(admin).when(pulsar).getAdminClient(); + brokerService.getClusterAdmins().put(remoteCluster, admin); + Optional clusterData = brokerService.pulsar().getPulsarResources().getClusterResources() + .getCluster(remoteCluster); PersistentReplicator replicator = new GeoPersistentReplicator(topic, cursor, localCluster, remoteCluster, - brokerService, (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster, - brokerService.pulsar().getPulsarResources().getClusterResources() - .getCluster(remoteCluster))); + brokerService, (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster, clusterData), + brokerService.getClusterPulsarAdmin(remoteCluster, clusterData)); // PersistentReplicator constructor calls startProducer() verify(clientImpl)