From b496437b790793d4cf471b2dfdd05dee7a4c474c Mon Sep 17 00:00:00 2001 From: Hugo Gu Date: Wed, 3 Apr 2024 18:00:21 +0800 Subject: [PATCH 1/4] [FLINK-34996][Connectors/Kafka] Use UserCodeClassLoader to instantiate Deserializer. --- .../reader/deserializer/KafkaValueOnlyDeserializerWrapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java index 8c8095b6b..d86c0cbac 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java @@ -65,7 +65,7 @@ public void open(DeserializationSchema.InitializationContext context) throws Exc InstantiationUtil.instantiate( deserializerClass.getName(), Deserializer.class, - getClass().getClassLoader()); + userCodeClassLoader); if (deserializer instanceof Configurable) { ((Configurable) deserializer).configure(config); From b4e047823aa0af665ac2fa27bac5f9974bb395c3 Mon Sep 17 00:00:00 2001 From: Hugo Gu Date: Wed, 3 Apr 2024 21:44:05 +0800 Subject: [PATCH 2/4] [FLINK-34996][Connectors/Kafka] Test proper class loader is used and also correct serializer. --- .../kafka/sink/KafkaSerializerWrapper.java | 2 +- .../sink/KafkaSerializerWrapperTest.java | 28 +++++++++ ...KafkaValueOnlyDeserializerWrapperTest.java | 32 ++++++++++ .../testutils/SerializationTestBase.java | 58 +++++++++++++++++++ .../org.mockito.plugins.MockMaker | 1 + 5 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java create mode 100644 flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java create mode 100644 flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SerializationTestBase.java create mode 100644 flink-connector-kafka/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java index f2120568b..bd3d924f2 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java @@ -71,7 +71,7 @@ public void open(InitializationContext context) throws Exception { InstantiationUtil.instantiate( serializerClass.getName(), Serializer.class, - getClass().getClassLoader()); + userCodeClassLoader); if (serializer instanceof Configurable) { ((Configurable) serializer).configure(config); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java new file mode 100644 index 000000000..8017531af --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java @@ -0,0 +1,28 @@ +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.streaming.connectors.kafka.testutils.SerializationTestBase; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaSerializerWrapperTest extends SerializationTestBase { + @Override + protected void setupContext() { + when(serializationContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader); + } + + @Test + public void testUserCodeClassLoaderIsUsed() throws Exception { + final KafkaSerializerWrapper wrapper = + new KafkaSerializerWrapper<>(StringSerializer.class, true, (value) -> "topic"); + + testUserClassLoaderIsUsedWhen(() -> { + wrapper.open(serializationContext); + return null; + }, new StringSerializer()); + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java new file mode 100644 index 000000000..46c207977 --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java @@ -0,0 +1,32 @@ +package org.apache.flink.connector.kafka.source.reader.deserializer; + +import org.apache.flink.streaming.connectors.kafka.testutils.SerializationTestBase; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaValueOnlyDeserializerWrapperTest extends SerializationTestBase { + @Override + protected void setupContext() { + when(deserializationContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader); + } + + @Test + public void testUserCodeClassLoaderIsUsed() throws Exception { + final Map config = new HashMap<>(); + final KafkaValueOnlyDeserializerWrapper wrapper = + new KafkaValueOnlyDeserializerWrapper<>(StringDeserializer.class, config); + + testUserClassLoaderIsUsedWhen(() -> { + wrapper.open(deserializationContext); + return null; + }, new StringDeserializer()); + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SerializationTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SerializationTestBase.java new file mode 100644 index 000000000..6dbaae029 --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SerializationTestBase.java @@ -0,0 +1,58 @@ +package org.apache.flink.streaming.connectors.kafka.testutils; + + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.UserCodeClassLoader; +import org.junit.Before; +import org.mockito.*; + +import java.util.concurrent.Callable; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.when; + +public class SerializationTestBase { + @Mock + protected DeserializationSchema.InitializationContext deserializationContext; + @Mock + protected SerializationSchema.InitializationContext serializationContext; + @Mock + protected UserCodeClassLoader userCodeClassLoader; + @Mock + protected ClassLoader classLoader; + @Captor + private ArgumentCaptor classLoaderCaptor; + + @Before + public void setUp() { + when(userCodeClassLoader.asClassLoader()).thenReturn(classLoader); + setupContext(); + } + + protected void setupContext() { + } + + protected void testUserClassLoaderIsUsedWhen(Callable callable, Object instance) throws Exception { + try (MockedStatic mocked = Mockito.mockStatic(InstantiationUtil.class)) { + + mocked.when(() -> InstantiationUtil.instantiate( + anyString(), + notNull(), + any(ClassLoader.class) + )).thenReturn(instance); + + callable.call(); + + mocked.verify(() -> InstantiationUtil.instantiate( + anyString(), + notNull(), + classLoaderCaptor.capture() + )); + + assertEquals(classLoader, classLoaderCaptor.getValue()); + } + } +} diff --git a/flink-connector-kafka/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/flink-connector-kafka/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000..1f0955d45 --- /dev/null +++ b/flink-connector-kafka/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline From 6a5b91951b3e0869307a67ca54612b32e16eb7db Mon Sep 17 00:00:00 2001 From: Hugo Gu Date: Thu, 4 Apr 2024 12:40:13 +0800 Subject: [PATCH 3/4] [FLINK-34996][Connectors/Kafka] Allow custom Serializer/Deserializer initialization and remove mockito. --- .../kafka/sink/KafkaSerializerWrapper.java | 24 ++++-- .../KafkaValueOnlyDeserializerWrapper.java | 25 +++++-- .../sink/KafkaSerializerWrapperTest.java | 67 +++++++++++++---- ...KafkaValueOnlyDeserializerWrapperTest.java | 73 ++++++++++++++----- .../testutils/SerializationTestBase.java | 58 --------------- .../org.mockito.plugins.MockMaker | 1 - 6 files changed, 138 insertions(+), 110 deletions(-) delete mode 100644 flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SerializationTestBase.java delete mode 100644 flink-connector-kafka/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java index bd3d924f2..b43743e85 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java @@ -61,17 +61,12 @@ class KafkaSerializerWrapper implements SerializationSchema { this(serializerClass, isKey, Collections.emptyMap(), topicSelector); } - @SuppressWarnings("unchecked") @Override public void open(InitializationContext context) throws Exception { - final ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader(); + final ClassLoader userCodeClassLoader = selectClassLoader(context); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userCodeClassLoader)) { - serializer = - InstantiationUtil.instantiate( - serializerClass.getName(), - Serializer.class, - userCodeClassLoader); + initializeSerializer(userCodeClassLoader); if (serializer instanceof Configurable) { ((Configurable) serializer).configure(config); @@ -88,4 +83,19 @@ public byte[] serialize(IN element) { checkState(serializer != null, "Call open() once before trying to serialize elements."); return serializer.serialize(topicSelector.apply(element), element); } + + /** + * Selects the class loader to be used when instantiating the serializer. Using a class loader + * with user code allows users to customize the serializer. + */ + protected ClassLoader selectClassLoader(InitializationContext context) { + return context.getUserCodeClassLoader().asClassLoader(); + } + + @SuppressWarnings("unchecked") + protected void initializeSerializer(ClassLoader classLoader) throws Exception { + serializer = + InstantiationUtil.instantiate( + serializerClass.getName(), Serializer.class, classLoader); + } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java index d86c0cbac..6da612b82 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java @@ -55,17 +55,11 @@ class KafkaValueOnlyDeserializerWrapper implements KafkaRecordDeserialization } @Override - @SuppressWarnings("unchecked") public void open(DeserializationSchema.InitializationContext context) throws Exception { - ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader(); + ClassLoader userCodeClassLoader = selectClassLoader(context); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userCodeClassLoader)) { - deserializer = - (Deserializer) - InstantiationUtil.instantiate( - deserializerClass.getName(), - Deserializer.class, - userCodeClassLoader); + initializeDeserializer(userCodeClassLoader); if (deserializer instanceof Configurable) { ((Configurable) deserializer).configure(config); @@ -103,4 +97,19 @@ public void deserialize(ConsumerRecord record, Collector coll public TypeInformation getProducedType() { return TypeExtractor.createTypeInfo(Deserializer.class, deserializerClass, 0, null, null); } + + /** + * Selects the class loader to be used when instantiating the deserializer. Using a class loader + * with user code allows users to customize the deserializer. + */ + protected ClassLoader selectClassLoader(DeserializationSchema.InitializationContext context) { + return context.getUserCodeClassLoader().asClassLoader(); + } + + @SuppressWarnings("unchecked") + protected void initializeDeserializer(ClassLoader classLoader) throws Exception { + deserializer = + InstantiationUtil.instantiate( + deserializerClass.getName(), Deserializer.class, classLoader); + } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java index 8017531af..a8c0a4a45 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java @@ -1,28 +1,63 @@ package org.apache.flink.connector.kafka.sink; -import org.apache.flink.streaming.connectors.kafka.testutils.SerializationTestBase; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.FlinkUserCodeClassLoaders; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; -import static org.mockito.Mockito.when; +import java.net.URL; -@RunWith(MockitoJUnitRunner.class) -public class KafkaSerializerWrapperTest extends SerializationTestBase { - @Override - protected void setupContext() { - when(serializationContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader); - } +import static org.junit.Assert.assertEquals; +/** Tests for {@link KafkaSerializerWrapper}. */ +public class KafkaSerializerWrapperTest { @Test public void testUserCodeClassLoaderIsUsed() throws Exception { - final KafkaSerializerWrapper wrapper = - new KafkaSerializerWrapper<>(StringSerializer.class, true, (value) -> "topic"); + final KafkaSerializerWrapperCaptureForTest wrapper = + new KafkaSerializerWrapperCaptureForTest(); + final ClassLoader classLoader = + FlinkUserCodeClassLoaders.childFirst( + new URL[0], + getClass().getClassLoader(), + new String[0], + throwable -> {}, + true); + wrapper.open( + new SerializationSchema.InitializationContext() { + @Override + public MetricGroup getMetricGroup() { + return new UnregisteredMetricsGroup(); + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return SimpleUserCodeClassLoader.create(classLoader); + } + }); + + assertEquals(classLoader, wrapper.getClassLoaderUsed()); + } + + static class KafkaSerializerWrapperCaptureForTest extends KafkaSerializerWrapper { + private ClassLoader classLoaderUsed; + + KafkaSerializerWrapperCaptureForTest() { + super(StringSerializer.class, true, (value) -> "topic"); + } + + public ClassLoader getClassLoaderUsed() { + return classLoaderUsed; + } - testUserClassLoaderIsUsedWhen(() -> { - wrapper.open(serializationContext); - return null; - }, new StringSerializer()); + @Override + protected void initializeSerializer(ClassLoader classLoader) throws Exception { + classLoaderUsed = classLoader; + super.initializeSerializer(classLoader); + } } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java index 46c207977..df07322c0 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java @@ -1,32 +1,65 @@ package org.apache.flink.connector.kafka.source.reader.deserializer; -import org.apache.flink.streaming.connectors.kafka.testutils.SerializationTestBase; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.FlinkUserCodeClassLoaders; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; +import java.net.URL; import java.util.HashMap; -import java.util.Map; - -import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) -public class KafkaValueOnlyDeserializerWrapperTest extends SerializationTestBase { - @Override - protected void setupContext() { - when(deserializationContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader); - } +import static org.junit.Assert.assertEquals; +/** Tests for {@link KafkaValueOnlyDeserializerWrapper}. */ +public class KafkaValueOnlyDeserializerWrapperTest { @Test public void testUserCodeClassLoaderIsUsed() throws Exception { - final Map config = new HashMap<>(); - final KafkaValueOnlyDeserializerWrapper wrapper = - new KafkaValueOnlyDeserializerWrapper<>(StringDeserializer.class, config); - - testUserClassLoaderIsUsedWhen(() -> { - wrapper.open(deserializationContext); - return null; - }, new StringDeserializer()); + final KafkaValueOnlyDeserializerWrapperCaptureForTest wrapper = + new KafkaValueOnlyDeserializerWrapperCaptureForTest(); + final ClassLoader classLoader = + FlinkUserCodeClassLoaders.childFirst( + new URL[0], + getClass().getClassLoader(), + new String[0], + throwable -> {}, + true); + wrapper.open( + new DeserializationSchema.InitializationContext() { + @Override + public MetricGroup getMetricGroup() { + return new UnregisteredMetricsGroup(); + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return SimpleUserCodeClassLoader.create(classLoader); + } + }); + + assertEquals(classLoader, wrapper.getClassLoaderUsed()); + } + + static class KafkaValueOnlyDeserializerWrapperCaptureForTest + extends KafkaValueOnlyDeserializerWrapper { + private ClassLoader classLoaderUsed; + + KafkaValueOnlyDeserializerWrapperCaptureForTest() { + super(StringDeserializer.class, new HashMap<>()); + } + + public ClassLoader getClassLoaderUsed() { + return classLoaderUsed; + } + + @Override + protected void initializeDeserializer(ClassLoader classLoader) throws Exception { + classLoaderUsed = classLoader; + super.initializeDeserializer(classLoader); + } } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SerializationTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SerializationTestBase.java deleted file mode 100644 index 6dbaae029..000000000 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SerializationTestBase.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.apache.flink.streaming.connectors.kafka.testutils; - - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.UserCodeClassLoader; -import org.junit.Before; -import org.mockito.*; - -import java.util.concurrent.Callable; - -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.when; - -public class SerializationTestBase { - @Mock - protected DeserializationSchema.InitializationContext deserializationContext; - @Mock - protected SerializationSchema.InitializationContext serializationContext; - @Mock - protected UserCodeClassLoader userCodeClassLoader; - @Mock - protected ClassLoader classLoader; - @Captor - private ArgumentCaptor classLoaderCaptor; - - @Before - public void setUp() { - when(userCodeClassLoader.asClassLoader()).thenReturn(classLoader); - setupContext(); - } - - protected void setupContext() { - } - - protected void testUserClassLoaderIsUsedWhen(Callable callable, Object instance) throws Exception { - try (MockedStatic mocked = Mockito.mockStatic(InstantiationUtil.class)) { - - mocked.when(() -> InstantiationUtil.instantiate( - anyString(), - notNull(), - any(ClassLoader.class) - )).thenReturn(instance); - - callable.call(); - - mocked.verify(() -> InstantiationUtil.instantiate( - anyString(), - notNull(), - classLoaderCaptor.capture() - )); - - assertEquals(classLoader, classLoaderCaptor.getValue()); - } - } -} diff --git a/flink-connector-kafka/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/flink-connector-kafka/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker deleted file mode 100644 index 1f0955d45..000000000 --- a/flink-connector-kafka/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker +++ /dev/null @@ -1 +0,0 @@ -mock-maker-inline From b288080c38fd34e49cc1d592131abe0216529d90 Mon Sep 17 00:00:00 2001 From: Hugo Gu Date: Mon, 8 Apr 2024 15:59:18 +0800 Subject: [PATCH 4/4] [FLINK-34996][Connectors/Kafka] Simplify code and add more tests on serialization wrappers. --- .../connector/kafka/sink/KafkaSerializerWrapper.java | 10 +--------- .../KafkaValueOnlyDeserializerWrapper.java | 10 +--------- .../kafka/sink/KafkaSerializerWrapperTest.java | 11 +++++++++++ .../KafkaValueOnlyDeserializerWrapperTest.java | 11 +++++++++++ 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java index b43743e85..6bc811a46 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java @@ -63,7 +63,7 @@ class KafkaSerializerWrapper implements SerializationSchema { @Override public void open(InitializationContext context) throws Exception { - final ClassLoader userCodeClassLoader = selectClassLoader(context); + final ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader(); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userCodeClassLoader)) { initializeSerializer(userCodeClassLoader); @@ -84,14 +84,6 @@ public byte[] serialize(IN element) { return serializer.serialize(topicSelector.apply(element), element); } - /** - * Selects the class loader to be used when instantiating the serializer. Using a class loader - * with user code allows users to customize the serializer. - */ - protected ClassLoader selectClassLoader(InitializationContext context) { - return context.getUserCodeClassLoader().asClassLoader(); - } - @SuppressWarnings("unchecked") protected void initializeSerializer(ClassLoader classLoader) throws Exception { serializer = diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java index 6da612b82..4d320fcc8 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java @@ -56,7 +56,7 @@ class KafkaValueOnlyDeserializerWrapper implements KafkaRecordDeserialization @Override public void open(DeserializationSchema.InitializationContext context) throws Exception { - ClassLoader userCodeClassLoader = selectClassLoader(context); + ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader(); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userCodeClassLoader)) { initializeDeserializer(userCodeClassLoader); @@ -98,14 +98,6 @@ public TypeInformation getProducedType() { return TypeExtractor.createTypeInfo(Deserializer.class, deserializerClass, 0, null, null); } - /** - * Selects the class loader to be used when instantiating the deserializer. Using a class loader - * with user code allows users to customize the deserializer. - */ - protected ClassLoader selectClassLoader(DeserializationSchema.InitializationContext context) { - return context.getUserCodeClassLoader().asClassLoader(); - } - @SuppressWarnings("unchecked") protected void initializeDeserializer(ClassLoader classLoader) throws Exception { deserializer = diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java index a8c0a4a45..2f8d872a8 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java @@ -1,6 +1,7 @@ package org.apache.flink.connector.kafka.sink; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.FlinkUserCodeClassLoaders; @@ -43,6 +44,16 @@ public UserCodeClassLoader getUserCodeClassLoader() { assertEquals(classLoader, wrapper.getClassLoaderUsed()); } + @Test + public void testDefaultClassLoaderIsUsed() throws Exception { + final KafkaSerializerWrapperCaptureForTest wrapper = + new KafkaSerializerWrapperCaptureForTest(); + wrapper.open(new DummyInitializationContext()); + + assertEquals( + DummyInitializationContext.class.getClassLoader(), wrapper.getClassLoaderUsed()); + } + static class KafkaSerializerWrapperCaptureForTest extends KafkaSerializerWrapper { private ClassLoader classLoaderUsed; diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java index df07322c0..312bfbcdf 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java @@ -1,6 +1,7 @@ package org.apache.flink.connector.kafka.source.reader.deserializer; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.util.FlinkUserCodeClassLoaders; @@ -44,6 +45,16 @@ public UserCodeClassLoader getUserCodeClassLoader() { assertEquals(classLoader, wrapper.getClassLoaderUsed()); } + @Test + public void testDefaultClassLoaderIsUsed() throws Exception { + final KafkaValueOnlyDeserializerWrapperCaptureForTest wrapper = + new KafkaValueOnlyDeserializerWrapperCaptureForTest(); + wrapper.open(new DummyInitializationContext()); + + assertEquals( + DummyInitializationContext.class.getClassLoader(), wrapper.getClassLoaderUsed()); + } + static class KafkaValueOnlyDeserializerWrapperCaptureForTest extends KafkaValueOnlyDeserializerWrapper { private ClassLoader classLoaderUsed;