4141import io .netty .util .concurrent .FastThreadLocal ;
4242import io .netty .util .concurrent .Promise ;
4343import io .netty .util .concurrent .ScheduledFuture ;
44- import java .io .IOException ;
4544import java .net .InetSocketAddress ;
4645import java .net .SocketAddress ;
4746import java .util .ArrayList ;
137136import org .apache .pulsar .common .api .proto .CommandUnsubscribe ;
138137import org .apache .pulsar .common .api .proto .CommandWatchTopicList ;
139138import org .apache .pulsar .common .api .proto .CommandWatchTopicListClose ;
140- import org .apache .pulsar .common .api .proto .CompressionType ;
141139import org .apache .pulsar .common .api .proto .FeatureFlags ;
142140import org .apache .pulsar .common .api .proto .KeySharedMeta ;
143141import org .apache .pulsar .common .api .proto .KeySharedMode ;
148146import org .apache .pulsar .common .api .proto .ProtocolVersion ;
149147import org .apache .pulsar .common .api .proto .Schema ;
150148import org .apache .pulsar .common .api .proto .ServerError ;
151- import org .apache .pulsar .common .api .proto .SingleMessageMetadata ;
152149import org .apache .pulsar .common .api .proto .TxnAction ;
153- import org .apache .pulsar .common .compression .CompressionCodec ;
154- import org .apache .pulsar .common .compression .CompressionCodecProvider ;
155150import org .apache .pulsar .common .intercept .InterceptException ;
156151import org .apache .pulsar .common .lookup .data .LookupData ;
157152import org .apache .pulsar .common .naming .Metadata ;
@@ -2282,7 +2277,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
22822277 .thenApply (lastPosition -> {
22832278 int partitionIndex = TopicName .getPartitionIndex (topic .getName ());
22842279
2285- Position markDeletePosition = null ;
2280+ Position markDeletePosition = PositionFactory . EARLIEST ;
22862281 if (consumer .getSubscription () instanceof PersistentSubscription ) {
22872282 markDeletePosition = ((PersistentSubscription ) consumer .getSubscription ()).getCursor ()
22882283 .getMarkDeletedPosition ();
@@ -2343,8 +2338,7 @@ private void getLargestBatchIndexWhenPossible(
23432338 } else {
23442339 // if readCompacted is false, we need to return MessageId.earliest
23452340 writeAndFlush (Commands .newGetLastMessageIdResponse (requestId , -1 , -1 , partitionIndex , -1 ,
2346- markDeletePosition != null ? markDeletePosition .getLedgerId () : -1 ,
2347- markDeletePosition != null ? markDeletePosition .getEntryId () : -1 ));
2341+ markDeletePosition .getLedgerId (), markDeletePosition .getEntryId ()));
23482342 }
23492343 return ;
23502344 }
@@ -2403,47 +2397,19 @@ public String toString() {
24032397
24042398 writeAndFlush (Commands .newGetLastMessageIdResponse (requestId , lastPosition .getLedgerId (),
24052399 lastPosition .getEntryId (), partitionIndex , largestBatchIndex ,
2406- markDeletePosition != null ? markDeletePosition .getLedgerId () : -1 ,
2407- markDeletePosition != null ? markDeletePosition .getEntryId () : -1 ));
2400+ markDeletePosition .getLedgerId (), markDeletePosition .getEntryId ()));
24082401 }
24092402 });
24102403 });
24112404 }
24122405
24132406 private void handleLastMessageIdFromCompactionService (PersistentTopic persistentTopic , long requestId ,
24142407 int partitionIndex , Position markDeletePosition ) {
2415- persistentTopic .getTopicCompactionService ().readLastCompactedEntry ().thenAccept (entry -> {
2416- if (entry != null ) {
2417- try {
2418- // in this case, all the data has been compacted, so return the last position
2419- // in the compacted ledger to the client
2420- ByteBuf payload = entry .getDataBuffer ();
2421- MessageMetadata metadata = Commands .parseMessageMetadata (payload );
2422- int largestBatchIndex ;
2423- try {
2424- largestBatchIndex = calculateTheLastBatchIndexInBatch (metadata , payload );
2425- } catch (IOException ioEx ) {
2426- writeAndFlush (Commands .newError (requestId , ServerError .MetadataError ,
2427- "Failed to deserialize batched message from the last entry of the compacted Ledger: "
2428- + ioEx .getMessage ()));
2429- return ;
2430- }
2431- writeAndFlush (Commands .newGetLastMessageIdResponse (requestId ,
2432- entry .getLedgerId (), entry .getEntryId (), partitionIndex , largestBatchIndex ,
2433- markDeletePosition != null ? markDeletePosition .getLedgerId () : -1 ,
2434- markDeletePosition != null ? markDeletePosition .getEntryId () : -1 ));
2435- } finally {
2436- entry .release ();
2437- }
2438- } else {
2439- // in this case, the ledgers been removed except the current ledger
2440- // and current ledger without any data
2441- writeAndFlush (Commands .newGetLastMessageIdResponse (requestId ,
2442- -1 , -1 , partitionIndex , -1 ,
2443- markDeletePosition != null ? markDeletePosition .getLedgerId () : -1 ,
2444- markDeletePosition != null ? markDeletePosition .getEntryId () : -1 ));
2445- }
2446- }).exceptionally (ex -> {
2408+ persistentTopic .getTopicCompactionService ().getLastMessagePosition ().thenAccept (position ->
2409+ writeAndFlush (Commands .newGetLastMessageIdResponse (requestId , position .ledgerId (), position .entryId (),
2410+ partitionIndex , position .batchIndex (), markDeletePosition .getLedgerId (),
2411+ markDeletePosition .getEntryId ()))
2412+ ).exceptionally (ex -> {
24472413 writeAndFlush (Commands .newError (
24482414 requestId , ServerError .MetadataError ,
24492415 "Failed to read last entry of the compacted Ledger "
@@ -2452,33 +2418,6 @@ private void handleLastMessageIdFromCompactionService(PersistentTopic persistent
24522418 });
24532419 }
24542420
2455- private int calculateTheLastBatchIndexInBatch (MessageMetadata metadata , ByteBuf payload ) throws IOException {
2456- int batchSize = metadata .getNumMessagesInBatch ();
2457- if (batchSize <= 1 ){
2458- return -1 ;
2459- }
2460- if (metadata .hasCompression ()) {
2461- var tmp = payload ;
2462- CompressionType compressionType = metadata .getCompression ();
2463- CompressionCodec codec = CompressionCodecProvider .getCompressionCodec (compressionType );
2464- int uncompressedSize = metadata .getUncompressedSize ();
2465- payload = codec .decode (payload , uncompressedSize );
2466- tmp .release ();
2467- }
2468- SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata ();
2469- int lastBatchIndexInBatch = -1 ;
2470- for (int i = 0 ; i < batchSize ; i ++){
2471- ByteBuf singleMessagePayload =
2472- Commands .deSerializeSingleMessageInBatch (payload , singleMessageMetadata , i , batchSize );
2473- singleMessagePayload .release ();
2474- if (singleMessageMetadata .isCompactedOut ()){
2475- continue ;
2476- }
2477- lastBatchIndexInBatch = i ;
2478- }
2479- return lastBatchIndexInBatch ;
2480- }
2481-
24822421 private CompletableFuture <Boolean > isNamespaceOperationAllowed (NamespaceName namespaceName ,
24832422 NamespaceOperation operation ) {
24842423 if (!service .isAuthorizationEnabled ()) {
0 commit comments