Skip to content

Commit 805c71d

Browse files
authored
[improve][broker] PIP-442: Add memory limits for topic list watcher (part 2) (#25070)
1 parent 0bf560f commit 805c71d

File tree

14 files changed

+979
-127
lines changed

14 files changed

+979
-127
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020

2121

2222
import io.netty.util.concurrent.Future;
23+
import java.util.Collection;
2324
import java.util.List;
2425
import java.util.Optional;
2526
import java.util.concurrent.CompletableFuture;
26-
import java.util.function.Consumer;
27+
import java.util.function.Function;
2728
import org.apache.bookkeeper.mledger.Entry;
2829
import org.apache.pulsar.client.api.transaction.TxnID;
2930
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
@@ -55,7 +56,8 @@ void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, l
5556

5657
CompletableFuture<Void> sendGetTopicsOfNamespaceResponse(List<String> topics, String topicsHash, boolean filtered,
5758
boolean changed, long requestId,
58-
Consumer<Throwable> permitAcquireErrorHandler);
59+
Function<Throwable, CompletableFuture<Void>>
60+
permitAcquireErrorHandler);
5961

6062
void sendGetSchemaResponse(long requestId, SchemaInfo schema, SchemaVersion version);
6163

@@ -96,8 +98,13 @@ Future<Void> sendMessagesToConsumer(long consumerId, String topicName, Subscript
9698

9799
void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError error, String message);
98100

99-
void sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, List<String> topics);
101+
CompletableFuture<Void> sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash,
102+
Collection<String> topics,
103+
Function<Throwable, CompletableFuture<Void>>
104+
permitAcquireErrorHandler);
100105

101-
void sendWatchTopicListUpdate(long watcherId,
102-
List<String> newTopics, List<String> deletedTopics, String topicsHash);
106+
CompletableFuture<Void> sendWatchTopicListUpdate(long watcherId,
107+
List<String> newTopics, List<String> deletedTopics, String topicsHash,
108+
Function<Throwable, CompletableFuture<Void>>
109+
permitAcquireErrorHandler);
103110
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424
import io.netty.channel.ChannelHandlerContext;
2525
import io.netty.channel.ChannelPromise;
2626
import java.util.ArrayList;
27+
import java.util.Collection;
2728
import java.util.List;
2829
import java.util.Optional;
2930
import java.util.concurrent.CompletableFuture;
30-
import java.util.function.Consumer;
31+
import java.util.function.Function;
3132
import lombok.extern.slf4j.Slf4j;
3233
import org.apache.bookkeeper.mledger.Entry;
3334
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
@@ -129,7 +130,9 @@ public void sendSendError(long producerId, long sequenceId, ServerError error, S
129130
@Override
130131
public CompletableFuture<Void> sendGetTopicsOfNamespaceResponse(List<String> topics, String topicsHash,
131132
boolean filtered, boolean changed, long requestId,
132-
Consumer<Throwable> permitAcquireErrorHandler) {
133+
Function<Throwable,
134+
CompletableFuture<Void>>
135+
permitAcquireErrorHandler) {
133136
BaseCommand command = Commands.newGetTopicsOfNamespaceResponseCommand(topics, topicsHash,
134137
filtered, changed, requestId);
135138
safeIntercept(command, cnx);
@@ -366,27 +369,32 @@ public void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError err
366369

367370
/***
368371
* @param topics topic names which are matching, the topic name contains the partition suffix.
372+
* @return a CompletableFuture&lt;Void&gt; that completes when the operation finishes
369373
*/
370374
@Override
371-
public void sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, List<String> topics) {
375+
public CompletableFuture<Void> sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash,
376+
Collection<String> topics,
377+
Function<Throwable, CompletableFuture<Void>>
378+
permitAcquireErrorHandler) {
372379
BaseCommand command = Commands.newWatchTopicListSuccess(requestId, watcherId, topicsHash, topics);
373-
interceptAndWriteCommand(command);
380+
safeIntercept(command, cnx);
381+
return acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(), maxTopicListInFlightLimiter, () -> !cnx.isActive(),
382+
command, permitAcquireErrorHandler);
374383
}
375384

376385
/***
377386
* {@inheritDoc}
387+
* @return a CompletableFuture that completes when the watch topic list update operation finishes
378388
*/
379389
@Override
380-
public void sendWatchTopicListUpdate(long watcherId,
381-
List<String> newTopics, List<String> deletedTopics, String topicsHash) {
390+
public CompletableFuture<Void> sendWatchTopicListUpdate(long watcherId, List<String> newTopics,
391+
List<String> deletedTopics, String topicsHash,
392+
Function<Throwable, CompletableFuture<Void>>
393+
permitAcquireErrorHandler) {
382394
BaseCommand command = Commands.newWatchTopicUpdate(watcherId, newTopics, deletedTopics, topicsHash);
383-
interceptAndWriteCommand(command);
384-
}
385-
386-
private void interceptAndWriteCommand(BaseCommand command) {
387395
safeIntercept(command, cnx);
388-
ByteBuf outBuf = Commands.serializeWithSize(command);
389-
writeAndFlush(outBuf);
396+
return acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(), maxTopicListInFlightLimiter, () -> !cnx.isActive(),
397+
command, permitAcquireErrorHandler);
390398
}
391399

392400
private void writeAndFlush(ByteBuf outBuf) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2644,6 +2644,7 @@ private void internalHandleGetTopicsOfNamespace(String namespace, NamespaceName
26442644
commandSender.sendErrorResponse(requestId,
26452645
ServerError.TooManyRequests,
26462646
"Cannot acquire permits for direct memory");
2647+
return CompletableFuture.completedFuture(null);
26472648
});
26482649
}, t -> {
26492650
log.warn("[{}] Failed to acquire heap memory permits for "

0 commit comments

Comments
 (0)