Skip to content

Commit f9d7ad6

Browse files
committed
[improve][client] PIP-229: Add a common interface to get fields of the MessageIdData
Master issue: apache#18950 ### Motivation We need a common interface to get fields of the MessageIdData. After that, we won't need to assert a MessageId implementation is an instance of a specific class. And we can pass our customized MessageId implementation to APIs like `acknowledge` and `seek`. ### Modifications - Add `MessageIdAdv` to get fields of `MessageIdData`, make all MessageId implementations inherit it (except `MultiMessageIdImpl`). - Deprecate `BatchMessageAcker` by adding the ACK bit set field and the `PreviousMessageAcknowledger` interface to `BatchMessageIdImpl`. - Deprecate `TopicMessageIdImpl#getInnerMessageId` by passing the `TopicMessageIdImpl` directly. - Remove `instanceof BatchMessageIdImpl` checks in `pulsar-client` module by casting to `MessageIdAdv`.
1 parent fcecca4 commit f9d7ad6

File tree

29 files changed

+525
-571
lines changed

29 files changed

+525
-571
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,11 @@
4040
import org.apache.pulsar.client.api.ConsumerEventListener;
4141
import org.apache.pulsar.client.api.Message;
4242
import org.apache.pulsar.client.api.MessageId;
43+
import org.apache.pulsar.client.api.MessageIdAdv;
4344
import org.apache.pulsar.client.api.MessageRoutingMode;
4445
import org.apache.pulsar.client.api.Producer;
4546
import org.apache.pulsar.client.api.PulsarClientException;
4647
import org.apache.pulsar.client.api.SubscriptionType;
47-
import org.apache.pulsar.client.impl.MessageIdImpl;
48-
import org.apache.pulsar.client.impl.TopicMessageImpl;
4948
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
5049
import org.apache.pulsar.common.naming.TopicName;
5150
import org.apache.pulsar.common.util.FutureUtil;
@@ -337,7 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
337336
}
338337
totalMessages++;
339338
consumer1.acknowledge(msg);
340-
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
339+
MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
341340
receivedPtns.add(msgId.getPartitionIndex());
342341
}
343342

@@ -354,7 +353,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
354353
}
355354
totalMessages++;
356355
consumer2.acknowledge(msg);
357-
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
356+
MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
358357
receivedPtns.add(msgId.getPartitionIndex());
359358
}
360359
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.pulsar.client.api.SubscriptionType;
5151
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
5252
import org.apache.pulsar.client.impl.MessageIdImpl;
53-
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
5453
import org.apache.pulsar.common.naming.TopicName;
5554
import org.apache.pulsar.common.util.RelativeTimeUtil;
5655
import org.awaitility.Awaitility;
@@ -679,8 +678,7 @@ public void testSeekByFunction() throws Exception {
679678
if (message == null) {
680679
break;
681680
}
682-
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId();
683-
received.add(topicMessageId.getInnerMessageId());
681+
received.add(message.getMessageId());
684682
}
685683
int msgNumFromPartition1 = list.size() / 2;
686684
int msgNumFromPartition2 = 1;

pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.pulsar.client.impl.MessageIdImpl;
4040
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
4141
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
42-
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
4342
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
4443
import org.apache.pulsar.common.naming.TopicName;
4544
import org.awaitility.Awaitility;
@@ -768,7 +767,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception {
768767

769768
for (int i = 0; i < totalMessages; i ++) {
770769
msg = consumer1.receive(5, TimeUnit.SECONDS);
771-
Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2);
770+
Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getPartitionIndex(), 2);
772771
consumer1.acknowledge(msg);
773772
}
774773

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,6 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
118118
Message<byte[]> message = consumer.receive();
119119
assertEquals(new String(message.getData()), messagePrefix + i);
120120
MessageId messageId = message.getMessageId();
121-
if (topicType == TopicType.PARTITIONED) {
122-
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
123-
}
124121
assertTrue(messageIds.remove(messageId), "Failed to receive message");
125122
}
126123
log.info("Remaining message IDs = {}", messageIds);
@@ -166,9 +163,6 @@ public void producerSend(TopicType topicType) throws PulsarClientException, Puls
166163

167164
for (int i = 0; i < numberOfMessages; i++) {
168165
MessageId messageId = consumer.receive().getMessageId();
169-
if (topicType == TopicType.PARTITIONED) {
170-
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
171-
}
172166
assertTrue(messageIds.remove(messageId), "Failed to receive Message");
173167
}
174168
log.info("Remaining message IDs = {}", messageIds);
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
import java.util.BitSet;
22+
23+
/**
24+
* The {@link MessageId} interface provided for advanced users.
25+
* <p>
26+
* All built-in MessageId implementations should be able to be cast to MessageIdAdv.
27+
* </p>
28+
*/
29+
public interface MessageIdAdv extends MessageId {
30+
31+
/**
32+
* Get the ledger ID.
33+
*
34+
* @return the ledger ID
35+
*/
36+
long getLedgerId();
37+
38+
/**
39+
* Get the entry ID.
40+
*
41+
* @return the entry ID
42+
*/
43+
long getEntryId();
44+
45+
/**
46+
* Get the partition index.
47+
*
48+
* @return -1 if the message is from a non-partitioned topic, otherwise the non-negative partition index
49+
*/
50+
default int getPartitionIndex() {
51+
return -1;
52+
}
53+
54+
/**
55+
* Get the batch index.
56+
*
57+
* @return -1 if the message is not in a batch
58+
*/
59+
default int getBatchIndex() {
60+
return -1;
61+
}
62+
63+
/**
64+
* Get the batch size.
65+
*
66+
* @return 0 if the message is not in a batch
67+
*/
68+
default int getBatchSize() {
69+
return 0;
70+
}
71+
72+
/**
73+
* Get the BitSet that indicates which messages in the batch.
74+
*
75+
* @implNote The message IDs of a batch should share a BitSet. For example, given 3 messages in the same batch whose
76+
* size is 3, all message IDs of them should return "111" (i.e. a BitSet whose size is 3 and all bits are 1). If the
77+
* 1st message has been acknowledged, the returned BitSet should become "011" (i.e. the 1st bit become 0).
78+
*
79+
* @return null if the message is a non-batched message
80+
*/
81+
default BitSet getAckSet() {
82+
return null;
83+
}
84+
85+
/**
86+
* Get the message ID of the first chunk if the current message ID represents the position of a chunked message.
87+
*
88+
* @implNote A chunked message is distributed across different BookKeeper entries. The message ID of a chunked
89+
* message is composed of two message IDs that represent positions of the first and the last chunk. The message ID
90+
* itself represents the position of the last chunk.
91+
*
92+
* @return null if the message is not a chunked message
93+
*/
94+
default MessageIdAdv getFirstChunkMessageId() {
95+
return null;
96+
}
97+
98+
/**
99+
* The default implementation of {@link Comparable#compareTo(Object)}.
100+
*/
101+
default int compareTo(MessageId o) {
102+
if (!(o instanceof MessageIdAdv)) {
103+
throw new UnsupportedOperationException("Unknown MessageId type: "
104+
+ ((o != null) ? o.getClass().getName() : "null"));
105+
}
106+
final MessageIdAdv other = (MessageIdAdv) o;
107+
int result = Long.compare(this.getLedgerId(), other.getLedgerId());
108+
if (result != 0) {
109+
return result;
110+
}
111+
result = Long.compare(this.getEntryId(), other.getEntryId());
112+
if (result != 0) {
113+
return result;
114+
}
115+
// TODO: Correct the following compare logics, see https://github.com/apache/pulsar/pull/18981
116+
result = Integer.compare(this.getPartitionIndex(), other.getPartitionIndex());
117+
if (result != 0) {
118+
return result;
119+
}
120+
return Integer.compare(this.getBatchIndex(), other.getBatchIndex());
121+
}
122+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.concurrent.CompletableFuture;
2424
import org.apache.pulsar.client.api.MessageId;
25+
import org.apache.pulsar.client.api.MessageIdAdv;
2526
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
2627

2728
/**
@@ -31,7 +32,7 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {
3132

3233
boolean isDuplicate(MessageId messageId);
3334

34-
CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
35+
CompletableFuture<Void> addAcknowledgment(MessageIdAdv msgId, AckType ackType, Map<String, Long> properties);
3536

3637
CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, AckType ackType,
3738
Map<String, Long> properties);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.BitSet;
2222

23+
@Deprecated
2324
public class BatchMessageAcker {
2425

2526
private BatchMessageAcker() {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java

Lines changed: 0 additions & 50 deletions
This file was deleted.

0 commit comments

Comments
 (0)