Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private int metadataStoreCacheExpirySeconds = 300;

private static final String DEFAULT_EXTENDED_RESOURCES_CLASS_NAME =
"org.apache.pulsar.broker.DefaultPulsarResourcesExtended";

@FieldContext(
category = CATEGORY_SERVER,
doc = "The class name of the PulsarResourcesExtended implementation. "
+ "This class must implement org.apache.pulsar.broker.PulsarResourcesExtended."
)
private String pulsarResourcesExtendedClassName = DEFAULT_EXTENDED_RESOURCES_CLASS_NAME;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Is metadata store read-only operations."
Expand Down Expand Up @@ -1263,6 +1273,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean enableBrokerSideSubscriptionPatternEvaluation = true;

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
doc = "Enables watching topic add/remove events on broker side for "
+ "subscription pattern evaluation."
)
private boolean enableBrokerTopicListWatcher = true;

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
Expand Down Expand Up @@ -1605,7 +1623,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
category = CATEGORY_SERVER,
doc = "List of broker interceptor to load, which is a list of broker interceptor names"
)
private Set<String> brokerInterceptors = new TreeSet<>();
private Set<String> brokerInterceptors = new LinkedHashSet<>();

@FieldContext(
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamespaceName;

/**
* Default implementation of {@link PulsarResourcesExtended}.
*
* <p>This implementation provides the standard topic listing functionality
* by delegating to the {@link NamespaceService}.</p>
*/
public class DefaultPulsarResourcesExtended implements PulsarResourcesExtended {

@Getter
private PulsarService pulsarService;

@Override
public CompletableFuture<List<String>> listTopicOfNamespace(NamespaceName namespaceName,
CommandGetTopicsOfNamespace.Mode mode,
Map<String, String> properties) {
return pulsarService.getNamespaceService().getListOfTopics(namespaceName, mode);
}

@Override
public void initialize(PulsarService pulsarService) {
this.pulsarService = pulsarService;
}

@Override
public void close() {
// No specific resources to close in the default implementation
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.naming.NamespaceName;
import org.jspecify.annotations.Nullable;

/**
* Extended PulsarResources that provides additional functionality beyond PulsarResources.
*
* <p>This interface is designed to be pluggable, allowing custom implementations
* to provide extended PulsarResources capabilities such as custom topic listing strategies</p>
*
* <p>Implementations of this interface can be registered with PulsarService to
* provide extended resource management capabilities.</p>
*/
@InterfaceStability.Evolving
public interface PulsarResourcesExtended {

/**
* Lists topics in a namespace with optional property-based filtering.
*
* <p>This method provides a flexible way to list topics in a namespace,
* supporting property-based filtering through the properties parameter.</p>
*
* @param namespaceName the namespace to list topics from
* @param mode the listing mode (ALL, PERSISTENT, NON_PERSISTENT)
* @param properties optional property filters for topic listing, if null or empty, no filtering is applied
* @return a CompletableFuture containing the list of topic names
*/
CompletableFuture<List<String>> listTopicOfNamespace(NamespaceName namespaceName,
CommandGetTopicsOfNamespace.Mode mode,
@Nullable Map<String, String> properties);

/**
* Initializes this extended resources instance with the PulsarService.
*
* <p>This method is called during broker startup to provide the implementation
* with access to the PulsarService and its dependencies.</p>
*
* @param pulsarService the PulsarService instance
*/
void initialize(PulsarService pulsarService);

/**
* Closes this extended resources instance and releases any resources.
*
* <p>This method should be called during broker shutdown to clean up
* any resources held by the implementation.</p>
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private boolean shouldShutdownConfigurationMetadataStore;

private PulsarResources pulsarResources;
private PulsarResourcesExtended pulsarResourcesExtended;

private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
private final ExecutorProvider transactionExecutorProvider;
Expand Down Expand Up @@ -1032,6 +1033,9 @@ public void start() throws PulsarServerException {

this.metricsGenerator = new MetricsGenerator(this);

// Initialize PulsarResourcesExtended
pulsarResourcesExtended = loadPulsarResourcesExtended();

// the broker is ready to accept incoming requests by Pulsar binary protocol and http/https
final List<Runnable> runnables;
synchronized (pendingTasksBeforeReadyForIncomingRequests) {
Expand Down Expand Up @@ -1158,6 +1162,14 @@ protected PulsarResources newPulsarResources() {
return pulsarResources;
}

protected PulsarResourcesExtended loadPulsarResourcesExtended() {
String className = config.getPulsarResourcesExtendedClassName();
PulsarResourcesExtended extendedResources = Reflections.createInstance(className,
PulsarResourcesExtended.class, Thread.currentThread().getContextClassLoader());
extendedResources.initialize(this);
return extendedResources;
}

private synchronized void createMetricsServlet() {
this.metricsServlet = new PulsarPrometheusMetricsServlet(
this, config.isExposeTopicLevelMetricsInPrometheus(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.EncryptionKeys;
Expand Down Expand Up @@ -151,6 +152,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -163,15 +165,17 @@ public class PersistentTopicsBase extends AdminResource {
private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);

protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle) {
protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle,
@Nullable Map<String, String> properties) {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
.thenAccept(exists -> {
if (!exists) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
})
.thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
.thenCompose(__ -> pulsar().getNamespaceService().getListOfTopicsByProperties(namespaceName,
CommandGetTopicsOfNamespace.Mode.PERSISTENT, properties))
.thenApply(topics ->
topics.stream()
.filter(topic -> {
Expand Down Expand Up @@ -4413,7 +4417,7 @@ private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) {
"Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
}
})
.thenCompose(__ -> internalGetListAsync(Optional.empty()))
.thenCompose(__ -> internalGetListAsync(Optional.empty(), null))
.thenApply(topics -> {
if (!topics.contains(topicName.toString())) {
throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String bundle) {
validateNamespaceName(property, cluster, namespace);
internalGetListAsync(Optional.ofNullable(bundle))
internalGetListAsync(Optional.ofNullable(bundle), null)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ public void getList(
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String nsBundle,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
@QueryParam("includeSystemTopic") boolean includeSystemTopic,
@QueryParam("properties") String propertiesStr) {
Policies policies = null;
try {
validateNamespaceName(tenant, namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -121,9 +123,11 @@ public void getList(
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String bundle,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
@QueryParam("includeSystemTopic") boolean includeSystemTopic,
@ApiParam(value = "properties for customized topic listing plugin, format: k1=v1,k2=v2")
@QueryParam("properties") String propertiesStr) {
validateNamespaceName(tenant, namespace);
internalGetListAsync(Optional.ofNullable(bundle))
internalGetListAsync(Optional.ofNullable(bundle), parseProperties(propertiesStr))
.thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
Expand Down Expand Up @@ -5131,5 +5135,28 @@ public void getMessageIDByIndex(@Suspended final AsyncResponse asyncResponse,
});
}

private Map<String, String> parseProperties(String propertiesStr) {
if (propertiesStr == null || propertiesStr.trim().isEmpty()) {
return Collections.emptyMap();
}
Map<String, String> map = new HashMap<>();
String[] pairs = propertiesStr.split(",");
for (String pair : pairs) {
String[] parts = pair.split("=", 2);
if (parts.length == 2) {
try {
String key = Codec.decode(parts[0].trim());
String value = Codec.decode(parts[1].trim());
if (!key.isEmpty()) {
map.put(key, value);
}
} catch (Exception e) {
log.warn("Failed to decode property: {}", pair, e);
}
}
}
return map;
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.TopicListingResult;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.NamespaceName;

/**
* A plugin interface that allows you to intercept the
Expand Down Expand Up @@ -224,6 +229,28 @@ default void onFilter(ServletRequest request, ServletResponse response, FilterCh
chain.doFilter(request, response);
}

/**
* Intercept the GetTopicsOfNamespace request.
* <p>
* This method allows plugins to override the default topic discovery logic (ZooKeeper scan).
* It enables fetching topics from external sources (e.g., databases, other metadata stores)
* based on the provided client context properties.
*
* @param namespace The namespace being queried.
* @param mode The query mode (PERSISTENT, NON_PERSISTENT, or ALL).
* @param topicsPattern Optional regex pattern provided by the client.
* @param properties Context properties provided by the client.
* @return A CompletableFuture containing the result:
* If the future completes with {@code Optional.empty()}, proceed to the next interceptor or Broker's default logic.
*/
default CompletableFuture<Optional<TopicListingResult>> interceptGetTopicsOfNamespace(
NamespaceName namespace,
CommandGetTopicsOfNamespace.Mode mode,
Optional<String> topicsPattern,
Map<String, String> properties) {
return CompletableFuture.completedFuture(Optional.empty());
}

/**
* Initialize the broker interceptor.
*
Expand Down
Loading
Loading