Skip to content

Commit 461a3e3

Browse files
committed
[FLINK-34996][Connectors/Kafka] Simplify code and add more tests on serialization wrappers.
1 parent 8986f77 commit 461a3e3

File tree

4 files changed

+20
-18
lines changed

4 files changed

+20
-18
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class KafkaSerializerWrapper<IN> implements SerializationSchema<IN> {
6363

6464
@Override
6565
public void open(InitializationContext context) throws Exception {
66-
final ClassLoader userCodeClassLoader = selectClassLoader(context);
66+
final ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
6767
try (TemporaryClassLoaderContext ignored =
6868
TemporaryClassLoaderContext.of(userCodeClassLoader)) {
6969
initializeSerializer(userCodeClassLoader);
@@ -84,14 +84,6 @@ public byte[] serialize(IN element) {
8484
return serializer.serialize(topicSelector.apply(element), element);
8585
}
8686

87-
/**
88-
* Selects the class loader to be used when instantiating the serializer. Using a class loader
89-
* with user code allows users to customize the serializer.
90-
*/
91-
protected ClassLoader selectClassLoader(InitializationContext context) {
92-
return context.getUserCodeClassLoader().asClassLoader();
93-
}
94-
9587
@SuppressWarnings("unchecked")
9688
protected void initializeSerializer(ClassLoader classLoader) throws Exception {
9789
serializer =

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class KafkaValueOnlyDeserializerWrapper<T> implements KafkaRecordDeserialization
5656

5757
@Override
5858
public void open(DeserializationSchema.InitializationContext context) throws Exception {
59-
ClassLoader userCodeClassLoader = selectClassLoader(context);
59+
ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
6060
try (TemporaryClassLoaderContext ignored =
6161
TemporaryClassLoaderContext.of(userCodeClassLoader)) {
6262
initializeDeserializer(userCodeClassLoader);
@@ -98,14 +98,6 @@ public TypeInformation<T> getProducedType() {
9898
return TypeExtractor.createTypeInfo(Deserializer.class, deserializerClass, 0, null, null);
9999
}
100100

101-
/**
102-
* Selects the class loader to be used when instantiating the deserializer. Using a class loader
103-
* with user code allows users to customize the deserializer.
104-
*/
105-
protected ClassLoader selectClassLoader(DeserializationSchema.InitializationContext context) {
106-
return context.getUserCodeClassLoader().asClassLoader();
107-
}
108-
109101
@SuppressWarnings("unchecked")
110102
protected void initializeDeserializer(ClassLoader classLoader) throws Exception {
111103
deserializer =

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapperTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.apache.flink.connector.kafka.sink;
22

33
import org.apache.flink.api.common.serialization.SerializationSchema;
4+
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
45
import org.apache.flink.metrics.MetricGroup;
56
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
67
import org.apache.flink.util.FlinkUserCodeClassLoaders;
@@ -43,6 +44,14 @@ public UserCodeClassLoader getUserCodeClassLoader() {
4344
assertEquals(classLoader, wrapper.getClassLoaderUsed());
4445
}
4546

47+
@Test
48+
public void testDefaultClassLoaderIsUsed() throws Exception {
49+
final KafkaSerializerWrapperCaptureForTest wrapper = new KafkaSerializerWrapperCaptureForTest();
50+
wrapper.open(new DummyInitializationContext());
51+
52+
assertEquals(DummyInitializationContext.class.getClassLoader(), wrapper.getClassLoaderUsed());
53+
}
54+
4655
static class KafkaSerializerWrapperCaptureForTest extends KafkaSerializerWrapper<String> {
4756
private ClassLoader classLoaderUsed;
4857

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapperTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.apache.flink.connector.kafka.source.reader.deserializer;
22

33
import org.apache.flink.api.common.serialization.DeserializationSchema;
4+
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
45
import org.apache.flink.metrics.MetricGroup;
56
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
67
import org.apache.flink.util.FlinkUserCodeClassLoaders;
@@ -44,6 +45,14 @@ public UserCodeClassLoader getUserCodeClassLoader() {
4445
assertEquals(classLoader, wrapper.getClassLoaderUsed());
4546
}
4647

48+
@Test
49+
public void testDefaultClassLoaderIsUsed() throws Exception {
50+
final KafkaValueOnlyDeserializerWrapperCaptureForTest wrapper = new KafkaValueOnlyDeserializerWrapperCaptureForTest();
51+
wrapper.open(new DummyInitializationContext());
52+
53+
assertEquals(DummyInitializationContext.class.getClassLoader(), wrapper.getClassLoaderUsed());
54+
}
55+
4756
static class KafkaValueOnlyDeserializerWrapperCaptureForTest
4857
extends KafkaValueOnlyDeserializerWrapper<String> {
4958
private ClassLoader classLoaderUsed;

0 commit comments

Comments
 (0)