2424import java .util .List ;
2525import java .util .Map ;
2626import java .util .Set ;
27+ import java .util .concurrent .atomic .AtomicBoolean ;
2728
2829import static org .awaitility .Awaitility .await ;
2930
@@ -44,6 +45,9 @@ class PulsarJavaSpringBootStarterApplicationTests {
4445 @ Autowired
4546 private PulsarTemplate <MyMsg > producer ;
4647
48+ @ Autowired
49+ private PulsarTemplate <String > producerForError ;
50+
4751 @ Container
4852 static PulsarContainer pulsarContainer = new PulsarContainer ();
4953
@@ -66,7 +70,7 @@ void testProducerSendMethod() throws PulsarClientException {
6670 void testConsumerRegistration1 () throws Exception {
6771 final List <Consumer <?>> consumers = consumerBuilder .getConsumers ();
6872
69- Assertions .assertEquals (1 , consumers .size ());
73+ Assertions .assertEquals (2 , consumers .size ());
7074
7175 final Consumer <?> consumer = consumers .stream ().findFirst ().orElseThrow (Exception ::new );
7276
@@ -91,11 +95,29 @@ void testProducerRegistration() {
9195
9296 final Map <String , ImmutablePair <Class <?>, Serialization >> topics = producerFactory .getTopics ();
9397
94- Assertions .assertEquals (2 , topics .size ());
98+ Assertions .assertEquals (3 , topics .size ());
9599
96100 final Set <String > topicNames = new HashSet <>(topics .keySet ());
97101
98102 Assertions .assertTrue (topicNames .contains ("topic-one" ));
99103 Assertions .assertTrue (topicNames .contains ("topic-two" ));
100104 }
105+
106+ @ Test
107+ void testMessageErrorHandling () throws PulsarClientException {
108+ final AtomicBoolean receivedError = new AtomicBoolean (false );
109+ final String messageToSend = "This message will never arrive." ;
110+
111+ producerForError .send ("topic-for-error" , messageToSend );
112+
113+ consumerBuilder .onError (($ ) -> {
114+ Assertions .assertEquals ($ .getConsumer ().getTopic (), "topic-for-error" );
115+ Assertions .assertEquals ($ .getMessage ().getValue (), messageToSend );
116+ Assertions .assertNotNull ($ .getException ());
117+
118+ receivedError .set (true );
119+ });
120+
121+ await ().untilTrue (receivedError );
122+ }
101123}
0 commit comments