Skip to content

Commit b37569b

Browse files
Demogorgon314mukesh-ctds
authored andcommitted
[improve][broker] Fix non-persistent system topic schema compatibility (apache#23286)
When upgrading broker version from `3.0.x` to `3.3.x` with `ExtensibleLoadManagerImpl` enabled, it will have an `Unable to read schema` exception. And the broker will fail to start. This issue is caused by apache#22055 . Add a new class `NonPersistentSystemTopic`, and it will use for system non-persistent topic. (cherry picked from commit 7dbd8a5) (cherry picked from commit 1c86d25)
1 parent 59e90e6 commit b37569b

File tree

3 files changed

+80
-1
lines changed

3 files changed

+80
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
121121
import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
122122
import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
123+
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic;
123124
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
124125
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
125126
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -1326,7 +1327,11 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
13261327
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
13271328
NonPersistentTopic nonPersistentTopic;
13281329
try {
1329-
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
1330+
if (isSystemTopic(topic)) {
1331+
nonPersistentTopic = new NonPersistentSystemTopic(topic, this);
1332+
} else {
1333+
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
1334+
}
13301335
nonPersistentTopic.setCreateFuture(topicFuture);
13311336
} catch (Throwable e) {
13321337
log.warn("Failed to create topic {}", topic, e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service.nonpersistent;
20+
21+
import org.apache.pulsar.broker.service.BrokerService;
22+
23+
public class NonPersistentSystemTopic extends NonPersistentTopic {
24+
public NonPersistentSystemTopic(String topic, BrokerService brokerService) {
25+
super(topic, brokerService);
26+
}
27+
28+
@Override
29+
public boolean isSystemTopic() {
30+
return true;
31+
}
32+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import java.util.concurrent.atomic.AtomicReference;
7474
import java.util.stream.Collectors;
7575
import lombok.Cleanup;
76+
import lombok.Data;
7677
import lombok.extern.slf4j.Slf4j;
7778
import org.apache.commons.lang3.reflect.FieldUtils;
7879
import org.apache.commons.lang3.tuple.Pair;
@@ -94,13 +95,15 @@
9495
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
9596
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
9697
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
98+
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
9799
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
98100
import org.apache.pulsar.broker.lookup.LookupResult;
99101
import org.apache.pulsar.broker.namespace.LookupOptions;
100102
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
101103
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
102104
import org.apache.pulsar.broker.namespace.NamespaceService;
103105
import org.apache.pulsar.broker.service.BrokerServiceException;
106+
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic;
104107
import org.apache.pulsar.client.admin.PulsarAdminException;
105108
import org.apache.pulsar.client.api.Consumer;
106109
import org.apache.pulsar.client.api.Producer;
@@ -114,13 +117,15 @@
114117
import org.apache.pulsar.common.policies.data.BrokerAssignment;
115118
import org.apache.pulsar.common.policies.data.BundlesData;
116119
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
120+
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
117121
import org.apache.pulsar.common.stats.Metrics;
118122
import org.apache.pulsar.common.util.FutureUtil;
119123
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
120124
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
121125
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
122126
import org.awaitility.Awaitility;
123127
import org.mockito.MockedStatic;
128+
import org.testng.Assert;
124129
import org.testng.AssertJUnit;
125130
import org.testng.annotations.Test;
126131

@@ -1476,6 +1481,43 @@ public void compactionScheduleTest() {
14761481
});
14771482
}
14781483

1484+
@Test
1485+
public void testSystemNonPersistentTopicSchemaCompatibility() throws Exception {
1486+
String topicName = ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
1487+
NonPersistentSystemTopic topic = new NonPersistentSystemTopic(topicName, pulsar.getBrokerService());
1488+
Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
1489+
1490+
var brokerLoadDataStore = LoadDataStoreFactory.create(pulsar, topicName, BrokerLoadDataV1.class);
1491+
brokerLoadDataStore.init();
1492+
brokerLoadDataStore.pushAsync("key", new BrokerLoadDataV1()).get();
1493+
Awaitility.await().until(() -> {
1494+
var data = brokerLoadDataStore.get("key");
1495+
return data.isPresent();
1496+
});
1497+
brokerLoadDataStore.pushAsync("key", null).get();
1498+
brokerLoadDataStore.close();
1499+
}
1500+
1501+
@Data
1502+
private static class BrokerLoadDataV1 {
1503+
private ResourceUsage cpu;
1504+
private ResourceUsage memory;
1505+
private ResourceUsage directMemory;
1506+
private ResourceUsage bandwidthIn;
1507+
private ResourceUsage bandwidthOut;
1508+
private double msgThroughputIn;
1509+
private double msgThroughputOut;
1510+
private double msgRateIn;
1511+
private double msgRateOut;
1512+
private int bundleCount;
1513+
private int topics;
1514+
private double maxResourceUsage;
1515+
private double weightedMaxEMA;
1516+
private double msgThroughputEMA;
1517+
private long updatedAt;
1518+
private long reportedAt;
1519+
}
1520+
14791521
@Test(timeOut = 10 * 1000)
14801522
public void unloadTimeoutCheckTest()
14811523
throws Exception {

0 commit comments

Comments
 (0)