Skip to content

Commit bdd3cb6

Browse files
committed
clean up
1 parent 1d62a88 commit bdd3cb6

File tree

3 files changed

+12
-5
lines changed

3 files changed

+12
-5
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,13 @@ public StreamingOutput getInternalStats(
128128
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
129129
validateTopicName(tenant, namespace, encodedTopic);
130130
return output -> {
131-
internalGetInternalStatsAsync(authoritative, metadata)
131+
validateTopicOwnershipAsync(topicName, authoritative)
132+
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
133+
.thenCompose(__ -> {
134+
Topic topic = getTopicReference(topicName);
135+
boolean includeMetadata = metadata && hasSuperUserAccess();
136+
return topic.getInternalStats(includeMetadata);
137+
})
132138
.thenAccept(stats -> {
133139
try {
134140
ObjectMapperFactory.getMapper().getObjectMapper().writeValue(output, stats);

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,7 +1407,7 @@ public void deleteSubscription(
14071407
internalDeleteSubscriptionAsync(subName, authoritative, force)
14081408
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
14091409
.exceptionally(ex -> {
1410-
Throwable cause = unwrapCompletionException(ex);
1410+
Throwable cause = FutureUtil.unwrapCompletionException(ex);
14111411

14121412
// If the exception is not redirect exception we need to log it.
14131413
if (!isRedirectException(cause)) {
@@ -1699,7 +1699,7 @@ public void resetCursor(
16991699
internalResetCursorAsync(decode(encodedSubName), timestamp, authoritative)
17001700
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
17011701
.exceptionally(ex -> {
1702-
Throwable t = unwrapCompletionException(ex);
1702+
Throwable t = FutureUtil.unwrapCompletionException(ex);
17031703
if (!isRedirectException(t)) {
17041704
log.error("[{}][{}] Failed to reset cursor on subscription {} to time {}",
17051705
clientAppId(), topicName, encodedSubName, timestamp, t);
@@ -2065,7 +2065,7 @@ public void getBacklog(
20652065
.thenCompose(__ -> internalGetBacklogAsync(authoritative))
20662066
.thenAccept(asyncResponse::resume)
20672067
.exceptionally(ex -> {
2068-
Throwable t = unwrapCompletionException(ex);
2068+
Throwable t = FutureUtil.unwrapCompletionException(ex);
20692069
if (t instanceof MetadataStoreException.NotFoundException) {
20702070
log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(),
20712071
namespaceName);
@@ -4219,7 +4219,7 @@ public void truncateTopic(
42194219
internalTruncateTopicAsync(authoritative)
42204220
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
42214221
.exceptionally(ex -> {
4222-
Throwable t = unwrapCompletionException(ex);
4222+
Throwable t = FutureUtil.unwrapCompletionException(ex);
42234223
if (!isRedirectException(t)) {
42244224
log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, t);
42254225
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2387,6 +2387,7 @@ public CompletableFuture<SchemaVersion> deleteSchema() {
23872387

23882388
@Override
23892389
public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {
2390+
23902391
CompletableFuture<PersistentTopicInternalStats> statFuture = new CompletableFuture<>();
23912392
PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
23922393

0 commit comments

Comments
 (0)