Skip to content

Commit 97f2349

Browse files
AnuragReddy2000anurag.reddy
authored andcommitted
[fix][client] Make DeadLetterPolicy & KeySharedPolicy serializable (apache#23718)
Co-authored-by: anurag.reddy <anurag.reddy@flipkart.com> (cherry picked from commit 14129e3) (cherry picked from commit b9ce087)
1 parent 62eadc7 commit 97f2349

File tree

7 files changed

+66
-8
lines changed

7 files changed

+66
-8
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -990,8 +990,8 @@ public void testBatchMessageDispatchingAccordingToPermits() throws Exception {
990990

991991
int numMsgs = 1000;
992992
int batchMessages = 10;
993-
final String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID();
994-
final String subscriptionName = "sub-1";
993+
final String topicName = "persistent://prop/ns-abc/testBatchMessageDispatchingAccordingToPermits-" + UUID.randomUUID();
994+
final String subscriptionName = "bmdap-sub-1";
995995

996996
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
997997
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared)
@@ -1016,6 +1016,7 @@ public void testBatchMessageDispatchingAccordingToPermits() throws Exception {
10161016

10171017
producer.close();
10181018
consumer1.close();
1019+
consumer2.close();
10191020
}
10201021

10211022
@Test(dataProvider="testSubTypeAndEnableBatch")

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import java.io.Serializable;
2122
import lombok.AllArgsConstructor;
2223
import lombok.Builder;
2324
import lombok.Data;
@@ -36,7 +37,8 @@
3637
@AllArgsConstructor
3738
@InterfaceAudience.Public
3839
@InterfaceStability.Stable
39-
public class DeadLetterPolicy {
40+
public class DeadLetterPolicy implements Serializable {
41+
private static final long serialVersionUID = 1L;
4042

4143
/**
4244
* Maximum number of times that a message will be redelivered before being sent to the dead letter queue.

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import java.io.Serializable;
2122
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.List;
@@ -29,7 +30,7 @@
2930
*/
3031
@InterfaceAudience.Public
3132
@InterfaceStability.Stable
32-
public abstract class KeySharedPolicy {
33+
public abstract class KeySharedPolicy implements Serializable {
3334

3435
protected KeySharedMode keySharedMode;
3536

@@ -82,6 +83,7 @@ public int getHashRangeTotal() {
8283
* for message, the cursor will rewind.
8384
*/
8485
public static class KeySharedPolicySticky extends KeySharedPolicy {
86+
private static final long serialVersionUID = 1L;
8587

8688
protected final List<Range> ranges;
8789

@@ -129,6 +131,7 @@ public List<Range> getRanges() {
129131
* Auto split hash range key shared policy.
130132
*/
131133
public static class KeySharedPolicyAutoSplit extends KeySharedPolicy {
134+
private static final long serialVersionUID = 1L;
132135

133136
KeySharedPolicyAutoSplit() {
134137
this.keySharedMode = KeySharedMode.AUTO_SPLIT;

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import java.io.Serializable;
2122
import java.util.Objects;
2223
import org.apache.pulsar.common.classification.InterfaceAudience;
2324
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -27,7 +28,8 @@
2728
*/
2829
@InterfaceAudience.Public
2930
@InterfaceStability.Stable
30-
public class Range implements Comparable<Range> {
31+
public class Range implements Comparable<Range>, Serializable {
32+
private static final long serialVersionUID = 1L;
3133

3234
private final int start;
3335
private final int end;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ public int getMaxPendingChuckedMessage() {
359359
+ "When specifying the dead letter policy while not specifying `ackTimeoutMillis`, you can set the"
360360
+ " ack timeout to 30000 millisecond."
361361
)
362-
private transient DeadLetterPolicy deadLetterPolicy;
362+
private DeadLetterPolicy deadLetterPolicy;
363363

364364
private boolean retryEnable = false;
365365

@@ -388,7 +388,7 @@ public int getMaxPendingChuckedMessage() {
388388
private boolean resetIncludeHead = false;
389389

390390
@JsonIgnore
391-
private transient KeySharedPolicy keySharedPolicy;
391+
private KeySharedPolicy keySharedPolicy;
392392

393393
private boolean batchIndexAckEnabled = false;
394394

pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
144144
)
145145
private boolean resetIncludeHead = false;
146146

147-
private transient List<Range> keyHashRanges;
147+
private List<Range> keyHashRanges;
148148

149149
private boolean poolMessages = false;
150150

pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,18 @@
1919
package org.apache.pulsar.client.impl.conf;
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import java.io.ByteArrayInputStream;
24+
import java.io.ByteArrayOutputStream;
25+
import java.io.ObjectInputStream;
26+
import java.io.ObjectOutputStream;
27+
import java.util.Collections;
2228
import java.util.regex.Pattern;
29+
30+
import lombok.Cleanup;
31+
import org.apache.pulsar.client.api.DeadLetterPolicy;
32+
import org.apache.pulsar.client.api.SubscriptionType;
33+
import org.testng.Assert;
2334
import org.testng.annotations.DataProvider;
2435
import org.testng.annotations.Test;
2536

@@ -45,4 +56,43 @@ public void testTopicConsumerConfigurationData(String topicName, int expectedPri
4556

4657
assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(expectedPriority);
4758
}
59+
60+
@Test
61+
public void testSerializable() throws Exception {
62+
ConsumerConfigurationData<String> consumerConfigurationData = new ConsumerConfigurationData<>();
63+
consumerConfigurationData.setPriorityLevel(1);
64+
consumerConfigurationData.setSubscriptionName("my-sub");
65+
consumerConfigurationData.setSubscriptionType(SubscriptionType.Shared);
66+
consumerConfigurationData.setReceiverQueueSize(100);
67+
consumerConfigurationData.setAckTimeoutMillis(1000);
68+
consumerConfigurationData.setTopicNames(Collections.singleton("my-topic"));
69+
70+
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
71+
.maxRedeliverCount(10)
72+
.retryLetterTopic("retry-topic")
73+
.deadLetterTopic("dead-topic")
74+
.build();
75+
consumerConfigurationData.setDeadLetterPolicy(deadLetterPolicy);
76+
77+
@Cleanup
78+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
79+
@Cleanup
80+
ObjectOutputStream oos = new ObjectOutputStream(bos);
81+
oos.writeObject(consumerConfigurationData);
82+
byte[] serialized = bos.toByteArray();
83+
84+
// Deserialize
85+
@Cleanup
86+
ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
87+
@Cleanup
88+
ObjectInputStream ois = new ObjectInputStream(bis);
89+
Object object = ois.readObject();
90+
91+
Assert.assertEquals(object.getClass(), ConsumerConfigurationData.class);
92+
Assert.assertEquals(object, consumerConfigurationData);
93+
94+
DeadLetterPolicy deserialisedDeadLetterPolicy = ((ConsumerConfigurationData<?>) object).getDeadLetterPolicy();
95+
Assert.assertNotNull(deserialisedDeadLetterPolicy);
96+
Assert.assertEquals(deserialisedDeadLetterPolicy, deadLetterPolicy);
97+
}
4898
}

0 commit comments

Comments
 (0)