| 
73 | 73 | import org.springframework.pulsar.core.SchemaResolver;  | 
74 | 74 | import org.springframework.pulsar.core.TopicResolver;  | 
75 | 75 | import org.springframework.pulsar.listener.PulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig;  | 
76 |  | -import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig;  | 
77 |  | -import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.WithSubscriptionTypes.WithSubscriptionTypesConfig;  | 
 | 76 | +import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.SubscriptionTypeTestsConfig;  | 
78 | 77 | import org.springframework.pulsar.support.PulsarHeaders;  | 
79 | 78 | import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper;  | 
80 | 79 | import org.springframework.pulsar.test.model.UserPojo;  | 
@@ -1055,110 +1054,71 @@ void listen(String msg) {  | 
1055 | 1054 | 	}  | 
1056 | 1055 | 
 
  | 
1057 | 1056 | 	@Nested  | 
 | 1057 | +	@ContextConfiguration(classes = SubscriptionTypeTestsConfig.class)  | 
1058 | 1058 | 	class SubscriptionTypeTests {  | 
1059 | 1059 | 
 
  | 
1060 |  | -		@SuppressWarnings("rawtypes")  | 
1061 |  | -		private static AbstractObjectAssert<?, ?> assertSubscriptionType(Consumer<?> consumer) {  | 
1062 |  | -			return assertThat(consumer)  | 
1063 |  | -				.extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class))  | 
1064 |  | -				.extracting(ConsumerConfigurationData::getSubscriptionType);  | 
1065 |  | -		}  | 
1066 |  | - | 
1067 |  | -		@Nested  | 
1068 |  | -		@ContextConfiguration(classes = WithDefaultTypeConfig.class)  | 
1069 |  | -		class WithDefaultType {  | 
 | 1060 | +		static final CountDownLatch latchTypeNotSet = new CountDownLatch(1);  | 
1070 | 1061 | 
 
  | 
1071 |  | -			static final CountDownLatch latchTypeNotSet = new CountDownLatch(1);  | 
1072 |  | - | 
1073 |  | -			@Test  | 
1074 |  | -			void whenTypeNotSetAnywhereThenFallbackTypeIsUsed() throws Exception {  | 
1075 |  | -				pulsarTemplate.send("typeNotSetAnywhere-topic", "hello-typeNotSetAnywhere");  | 
1076 |  | -				assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue();  | 
1077 |  | -			}  | 
 | 1062 | +		static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1);  | 
1078 | 1063 | 
 
  | 
1079 |  | -			@Configuration(proxyBeanMethods = false)  | 
1080 |  | -			static class WithDefaultTypeConfig {  | 
 | 1064 | +		static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1);  | 
1081 | 1065 | 
 
  | 
1082 |  | -				@PulsarListener(topics = "typeNotSetAnywhere-topic", subscriptionName = "typeNotSetAnywhere-sub")  | 
1083 |  | -				void listenWithoutTypeSetAnywhere(String ignored, Consumer<String> consumer) {  | 
1084 |  | -					assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Exclusive);  | 
1085 |  | -					latchTypeNotSet.countDown();  | 
1086 |  | -				}  | 
 | 1066 | +		@Test  | 
 | 1067 | +		void defaultTypeFromContainerFactoryUsedWhenTypeNotSetAnywhere() throws Exception {  | 
 | 1068 | +			pulsarTemplate.send("latchTypeNotSet-topic", "hello-latchTypeNotSet");  | 
 | 1069 | +			assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue();  | 
 | 1070 | +		}  | 
1087 | 1071 | 
 
  | 
1088 |  | -			}  | 
 | 1072 | +		@Test  | 
 | 1073 | +		void typeSetOnAnnotationOverridesDefaultTypeFromContainerFactory() throws Exception {  | 
 | 1074 | +			pulsarTemplate.send("typeSetOnAnnotation-topic", "hello-typeSetOnAnnotation");  | 
 | 1075 | +			assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue();  | 
 | 1076 | +		}  | 
1089 | 1077 | 
 
  | 
 | 1078 | +		@Test  | 
 | 1079 | +		void typeSetOnCustomizerOverridesTypeSetOnAnnotation() throws Exception {  | 
 | 1080 | +			pulsarTemplate.send("typeSetOnCustomizer-topic", "hello-typeSetOnCustomizer");  | 
 | 1081 | +			assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue();  | 
1090 | 1082 | 		}  | 
1091 | 1083 | 
 
  | 
1092 |  | -		/**  | 
1093 |  | -		 * Tests the following order of setting the subscription type:  | 
1094 |  | -		 * <pre>  | 
1095 |  | -		 * - 1) ConsumerFactory defaultConfigCustomizer  | 
1096 |  | -		 * - 2) ContainerFactory props.subType (default is Exclusive)  | 
1097 |  | -		 * - 3) PulsarListener subType attribute  | 
1098 |  | -		 * - 4) PulsarListener customizer attribute  | 
1099 |  | -		 * </pre>  | 
1100 |  | -		 */  | 
1101 |  | -		@Nested  | 
1102 |  | -		@ContextConfiguration(classes = WithSubscriptionTypesConfig.class)  | 
1103 |  | -		class WithSubscriptionTypes {  | 
1104 |  | - | 
1105 |  | -			static final CountDownLatch latchTypeSetOnContainerFactory = new CountDownLatch(1);  | 
1106 |  | - | 
1107 |  | -			static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1);  | 
1108 |  | - | 
1109 |  | -			static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1);  | 
1110 |  | - | 
1111 |  | -			@Test  | 
1112 |  | -			void typeSetOnContainerFactoryUsedWhenNotSetElsewhere() throws Exception {  | 
1113 |  | -				pulsarTemplate.send("typeSetOnContainerFactory-topic", "hello-typeSetOnContainerFactory");  | 
1114 |  | -				assertThat(latchTypeSetOnContainerFactory.await(5, TimeUnit.SECONDS)).isTrue();  | 
1115 |  | -			}  | 
 | 1084 | +		@Configuration(proxyBeanMethods = false)  | 
 | 1085 | +		static class SubscriptionTypeTestsConfig {  | 
1116 | 1086 | 
 
  | 
1117 |  | -			@Test  | 
1118 |  | -			void typeSetOnAnnotationOverridesTypeSetOnContainerFactory() throws Exception {  | 
1119 |  | -				pulsarTemplate.send("typeSetOnAnnotation-topic", "hello-typeSetOnAnnotation");  | 
1120 |  | -				assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue();  | 
 | 1087 | +			@Bean  | 
 | 1088 | +			ConsumerBuilderCustomizer<String> consumerFactoryCustomizerSubTypeIsIgnored() {  | 
 | 1089 | +				return (b) -> b.subscriptionType(SubscriptionType.Shared);  | 
1121 | 1090 | 			}  | 
1122 | 1091 | 
 
  | 
1123 |  | -			@Test  | 
1124 |  | -			void typeSetOnCustomizerOverridesTypeSetOnAnnotation() throws Exception {  | 
1125 |  | -				pulsarTemplate.send("typeSetOnCustomizer-topic", "hello-typeSetOnCustomizer");  | 
1126 |  | -				assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue();  | 
 | 1092 | +			@PulsarListener(topics = "latchTypeNotSet-topic", subscriptionName = "latchTypeNotSet-sub")  | 
 | 1093 | +			void listenWithTypeNotSet(String ignored, Consumer<String> consumer) {  | 
 | 1094 | +				assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Exclusive);  | 
 | 1095 | +				latchTypeNotSet.countDown();  | 
1127 | 1096 | 			}  | 
1128 | 1097 | 
 
  | 
1129 |  | -			@Configuration(proxyBeanMethods = false)  | 
1130 |  | -			static class WithSubscriptionTypesConfig {  | 
1131 |  | - | 
1132 |  | -				@Bean  | 
1133 |  | -				ConsumerBuilderCustomizer<String> consumerFactoryCustomizerSubTypeIsIgnored() {  | 
1134 |  | -					return (b) -> b.subscriptionType(SubscriptionType.Shared);  | 
1135 |  | -				}  | 
1136 |  | - | 
1137 |  | -				@PulsarListener(topics = "typeSetOnContainerFactory-topic", subscriptionName = "typeSetOnContainerFactory-sub")  | 
1138 |  | -				void listenWithTypeSetOnlyOnContainerFactory(String ignored, Consumer<String> consumer) {  | 
1139 |  | -					assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Exclusive);  | 
1140 |  | -					latchTypeSetOnContainerFactory.countDown();  | 
1141 |  | -				}  | 
1142 |  | - | 
1143 |  | -				@PulsarListener(topics = "typeSetOnAnnotation-topic", subscriptionName = "typeSetOnAnnotation-sub",  | 
1144 |  | -						subscriptionType = SubscriptionType.Key_Shared)  | 
1145 |  | -				void listenWithTypeSetOnAnnotation(String ignored, Consumer<String> consumer) {  | 
1146 |  | -					assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Key_Shared);  | 
1147 |  | -					latchTypeSetOnAnnotation.countDown();  | 
1148 |  | -				}  | 
 | 1098 | +			@PulsarListener(topics = "typeSetOnAnnotation-topic", subscriptionName = "typeSetOnAnnotation-sub",  | 
 | 1099 | +					subscriptionType = SubscriptionType.Key_Shared)  | 
 | 1100 | +			void listenWithTypeSetOnAnnotation(String ignored, Consumer<String> consumer) {  | 
 | 1101 | +				assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Key_Shared);  | 
 | 1102 | +				latchTypeSetOnAnnotation.countDown();  | 
 | 1103 | +			}  | 
1149 | 1104 | 
 
  | 
1150 |  | -				@PulsarListener(topics = "typeSetOnCustomizer-topic", subscriptionName = "typeSetOnCustomizer-sub",  | 
1151 |  | -						subscriptionType = SubscriptionType.Key_Shared, consumerCustomizer = "myCustomizer")  | 
1152 |  | -				void listenWithTypeSetOnCustomizer(String ignored, Consumer<String> consumer) {  | 
1153 |  | -					assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Failover);  | 
1154 |  | -					latchTypeSetOnCustomizer.countDown();  | 
1155 |  | -				}  | 
 | 1105 | +			@PulsarListener(topics = "typeSetOnCustomizer-topic", subscriptionName = "typeSetOnCustomizer-sub",  | 
 | 1106 | +					subscriptionType = SubscriptionType.Key_Shared, consumerCustomizer = "myCustomizer")  | 
 | 1107 | +			void listenWithTypeSetOnCustomizer(String ignored, Consumer<String> consumer) {  | 
 | 1108 | +				assertSubscriptionType(consumer).isEqualTo(SubscriptionType.Failover);  | 
 | 1109 | +				latchTypeSetOnCustomizer.countDown();  | 
 | 1110 | +			}  | 
1156 | 1111 | 
 
  | 
1157 |  | -				@Bean  | 
1158 |  | -				public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {  | 
1159 |  | -					return cb -> cb.subscriptionType(SubscriptionType.Failover);  | 
1160 |  | -				}  | 
 | 1112 | +			@Bean  | 
 | 1113 | +			public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {  | 
 | 1114 | +				return cb -> cb.subscriptionType(SubscriptionType.Failover);  | 
 | 1115 | +			}  | 
1161 | 1116 | 
 
  | 
 | 1117 | +			@SuppressWarnings("rawtypes")  | 
 | 1118 | +			private static AbstractObjectAssert<?, ?> assertSubscriptionType(Consumer<?> consumer) {  | 
 | 1119 | +				return assertThat(consumer)  | 
 | 1120 | +					.extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class))  | 
 | 1121 | +					.extracting(ConsumerConfigurationData::getSubscriptionType);  | 
1162 | 1122 | 			}  | 
1163 | 1123 | 
 
  | 
1164 | 1124 | 		}  | 
 | 
0 commit comments