Skip to content

Commit 53b1bd5

Browse files
committed
Implement PIP-452
1 parent fcbafaa commit 53b1bd5

File tree

26 files changed

+726
-124
lines changed

26 files changed

+726
-124
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1257,6 +1257,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
12571257
)
12581258
private boolean enableBrokerSideSubscriptionPatternEvaluation = true;
12591259

1260+
@FieldContext(
1261+
dynamic = false,
1262+
category = CATEGORY_POLICIES,
1263+
doc = "Enables watching topic add/remove events on broker side for "
1264+
+ "subscription pattern evaluation."
1265+
)
1266+
private boolean enableBrokerTopicListWatcher = true;
1267+
12601268
@FieldContext(
12611269
dynamic = false,
12621270
category = CATEGORY_POLICIES,
@@ -1599,7 +1607,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
15991607
category = CATEGORY_SERVER,
16001608
doc = "List of broker interceptor to load, which is a list of broker interceptor names"
16011609
)
1602-
private Set<String> brokerInterceptors = new TreeSet<>();
1610+
private Set<String> brokerInterceptors = new LinkedHashSet<>();
16031611

16041612
@FieldContext(
16051613
category = CATEGORY_SERVER,

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@
7575
import org.apache.pulsar.broker.admin.AdminResource;
7676
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
7777
import org.apache.pulsar.broker.authorization.AuthorizationService;
78+
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
79+
import org.apache.pulsar.broker.namespace.TopicListingResult;
7880
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
7981
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
8082
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -102,6 +104,7 @@
102104
import org.apache.pulsar.client.impl.MessageImpl;
103105
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
104106
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
107+
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
105108
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
106109
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
107110
import org.apache.pulsar.common.api.proto.EncryptionKeys;
@@ -151,6 +154,7 @@
151154
import org.apache.pulsar.metadata.api.MetadataStoreException;
152155
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
153156
import org.jspecify.annotations.NonNull;
157+
import org.jspecify.annotations.Nullable;
154158
import org.slf4j.Logger;
155159
import org.slf4j.LoggerFactory;
156160

@@ -163,15 +167,29 @@ public class PersistentTopicsBase extends AdminResource {
163167
private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
164168
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);
165169

166-
protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle) {
170+
protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle,
171+
@Nullable Map<String, String> properties) {
167172
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
168173
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
169174
.thenAccept(exists -> {
170175
if (!exists) {
171176
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
172177
}
173178
})
174-
.thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
179+
.thenCompose(__ -> {
180+
BrokerInterceptor brokerInterceptor = pulsar().getBrokerInterceptor();
181+
CompletableFuture<Optional<TopicListingResult>> interceptorFuture =
182+
brokerInterceptor == null ? CompletableFuture.completedFuture(null) :
183+
brokerInterceptor.interceptGetTopicsOfNamespace(namespaceName,
184+
CommandGetTopicsOfNamespace.Mode.PERSISTENT, Optional.empty(), properties);
185+
return interceptorFuture.thenCompose(topicListingResult -> {
186+
if (topicListingResult != null && topicListingResult.isPresent()) {
187+
return CompletableFuture.completedFuture(topicListingResult.get().topics());
188+
} else {
189+
return topicResources().listPersistentTopicsAsync(namespaceName);
190+
}
191+
});
192+
})
175193
.thenApply(topics ->
176194
topics.stream()
177195
.filter(topic -> {
@@ -4413,7 +4431,7 @@ private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) {
44134431
"Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
44144432
}
44154433
})
4416-
.thenCompose(__ -> internalGetListAsync(Optional.empty()))
4434+
.thenCompose(__ -> internalGetListAsync(Optional.empty(), null))
44174435
.thenApply(topics -> {
44184436
if (!topics.contains(topicName.toString())) {
44194437
throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
7676
@ApiParam(value = "Specify the bundle name", required = false)
7777
@QueryParam("bundle") String bundle) {
7878
validateNamespaceName(property, cluster, namespace);
79-
internalGetListAsync(Optional.ofNullable(bundle))
79+
internalGetListAsync(Optional.ofNullable(bundle), null)
8080
.thenAccept(asyncResponse::resume)
8181
.exceptionally(ex -> {
8282
if (!isRedirectException(ex)) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,8 @@ public void getList(
381381
@ApiParam(value = "Specify the bundle name", required = false)
382382
@QueryParam("bundle") String nsBundle,
383383
@ApiParam(value = "Include system topic")
384-
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
384+
@QueryParam("includeSystemTopic") boolean includeSystemTopic,
385+
@QueryParam("properties") String propertiesStr) {
385386
Policies policies = null;
386387
try {
387388
validateNamespaceName(tenant, namespace);

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.swagger.annotations.ApiParam;
2626
import io.swagger.annotations.ApiResponse;
2727
import io.swagger.annotations.ApiResponses;
28+
import java.util.Collections;
29+
import java.util.HashMap;
2830
import java.util.List;
2931
import java.util.Map;
3032
import java.util.Optional;
@@ -121,9 +123,11 @@ public void getList(
121123
@ApiParam(value = "Specify the bundle name", required = false)
122124
@QueryParam("bundle") String bundle,
123125
@ApiParam(value = "Include system topic")
124-
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
126+
@QueryParam("includeSystemTopic") boolean includeSystemTopic,
127+
@ApiParam(value = "properties for customized topic listing intercept, format: k1=v1,k2=v2")
128+
@QueryParam("properties") String propertiesStr) {
125129
validateNamespaceName(tenant, namespace);
126-
internalGetListAsync(Optional.ofNullable(bundle))
130+
internalGetListAsync(Optional.ofNullable(bundle), parseProperties(propertiesStr))
127131
.thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
128132
.exceptionally(ex -> {
129133
if (isNot307And404Exception(ex)) {
@@ -5131,5 +5135,28 @@ public void getMessageIDByIndex(@Suspended final AsyncResponse asyncResponse,
51315135
});
51325136
}
51335137

5138+
private Map<String, String> parseProperties(String propertiesStr) {
5139+
if (propertiesStr == null || propertiesStr.trim().isEmpty()) {
5140+
return Collections.emptyMap();
5141+
}
5142+
Map<String, String> map = new HashMap<>();
5143+
String[] pairs = propertiesStr.split(",");
5144+
for (String pair : pairs) {
5145+
String[] parts = pair.split("=", 2);
5146+
if (parts.length == 2) {
5147+
try {
5148+
String key = Codec.decode(parts[0].trim());
5149+
String value = Codec.decode(parts[1].trim());
5150+
if (!key.isEmpty()) {
5151+
map.put(key, value);
5152+
}
5153+
} catch (Exception e) {
5154+
log.warn("Failed to decode property: {}", pair, e);
5155+
}
5156+
}
5157+
}
5158+
return map;
5159+
}
5160+
51345161
private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
51355162
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,28 @@
2121
import io.netty.buffer.ByteBuf;
2222
import java.io.IOException;
2323
import java.util.Map;
24+
import java.util.Optional;
25+
import java.util.concurrent.CompletableFuture;
2426
import javax.servlet.FilterChain;
2527
import javax.servlet.ServletException;
2628
import javax.servlet.ServletRequest;
2729
import javax.servlet.ServletResponse;
2830
import org.apache.bookkeeper.mledger.Entry;
2931
import org.apache.pulsar.broker.PulsarService;
32+
import org.apache.pulsar.broker.namespace.TopicListingResult;
3033
import org.apache.pulsar.broker.service.Consumer;
3134
import org.apache.pulsar.broker.service.Producer;
3235
import org.apache.pulsar.broker.service.ServerCnx;
3336
import org.apache.pulsar.broker.service.Subscription;
3437
import org.apache.pulsar.broker.service.Topic;
3538
import org.apache.pulsar.common.api.proto.BaseCommand;
3639
import org.apache.pulsar.common.api.proto.CommandAck;
40+
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
3741
import org.apache.pulsar.common.api.proto.MessageMetadata;
3842
import org.apache.pulsar.common.classification.InterfaceAudience;
3943
import org.apache.pulsar.common.classification.InterfaceStability;
4044
import org.apache.pulsar.common.intercept.InterceptException;
45+
import org.apache.pulsar.common.naming.NamespaceName;
4146

4247
/**
4348
* A plugin interface that allows you to intercept the
@@ -224,6 +229,28 @@ default void onFilter(ServletRequest request, ServletResponse response, FilterCh
224229
chain.doFilter(request, response);
225230
}
226231

232+
/**
233+
* Intercept the GetTopicsOfNamespace request.
234+
* <p>
235+
* This method allows plugins to override the default topic discovery logic (ZooKeeper scan).
236+
* It enables fetching topics from external sources (e.g., databases, other metadata stores)
237+
* based on the provided client context properties.
238+
*
239+
* @param namespace The namespace being queried.
240+
* @param mode The query mode (PERSISTENT, NON_PERSISTENT, or ALL).
241+
* @param topicsPattern Optional regex pattern provided by the client.
242+
* @param properties Context properties provided by the client.
243+
* @return A CompletableFuture containing the result:
244+
* If the future completes with {@code Optional.empty()}, proceed to the next interceptor or Broker's default logic.
245+
*/
246+
default CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace(
247+
NamespaceName namespace,
248+
CommandGetTopicsOfNamespace.Mode mode,
249+
Optional<String> topicsPattern,
250+
Map<String, String> properties) {
251+
return CompletableFuture.completedFuture(Optional.empty());
252+
}
253+
227254
/**
228255
* Initialize the broker interceptor.
229256
*

pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.netty.buffer.ByteBuf;
2323
import java.io.IOException;
2424
import java.util.Map;
25+
import java.util.Optional;
26+
import java.util.concurrent.CompletableFuture;
2527
import javax.servlet.FilterChain;
2628
import javax.servlet.ServletException;
2729
import javax.servlet.ServletRequest;
@@ -31,15 +33,18 @@
3133
import lombok.extern.slf4j.Slf4j;
3234
import org.apache.bookkeeper.mledger.Entry;
3335
import org.apache.pulsar.broker.PulsarService;
36+
import org.apache.pulsar.broker.namespace.TopicListingResult;
3437
import org.apache.pulsar.broker.service.Consumer;
3538
import org.apache.pulsar.broker.service.Producer;
3639
import org.apache.pulsar.broker.service.ServerCnx;
3740
import org.apache.pulsar.broker.service.Subscription;
3841
import org.apache.pulsar.broker.service.Topic;
3942
import org.apache.pulsar.common.api.proto.BaseCommand;
4043
import org.apache.pulsar.common.api.proto.CommandAck;
44+
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
4145
import org.apache.pulsar.common.api.proto.MessageMetadata;
4246
import org.apache.pulsar.common.intercept.InterceptException;
47+
import org.apache.pulsar.common.naming.NamespaceName;
4348
import org.apache.pulsar.common.nar.NarClassLoader;
4449

4550
/**
@@ -285,6 +290,21 @@ public void onFilter(ServletRequest request, ServletResponse response, FilterCha
285290
}
286291
}
287292

293+
@Override
294+
public CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace(
295+
NamespaceName namespace,
296+
CommandGetTopicsOfNamespace.Mode mode,
297+
Optional<String> topicsPattern,
298+
Map<String, String> properties) {
299+
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
300+
try {
301+
Thread.currentThread().setContextClassLoader(narClassLoader);
302+
return this.interceptor.interceptGetTopicsOfNamespace(namespace, mode, topicsPattern, properties);
303+
} finally {
304+
Thread.currentThread().setContextClassLoader(previousContext);
305+
}
306+
}
307+
288308
@Override
289309
public void close() {
290310
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();

0 commit comments

Comments
 (0)