Skip to content

Commit 8c77953

Browse files
authored
MINOR: Revert "migrate LogFetchInfo, Assignment and RequestAndCompletionHandler to java record" (apache#19177)
Revert some java record migration in apache#19062 apache#18783 We assume java record is purely immutable data carriers. As discussed in apache#19062 (comment), if a class has fields that may be mutable, we shouldn't migrate it to Java record because the hashcode/equals behavior are changed. * LogFetchInfo (Records) * Assignment (successCallback) * Remove `equals` method from Assignment since `Assignment` is not and shouldn't be used in Map/Set key. * RequestAndCompletionHandler (handler) Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 3f0e14a commit 8c77953

File tree

11 files changed

+139
-67
lines changed

11 files changed

+139
-67
lines changed

raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ private void updateListenersProgress(long highWatermark) {
429429
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
430430
if (nextExpectedOffset < highWatermark) {
431431
LogFetchInfo readInfo = log.read(nextExpectedOffset, Isolation.COMMITTED);
432-
listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records());
432+
listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records);
433433
}
434434
});
435435
}
@@ -1622,11 +1622,11 @@ private FetchResponseData tryCompleteFetchRequest(
16221622
if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
16231623
LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
16241624

1625-
if (state.updateReplicaState(replicaKey, currentTimeMs, info.startOffsetMetadata())) {
1625+
if (state.updateReplicaState(replicaKey, currentTimeMs, info.startOffsetMetadata)) {
16261626
onUpdateLeaderHighWatermark(state, currentTimeMs);
16271627
}
16281628

1629-
records = info.records();
1629+
records = info.records;
16301630
} else {
16311631
records = MemoryRecords.EMPTY;
16321632
}

raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@
1919
import org.apache.kafka.common.record.Records;
2020

2121
/**
22-
* Metadata for the records fetched from log, including the records itself
22+
* The class is not converted to a Java record since records are typically intended to be immutable, but this one contains a mutable field records
2323
*/
24-
public record LogFetchInfo(Records records, LogOffsetMetadata startOffsetMetadata) { }
24+
public class LogFetchInfo {
25+
26+
public final Records records;
27+
public final LogOffsetMetadata startOffsetMetadata;
28+
29+
public LogFetchInfo(Records records, LogOffsetMetadata startOffsetMetadata) {
30+
this.records = records;
31+
this.startOffsetMetadata = startOffsetMetadata;
32+
}
33+
}

raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ private void maybeLoadLog() {
233233
while (log.endOffset().offset() > nextOffset) {
234234
LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED);
235235
try (RecordsIterator<?> iterator = new RecordsIterator<>(
236-
info.records(),
236+
info.records,
237237
serde,
238238
bufferSupplier,
239239
maxBatchSizeBytes,

raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public void testLeaderWritesBootstrapRecords() throws Exception {
128128
context.unattachedToLeader();
129129

130130
// check if leader writes 3 bootstrap records to the log
131-
Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
131+
Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
132132
RecordBatch batch = records.batches().iterator().next();
133133
assertTrue(batch.isControlBatch());
134134
Iterator<Record> recordIterator = batch.iterator();
@@ -191,7 +191,7 @@ public void testLeaderDoesNotBootstrapRecordsWithKraftVersion0() throws Exceptio
191191
// check leader does not write bootstrap records to log
192192
context.unattachedToLeader();
193193

194-
Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
194+
Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
195195
RecordBatch batch = records.batches().iterator().next();
196196
assertTrue(batch.isControlBatch());
197197
Iterator<Record> recordIterator = batch.iterator();

raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -913,7 +913,7 @@ public void testInitializeAsUnattachedAndBecomeLeader(boolean withKip853Rpc) thr
913913
context.client.poll();
914914
context.assertSentBeginQuorumEpochRequest(1, Set.of(otherNodeId));
915915

916-
Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
916+
Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
917917
RecordBatch batch = records.batches().iterator().next();
918918
assertTrue(batch.isControlBatch());
919919

@@ -962,7 +962,7 @@ public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree(boolean withKi
962962
context.client.poll();
963963
context.assertSentBeginQuorumEpochRequest(2, Set.of(firstNodeId, secondNodeId));
964964

965-
Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
965+
Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
966966
RecordBatch batch = records.batches().iterator().next();
967967
assertTrue(batch.isControlBatch());
968968

raft/src/test/java/org/apache/kafka/raft/MockLog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ public Optional<RawSnapshotWriter> createNewSnapshot(OffsetAndEpoch snapshotId)
513513
);
514514
}
515515

516-
long baseOffset = read(snapshotId.offset(), Isolation.COMMITTED).startOffsetMetadata().offset();
516+
long baseOffset = read(snapshotId.offset(), Isolation.COMMITTED).startOffsetMetadata.offset();
517517
if (snapshotId.offset() != baseOffset) {
518518
throw new IllegalArgumentException(
519519
String.format(

raft/src/test/java/org/apache/kafka/raft/MockLogTest.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public void testAppendControlRecord() {
214214
assertEquals(1, log.endOffset().offset());
215215
assertEquals(currentEpoch, log.lastFetchedEpoch());
216216

217-
Records records = log.read(0, Isolation.UNCOMMITTED).records();
217+
Records records = log.read(0, Isolation.UNCOMMITTED).records;
218218
for (RecordBatch batch : records.batches()) {
219219
assertTrue(batch.isControlBatch());
220220
}
@@ -249,7 +249,7 @@ public void testAppendAsFollower() {
249249
assertEquals(initialOffset + 1, log.endOffset().offset());
250250
assertEquals(3, log.lastFetchedEpoch());
251251

252-
Records records = log.read(5L, Isolation.UNCOMMITTED).records();
252+
Records records = log.read(5L, Isolation.UNCOMMITTED).records;
253253
List<ByteBuffer> extractRecords = new ArrayList<>();
254254
for (Record record : records.records()) {
255255
extractRecords.add(record.value());
@@ -275,7 +275,7 @@ public void testReadRecords() {
275275

276276
appendAsLeader(List.of(recordOne, recordTwo), epoch);
277277

278-
Records records = log.read(0, Isolation.UNCOMMITTED).records();
278+
Records records = log.read(0, Isolation.UNCOMMITTED).records;
279279

280280
List<ByteBuffer> extractRecords = new ArrayList<>();
281281
for (Record record : records.records()) {
@@ -346,12 +346,12 @@ public void testMetadataValidation() {
346346
appendBatch(5, 1);
347347

348348
LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED);
349-
assertEquals(5L, readInfo.startOffsetMetadata().offset());
350-
assertTrue(readInfo.startOffsetMetadata().metadata().isPresent());
349+
assertEquals(5L, readInfo.startOffsetMetadata.offset());
350+
assertTrue(readInfo.startOffsetMetadata.metadata().isPresent());
351351

352352
// Update to a high watermark with valid offset metadata
353-
log.updateHighWatermark(readInfo.startOffsetMetadata());
354-
assertEquals(readInfo.startOffsetMetadata().offset(), log.highWatermark().offset());
353+
log.updateHighWatermark(readInfo.startOffsetMetadata);
354+
assertEquals(readInfo.startOffsetMetadata.offset(), log.highWatermark().offset());
355355

356356
// Now update to a high watermark with invalid metadata
357357
assertThrows(IllegalArgumentException.class, () ->
@@ -360,17 +360,17 @@ public void testMetadataValidation() {
360360

361361
// Ensure we can update the high watermark to the end offset
362362
LogFetchInfo readFromEndInfo = log.read(15L, Isolation.UNCOMMITTED);
363-
assertEquals(15, readFromEndInfo.startOffsetMetadata().offset());
364-
assertTrue(readFromEndInfo.startOffsetMetadata().metadata().isPresent());
365-
log.updateHighWatermark(readFromEndInfo.startOffsetMetadata());
363+
assertEquals(15, readFromEndInfo.startOffsetMetadata.offset());
364+
assertTrue(readFromEndInfo.startOffsetMetadata.metadata().isPresent());
365+
log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
366366

367367
// Ensure that the end offset metadata is valid after new entries are appended
368368
appendBatch(5, 1);
369-
log.updateHighWatermark(readFromEndInfo.startOffsetMetadata());
369+
log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
370370

371371
// Check handling of a fetch from the middle of a batch
372372
LogFetchInfo readFromMiddleInfo = log.read(16L, Isolation.UNCOMMITTED);
373-
assertEquals(readFromEndInfo.startOffsetMetadata(), readFromMiddleInfo.startOffsetMetadata());
373+
assertEquals(readFromEndInfo.startOffsetMetadata, readFromMiddleInfo.startOffsetMetadata);
374374
}
375375

376376
@Test
@@ -1002,7 +1002,7 @@ private Optional<OffsetRange> readOffsets(long startOffset, Isolation isolation)
10021002
while (foundRecord) {
10031003
foundRecord = false;
10041004

1005-
Records records = log.read(currentStart, isolation).records();
1005+
Records records = log.read(currentStart, isolation).records;
10061006
for (Record record : records.records()) {
10071007
foundRecord = true;
10081008

@@ -1081,7 +1081,7 @@ private static void validateReadRecords(List<SimpleRecord> expectedRecords, Mock
10811081

10821082
int currentOffset = 0;
10831083
while (currentOffset < log.endOffset().offset()) {
1084-
Records records = log.read(currentOffset, Isolation.UNCOMMITTED).records();
1084+
Records records = log.read(currentOffset, Isolation.UNCOMMITTED).records;
10851085
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
10861086

10871087
assertFalse(batches.isEmpty());

server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,14 @@ public void shutdown() throws InterruptedException {
8989
private void drainGeneratedRequests() {
9090
generateRequests().forEach(request ->
9191
unsentRequests.put(
92-
request.destination(),
92+
request.destination,
9393
networkClient.newClientRequest(
94-
request.destination().idString(),
95-
request.request(),
96-
request.creationTimeMs(),
94+
request.destination.idString(),
95+
request.request,
96+
request.creationTimeMs,
9797
true,
9898
requestTimeoutMs,
99-
request.handler()
99+
request.handler
100100
)
101101
)
102102
);

server-common/src/main/java/org/apache/kafka/server/util/RequestAndCompletionHandler.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,32 @@
2020
import org.apache.kafka.common.Node;
2121
import org.apache.kafka.common.requests.AbstractRequest;
2222

23-
public record RequestAndCompletionHandler(
24-
long creationTimeMs,
25-
Node destination,
26-
AbstractRequest.Builder<? extends AbstractRequest> request,
27-
RequestCompletionHandler handler
28-
) { }
23+
public final class RequestAndCompletionHandler {
24+
25+
public final long creationTimeMs;
26+
public final Node destination;
27+
public final AbstractRequest.Builder<? extends AbstractRequest> request;
28+
public final RequestCompletionHandler handler;
29+
30+
public RequestAndCompletionHandler(
31+
long creationTimeMs,
32+
Node destination,
33+
AbstractRequest.Builder<? extends AbstractRequest> request,
34+
RequestCompletionHandler handler
35+
) {
36+
this.creationTimeMs = creationTimeMs;
37+
this.destination = destination;
38+
this.request = request;
39+
this.handler = handler;
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return "RequestAndCompletionHandler(" +
45+
"creationTimeMs=" + creationTimeMs +
46+
", destination=" + destination +
47+
", request=" + request +
48+
", handler=" + handler +
49+
')';
50+
}
51+
}

server-common/src/test/java/org/apache/kafka/server/util/InterBrokerSendThreadTest.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -166,15 +166,15 @@ public void testShouldCreateClientRequestAndSendWhenNodeIsReady() {
166166
final TestInterBrokerSendThread sendThread = new TestInterBrokerSendThread();
167167

168168
final ClientRequest clientRequest =
169-
new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler());
169+
new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler);
170170

171171
when(networkClient.newClientRequest(
172172
ArgumentMatchers.eq("1"),
173-
same(handler.request()),
173+
same(handler.request),
174174
anyLong(),
175175
ArgumentMatchers.eq(true),
176176
ArgumentMatchers.eq(requestTimeoutMs),
177-
same(handler.handler())
177+
same(handler.handler)
178178
)).thenReturn(clientRequest);
179179

180180
when(networkClient.ready(node, time.milliseconds())).thenReturn(true);
@@ -187,11 +187,11 @@ public void testShouldCreateClientRequestAndSendWhenNodeIsReady() {
187187
verify(networkClient)
188188
.newClientRequest(
189189
ArgumentMatchers.eq("1"),
190-
same(handler.request()),
190+
same(handler.request),
191191
anyLong(),
192192
ArgumentMatchers.eq(true),
193193
ArgumentMatchers.eq(requestTimeoutMs),
194-
same(handler.handler()));
194+
same(handler.handler));
195195
verify(networkClient).ready(any(), anyLong());
196196
verify(networkClient).send(same(clientRequest), anyLong());
197197
verify(networkClient).poll(anyLong(), anyLong());
@@ -209,15 +209,15 @@ public void testShouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotRe
209209
final TestInterBrokerSendThread sendThread = new TestInterBrokerSendThread();
210210

211211
final ClientRequest clientRequest =
212-
new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler());
212+
new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler);
213213

214214
when(networkClient.newClientRequest(
215215
ArgumentMatchers.eq("1"),
216-
same(handler.request()),
216+
same(handler.request),
217217
anyLong(),
218218
ArgumentMatchers.eq(true),
219219
ArgumentMatchers.eq(requestTimeoutMs),
220-
same(handler.handler())
220+
same(handler.handler)
221221
)).thenReturn(clientRequest);
222222

223223
when(networkClient.ready(node, time.milliseconds())).thenReturn(false);
@@ -236,11 +236,11 @@ public void testShouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotRe
236236
verify(networkClient)
237237
.newClientRequest(
238238
ArgumentMatchers.eq("1"),
239-
same(handler.request()),
239+
same(handler.request),
240240
anyLong(),
241241
ArgumentMatchers.eq(true),
242242
ArgumentMatchers.eq(requestTimeoutMs),
243-
same(handler.handler()));
243+
same(handler.handler));
244244
verify(networkClient).ready(any(), anyLong());
245245
verify(networkClient).connectionDelay(any(), anyLong());
246246
verify(networkClient).poll(anyLong(), anyLong());
@@ -261,16 +261,16 @@ public void testFailingExpiredRequests() {
261261

262262
final ClientRequest clientRequest =
263263
new ClientRequest(
264-
"dest", request, 0, "1", time.milliseconds(), true, requestTimeoutMs, handler.handler());
264+
"dest", request, 0, "1", time.milliseconds(), true, requestTimeoutMs, handler.handler);
265265
time.sleep(1500L);
266266

267267
when(networkClient.newClientRequest(
268268
ArgumentMatchers.eq("1"),
269-
same(handler.request()),
270-
ArgumentMatchers.eq(handler.creationTimeMs()),
269+
same(handler.request),
270+
ArgumentMatchers.eq(handler.creationTimeMs),
271271
ArgumentMatchers.eq(true),
272272
ArgumentMatchers.eq(requestTimeoutMs),
273-
same(handler.handler())
273+
same(handler.handler)
274274
)).thenReturn(clientRequest);
275275

276276
// make the node unready so the request is not cleared
@@ -289,11 +289,11 @@ public void testFailingExpiredRequests() {
289289
verify(networkClient)
290290
.newClientRequest(
291291
ArgumentMatchers.eq("1"),
292-
same(handler.request()),
293-
ArgumentMatchers.eq(handler.creationTimeMs()),
292+
same(handler.request),
293+
ArgumentMatchers.eq(handler.creationTimeMs),
294294
ArgumentMatchers.eq(true),
295295
ArgumentMatchers.eq(requestTimeoutMs),
296-
same(handler.handler()));
296+
same(handler.handler));
297297
verify(networkClient).ready(any(), anyLong());
298298
verify(networkClient).connectionDelay(any(), anyLong());
299299
verify(networkClient).poll(anyLong(), anyLong());

0 commit comments

Comments
 (0)