Skip to content

Commit ecc7225

Browse files
committed
Admin API: stream stats
1 parent 5ae9877 commit ecc7225

File tree

1 file changed

+49
-1
lines changed

1 file changed

+49
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,15 @@
2525
import io.swagger.annotations.ApiParam;
2626
import io.swagger.annotations.ApiResponse;
2727
import io.swagger.annotations.ApiResponses;
28+
import java.io.IOException;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Optional;
3132
import java.util.Set;
33+
import javax.servlet.AsyncContext;
34+
import javax.servlet.ServletOutputStream;
35+
import javax.servlet.WriteListener;
36+
import javax.servlet.http.HttpServletRequest;
3237
import javax.ws.rs.DELETE;
3338
import javax.ws.rs.DefaultValue;
3439
import javax.ws.rs.Encoded;
@@ -42,6 +47,7 @@
4247
import javax.ws.rs.WebApplicationException;
4348
import javax.ws.rs.container.AsyncResponse;
4449
import javax.ws.rs.container.Suspended;
50+
import javax.ws.rs.core.Context;
4551
import javax.ws.rs.core.MediaType;
4652
import javax.ws.rs.core.Response;
4753
import org.apache.bookkeeper.mledger.Position;
@@ -83,6 +89,7 @@
8389
import org.apache.pulsar.common.policies.data.stats.PartitionedTopicStatsImpl;
8490
import org.apache.pulsar.common.util.Codec;
8591
import org.apache.pulsar.common.util.FutureUtil;
92+
import org.apache.pulsar.common.util.ObjectMapperFactory;
8693
import org.apache.pulsar.metadata.api.MetadataStoreException;
8794
import org.slf4j.Logger;
8895
import org.slf4j.LoggerFactory;
@@ -1215,6 +1222,8 @@ public void getStats(
12151222
});
12161223
}
12171224

1225+
@Context HttpServletRequest servletRequest;
1226+
12181227
@GET
12191228
@Path("{tenant}/{namespace}/{topic}/internalStats")
12201229
@ApiOperation(value = "Get the internal stats for the topic.", response = PersistentTopicInternalStats.class)
@@ -1239,7 +1248,46 @@ public void getInternalStats(
12391248
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
12401249
validateTopicName(tenant, namespace, encodedTopic);
12411250
internalGetInternalStatsAsync(authoritative, metadata)
1242-
.thenAccept(asyncResponse::resume)
1251+
.thenAccept(stats -> {
1252+
final AsyncContext asyncContext = servletRequest.getAsyncContext();
1253+
final ServletOutputStream s;
1254+
try {
1255+
s = asyncContext.getResponse().getOutputStream();
1256+
} catch (IOException e) {
1257+
log.error("Cannot get the outputstream to write the response", e);
1258+
resumeAsyncResponseExceptionally(asyncResponse, e);
1259+
return;
1260+
}
1261+
s.setWriteListener(new WriteListener() {
1262+
1263+
public void onWritePossible() throws IOException {
1264+
log.info("Serializing internal stats for topic {} to outputstream", topicName);
1265+
if (!s.isReady()) {
1266+
return;
1267+
}
1268+
byte[] asArray = ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsBytes(stats);
1269+
s.write(asArray);
1270+
if (!s.isReady()) {
1271+
return;
1272+
}
1273+
s.close();
1274+
if (!s.isReady()) {
1275+
return;
1276+
}
1277+
asyncContext.complete();
1278+
log.info("Finished writing internal stats for topic {} to outputstream", topicName);
1279+
1280+
}
1281+
1282+
@Override
1283+
public void onError(Throwable ex) {
1284+
if (isNot307And404Exception(ex)) {
1285+
log.error("[{}] Failed to server internal stats for topic {}",
1286+
clientAppId(), topicName, ex);
1287+
}
1288+
}
1289+
});
1290+
})
12431291
.exceptionally(ex -> {
12441292
if (isNot307And404Exception(ex)) {
12451293
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);

0 commit comments

Comments
 (0)