Skip to content

Commit 457b580

Browse files
rdhabaliamukesh-ctds
authored andcommitted
[improve][cli] Support additional msg metadata for V1 topic on peek message cmd (apache#23978)
(cherry picked from commit 626b211) (cherry picked from commit 46ff9f6)
1 parent 7e52b06 commit 457b580

File tree

2 files changed

+52
-66
lines changed

2 files changed

+52
-66
lines changed

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.admin.cli;
2020

2121
import static org.apache.commons.lang3.StringUtils.isNotBlank;
22+
import static org.apache.pulsar.admin.cli.CmdTopics.printMessages;
2223
import com.beust.jcommander.Parameter;
2324
import com.beust.jcommander.ParameterException;
2425
import com.beust.jcommander.Parameters;
@@ -39,8 +40,6 @@
3940
import org.apache.pulsar.client.api.Message;
4041
import org.apache.pulsar.client.api.MessageId;
4142
import org.apache.pulsar.client.cli.NoSplitter;
42-
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
43-
import org.apache.pulsar.client.impl.MessageIdImpl;
4443
import org.apache.pulsar.common.util.RelativeTimeUtil;
4544

4645
@Parameters(commandDescription = "Operations on persistent topics. The persistent-topics "
@@ -606,26 +605,7 @@ private class PeekMessages extends CliCommand {
606605
void run() throws PulsarAdminException {
607606
String persistentTopic = validatePersistentTopic(params);
608607
List<Message<byte[]>> messages = getPersistentTopics().peekMessages(persistentTopic, subName, numMessages);
609-
int position = 0;
610-
for (Message<byte[]> msg : messages) {
611-
if (++position != 1) {
612-
System.out.println("-------------------------------------------------------------------------\n");
613-
}
614-
if (msg.getMessageId() instanceof BatchMessageIdImpl) {
615-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
616-
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
617-
+ msgId.getBatchIndex());
618-
} else {
619-
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
620-
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
621-
}
622-
if (msg.getProperties().size() > 0) {
623-
System.out.println("Properties:");
624-
print(msg.getProperties());
625-
}
626-
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
627-
System.out.println(ByteBufUtil.prettyHexDump(data));
628-
}
608+
printMessages(messages, false, this);
629609
}
630610
}
631611

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,50 +1246,7 @@ void run() throws PulsarAdminException {
12461246
String persistentTopic = validatePersistentTopic(params);
12471247
List<Message<byte[]>> messages = getTopics().peekMessages(persistentTopic, subName, numMessages,
12481248
showServerMarker, transactionIsolationLevel);
1249-
int position = 0;
1250-
for (Message<byte[]> msg : messages) {
1251-
MessageImpl message = (MessageImpl) msg;
1252-
if (++position != 1) {
1253-
System.out.println("-------------------------------------------------------------------------\n");
1254-
}
1255-
if (message.getMessageId() instanceof BatchMessageIdImpl) {
1256-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
1257-
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
1258-
+ msgId.getBatchIndex());
1259-
} else {
1260-
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
1261-
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
1262-
}
1263-
1264-
System.out.println("Publish time: " + message.getPublishTime());
1265-
System.out.println("Event time: " + message.getEventTime());
1266-
1267-
if (message.getDeliverAtTime() != 0) {
1268-
System.out.println("Deliver at time: " + message.getDeliverAtTime());
1269-
}
1270-
MessageMetadata msgMetaData = message.getMessageBuilder();
1271-
if (showServerMarker && msgMetaData.hasMarkerType()) {
1272-
System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType()));
1273-
}
1274-
1275-
if (message.getBrokerEntryMetadata() != null) {
1276-
if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
1277-
System.out.println("Broker entry metadata timestamp: "
1278-
+ message.getBrokerEntryMetadata().getBrokerTimestamp());
1279-
}
1280-
if (message.getBrokerEntryMetadata().hasIndex()) {
1281-
System.out.println("Broker entry metadata index: "
1282-
+ message.getBrokerEntryMetadata().getIndex());
1283-
}
1284-
}
1285-
1286-
if (message.getProperties().size() > 0) {
1287-
System.out.println("Properties:");
1288-
print(msg.getProperties());
1289-
}
1290-
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
1291-
System.out.println(ByteBufUtil.prettyHexDump(data));
1292-
}
1249+
printMessages(messages, showServerMarker, this);
12931250
}
12941251
}
12951252

@@ -1508,6 +1465,55 @@ static MessageId findFirstLedgerWithinThreshold(List<PersistentTopicInternalStat
15081465
return null;
15091466
}
15101467

1468+
public static void printMessages(List<Message<byte[]>> messages, boolean showServerMarker, CliCommand cli) {
1469+
if (messages == null) {
1470+
return;
1471+
}
1472+
int position = 0;
1473+
for (Message<byte[]> msg : messages) {
1474+
MessageImpl message = (MessageImpl) msg;
1475+
if (++position != 1) {
1476+
System.out.println("-------------------------------------------------------------------------\n");
1477+
}
1478+
if (message.getMessageId() instanceof BatchMessageIdImpl) {
1479+
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
1480+
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
1481+
+ msgId.getBatchIndex());
1482+
} else {
1483+
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
1484+
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
1485+
}
1486+
1487+
System.out.println("Publish time: " + message.getPublishTime());
1488+
System.out.println("Event time: " + message.getEventTime());
1489+
1490+
if (message.getDeliverAtTime() != 0) {
1491+
System.out.println("Deliver at time: " + message.getDeliverAtTime());
1492+
}
1493+
MessageMetadata msgMetaData = message.getMessageBuilder();
1494+
if (showServerMarker && msgMetaData.hasMarkerType()) {
1495+
System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType()));
1496+
}
1497+
1498+
if (message.getBrokerEntryMetadata() != null) {
1499+
if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
1500+
System.out.println("Broker entry metadata timestamp: "
1501+
+ message.getBrokerEntryMetadata().getBrokerTimestamp());
1502+
}
1503+
if (message.getBrokerEntryMetadata().hasIndex()) {
1504+
System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex());
1505+
}
1506+
}
1507+
1508+
if (message.getProperties().size() > 0) {
1509+
System.out.println("Properties:");
1510+
cli.print(msg.getProperties());
1511+
}
1512+
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
1513+
System.out.println(ByteBufUtil.prettyHexDump(data));
1514+
}
1515+
}
1516+
15111517
@Parameters(commandDescription = "Trigger offload of data from a topic to long-term storage (e.g. Amazon S3)")
15121518
private class Offload extends CliCommand {
15131519
@Parameter(names = { "-s", "--size-threshold" },

0 commit comments

Comments
 (0)