7777import org .springframework .util .ClassUtils ;
7878import org .springframework .util .ObjectUtils ;
7979import org .springframework .util .StringUtils ;
80+ import org .springframework .util .TypeUtils ;
8081
8182/**
8283 * An abstract {@link org.springframework.kafka.listener.MessageListener} adapter
@@ -320,6 +321,20 @@ public void setBeanResolver(BeanResolver beanResolver) {
320321 this .evaluationContext .addPropertyAccessor (new MapAccessor ());
321322 }
322323
324+ /**
325+ * Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}.
326+ * {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)}
327+ * will invoke {@link MessagingMessageListenerAdapter#asyncRetryCallback} when
328+ * {@link CompletableFuture} or {@link Mono} fails to complete.
329+ * @param asyncRetryCallback the callback for async retry.
330+ * @since 3.3
331+ */
332+ public void setCallbackForAsyncFailure (
333+ @ Nullable BiConsumer <ConsumerRecord <K , V >, RuntimeException > asyncRetryCallback ) {
334+
335+ this .asyncRetryCallback = asyncRetryCallback ;
336+ }
337+
323338 protected boolean isMessageList () {
324339 return this .isMessageList ;
325340 }
@@ -392,6 +407,7 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC
392407
393408 protected Message <?> toMessagingMessage (ConsumerRecord <K , V > cRecord , @ Nullable Acknowledgment acknowledgment ,
394409 Consumer <?, ?> consumer ) {
410+
395411 return getMessageConverter ().toMessage (cRecord , acknowledgment , consumer , getType ());
396412 }
397413
@@ -875,70 +891,47 @@ else if (isAck || isKotlinContinuation || isConsumer || annotationHeaderIsGroupI
875891 private Type extractGenericParameterTypFromMethodParameter (MethodParameter methodParameter ) {
876892 Type genericParameterType = methodParameter .getGenericParameterType ();
877893 if (genericParameterType instanceof ParameterizedType parameterizedType ) {
878- if (parameterizedType .getRawType ().equals (Message .class )) {
894+ Type rawType = parameterizedType .getRawType ();
895+ if (rawType .equals (Message .class )) {
879896 genericParameterType = parameterizedType .getActualTypeArguments ()[0 ];
880897 }
881- else if (parameterizedType .getRawType ().equals (List .class )
882- && parameterizedType .getActualTypeArguments ().length == 1 ) {
883-
884- Type paramType = getTypeFromWildCardWithUpperBound (parameterizedType .getActualTypeArguments ()[0 ]);
885- this .isConsumerRecordList = parameterIsType (paramType , ConsumerRecord .class );
886- boolean messageWithGeneric = rawByParameterIsType (paramType , Message .class );
887- this .isMessageList = Message .class .equals (paramType ) || messageWithGeneric ;
888- if (messageWithGeneric ) {
898+ else if (rawType .equals (List .class ) && parameterizedType .getActualTypeArguments ().length == 1 ) {
899+ Type paramType = parameterizedType .getActualTypeArguments ()[0 ];
900+ boolean messageHasGeneric = paramType instanceof ParameterizedType pType
901+ && pType .getRawType ().equals (Message .class );
902+ this .isMessageList = TypeUtils .isAssignable (paramType , Message .class ) || messageHasGeneric ;
903+ this .isConsumerRecordList = TypeUtils .isAssignable (paramType , ConsumerRecord .class );
904+ if (messageHasGeneric ) {
889905 genericParameterType = ((ParameterizedType ) paramType ).getActualTypeArguments ()[0 ];
890906 }
891907 }
892908 else {
893- this .isConsumerRecords = parameterizedType . getRawType () .equals (ConsumerRecords .class );
909+ this .isConsumerRecords = rawType .equals (ConsumerRecords .class );
894910 }
895911 }
896912 return genericParameterType ;
897913 }
898914
899- private boolean annotationHeaderIsGroupId (MethodParameter methodParameter ) {
915+ private static boolean annotationHeaderIsGroupId (MethodParameter methodParameter ) {
900916 Header header = methodParameter .getParameterAnnotation (Header .class );
901917 return header != null && KafkaHeaders .GROUP_ID .equals (header .value ());
902918 }
903919
904- private Type getTypeFromWildCardWithUpperBound (Type paramType ) {
905- if (paramType instanceof WildcardType wcType
906- && wcType .getUpperBounds () != null
907- && wcType .getUpperBounds ().length > 0 ) {
908- paramType = wcType .getUpperBounds ()[0 ];
909- }
910- return paramType ;
911- }
912-
913- private boolean isMessageWithNoTypeInfo (Type parameterType ) {
920+ private static boolean isMessageWithNoTypeInfo (Type parameterType ) {
914921 if (parameterType instanceof ParameterizedType pType && pType .getRawType ().equals (Message .class )) {
915922 return pType .getActualTypeArguments ()[0 ] instanceof WildcardType ;
916923 }
917924 return Message .class .equals (parameterType ); // could be Message without a generic type
918925 }
919926
920- private boolean parameterIsType (Type parameterType , Type type ) {
927+ private static boolean parameterIsType (Type parameterType , Type type ) {
921928 return parameterType .equals (type ) || rawByParameterIsType (parameterType , type );
922929 }
923930
924- private boolean rawByParameterIsType (Type parameterType , Type type ) {
931+ private static boolean rawByParameterIsType (Type parameterType , Type type ) {
925932 return parameterType instanceof ParameterizedType pType && pType .getRawType ().equals (type );
926933 }
927934
928- /**
929- * Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}.
930- * {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)}
931- * will invoke {@link MessagingMessageListenerAdapter#asyncRetryCallback} when
932- * {@link CompletableFuture} or {@link Mono} fails to complete.
933- * @param asyncRetryCallback the callback for async retry.
934- * @since 3.3
935- */
936- public void setCallbackForAsyncFailure (
937- @ Nullable BiConsumer <ConsumerRecord <K , V >, RuntimeException > asyncRetryCallback ) {
938-
939- this .asyncRetryCallback = asyncRetryCallback ;
940- }
941-
942935 /**
943936 * Root object for reply expression evaluation.
944937 * @param request the request.
@@ -947,6 +940,7 @@ public void setCallbackForAsyncFailure(
947940 * @since 2.0
948941 */
949942 public record ReplyExpressionRoot (Object request , Object source , Object result ) {
943+
950944 }
951945
952946 static class NoOpAck implements Acknowledgment {
0 commit comments