|
113 | 113 | * @author Artem Bilan |
114 | 114 | * @author Loic Talhouarne |
115 | 115 | * @author Lukasz Kaminski |
| 116 | + * @author Ray Chuan Tay |
116 | 117 | */ |
117 | 118 | @EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2, |
118 | 119 | KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4, |
@@ -176,6 +177,8 @@ public class KafkaMessageListenerContainerTests { |
176 | 177 |
|
177 | 178 | public static final String topic23 = "testTopic23"; |
178 | 179 |
|
| 180 | + public static final String topic24 = "testTopic24"; |
| 181 | + |
179 | 182 | private static EmbeddedKafkaBroker embeddedKafka; |
180 | 183 |
|
181 | 184 | @BeforeAll |
@@ -1893,6 +1896,47 @@ public void testJsonSerDeConfiguredType() throws Exception { |
1893 | 1896 | this.logger.info("Stop JSON1"); |
1894 | 1897 | } |
1895 | 1898 |
|
| 1899 | + @Test |
| 1900 | + public void testJsonSerDeWithInstanceDoesNotUseConfiguration() throws Exception { |
| 1901 | + this.logger.info("Start JSON1a"); |
| 1902 | + Class<Foo1> consumerConfigValueDefaultType = Foo1.class; |
| 1903 | + Map<String, Object> props = KafkaTestUtils.consumerProps("testJson", "false", embeddedKafka); |
| 1904 | + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); |
| 1905 | + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, consumerConfigValueDefaultType); |
| 1906 | + DefaultKafkaConsumerFactory<Integer, Foo> cf = new DefaultKafkaConsumerFactory<>(props, null, new JsonDeserializer<>(Foo.class)); |
| 1907 | + ContainerProperties containerProps = new ContainerProperties(topic24); |
| 1908 | + |
| 1909 | + final CountDownLatch latch = new CountDownLatch(1); |
| 1910 | + final AtomicReference<ConsumerRecord<?, ?>> received = new AtomicReference<>(); |
| 1911 | + containerProps.setMessageListener((MessageListener<Integer, Foo>) record -> { |
| 1912 | + KafkaMessageListenerContainerTests.this.logger.info("json: " + record); |
| 1913 | + received.set(record); |
| 1914 | + latch.countDown(); |
| 1915 | + }); |
| 1916 | + |
| 1917 | + KafkaMessageListenerContainer<Integer, Foo> container = |
| 1918 | + new KafkaMessageListenerContainer<>(cf, containerProps); |
| 1919 | + container.setBeanName("testJson1a"); |
| 1920 | + container.start(); |
| 1921 | + |
| 1922 | + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); |
| 1923 | + |
| 1924 | + Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); |
| 1925 | + senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); |
| 1926 | + DefaultKafkaProducerFactory<Integer, Foo> pf = new DefaultKafkaProducerFactory<>(senderProps); |
| 1927 | + KafkaTemplate<Integer, Foo> template = new KafkaTemplate<>(pf); |
| 1928 | + template.setDefaultTopic(topic24); |
| 1929 | + template.sendDefault(0, new Foo("bar")); |
| 1930 | + template.flush(); |
| 1931 | + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); |
| 1932 | + assertThat(received.get().value()) |
| 1933 | + .isInstanceOf(Foo.class) |
| 1934 | + .isNotInstanceOf(consumerConfigValueDefaultType); |
| 1935 | + container.stop(); |
| 1936 | + pf.destroy(); |
| 1937 | + this.logger.info("Stop JSON1a"); |
| 1938 | + } |
| 1939 | + |
1896 | 1940 | @Test |
1897 | 1941 | public void testJsonSerDeHeaderSimpleType() throws Exception { |
1898 | 1942 | this.logger.info("Start JSON2"); |
|
0 commit comments