Skip to content

Commit cf782af

Browse files
authored
Deprecate (Reactive)PulsarListenerEndpointAdapter (#481)
This commit deprecates PulsarListenerEndpointAdapter and ReactivePulsarListenerEndpointAdapter in favor of default methods on ListenerEndpoint and its sub-interfaces, which makes it a bit easier to provide custom ListenerEndpoint implementations.
1 parent aebb711 commit cf782af

File tree

7 files changed

+51
-16
lines changed

7 files changed

+51
-16
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ private void configureEndpoint(AbstractReactivePulsarListenerEndpoint<T> aplEndp
156156

157157
@Override
158158
public DefaultReactivePulsarMessageListenerContainer<T> createContainer(String... topics) {
159-
ReactivePulsarListenerEndpoint<T> endpoint = new ReactivePulsarListenerEndpointAdapter<>() {
159+
ReactivePulsarListenerEndpoint<T> endpoint = new ReactivePulsarListenerEndpoint<>() {
160160

161161
@Override
162162
public List<String> getTopics() {

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/ReactivePulsarListenerEndpoint.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,17 @@
2929
* @param <T> Message payload type.
3030
* @author Christophe Bornet
3131
* @author Chris Bono
32+
* @author Vedran Pavic
3233
*/
3334
public interface ReactivePulsarListenerEndpoint<T> extends ListenerEndpoint<ReactivePulsarMessageListenerContainer<T>> {
3435

35-
boolean isFluxListener();
36+
default boolean isFluxListener() {
37+
return false;
38+
}
3639

3740
@Nullable
38-
Boolean getUseKeyOrderedProcessing();
41+
default Boolean getUseKeyOrderedProcessing() {
42+
return null;
43+
}
3944

4045
}

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/ReactivePulsarListenerEndpointAdapter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
*
3232
* @param <T> Message payload type.
3333
* @author Christophe Bornet
34+
* @deprecated for removal in favor of {@link ReactivePulsarListenerEndpoint}
3435
*/
36+
@Deprecated(forRemoval = true)
3537
public class ReactivePulsarListenerEndpointAdapter<T> implements ReactivePulsarListenerEndpoint<T> {
3638

3739
@Override

spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void setConcurrency(Integer concurrency) {
5454

5555
@Override
5656
public ConcurrentPulsarMessageListenerContainer<T> createContainer(String... topics) {
57-
PulsarListenerEndpoint endpoint = new PulsarListenerEndpointAdapter() {
57+
PulsarListenerEndpoint endpoint = new PulsarListenerEndpoint() {
5858

5959
@Override
6060
public Collection<String> getTopics() {

spring-pulsar/src/main/java/org/springframework/pulsar/config/ListenerEndpoint.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.pulsar.config;
1818

1919
import java.util.Collection;
20+
import java.util.Collections;
2021

2122
import org.apache.pulsar.client.api.SubscriptionType;
2223
import org.apache.pulsar.common.schema.SchemaType;
@@ -32,6 +33,7 @@
3233
*
3334
* @param <C> Message listener container type.
3435
* @author Christophe Bornet
36+
* @author Vedran Pavic
3537
*/
3638
public interface ListenerEndpoint<C extends MessageListenerContainer> {
3739

@@ -42,53 +44,69 @@ public interface ListenerEndpoint<C extends MessageListenerContainer> {
4244
* @see ListenerContainerFactory#createListenerContainer
4345
*/
4446
@Nullable
45-
String getId();
47+
default String getId() {
48+
return null;
49+
}
4650

4751
/**
4852
* Return the subscription name for this endpoint's container.
4953
* @return the subscription name.
5054
*/
5155
@Nullable
52-
String getSubscriptionName();
56+
default String getSubscriptionName() {
57+
return null;
58+
}
5359

5460
/**
5561
* Return the subscription type for this endpoint's container.
5662
* @return the subscription type.
5763
*/
5864
@Nullable
59-
SubscriptionType getSubscriptionType();
65+
default SubscriptionType getSubscriptionType() {
66+
return SubscriptionType.Exclusive;
67+
}
6068

6169
/**
6270
* Return the topics for this endpoint's container.
6371
* @return the topics.
6472
*/
65-
Collection<String> getTopics();
73+
default Collection<String> getTopics() {
74+
return Collections.emptyList();
75+
}
6676

6777
/**
6878
* Return the topic pattern for this endpoint's container.
6979
* @return the topic pattern.
7080
*/
71-
String getTopicPattern();
81+
default String getTopicPattern() {
82+
return null;
83+
}
7284

7385
/**
7486
* Return the autoStartup for this endpoint's container.
7587
* @return the autoStartup.
7688
*/
7789
@Nullable
78-
Boolean getAutoStartup();
90+
default Boolean getAutoStartup() {
91+
return null;
92+
}
7993

8094
/**
8195
* Return the schema type for this endpoint's container.
8296
* @return the schema type.
8397
*/
84-
SchemaType getSchemaType();
98+
default SchemaType getSchemaType() {
99+
return null;
100+
}
85101

86102
/**
87103
* Return the concurrency for this endpoint's container.
88104
* @return the concurrency.
89105
*/
90106
@Nullable
91-
Integer getConcurrency();
107+
default Integer getConcurrency() {
108+
return null;
109+
}
92110

93111
/**
94112
* Setup the specified message listener container with the model defined by this
@@ -101,6 +119,7 @@ public interface ListenerEndpoint<C extends MessageListenerContainer> {
101119
* @param listenerContainer the listener container to configure
102120
* @param messageConverter the message converter - can be null
103121
*/
104-
void setupListenerContainer(C listenerContainer, @Nullable MessageConverter messageConverter);
122+
default void setupListenerContainer(C listenerContainer, @Nullable MessageConverter messageConverter) {
123+
}
105124

106125
}

spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarListenerEndpoint.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,20 @@
2828
*
2929
* @author Soby Chacko
3030
* @author Alexander Preuß
31+
* @author Vedran Pavic
3132
*/
3233
public interface PulsarListenerEndpoint extends ListenerEndpoint<PulsarMessageListenerContainer> {
3334

34-
boolean isBatchListener();
35+
default boolean isBatchListener() {
36+
return false;
37+
}
3538

36-
Properties getConsumerProperties();
39+
default Properties getConsumerProperties() {
40+
return null;
41+
}
3742

38-
AckMode getAckMode();
43+
default AckMode getAckMode() {
44+
return null;
45+
}
3946

4047
}

spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarListenerEndpointAdapter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
*
3434
* @author Soby Chacko
3535
* @author Alexander Preuß
36+
* @deprecated for removal in favor of {@link PulsarListenerEndpoint}
3637
*/
38+
@Deprecated(forRemoval = true)
3739
public class PulsarListenerEndpointAdapter implements PulsarListenerEndpoint {
3840

3941
@Override

0 commit comments

Comments
 (0)