|
74 | 74 | import org.springframework.pulsar.core.TopicResolver; |
75 | 75 | import org.springframework.pulsar.listener.PulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig; |
76 | 76 | import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig; |
77 | | -import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig; |
| 77 | +import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.WithSubscriptionTypes.WithSubscriptionTypesConfig; |
78 | 78 | import org.springframework.pulsar.support.PulsarHeaders; |
79 | 79 | import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; |
80 | 80 | import org.springframework.pulsar.test.model.UserPojo; |
@@ -1089,61 +1089,69 @@ void listenWithoutTypeSetAnywhere(String ignored, Consumer<String> consumer) { |
1089 | 1089 |
|
1090 | 1090 | } |
1091 | 1091 |
|
| 1092 | + /** |
| 1093 | + * Tests the following order of setting the subscription type: |
| 1094 | + * <pre> |
| 1095 | + * - 1) ConsumerFactory defaultConfigCustomizer |
| 1096 | + * - 2) ContainerFactory props.subType (default is Exclusive) |
| 1097 | + * - 3) PulsarListener subType attribute |
| 1098 | + * - 4) PulsarListener customizer attribute |
| 1099 | + * </pre> |
| 1100 | + */ |
1092 | 1101 | @Nested |
1093 | | - @ContextConfiguration(classes = WithSpecificTypesConfig.class) |
1094 | | - class WithSpecificTypes { |
| 1102 | + @ContextConfiguration(classes = WithSubscriptionTypesConfig.class) |
| 1103 | + class WithSubscriptionTypes { |
1095 | 1104 |
|
1096 | | - static final CountDownLatch latchTypeSetConsumerFactory = new CountDownLatch(1); |
| 1105 | + static final CountDownLatch latchTypeSetOnContainerFactory = new CountDownLatch(1); |
1097 | 1106 |
|
1098 | | - static final CountDownLatch latchTypeSetAnnotation = new CountDownLatch(1); |
| 1107 | + static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1); |
1099 | 1108 |
|
1100 | | - static final CountDownLatch latchWithCustomizer = new CountDownLatch(1); |
| 1109 | + static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1); |
1101 | 1110 |
|
1102 | 1111 | @Test |
1103 | | - void whenTypeSetOnlyInConsumerFactoryThenConsumerFactoryTypeIsUsed() throws Exception { |
1104 | | - pulsarTemplate.send("typeSetConsumerFactory-topic", "hello-typeSetConsumerFactory"); |
1105 | | - assertThat(latchTypeSetConsumerFactory.await(5, TimeUnit.SECONDS)).isTrue(); |
| 1112 | + void typeSetOnContainerFactoryUsedWhenNotSetElsewhere() throws Exception { |
| 1113 | + pulsarTemplate.send("typeSetOnContainerFactory-topic", "hello-typeSetOnContainerFactory"); |
| 1114 | + assertThat(latchTypeSetOnContainerFactory.await(5, TimeUnit.SECONDS)).isTrue(); |
1106 | 1115 | } |
1107 | 1116 |
|
1108 | 1117 | @Test |
1109 | | - void whenTypeSetOnAnnotationThenAnnotationTypeIsUsed() throws Exception { |
1110 | | - pulsarTemplate.send("typeSetAnnotation-topic", "hello-typeSetAnnotation"); |
1111 | | - assertThat(latchTypeSetAnnotation.await(5, TimeUnit.SECONDS)).isTrue(); |
| 1118 | + void typeSetOnAnnotationOverridesTypeSetOnContainerFactory() throws Exception { |
| 1119 | + pulsarTemplate.send("typeSetOnAnnotation-topic", "hello-typeSetOnAnnotation"); |
| 1120 | + assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue(); |
1112 | 1121 | } |
1113 | 1122 |
|
1114 | 1123 | @Test |
1115 | | - void whenTypeSetWithCustomizerThenCustomizerTypeIsUsed() throws Exception { |
1116 | | - pulsarTemplate.send("typeSetCustomizer-topic", "hello-typeSetCustomizer"); |
1117 | | - assertThat(latchWithCustomizer.await(5, TimeUnit.SECONDS)).isTrue(); |
| 1124 | + void typeSetOnCustomizerOverridesTypeSetOnAnnotation() throws Exception { |
| 1125 | + pulsarTemplate.send("typeSetOnCustomizer-topic", "hello-typeSetOnCustomizer"); |
| 1126 | + assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue(); |
1118 | 1127 | } |
1119 | 1128 |
|
1120 | 1129 | @Configuration(proxyBeanMethods = false) |
1121 | | - static class WithSpecificTypesConfig { |
| 1130 | + static class WithSubscriptionTypesConfig { |
1122 | 1131 |
|
1123 | 1132 | @Bean |
1124 | | - ConsumerBuilderCustomizer<String> consumerFactoryDefaultSubTypeCustomizer() { |
| 1133 | + ConsumerBuilderCustomizer<String> consumerFactoryCustomizerSubTypeIsIgnored() { |
1125 | 1134 | return (b) -> b.subscriptionType(SubscriptionType.Shared); |
1126 | 1135 | } |
1127 | 1136 |
|
1128 | | - @PulsarListener(topics = "typeSetConsumerFactory-topic", |
1129 | | - subscriptionName = "typeSetConsumerFactory-sub", subscriptionType = {}) |
1130 | | - void listenWithTypeSetOnlyOnConsumerFactory(String ignored, Consumer<String> consumer) { |
1131 | | - assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Shared); |
1132 | | - latchTypeSetConsumerFactory.countDown(); |
| 1137 | + @PulsarListener(topics = "typeSetOnContainerFactory-topic", subscriptionName = "typeSetOnContainerFactory-sub") |
| 1138 | + void listenWithTypeSetOnlyOnContainerFactory(String ignored, Consumer<String> consumer) { |
| 1139 | + assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Exclusive); |
| 1140 | + latchTypeSetOnContainerFactory.countDown(); |
1133 | 1141 | } |
1134 | 1142 |
|
1135 | | - @PulsarListener(topics = "typeSetAnnotation-topic", subscriptionName = "typeSetAnnotation-sub", |
| 1143 | + @PulsarListener(topics = "typeSetOnAnnotation-topic", subscriptionName = "typeSetOnAnnotation-sub", |
1136 | 1144 | subscriptionType = SubscriptionType.Key_Shared) |
1137 | 1145 | void listenWithTypeSetOnAnnotation(String ignored, Consumer<String> consumer) { |
1138 | 1146 | assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Key_Shared); |
1139 | | - latchTypeSetAnnotation.countDown(); |
| 1147 | + latchTypeSetOnAnnotation.countDown(); |
1140 | 1148 | } |
1141 | 1149 |
|
1142 | | - @PulsarListener(topics = "typeSetCustomizer-topic", subscriptionName = "typeSetCustomizer-sub", |
| 1150 | + @PulsarListener(topics = "typeSetOnCustomizer-topic", subscriptionName = "typeSetOnCustomizer-sub", |
1143 | 1151 | subscriptionType = SubscriptionType.Key_Shared, consumerCustomizer = "myCustomizer") |
1144 | | - void listenWithTypeSetInCustomizer(String ignored, Consumer<String> consumer) { |
| 1152 | + void listenWithTypeSetOnCustomizer(String ignored, Consumer<String> consumer) { |
1145 | 1153 | assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Failover); |
1146 | | - latchWithCustomizer.countDown(); |
| 1154 | + latchTypeSetOnCustomizer.countDown(); |
1147 | 1155 | } |
1148 | 1156 |
|
1149 | 1157 | @Bean |
|
0 commit comments