Skip to content

Commit d1aa370

Browse files
authored
KAFKA-15599: Move SegmentPosition/TimingWheelExpirationService to raft module (apache#18094)
Reviewers: Divij Vaidya <[email protected]>, Jason Taylor <[email protected]>
1 parent 0c435e3 commit d1aa370

File tree

5 files changed

+91
-72
lines changed

5 files changed

+91
-72
lines changed

core/src/main/scala/kafka/raft/KafkaMetadataLog.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException
2828
import org.apache.kafka.common.record.{MemoryRecords, Records}
2929
import org.apache.kafka.common.utils.{Time, Utils}
3030
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
31-
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
31+
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
3232
import org.apache.kafka.server.common.RequestLocal
3333
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
3434
import org.apache.kafka.server.storage.log.FetchIsolation
@@ -81,7 +81,7 @@ final class KafkaMetadataLog private (
8181

8282
new LogOffsetMetadata(
8383
fetchInfo.fetchOffsetMetadata.messageOffset,
84-
Optional.of(SegmentPosition(
84+
Optional.of(new SegmentPosition(
8585
fetchInfo.fetchOffsetMetadata.segmentBaseOffset,
8686
fetchInfo.fetchOffsetMetadata.relativePositionInSegment))
8787
)
@@ -155,7 +155,7 @@ final class KafkaMetadataLog private (
155155
val endOffsetMetadata = log.logEndOffsetMetadata
156156
new LogOffsetMetadata(
157157
endOffsetMetadata.messageOffset,
158-
Optional.of(SegmentPosition(
158+
Optional.of(new SegmentPosition(
159159
endOffsetMetadata.segmentBaseOffset,
160160
endOffsetMetadata.relativePositionInSegment)
161161
)
@@ -226,7 +226,7 @@ final class KafkaMetadataLog private (
226226
override def highWatermark: LogOffsetMetadata = {
227227
val hwm = log.fetchOffsetSnapshot.highWatermark
228228
val segmentPosition: Optional[OffsetMetadata] = if (!hwm.messageOffsetOnly) {
229-
Optional.of(SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment))
229+
Optional.of(new SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment))
230230
} else {
231231
Optional.empty()
232232
}

core/src/main/scala/kafka/raft/RaftManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.RequestHeader
4242
import org.apache.kafka.common.security.JaasContext
4343
import org.apache.kafka.common.security.auth.SecurityProtocol
4444
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
45-
import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog}
45+
import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService}
4646
import org.apache.kafka.server.ProcessRole
4747
import org.apache.kafka.server.common.Feature
4848
import org.apache.kafka.server.common.serialization.RecordSerde

core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala

Lines changed: 0 additions & 63 deletions
This file was deleted.

core/src/main/scala/kafka/raft/SegmentPosition.scala renamed to raft/src/main/java/org/apache/kafka/raft/SegmentPosition.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package kafka.raft
17+
package org.apache.kafka.raft;
1818

19-
import org.apache.kafka.raft.OffsetMetadata
19+
public record SegmentPosition(long baseOffset, int relativePosition) implements OffsetMetadata {
2020

21-
case class SegmentPosition(baseOffset: Long, relativePosition: Int) extends OffsetMetadata {
22-
override def toString: String = s"(segmentBaseOffset=$baseOffset,relativePositionInSegment=$relativePosition)"
21+
@Override
22+
public String toString() {
23+
return "(segmentBaseOffset=" + baseOffset + ",relativePositionInSegment=" + relativePosition + ")";
24+
}
2325
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.raft;
18+
19+
import org.apache.kafka.common.errors.TimeoutException;
20+
import org.apache.kafka.server.util.ShutdownableThread;
21+
import org.apache.kafka.server.util.timer.Timer;
22+
import org.apache.kafka.server.util.timer.TimerTask;
23+
24+
import java.util.concurrent.CompletableFuture;
25+
26+
public class TimingWheelExpirationService implements ExpirationService {
27+
28+
private static final long WORK_TIMEOUT_MS = 200L;
29+
30+
private final ExpiredOperationReaper expirationReaper;
31+
private final Timer timer;
32+
33+
public TimingWheelExpirationService(Timer timer) {
34+
this.timer = timer;
35+
this.expirationReaper = new ExpiredOperationReaper();
36+
expirationReaper.start();
37+
}
38+
39+
@Override
40+
public <T> CompletableFuture<T> failAfter(long timeoutMs) {
41+
TimerTaskCompletableFuture<T> task = new TimerTaskCompletableFuture<>(timeoutMs);
42+
task.future.whenComplete((t, throwable) -> task.cancel());
43+
timer.add(task);
44+
return task.future;
45+
}
46+
47+
public void shutdown() throws InterruptedException {
48+
expirationReaper.shutdown();
49+
}
50+
51+
private static class TimerTaskCompletableFuture<T> extends TimerTask {
52+
53+
private final CompletableFuture<T> future = new CompletableFuture<>();
54+
55+
TimerTaskCompletableFuture(long delayMs) {
56+
super(delayMs);
57+
}
58+
59+
@Override
60+
public void run() {
61+
future.completeExceptionally(new TimeoutException("Future failed to be completed before timeout of " + delayMs + " ms was reached"));
62+
}
63+
}
64+
65+
private class ExpiredOperationReaper extends ShutdownableThread {
66+
67+
ExpiredOperationReaper() {
68+
super("raft-expiration-reaper", false);
69+
}
70+
71+
@Override
72+
public void doWork() {
73+
try {
74+
timer.advanceClock(WORK_TIMEOUT_MS);
75+
} catch (InterruptedException e) {
76+
throw new RuntimeException(e);
77+
}
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)