Skip to content

Commit 1d62a88

Browse files
committed
Use StreamingOutput
1 parent ecc7225 commit 1d62a88

File tree

4 files changed

+63
-70
lines changed

4 files changed

+63
-70
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
import io.swagger.annotations.ApiParam;
2424
import io.swagger.annotations.ApiResponse;
2525
import io.swagger.annotations.ApiResponses;
26+
import java.io.BufferedOutputStream;
27+
import java.io.IOException;
2628
import java.util.ArrayList;
2729
import java.util.Collections;
2830
import java.util.List;
2931
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.CompletionException;
3033
import java.util.concurrent.ExecutionException;
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.TimeoutException;
@@ -47,6 +50,7 @@
4750
import javax.ws.rs.core.MediaType;
4851
import javax.ws.rs.core.Response;
4952
import javax.ws.rs.core.Response.Status;
53+
import javax.ws.rs.core.StreamingOutput;
5054
import org.apache.commons.lang3.StringUtils;
5155
import org.apache.pulsar.broker.PulsarServerException;
5256
import org.apache.pulsar.broker.service.Topic;
@@ -60,6 +64,7 @@
6064
import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
6165
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
6266
import org.apache.pulsar.common.util.FutureUtil;
67+
import org.apache.pulsar.common.util.ObjectMapperFactory;
6368
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
6469
import org.slf4j.Logger;
6570
import org.slf4j.LoggerFactory;
@@ -111,8 +116,7 @@ public void getPartitionedMetadata(
111116
@ApiResponse(code = 412, message = "Topic name is not valid"),
112117
@ApiResponse(code = 500, message = "Internal server error"),
113118
})
114-
public void getInternalStats(
115-
@Suspended final AsyncResponse asyncResponse,
119+
public StreamingOutput getInternalStats(
116120
@ApiParam(value = "Specify the tenant", required = true)
117121
@PathParam("tenant") String tenant,
118122
@ApiParam(value = "Specify the namespace", required = true)
@@ -123,21 +127,22 @@ public void getInternalStats(
123127
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
124128
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
125129
validateTopicName(tenant, namespace, encodedTopic);
126-
validateTopicOwnershipAsync(topicName, authoritative)
127-
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
128-
.thenCompose(__ -> {
129-
Topic topic = getTopicReference(topicName);
130-
boolean includeMetadata = metadata && hasSuperUserAccess();
131-
return topic.getInternalStats(includeMetadata);
132-
})
133-
.thenAccept(asyncResponse::resume)
134-
.exceptionally(ex -> {
135-
if (isNot307And404Exception(ex)) {
136-
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
137-
}
138-
resumeAsyncResponseExceptionally(asyncResponse, ex);
139-
return null;
140-
});
130+
return output -> {
131+
internalGetInternalStatsAsync(authoritative, metadata)
132+
.thenAccept(stats -> {
133+
try {
134+
ObjectMapperFactory.getMapper().getObjectMapper().writeValue(output, stats);
135+
} catch (Throwable e) {
136+
throw new CompletionException(e);
137+
}
138+
})
139+
.exceptionally(ex -> {
140+
if (isNot307And404Exception(ex)) {
141+
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
142+
}
143+
throw translateToWebApplicationException(ex);
144+
});
145+
};
141146
}
142147

143148
@PUT

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 24 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,21 @@
1919
package org.apache.pulsar.broker.admin.v2;
2020

2121
import static org.apache.pulsar.common.util.Codec.decode;
22+
import static org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException;
2223
import com.fasterxml.jackson.core.JsonProcessingException;
2324
import io.swagger.annotations.Api;
2425
import io.swagger.annotations.ApiOperation;
2526
import io.swagger.annotations.ApiParam;
2627
import io.swagger.annotations.ApiResponse;
2728
import io.swagger.annotations.ApiResponses;
29+
import java.io.BufferedOutputStream;
2830
import java.io.IOException;
31+
import java.io.OutputStream;
2932
import java.util.List;
3033
import java.util.Map;
3134
import java.util.Optional;
3235
import java.util.Set;
36+
import java.util.concurrent.CompletionException;
3337
import javax.servlet.AsyncContext;
3438
import javax.servlet.ServletOutputStream;
3539
import javax.servlet.WriteListener;
@@ -50,6 +54,7 @@
5054
import javax.ws.rs.core.Context;
5155
import javax.ws.rs.core.MediaType;
5256
import javax.ws.rs.core.Response;
57+
import javax.ws.rs.core.StreamingOutput;
5358
import org.apache.bookkeeper.mledger.Position;
5459
import org.apache.bookkeeper.mledger.impl.PositionImpl;
5560
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
@@ -1127,7 +1132,7 @@ public void deleteTopic(
11271132
internalDeleteTopicAsync(authoritative, force)
11281133
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
11291134
.exceptionally(ex -> {
1130-
Throwable t = FutureUtil.unwrapCompletionException(ex);
1135+
Throwable t = unwrapCompletionException(ex);
11311136
if (!force && (t instanceof BrokerServiceException.TopicBusyException)) {
11321137
ex = new RestException(Response.Status.PRECONDITION_FAILED,
11331138
t.getMessage());
@@ -1235,8 +1240,7 @@ public void getStats(
12351240
@ApiResponse(code = 412, message = "Topic name is not valid"),
12361241
@ApiResponse(code = 500, message = "Internal server error"),
12371242
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
1238-
public void getInternalStats(
1239-
@Suspended final AsyncResponse asyncResponse,
1243+
public StreamingOutput getInternalStats(
12401244
@ApiParam(value = "Specify the tenant", required = true)
12411245
@PathParam("tenant") String tenant,
12421246
@ApiParam(value = "Specify the namespace", required = true)
@@ -1247,54 +1251,22 @@ public void getInternalStats(
12471251
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
12481252
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
12491253
validateTopicName(tenant, namespace, encodedTopic);
1250-
internalGetInternalStatsAsync(authoritative, metadata)
1251-
.thenAccept(stats -> {
1252-
final AsyncContext asyncContext = servletRequest.getAsyncContext();
1253-
final ServletOutputStream s;
1254-
try {
1255-
s = asyncContext.getResponse().getOutputStream();
1256-
} catch (IOException e) {
1257-
log.error("Cannot get the outputstream to write the response", e);
1258-
resumeAsyncResponseExceptionally(asyncResponse, e);
1259-
return;
1260-
}
1261-
s.setWriteListener(new WriteListener() {
1262-
1263-
public void onWritePossible() throws IOException {
1264-
log.info("Serializing internal stats for topic {} to outputstream", topicName);
1265-
if (!s.isReady()) {
1266-
return;
1267-
}
1268-
byte[] asArray = ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsBytes(stats);
1269-
s.write(asArray);
1270-
if (!s.isReady()) {
1271-
return;
1272-
}
1273-
s.close();
1274-
if (!s.isReady()) {
1275-
return;
1276-
}
1277-
asyncContext.complete();
1278-
log.info("Finished writing internal stats for topic {} to outputstream", topicName);
1279-
1254+
return output -> {
1255+
internalGetInternalStatsAsync(authoritative, metadata)
1256+
.thenAccept(stats -> {
1257+
try {
1258+
ObjectMapperFactory.getMapper().getObjectMapper().writeValue(output, stats);
1259+
} catch (IOException error) {
1260+
throw new CompletionException(error);
12801261
}
1281-
1282-
@Override
1283-
public void onError(Throwable ex) {
1284-
if (isNot307And404Exception(ex)) {
1285-
log.error("[{}] Failed to server internal stats for topic {}",
1286-
clientAppId(), topicName, ex);
1287-
}
1262+
})
1263+
.exceptionally(ex -> {
1264+
if (isNot307And404Exception(ex)) {
1265+
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
12881266
}
1267+
throw translateToWebApplicationException(ex);
12891268
});
1290-
})
1291-
.exceptionally(ex -> {
1292-
if (isNot307And404Exception(ex)) {
1293-
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
1294-
}
1295-
resumeAsyncResponseExceptionally(asyncResponse, ex);
1296-
return null;
1297-
});
1269+
};
12981270
}
12991271

13001272
@GET
@@ -1435,7 +1407,7 @@ public void deleteSubscription(
14351407
internalDeleteSubscriptionAsync(subName, authoritative, force)
14361408
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
14371409
.exceptionally(ex -> {
1438-
Throwable cause = FutureUtil.unwrapCompletionException(ex);
1410+
Throwable cause = unwrapCompletionException(ex);
14391411

14401412
// If the exception is not redirect exception we need to log it.
14411413
if (!isRedirectException(cause)) {
@@ -1727,7 +1699,7 @@ public void resetCursor(
17271699
internalResetCursorAsync(decode(encodedSubName), timestamp, authoritative)
17281700
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
17291701
.exceptionally(ex -> {
1730-
Throwable t = FutureUtil.unwrapCompletionException(ex);
1702+
Throwable t = unwrapCompletionException(ex);
17311703
if (!isRedirectException(t)) {
17321704
log.error("[{}][{}] Failed to reset cursor on subscription {} to time {}",
17331705
clientAppId(), topicName, encodedSubName, timestamp, t);
@@ -2093,7 +2065,7 @@ public void getBacklog(
20932065
.thenCompose(__ -> internalGetBacklogAsync(authoritative))
20942066
.thenAccept(asyncResponse::resume)
20952067
.exceptionally(ex -> {
2096-
Throwable t = FutureUtil.unwrapCompletionException(ex);
2068+
Throwable t = unwrapCompletionException(ex);
20972069
if (t instanceof MetadataStoreException.NotFoundException) {
20982070
log.warn("[{}] Failed to get topic backlog {}: Namespace does not exist", clientAppId(),
20992071
namespaceName);
@@ -4247,7 +4219,7 @@ public void truncateTopic(
42474219
internalTruncateTopicAsync(authoritative)
42484220
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
42494221
.exceptionally(ex -> {
4250-
Throwable t = FutureUtil.unwrapCompletionException(ex);
4222+
Throwable t = unwrapCompletionException(ex);
42514223
if (!isRedirectException(t)) {
42524224
log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, t);
42534225
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2387,7 +2387,6 @@ public CompletableFuture<SchemaVersion> deleteSchema() {
23872387

23882388
@Override
23892389
public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {
2390-
23912390
CompletableFuture<PersistentTopicInternalStats> statFuture = new CompletableFuture<>();
23922391
PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
23932392

pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,6 +1314,23 @@ public <T> T sync(Supplier<CompletableFuture<T>> supplier) {
13141314
}
13151315
}
13161316

1317+
protected static WebApplicationException translateToWebApplicationException(Throwable exception) {
1318+
Throwable realCause = FutureUtil.unwrapCompletionException(exception);
1319+
if (realCause instanceof WebApplicationException) {
1320+
return (WebApplicationException) realCause;
1321+
} else if (realCause instanceof BrokerServiceException.NotAllowedException) {
1322+
return new RestException(Status.CONFLICT, realCause);
1323+
} else if (realCause instanceof MetadataStoreException.NotFoundException) {
1324+
return new RestException(Status.NOT_FOUND, realCause);
1325+
} else if (realCause instanceof MetadataStoreException.BadVersionException) {
1326+
return new RestException(Status.CONFLICT, "Concurrent modification");
1327+
} else if (realCause instanceof PulsarAdminException) {
1328+
return new RestException(((PulsarAdminException) realCause));
1329+
} else {
1330+
return new RestException(realCause);
1331+
}
1332+
}
1333+
13171334
protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
13181335
Throwable realCause = FutureUtil.unwrapCompletionException(exception);
13191336
if (realCause instanceof WebApplicationException) {

0 commit comments

Comments
 (0)