diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java index 1abf4e88d1ef..27b57938a1db 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java @@ -293,23 +293,64 @@ protected ProtoCoder(Class protoMessageClass, Set> extensionHostClas this.extensionHostClasses = extensionHostClasses; } - /** Get the memoized {@link Parser}, possibly initializing it lazily. */ + /** + * Get the memoized {@link Parser}, possibly initializing it lazily. Attempts to use {@code + * getDefaultInstance()} first, falling back to instantiation if necessary, with clear error + * handling for unsupported cases. + */ protected synchronized Parser getParser() { if (memoizedParser == null) { try { if (DynamicMessage.class.equals(protoMessageClass)) { throw new IllegalArgumentException( "DynamicMessage is not supported by the ProtoCoder, use the DynamicProtoCoder."); + } else if (Message.class.equals(protoMessageClass)) { + throw new IllegalArgumentException( + "ProtoCoder does not support the raw Message interface. Use a concrete Protobuf-generated class."); } else { - @SuppressWarnings("unchecked") - T protoMessageInstance = - (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null); - @SuppressWarnings("unchecked") - Parser tParser = (Parser) protoMessageInstance.getParserForType(); - memoizedParser = tParser; + // Try using getDefaultInstance() first (preferred for generated Protobuf classes) + try { + @SuppressWarnings("unchecked") + T protoMessageInstance = + (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null); + @SuppressWarnings("unchecked") + Parser tParser = (Parser) protoMessageInstance.getParserForType(); + memoizedParser = tParser; + } catch (NoSuchMethodException e) { + // Fallback: instantiate directly if getDefaultInstance() isn’t available + try { + @SuppressWarnings("unchecked") + T protoMessageInstance = protoMessageClass.getDeclaredConstructor().newInstance(); + @SuppressWarnings("unchecked") + Parser tParser = (Parser) protoMessageInstance.getParserForType(); + memoizedParser = tParser; + } catch (NoSuchMethodException e2) { + throw new IllegalArgumentException( + "Class " + + protoMessageClass.getName() + + " lacks both getDefaultInstance() and a no-arg constructor. " + + "Ensure it is a concrete Protobuf-generated class.", + e2); + } catch (InstantiationException e2) { + throw new IllegalArgumentException( + "Class " + + protoMessageClass.getName() + + " is abstract or cannot be instantiated. " + + "Ensure it is a concrete Protobuf-generated class.", + e2); + } catch (IllegalAccessException | InvocationTargetException e2) { + throw new IllegalArgumentException( + "Failed to instantiate " + + protoMessageClass.getName() + + " due to access or runtime issues. " + + "Ensure it is a concrete Protobuf-generated class with an accessible constructor.", + e2); + } + } } - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new IllegalArgumentException(e); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new IllegalArgumentException( + "Failed to access getDefaultInstance() for " + protoMessageClass.getName(), e); } } return memoizedParser; diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java index 255b5aa4fdce..c20e9c42d51c 100644 --- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java +++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoderTest.java @@ -19,7 +19,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import com.google.protobuf.Message; import java.io.ObjectStreamClass; import java.util.Collections; import org.apache.beam.sdk.coders.CannotProvideCoderException; @@ -180,4 +183,47 @@ public void testSerialVersionID() { long serialVersionID = ObjectStreamClass.lookup(ProtoCoder.class).getSerialVersionUID(); assertEquals(-5043999806040629525L, serialVersionID); } + + @Test + public void testProtoCoderFailsWithRawMessageInterface() { + // Simulate the user’s pipeline where T resolves to Message + ProtoCoder coder = ProtoCoder.of(Message.class); + try { + coder.getParser(); // Triggers the new check in updated code + fail("Expected IllegalArgumentException for raw Message interface"); + } catch (IllegalArgumentException e) { + assertEquals( + "ProtoCoder does not support the raw Message interface. Use a concrete Protobuf-generated class.", + e.getMessage()); + } + } + + @Test + public void testProtoCoderWithGeneratedClass() throws Exception { + ProtoCoder coder = ProtoCoder.of(MessageA.class); + MessageA message = MessageA.newBuilder().setField1("Test").build(); + byte[] encoded = CoderUtils.encodeToByteArray(coder, message); + MessageA decoded = CoderUtils.decodeFromByteArray(coder, encoded); + assertEquals(message, decoded); + assertEquals("Test", decoded.getField1()); + } + + @Test + public void testProtoCoderWithAbstractClassThrowsException() { + try { + abstract class InvalidMessage implements Message { + // No implementation needed for test + } + ProtoCoder coder = ProtoCoder.of(InvalidMessage.class); + coder.getParser(); + fail("Expected IllegalArgumentException for abstract class"); + } catch (IllegalArgumentException e) { + assertTrue( + "Expected message about abstract class, but was: " + e.getMessage(), + e.getMessage().contains("is abstract or cannot be instantiated")); + assertTrue( + "Expected cause to be InstantiationException", + e.getCause() instanceof InstantiationException); + } + } }