Skip to content

Commit d44f574

Browse files
committed
[FLINK-37644] Remove guava from prod code
With Java 11 most of the usages can be easily replaced. TypeToken can be replaced by Flink's TypeExtractor. The sole hard thing was a use of Map.difference but it's only needed for logging and we can solve this in a different way.
1 parent 16c8559 commit d44f574

File tree

13 files changed

+72
-76
lines changed

13 files changed

+72
-76
lines changed

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ public class SQLClientSchemaRegistryITCase {
6565
ResourceTestUtils.getResource(".*avro-confluent.jar");
6666
private final Path sqlConnectorKafkaJar = ResourceTestUtils.getResource(".*kafka.jar");
6767

68-
private final Path guavaJar = ResourceTestUtils.getResource(".*guava.jar");
69-
7068
@ClassRule public static final Network NETWORK = Network.newNetwork();
7169

7270
@ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES);
@@ -252,7 +250,7 @@ private List<Integer> getAllVersions(String behaviourSubject) throws Exception {
252250
private void executeSqlStatements(List<String> sqlLines) throws Exception {
253251
flink.submitSQLJob(
254252
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
255-
.addJars(sqlAvroJar, sqlAvroRegistryJar, sqlConnectorKafkaJar, guavaJar)
253+
.addJars(sqlAvroJar, sqlAvroRegistryJar, sqlConnectorKafkaJar)
256254
.build());
257255
}
258256
}

flink-connector-kafka/pom.xml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,14 @@ under the License.
8282
<version>${kafka.version}</version>
8383
</dependency>
8484

85-
<dependency>
86-
<groupId>com.google.guava</groupId>
87-
<artifactId>guava</artifactId>
88-
</dependency>
89-
9085
<!-- Tests -->
9186

87+
<dependency>
88+
<groupId>com.google.guava</groupId>
89+
<artifactId>guava</artifactId>
90+
<scope>test</scope>
91+
</dependency>
92+
9293
<dependency>
9394
<groupId>org.hamcrest</groupId>
9495
<artifactId>hamcrest-all</artifactId>

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121
import org.apache.flink.annotation.Experimental;
2222

23-
import com.google.common.base.MoreObjects;
24-
2523
import java.io.Serializable;
2624
import java.util.Objects;
2725
import java.util.Properties;
@@ -67,10 +65,7 @@ public Properties getProperties() {
6765

6866
@Override
6967
public String toString() {
70-
return MoreObjects.toStringHelper(this)
71-
.add("topics", topics)
72-
.add("properties", properties)
73-
.toString();
68+
return "ClusterMetadata{" + "topics=" + topics + ", properties=" + properties + '}';
7469
}
7570

7671
@Override

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/KafkaStream.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020

2121
import org.apache.flink.annotation.Experimental;
2222

23-
import com.google.common.base.MoreObjects;
24-
2523
import java.io.Serializable;
2624
import java.util.Map;
2725
import java.util.Objects;
@@ -68,10 +66,13 @@ public Map<String, ClusterMetadata> getClusterMetadataMap() {
6866

6967
@Override
7068
public String toString() {
71-
return MoreObjects.toStringHelper(this)
72-
.add("streamId", streamId)
73-
.add("clusterMetadataMap", clusterMetadataMap)
74-
.toString();
69+
return "KafkaStream{"
70+
+ "streamId='"
71+
+ streamId
72+
+ '\''
73+
+ ", clusterMetadataMap="
74+
+ clusterMetadataMap
75+
+ '}';
7576
}
7677

7778
@Override

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/MetadataUpdateEvent.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
2424
import org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader;
2525

26-
import com.google.common.base.MoreObjects;
27-
2826
import java.util.Objects;
2927
import java.util.Set;
3028

@@ -55,7 +53,7 @@ public Set<KafkaStream> getKafkaStreams() {
5553

5654
@Override
5755
public String toString() {
58-
return MoreObjects.toStringHelper(this).add("kafkaStreams", kafkaStreams).toString();
56+
return "MetadataUpdateEvent{" + "kafkaStreams=" + kafkaStreams + '}';
5957
}
6058

6159
@Override

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@
4141
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
4242
import org.apache.flink.util.Preconditions;
4343

44-
import com.google.common.collect.ArrayListMultimap;
45-
import com.google.common.collect.MapDifference;
46-
import com.google.common.collect.Maps;
4744
import org.apache.kafka.common.KafkaException;
4845
import org.apache.kafka.common.TopicPartition;
4946
import org.slf4j.Logger;
@@ -61,6 +58,7 @@
6158
import java.util.Map.Entry;
6259
import java.util.Properties;
6360
import java.util.Set;
61+
import java.util.TreeMap;
6462
import java.util.stream.Collectors;
6563

6664
/**
@@ -260,17 +258,11 @@ private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams
260258
}
261259

262260
if (logger.isInfoEnabled()) {
263-
MapDifference<String, Set<String>> metadataDifference =
264-
Maps.difference(latestClusterTopicsMap, newClustersTopicsMap);
261+
// log the maps in a sorted fashion so it's easy to see the changes
265262
logger.info(
266-
"Common cluster topics after metadata refresh: {}",
267-
metadataDifference.entriesInCommon());
268-
logger.info(
269-
"Removed cluster topics after metadata refresh: {}",
270-
metadataDifference.entriesOnlyOnLeft());
271-
logger.info(
272-
"Additional cluster topics after metadata refresh: {}",
273-
metadataDifference.entriesOnlyOnRight());
263+
"Detected changed cluster topics after metadata refresh:\nPrevious: {}\nNew: {}",
264+
new TreeMap<>(latestClusterTopicsMap),
265+
new TreeMap<>(newClustersTopicsMap));
274266
}
275267

276268
DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState;
@@ -456,10 +448,11 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
456448
public void addSplitsBack(List<DynamicKafkaSourceSplit> splits, int subtaskId) {
457449
logger.debug("Adding splits back for {}", subtaskId);
458450
// separate splits by cluster
459-
ArrayListMultimap<String, KafkaPartitionSplit> kafkaPartitionSplits =
460-
ArrayListMultimap.create();
451+
Map<String, List<KafkaPartitionSplit>> kafkaPartitionSplits = new HashMap<>();
461452
for (DynamicKafkaSourceSplit split : splits) {
462-
kafkaPartitionSplits.put(split.getKafkaClusterId(), split.getKafkaPartitionSplit());
453+
kafkaPartitionSplits
454+
.computeIfAbsent(split.getKafkaClusterId(), unused -> new ArrayList<>())
455+
.add(split.getKafkaPartitionSplit());
463456
}
464457

465458
// add splits back and assign pending splits for all enumerators

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/subscriber/StreamPatternSubscriber.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
2323
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
2424

25-
import com.google.common.collect.ImmutableSet;
26-
25+
import java.util.ArrayList;
26+
import java.util.List;
2727
import java.util.Set;
2828
import java.util.regex.Pattern;
2929

@@ -40,14 +40,14 @@ public StreamPatternSubscriber(Pattern streamPattern) {
4040
@Override
4141
public Set<KafkaStream> getSubscribedStreams(KafkaMetadataService kafkaMetadataService) {
4242
Set<KafkaStream> allStreams = kafkaMetadataService.getAllStreams();
43-
ImmutableSet.Builder<KafkaStream> builder = ImmutableSet.builder();
43+
List<KafkaStream> matches = new ArrayList<>();
4444
for (KafkaStream kafkaStream : allStreams) {
4545
String streamId = kafkaStream.getStreamId();
4646
if (streamPattern.matcher(streamId).find()) {
47-
builder.add(kafkaStream);
47+
matches.add(kafkaStream);
4848
}
4949
}
5050

51-
return builder.build();
51+
return Set.copyOf(matches);
5252
}
5353
}

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.apache.flink.util.Preconditions;
5050
import org.apache.flink.util.UserCodeClassLoader;
5151

52-
import com.google.common.collect.ArrayListMultimap;
5352
import org.apache.kafka.clients.consumer.ConsumerRecord;
5453
import org.slf4j.Logger;
5554
import org.slf4j.LoggerFactory;
@@ -185,10 +184,11 @@ public void addSplits(List<DynamicKafkaSourceSplit> splits) {
185184
return;
186185
}
187186

188-
ArrayListMultimap<String, KafkaPartitionSplit> clusterSplitsMap =
189-
ArrayListMultimap.create();
187+
Map<String, List<KafkaPartitionSplit>> clusterSplitsMap = new HashMap<>();
190188
for (DynamicKafkaSourceSplit split : splits) {
191-
clusterSplitsMap.put(split.getKafkaClusterId(), split);
189+
clusterSplitsMap
190+
.computeIfAbsent(split.getKafkaClusterId(), unused -> new ArrayList<>())
191+
.add(split);
192192
}
193193

194194
Set<String> kafkaClusterIds = clusterSplitsMap.keySet();

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplit.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
2323

24-
import com.google.common.base.MoreObjects;
25-
2624
import java.util.Objects;
2725

2826
/** Split that wraps {@link KafkaPartitionSplit} with Kafka cluster information. */
@@ -56,10 +54,13 @@ public KafkaPartitionSplit getKafkaPartitionSplit() {
5654

5755
@Override
5856
public String toString() {
59-
return MoreObjects.toStringHelper(this)
60-
.add("kafkaClusterId", kafkaClusterId)
61-
.add("kafkaPartitionSplit", kafkaPartitionSplit)
62-
.toString();
57+
return "DynamicKafkaSourceSplit{"
58+
+ "kafkaClusterId='"
59+
+ kafkaClusterId
60+
+ '\''
61+
+ ", kafkaPartitionSplit="
62+
+ kafkaPartitionSplit
63+
+ '}';
6364
}
6465

6566
@Override

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/split/DynamicKafkaSourceSplitSerializer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
2424
import org.apache.flink.core.io.SimpleVersionedSerializer;
2525

26-
import com.google.common.io.ByteStreams;
27-
2826
import java.io.ByteArrayInputStream;
2927
import java.io.ByteArrayOutputStream;
3028
import java.io.DataInputStream;
@@ -69,7 +67,7 @@ public DynamicKafkaSourceSplit deserialize(int version, byte[] serialized) throw
6967
int kafkaPartitionSplitSerializerVersion = in.readInt();
7068
KafkaPartitionSplit kafkaPartitionSplit =
7169
kafkaPartitionSplitSerializer.deserialize(
72-
kafkaPartitionSplitSerializerVersion, ByteStreams.toByteArray(in));
70+
kafkaPartitionSplitSerializerVersion, in.readAllBytes());
7371
return new DynamicKafkaSourceSplit(kafkaClusterId, kafkaPartitionSplit);
7472
}
7573
}

0 commit comments

Comments
 (0)