3434import org .junit .jupiter .api .BeforeEach ;
3535import org .junit .jupiter .api .Test ;
3636import org .junit .jupiter .api .TestInfo ;
37+ import org .junit .jupiter .params .ParameterizedTest ;
38+ import org .junit .jupiter .params .provider .ValueSource ;
3739
3840@ AmqpTestInfrastructure
3941public class ConsumerOutcomeTest {
@@ -108,8 +110,9 @@ void requeuedMessageShouldBeRequeued() {
108110 waitAtMost (() -> management .queueInfo (q ).messageCount () == 0 );
109111 }
110112
111- @ Test
112- void requeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery () {
113+ @ ParameterizedTest
114+ @ ValueSource (booleans = {true , false })
115+ void requeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery (boolean batch ) {
113116 this .management .queue ().name (q ).type (QUORUM ).declare ();
114117
115118 Publisher publisher = this .connection .publisherBuilder ().queue (q ).build ();
@@ -120,13 +123,18 @@ void requeuedMessageWithAnnotationShouldContainAnnotationsOnRedelivery() {
120123 .consumerBuilder ()
121124 .queue (q )
122125 .messageHandler (
123- (context , message ) -> {
126+ (ctx , message ) -> {
124127 deliveryCount .incrementAndGet ();
125128 messages .offer (message );
126129 if (deliveryCount .get () == 1 ) {
127- context .requeue (ANNOTATIONS );
130+ if (batch ) {
131+ Consumer .BatchContext bc = ctx .batch ();
132+ bc .add (ctx );
133+ ctx = bc ;
134+ }
135+ ctx .requeue (ANNOTATIONS );
128136 } else {
129- context .accept ();
137+ ctx .accept ();
130138 redeliveredSync .down ();
131139 }
132140 })
@@ -175,15 +183,24 @@ void discardedMessageShouldBeDeadLeadLetteredWhenConfigured() {
175183 waitAtMost (() -> management .queueInfo (dlq ).messageCount () == 0 );
176184 }
177185
178- @ Test
179- void
180- discardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndContainAnnotationsWhenConfigured () {
186+ @ ParameterizedTest
187+ @ ValueSource (booleans = {true , false })
188+ void discardedMessageWithAnnotationsShouldBeDeadLeadLetteredAndContainAnnotationsWhenConfigured (
189+ boolean batch ) {
181190 declareDeadLetterTopology ();
182191 Publisher publisher = this .connection .publisherBuilder ().queue (q ).build ();
183192 this .connection
184193 .consumerBuilder ()
185194 .queue (q )
186- .messageHandler ((ctx , msg ) -> ctx .discard (ANNOTATIONS ))
195+ .messageHandler (
196+ (ctx , msg ) -> {
197+ if (batch ) {
198+ Consumer .BatchContext bc = ctx .batch ();
199+ bc .add (ctx );
200+ ctx = bc ;
201+ }
202+ ctx .discard (ANNOTATIONS );
203+ })
187204 .build ();
188205
189206 TestUtils .Sync deadLetteredSync = TestUtils .sync ();
0 commit comments