Skip to content

Commit 65effdf

Browse files
poorbarcodesrinath-ctds
authored andcommitted
[fix][broker] fix null lookup result when brokers are starting (apache#23642)
(cherry picked from commit bd3b3b8) (cherry picked from commit a6b93a6)
1 parent c691742 commit 65effdf

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,9 @@ public void start() throws PulsarServerException {
854854
this.webSocketService.setLocalCluster(clusterData);
855855
}
856856

857+
// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
858+
this.nsService.initialize();
859+
857860
// Start the leader election service
858861
startLeaderElectionService();
859862

@@ -865,9 +868,6 @@ public void start() throws PulsarServerException {
865868
// (namespace service depends on load manager)
866869
this.startLoadManagementService();
867870

868-
// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
869-
this.nsService.initialize();
870-
871871
// Start topic level policies service
872872
if (config.isTopicLevelPoliciesEnabled() && config.isSystemTopicEnabled()) {
873873
this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this);

pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
314314
}
315315

316316
LookupData lookupData = lookupResult.get().getLookupData();
317+
printWarnLogIfLookupResUnexpected(topicName, lookupData, options, pulsarService);
317318
if (lookupResult.get().isRedirect()) {
318319
boolean newAuthoritative = lookupResult.get().isAuthoritativeRedirect();
319320
lookupfuture.complete(
@@ -338,6 +339,24 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
338339
return lookupfuture;
339340
}
340341

342+
/**
343+
* Check if a internal client will get a null lookup result.
344+
*/
345+
private static void printWarnLogIfLookupResUnexpected(TopicName topic, LookupData lookupData, LookupOptions options,
346+
PulsarService pulsar) {
347+
if (!pulsar.getBrokerService().isSystemTopic(topic)) {
348+
return;
349+
}
350+
boolean tlsEnabled = pulsar.getConfig().isBrokerClientTlsEnabled();
351+
if (!tlsEnabled && StringUtils.isBlank(lookupData.getBrokerUrl())) {
352+
log.warn("[{}] Unexpected lookup result: brokerUrl is required when TLS isn't enabled. options: {},"
353+
+ " result {}", topic, options, lookupData);
354+
} else if (tlsEnabled && StringUtils.isBlank(lookupData.getBrokerUrlTls())) {
355+
log.warn("[{}] Unexpected lookup result: brokerUrlTls is required when TLS is enabled. options: {},"
356+
+ " result {}", topic, options, lookupData);
357+
}
358+
}
359+
341360
private static void handleLookupError(CompletableFuture<ByteBuf> lookupFuture, String topicName, String clientAppId,
342361
long requestId, Throwable ex){
343362
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public class OwnershipCache {
7373
/**
7474
* The NamespaceEphemeralData objects that can be associated with the current owner, when the broker is disabled.
7575
*/
76-
private final NamespaceEphemeralData selfOwnerInfoDisabled;
76+
private NamespaceEphemeralData selfOwnerInfoDisabled;
7777

7878
private final LockManager<NamespaceEphemeralData> lockManager;
7979

@@ -121,6 +121,9 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
121121
this.pulsar = pulsar;
122122
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
123123
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
124+
// At this moment, the variables "webServiceAddress" and "webServiceAddressTls" and so on have not been
125+
// initialized, so we will get an empty "selfOwnerInfo" and an empty "selfOwnerInfoDisabled" here.
126+
// But do not worry, these two fields will be set by the method "refreshSelfOwnerInfo" soon.
124127
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
125128
pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
126129
false, pulsar.getAdvertisedListeners());
@@ -353,6 +356,9 @@ public synchronized boolean refreshSelfOwnerInfo() {
353356
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
354357
pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
355358
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
359+
this.selfOwnerInfoDisabled = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
360+
pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
361+
pulsar.getWebServiceAddressTls(), true, pulsar.getAdvertisedListeners());
356362
return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null;
357363
}
358364
}

0 commit comments

Comments
 (0)