Skip to content

Commit 7d54f7b

Browse files
authored
MINOR: Cleanup DelayedOperationKey (#20947)
Refactor the subclasses of DelayedOperationKey using `record`. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent cf7b4a9 commit 7d54f7b

File tree

5 files changed

+5
-108
lines changed

5 files changed

+5
-108
lines changed

server-common/src/main/java/org/apache/kafka/server/purgatory/TopicPartitionOperationKey.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,10 @@
1919
import org.apache.kafka.common.TopicIdPartition;
2020
import org.apache.kafka.common.TopicPartition;
2121

22-
import java.util.Objects;
23-
2422
/**
2523
* Used by delayed-produce and delayed-fetch operations
2624
*/
27-
public class TopicPartitionOperationKey implements DelayedOperationKey {
28-
29-
public final String topic;
30-
public final int partition;
31-
32-
public TopicPartitionOperationKey(String topic, int partition) {
33-
this.topic = topic;
34-
this.partition = partition;
35-
}
25+
public record TopicPartitionOperationKey(String topic, int partition) implements DelayedOperationKey {
3626

3727
public TopicPartitionOperationKey(TopicPartition tp) {
3828
this(tp.topic(), tp.partition());
@@ -46,17 +36,4 @@ public TopicPartitionOperationKey(TopicIdPartition tp) {
4636
public String keyLabel() {
4737
return topic + "-" + partition;
4838
}
49-
50-
@Override
51-
public boolean equals(Object o) {
52-
if (this == o) return true;
53-
if (o == null || getClass() != o.getClass()) return false;
54-
TopicPartitionOperationKey that = (TopicPartitionOperationKey) o;
55-
return partition == that.partition && Objects.equals(topic, that.topic);
56-
}
57-
58-
@Override
59-
public int hashCode() {
60-
return Objects.hash(topic, partition);
61-
}
6239
}

server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
import java.util.ArrayList;
2626
import java.util.List;
27-
import java.util.Objects;
2827
import java.util.Optional;
2928
import java.util.Random;
3029
import java.util.concurrent.ExecutionException;
@@ -63,26 +62,7 @@ public void tearDown() throws Exception {
6362
executorService.shutdown();
6463
}
6564

66-
private static class MockKey implements DelayedOperationKey {
67-
@Override
68-
public boolean equals(Object o) {
69-
if (this == o) return true;
70-
if (o == null || getClass() != o.getClass()) return false;
71-
MockKey mockKey = (MockKey) o;
72-
return Objects.equals(key, mockKey.key);
73-
}
74-
75-
@Override
76-
public int hashCode() {
77-
return key != null ? key.hashCode() : 0;
78-
}
79-
80-
final String key;
81-
82-
MockKey(String key) {
83-
this.key = key;
84-
}
85-
65+
private record MockKey(String key) implements DelayedOperationKey {
8666
@Override
8767
public String keyLabel() {
8868
return key;

server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchGroupKey.java

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,47 +19,15 @@
1919
import org.apache.kafka.common.TopicIdPartition;
2020
import org.apache.kafka.common.Uuid;
2121

22-
import java.util.Objects;
23-
2422
/**
2523
* A key for delayed share fetch purgatory that refers to the share partition.
2624
*/
27-
public class DelayedShareFetchGroupKey implements DelayedShareFetchKey {
28-
private final String groupId;
29-
private final Uuid topicId;
30-
private final int partition;
25+
public record DelayedShareFetchGroupKey(String groupId, Uuid topicId, int partition) implements DelayedShareFetchKey {
3126

3227
public DelayedShareFetchGroupKey(String groupId, TopicIdPartition topicIdPartition) {
3328
this(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
3429
}
3530

36-
public DelayedShareFetchGroupKey(String groupId, Uuid topicId, int partition) {
37-
this.groupId = groupId;
38-
this.topicId = topicId;
39-
this.partition = partition;
40-
}
41-
42-
@Override
43-
public boolean equals(Object o) {
44-
if (this == o) return true;
45-
if (o == null || getClass() != o.getClass()) return false;
46-
DelayedShareFetchGroupKey that = (DelayedShareFetchGroupKey) o;
47-
return topicId.equals(that.topicId) && partition == that.partition && groupId.equals(that.groupId);
48-
}
49-
50-
@Override
51-
public int hashCode() {
52-
return Objects.hash(topicId, partition, groupId);
53-
}
54-
55-
@Override
56-
public String toString() {
57-
return "DelayedShareFetchGroupKey(groupId=" + groupId +
58-
", topicId=" + topicId +
59-
", partition=" + partition +
60-
")";
61-
}
62-
6331
@Override
6432
public String keyLabel() {
6533
return String.format("groupId=%s, topicId=%s, partition=%s", groupId, topicId, partition);

server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,43 +19,15 @@
1919
import org.apache.kafka.common.TopicIdPartition;
2020
import org.apache.kafka.common.Uuid;
2121

22-
import java.util.Objects;
23-
2422
/**
2523
* A key for delayed share fetch purgatory that refers to the topic partition.
2624
*/
27-
public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey {
28-
private final Uuid topicId;
29-
private final int partition;
25+
public record DelayedShareFetchPartitionKey(Uuid topicId, int partition) implements DelayedShareFetchKey {
3026

3127
public DelayedShareFetchPartitionKey(TopicIdPartition topicIdPartition) {
3228
this(topicIdPartition.topicId(), topicIdPartition.partition());
3329
}
3430

35-
public DelayedShareFetchPartitionKey(Uuid topicId, int partition) {
36-
this.topicId = topicId;
37-
this.partition = partition;
38-
}
39-
40-
@Override
41-
public boolean equals(Object o) {
42-
if (this == o) return true;
43-
if (o == null || getClass() != o.getClass()) return false;
44-
DelayedShareFetchPartitionKey that = (DelayedShareFetchPartitionKey) o;
45-
return topicId.equals(that.topicId) && partition == that.partition;
46-
}
47-
48-
@Override
49-
public int hashCode() {
50-
return Objects.hash(topicId, partition);
51-
}
52-
53-
@Override
54-
public String toString() {
55-
return "DelayedShareFetchPartitionKey(topicId=" + topicId +
56-
", partition=" + partition + ")";
57-
}
58-
5931
@Override
6032
public String keyLabel() {
6133
return String.format("topicId=%s, partition=%s", topicId, partition);

storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void testResponseOnRequestExpiration() throws InterruptedException {
100100
assertEquals(numResponse.get(), listOffsetsRequestKeys.size());
101101
assertEquals(listOffsetsRequestKeys.size(), DelayedRemoteListOffsets.AGGREGATE_EXPIRATION_METER.count());
102102
listOffsetsRequestKeys.forEach(key -> {
103-
TopicPartition tp = new TopicPartition(key.topic, key.partition);
103+
TopicPartition tp = new TopicPartition(key.topic(), key.partition());
104104
assertEquals(1, DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.get(tp).count());
105105
});
106106
}

0 commit comments

Comments
 (0)