@@ -657,6 +657,16 @@ public void onCompletion(RecordMetadata shaded, Exception exception) {
657657 unMeta = constructUnshadedRecordMetadata (unRecMetaCls , unTp , shaded );
658658 }
659659
660+ // Guardrail: if we cannot construct metadata for a successful send, skip the
661+ // routed callback rather than violating the usual contract (success + null metadata).
662+ // The application's own producer callback (if any) will still see the real metadata.
663+ if (unMeta == null && exception == null ) {
664+ try {
665+ logger .debug ("[ROUTED-CALLBACK] onCompletion: skipping routed callback because RecordMetadata could not be constructed and no exception is present" );
666+ } catch (Throwable ignoredLog ) { }
667+ return ;
668+ }
669+
660670 // Strategy 1: Prefer interface signature in the app classloader (most reliable)
661671 if (ifaceMethod != null ) {
662672 try {
@@ -681,16 +691,35 @@ public void onCompletion(RecordMetadata shaded, Exception exception) {
681691 }
682692 }
683693
684- // Strategy 3: Fallback - find any compatible method by name with second param assignable from Throwable/Exception
694+ // Strategy 3: Fallback - find a compatible method by name and parameter types.
695+ // We require:
696+ // - name == "onCompletion"
697+ // - exactly two parameters
698+ // - second parameter assignable from Throwable/Exception
699+ // - (when available) first parameter type compatible with the app's RecordMetadata class
685700 Method chosen = null ;
686701 for (Method m : unshadedCallback .getClass ().getMethods ()) {
687702 if (!m .getName ().equals ("onCompletion" )) continue ;
688703 if (m .getParameterCount () != 2 ) continue ;
689704 Class <?>[] pt = m .getParameterTypes ();
690705 if (!Throwable .class .isAssignableFrom (pt [1 ]) && !Exception .class .isAssignableFrom (pt [1 ])) continue ;
706+ if (unRecMetaCls != null && !pt [0 ].isAssignableFrom (unRecMetaCls )) continue ;
691707 chosen = m ;
692708 break ;
693709 }
710+ if (chosen != null ) {
711+ Class <?>[] pt = chosen .getParameterTypes ();
712+ Object arg0 = unMeta ;
713+ Object arg1 = exception ;
714+
715+ // Ensure our arguments are compatible with the chosen method's parameters.
716+ if (arg0 != null && !pt [0 ].isInstance (arg0 )) {
717+ chosen = null ;
718+ } else if (arg1 != null && !pt [1 ].isInstance (arg1 )) {
719+ chosen = null ;
720+ }
721+ }
722+
694723 if (chosen != null ) {
695724 try {
696725 chosen .setAccessible (true );
@@ -722,41 +751,111 @@ public void onCompletion(RecordMetadata shaded, Exception exception) {
722751 * @throws Exception if constructor invocation fails
723752 */
724753 private static Object constructUnshadedRecordMetadata (Class <?> unRecMetaCls , Object unTopicPartition , RecordMetadata shaded ) throws Exception {
725- // Try known constructor patterns for Kafka 3.3.x and later versions
726754 Constructor <?>[] ctors = unRecMetaCls .getConstructors ();
755+
756+ long offset = shaded .offset ();
757+ long timestamp = shaded .timestamp ();
758+ int serializedKeySize = shaded .serializedKeySize ();
759+ int serializedValueSize = shaded .serializedValueSize ();
760+
727761 for (Constructor <?> ctor : ctors ) {
728- Class <?>[] p = ctor .getParameterTypes ();
729- try {
730- boolean isTopicPartition = p .length == 6
731- && p [0 ].getName ().equals ("org.apache.kafka.common.TopicPartition" )
732- && p [1 ] == long .class
733- && p [2 ] == long .class
734- && (p [3 ] == Integer .class || p [3 ] == int .class || p [3 ] == Long .class )
735- && p [4 ] == int .class
736- && p [5 ] == int .class ;
737- if (isTopicPartition ) {
738- int sk = shaded .serializedKeySize ();
739- int sv = shaded .serializedValueSize ();
740- // Constructor: RecordMetadata(TopicPartition, long offset, long timestamp, Integer leaderEpoch, int serializedKeySize, int serializedValueSize)
741- // Pass null for leaderEpoch to accept any epoch type (Integer, int, or Long)
742- return ctor .newInstance (unTopicPartition , shaded .offset (), shaded .timestamp (), null , sk , sv );
762+ Class <?>[] params = ctor .getParameterTypes ();
763+ if (params .length == 0 ) {
764+ continue ;
765+ }
766+
767+ boolean firstIsTopicPartition = "org.apache.kafka.common.TopicPartition" .equals (params [0 ].getName ());
768+ if (!firstIsTopicPartition ) {
769+ continue ;
770+ }
771+
772+ Object [] args = new Object [params .length ];
773+ args [0 ] = unTopicPartition ;
774+
775+ int longCount = 0 ;
776+
777+ // Collect indices of all int/Integer parameters (positions >= 1)
778+ List <Integer > intPositions = new ArrayList <>();
779+ for (int i = 1 ; i < params .length ; i ++) {
780+ Class <?> p = params [i ];
781+ if (p == int .class || p == Integer .class ) {
782+ intPositions .add (i );
743783 }
744- } catch (Throwable ignored ) { }
745- }
746- // Fallback: try constructors by parameter count (for older Kafka versions or alternative signatures)
747- for (Constructor <?> ctor : ctors ) {
748- if (ctor .getParameterCount () == 6 ) {
749- try {
750- return ctor .newInstance (unTopicPartition , shaded .offset (), shaded .timestamp (), null , shaded .serializedKeySize (), shaded .serializedValueSize ());
751- } catch (Throwable ignored ) { }
752784 }
753- if (ctor .getParameterCount () == 5 ) {
754- try {
755- // Try 5-parameter constructor: (TopicPartition, long offset, long timestamp, int keySize, int valueSize)
756- return ctor .newInstance (unTopicPartition , shaded .offset (), shaded .timestamp (), shaded .serializedKeySize (), shaded .serializedValueSize ());
757- } catch (Throwable ignored ) { }
785+
786+ // Decide which indices get key/value sizes: last two int positions, if present
787+ int keySizeIndex = -1 ;
788+ int valueSizeIndex = -1 ;
789+ if (!intPositions .isEmpty ()) {
790+ valueSizeIndex = intPositions .get (intPositions .size () - 1 );
791+ if (intPositions .size () >= 2 ) {
792+ keySizeIndex = intPositions .get (intPositions .size () - 2 );
793+ }
794+ }
795+
796+ for (int i = 1 ; i < params .length ; i ++) {
797+ Class <?> p = params [i ];
798+
799+ if (p == long .class || p == Long .class ) {
800+ long value ;
801+ if (longCount == 0 ) {
802+ value = offset ;
803+ } else if (longCount == 1 ) {
804+ value = timestamp ;
805+ } else {
806+ value = 0L ;
807+ }
808+ args [i ] = (p == long .class ) ? value : Long .valueOf (value );
809+ longCount ++;
810+ continue ;
811+ }
812+
813+ if (p == int .class || p == Integer .class ) {
814+ int value ;
815+ if (i == keySizeIndex ) {
816+ value = serializedKeySize ;
817+ } else if (i == valueSizeIndex ) {
818+ value = serializedValueSize ;
819+ } else {
820+ // Other int fields (e.g., leaderEpoch, future extensions) get a neutral default.
821+ value = 0 ;
822+ }
823+ args [i ] = (p == int .class ) ? value : Integer .valueOf (value );
824+ continue ;
825+ }
826+
827+ // For other reference types (e.g., leaderEpoch, checksum), pass null.
828+ if (!p .isPrimitive ()) {
829+ args [i ] = null ;
830+ continue ;
831+ }
832+
833+ // For any other primitive, fall back to the default zero value.
834+ if (p == boolean .class ) {
835+ args [i ] = false ;
836+ } else if (p == byte .class ) {
837+ args [i ] = (byte ) 0 ;
838+ } else if (p == short .class ) {
839+ args [i ] = (short ) 0 ;
840+ } else if (p == char .class ) {
841+ args [i ] = (char ) 0 ;
842+ } else if (p == float .class ) {
843+ args [i ] = 0.0f ;
844+ } else if (p == double .class ) {
845+ args [i ] = 0.0d ;
846+ } else {
847+ // Should not reach here, but keep it safe.
848+ args [i ] = null ;
849+ }
850+ }
851+
852+ try {
853+ return ctor .newInstance (args );
854+ } catch (Throwable ignored ) {
855+ // Try the next constructor
758856 }
759857 }
858+
760859 // No compatible constructor found - return null (application callback will receive null metadata)
761860 return null ;
762861 }
0 commit comments