Skip to content

Commit e308757

Browse files
committed
[FLINK-34996][Connectors/Kafka] Allow custom Serializer/Deserializer initialization and remove mockito.
1 parent 68da758 commit e308757

File tree

6 files changed

+137
-108
lines changed

6 files changed

+137
-108
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,10 @@ class KafkaSerializerWrapper<IN> implements SerializationSchema<IN> {
6464
@SuppressWarnings("unchecked")
6565
@Override
6666
public void open(InitializationContext context) throws Exception {
67-
final ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
67+
final ClassLoader userCodeClassLoader = selectClassLoader(context);
6868
try (TemporaryClassLoaderContext ignored =
6969
TemporaryClassLoaderContext.of(userCodeClassLoader)) {
70-
serializer =
71-
InstantiationUtil.instantiate(
72-
serializerClass.getName(),
73-
Serializer.class,
74-
userCodeClassLoader);
70+
initializeSerializer(userCodeClassLoader);
7571

7672
if (serializer instanceof Configurable) {
7773
((Configurable) serializer).configure(config);
@@ -88,4 +84,18 @@ public byte[] serialize(IN element) {
8884
checkState(serializer != null, "Call open() once before trying to serialize elements.");
8985
return serializer.serialize(topicSelector.apply(element), element);
9086
}
87+
88+
/**
89+
* Selects the class loader to be used when instantiating the serializer. Using a class loader
90+
* with user code allows users to customize the serializer.
91+
*/
92+
protected ClassLoader selectClassLoader(InitializationContext context) {
93+
return context.getUserCodeClassLoader().asClassLoader();
94+
}
95+
96+
protected void initializeSerializer(ClassLoader classLoader) throws Exception {
97+
serializer =
98+
InstantiationUtil.instantiate(
99+
serializerClass.getName(), Serializer.class, classLoader);
100+
}
91101
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,10 @@ class KafkaValueOnlyDeserializerWrapper<T> implements KafkaRecordDeserialization
5757
@Override
5858
@SuppressWarnings("unchecked")
5959
public void open(DeserializationSchema.InitializationContext context) throws Exception {
60-
ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader();
60+
ClassLoader userCodeClassLoader = selectClassLoader(context);
6161
try (TemporaryClassLoaderContext ignored =
6262
TemporaryClassLoaderContext.of(userCodeClassLoader)) {
63-
deserializer =
64-
(Deserializer<T>)
65-
InstantiationUtil.instantiate(
66-
deserializerClass.getName(),
67-
Deserializer.class,
68-
userCodeClassLoader);
63+
initializeDeserializer(userCodeClassLoader);
6964

7065
if (deserializer instanceof Configurable) {
7166
((Configurable) deserializer).configure(config);
@@ -103,4 +98,19 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> coll
10398
public TypeInformation<T> getProducedType() {
10499
return TypeExtractor.createTypeInfo(Deserializer.class, deserializerClass, 0, null, null);
105100
}
101+
102+
/**
103+
* Selects the class loader to be used when instantiating the deserializer. Using a class loader
104+
* with user code allows users to customize the deserializer.
105+
*/
106+
protected ClassLoader selectClassLoader(DeserializationSchema.InitializationContext context) {
107+
return context.getUserCodeClassLoader().asClassLoader();
108+
}
109+
110+
protected void initializeDeserializer(ClassLoader classLoader) throws Exception {
111+
deserializer =
112+
(Deserializer<T>)
113+
InstantiationUtil.instantiate(
114+
deserializerClass.getName(), Deserializer.class, classLoader);
115+
}
106116
}
Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,63 @@
11
package org.apache.flink.connector.kafka.sink;
22

3-
import org.apache.flink.streaming.connectors.kafka.testutils.SerializationTestBase;
3+
import org.apache.flink.api.common.serialization.SerializationSchema;
4+
import org.apache.flink.metrics.MetricGroup;
5+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
6+
import org.apache.flink.util.FlinkUserCodeClassLoaders;
7+
import org.apache.flink.util.SimpleUserCodeClassLoader;
8+
import org.apache.flink.util.UserCodeClassLoader;
9+
410
import org.apache.kafka.common.serialization.StringSerializer;
511
import org.junit.Test;
6-
import org.junit.runner.RunWith;
7-
import org.mockito.junit.MockitoJUnitRunner;
812

9-
import static org.mockito.Mockito.when;
13+
import java.net.URL;
1014

11-
@RunWith(MockitoJUnitRunner.class)
12-
public class KafkaSerializerWrapperTest extends SerializationTestBase {
13-
@Override
14-
protected void setupContext() {
15-
when(serializationContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
16-
}
15+
import static org.junit.Assert.assertEquals;
1716

17+
/** Tests for {@link KafkaSerializerWrapper}. */
18+
public class KafkaSerializerWrapperTest {
1819
@Test
1920
public void testUserCodeClassLoaderIsUsed() throws Exception {
20-
final KafkaSerializerWrapper<String> wrapper =
21-
new KafkaSerializerWrapper<>(StringSerializer.class, true, (value) -> "topic");
21+
final KafkaSerializerWrapperCaptureForTest wrapper =
22+
new KafkaSerializerWrapperCaptureForTest();
23+
final ClassLoader classLoader =
24+
FlinkUserCodeClassLoaders.childFirst(
25+
new URL[0],
26+
getClass().getClassLoader(),
27+
new String[0],
28+
throwable -> {},
29+
true);
30+
wrapper.open(
31+
new SerializationSchema.InitializationContext() {
32+
@Override
33+
public MetricGroup getMetricGroup() {
34+
return new UnregisteredMetricsGroup();
35+
}
36+
37+
@Override
38+
public UserCodeClassLoader getUserCodeClassLoader() {
39+
return SimpleUserCodeClassLoader.create(classLoader);
40+
}
41+
});
42+
43+
assertEquals(classLoader, wrapper.getClassLoaderUsed());
44+
}
45+
46+
static class KafkaSerializerWrapperCaptureForTest extends KafkaSerializerWrapper<String> {
47+
private ClassLoader classLoaderUsed;
48+
49+
KafkaSerializerWrapperCaptureForTest() {
50+
super(StringSerializer.class, true, (value) -> "topic");
51+
}
52+
53+
public ClassLoader getClassLoaderUsed() {
54+
return classLoaderUsed;
55+
}
2256

23-
testUserClassLoaderIsUsedWhen(() -> {
24-
wrapper.open(serializationContext);
25-
return null;
26-
}, new StringSerializer());
57+
@Override
58+
protected void initializeSerializer(ClassLoader classLoader) throws Exception {
59+
classLoaderUsed = classLoader;
60+
super.initializeSerializer(classLoader);
61+
}
2762
}
2863
}
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,65 @@
11
package org.apache.flink.connector.kafka.source.reader.deserializer;
22

3-
import org.apache.flink.streaming.connectors.kafka.testutils.SerializationTestBase;
3+
import org.apache.flink.api.common.serialization.DeserializationSchema;
4+
import org.apache.flink.metrics.MetricGroup;
5+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
6+
import org.apache.flink.util.FlinkUserCodeClassLoaders;
7+
import org.apache.flink.util.SimpleUserCodeClassLoader;
8+
import org.apache.flink.util.UserCodeClassLoader;
9+
410
import org.apache.kafka.common.serialization.StringDeserializer;
511
import org.junit.Test;
6-
import org.junit.runner.RunWith;
7-
import org.mockito.junit.MockitoJUnitRunner;
812

13+
import java.net.URL;
914
import java.util.HashMap;
10-
import java.util.Map;
11-
12-
import static org.mockito.Mockito.when;
1315

14-
@RunWith(MockitoJUnitRunner.class)
15-
public class KafkaValueOnlyDeserializerWrapperTest extends SerializationTestBase {
16-
@Override
17-
protected void setupContext() {
18-
when(deserializationContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
19-
}
16+
import static org.junit.Assert.assertEquals;
2017

18+
/** Tests for {@link KafkaValueOnlyDeserializerWrapper}. */
19+
public class KafkaValueOnlyDeserializerWrapperTest {
2120
@Test
2221
public void testUserCodeClassLoaderIsUsed() throws Exception {
23-
final Map<String, String> config = new HashMap<>();
24-
final KafkaValueOnlyDeserializerWrapper<String> wrapper =
25-
new KafkaValueOnlyDeserializerWrapper<>(StringDeserializer.class, config);
26-
27-
testUserClassLoaderIsUsedWhen(() -> {
28-
wrapper.open(deserializationContext);
29-
return null;
30-
}, new StringDeserializer());
22+
final KafkaValueOnlyDeserializerWrapperCaptureForTest wrapper =
23+
new KafkaValueOnlyDeserializerWrapperCaptureForTest();
24+
final ClassLoader classLoader =
25+
FlinkUserCodeClassLoaders.childFirst(
26+
new URL[0],
27+
getClass().getClassLoader(),
28+
new String[0],
29+
throwable -> {},
30+
true);
31+
wrapper.open(
32+
new DeserializationSchema.InitializationContext() {
33+
@Override
34+
public MetricGroup getMetricGroup() {
35+
return new UnregisteredMetricsGroup();
36+
}
37+
38+
@Override
39+
public UserCodeClassLoader getUserCodeClassLoader() {
40+
return SimpleUserCodeClassLoader.create(classLoader);
41+
}
42+
});
43+
44+
assertEquals(classLoader, wrapper.getClassLoaderUsed());
45+
}
46+
47+
static class KafkaValueOnlyDeserializerWrapperCaptureForTest
48+
extends KafkaValueOnlyDeserializerWrapper<String> {
49+
private ClassLoader classLoaderUsed;
50+
51+
KafkaValueOnlyDeserializerWrapperCaptureForTest() {
52+
super(StringDeserializer.class, new HashMap<>());
53+
}
54+
55+
public ClassLoader getClassLoaderUsed() {
56+
return classLoaderUsed;
57+
}
58+
59+
@Override
60+
protected void initializeDeserializer(ClassLoader classLoader) throws Exception {
61+
classLoaderUsed = classLoader;
62+
super.initializeDeserializer(classLoader);
63+
}
3164
}
3265
}

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SerializationTestBase.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

flink-connector-kafka/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)