2525import  java .util .concurrent .TimeUnit ;
2626import  java .util .stream .Stream ;
2727
28+ import  org .apache .commons .logging .Log ;
29+ import  org .apache .commons .logging .LogFactory ;
2830import  org .apache .kafka .clients .consumer .ConsumerConfig ;
2931import  org .apache .kafka .clients .consumer .ConsumerRebalanceListener ;
3032import  org .apache .kafka .clients .producer .ProducerConfig ;
98100@ DirtiesContext 
99101public  class  KafkaDslTests  {
100102
103+ 	private  static  final  Log  log  = LogFactory .getLog (KafkaDslTests .class );
104+ 
101105	static  final  String  TEST_TOPIC1  = "test-topic1" ;
102106
103107	static  final  String  TEST_TOPIC2  = "test-topic2" ;
@@ -179,6 +183,7 @@ void testKafkaAdapters() throws Exception {
179183		for  (int  i  = 0 ; i  < 100 ; i ++) {
180184			Message <?> receive  = this .listeningFromKafkaResults1 .receive (20000 );
181185			assertThat (receive ).isNotNull ();
186+ 			log .warn ("Received '%s' for index '%d'" .formatted (receive , i ));
182187			assertThat (receive .getPayload ()).isEqualTo ("FOO" );
183188			MessageHeaders  headers  = receive .getHeaders ();
184189			assertThat (headers .containsKey (KafkaHeaders .ACKNOWLEDGMENT )).isTrue ();
@@ -196,6 +201,7 @@ void testKafkaAdapters() throws Exception {
196201		for  (int  i  = 0 ; i  < 100 ; i ++) {
197202			Message <?> receive  = this .listeningFromKafkaResults2 .receive (20000 );
198203			assertThat (receive ).isNotNull ();
204+ 			log .warn ("Received '%s' for index '%d'" .formatted (receive , i ));
199205			assertThat (receive .getPayload ()).isEqualTo ("FOO" );
200206			MessageHeaders  headers  = receive .getHeaders ();
201207			assertThat (headers .containsKey (KafkaHeaders .ACKNOWLEDGMENT )).isTrue ();
0 commit comments