Skip to content

Commit 12d611b

Browse files
authored
[#2353] Improvement: Fix the warning: unchecked method invocation: method sendCachedBuffer in class RMRecordsReader.RecordsCombiner is applied to given types (#2358)
### What changes were proposed in this pull request? Fix the warning: unchecked method invocation: method `sendCachedBuffer` in class `RMRecordsReader.RecordsCombiner` is applied to given types ### Why are the changes needed? Fix: #2353 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? current UT <img width="1114" alt="image" src="https://github.com/user-attachments/assets/1ac86c0d-d902-47e0-8ff9-1de3fbc3299c" />
1 parent e2523ec commit 12d611b

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

client/src/main/java/org/apache/uniffle/client/record/reader/RMRecordsReader.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ class RecordsFetcher extends Thread {
448448
RecordsFetcher(int partitionId) {
449449
this.partitionId = partitionId;
450450
this.sleepTime = initFetchSleepTime;
451-
this.recordBuffer = new RecordBuffer(partitionId);
451+
this.recordBuffer = new RecordBuffer<>(partitionId);
452452
this.nextQueue =
453453
combiner == null ? mergeBuffers.get(partitionId) : combineBuffers.get(partitionId);
454454
this.serverInfos = shuffleServerInfoMap.get(partitionId);
@@ -512,7 +512,7 @@ public void run() {
512512
// split into two different threads, then will be asynchronous processes. Although it
513513
// seems to save time, it actually consumes more memory.
514514
reader =
515-
new RecordsReader(
515+
new RecordsReader<>(
516516
rssConf,
517517
SerInputStream.newInputStream(byteBuf),
518518
keyClass,
@@ -526,7 +526,7 @@ public void run() {
526526
}
527527
if (recordBuffer.size() >= maxRecordsNumPerBuffer) {
528528
nextQueue.put(recordBuffer);
529-
recordBuffer = new RecordBuffer(partitionId);
529+
recordBuffer = new RecordBuffer<>(partitionId);
530530
}
531531
recordBuffer.addRecord(reader.getCurrentKey(), reader.getCurrentValue());
532532
}
@@ -561,12 +561,12 @@ class RecordsCombiner extends Thread {
561561
// The RecordBuffer has a capacity limit, records for the same key may be
562562
// distributed in different RecordBuffers. So we need a cachedBuffer used
563563
// to record the buffer of the last combine.
564-
private RecordBuffer cached;
564+
private RecordBuffer<K, C> cached;
565565
private Queue<RecordBuffer> nextQueue;
566566

567567
RecordsCombiner(int partitionId) {
568568
this.partitionId = partitionId;
569-
this.cached = new RecordBuffer(partitionId);
569+
this.cached = new RecordBuffer<>(partitionId);
570570
this.nextQueue = mergeBuffers.get(partitionId);
571571
setName("RecordsCombiner-" + partitionId);
572572
}
@@ -589,13 +589,13 @@ public void run() {
589589
// we can send the cached to downstream directly.
590590
if (cached.size() > 0 && !isSameKey(cached.getLastKey(), current.getFirstKey())) {
591591
sendCachedBuffer(cached);
592-
cached = new RecordBuffer(partitionId);
592+
cached = new RecordBuffer<>(partitionId);
593593
}
594594

595595
// 3 combine the current, then cache it. By this way, we can handle the specical case
596596
// that next record
597597
// buffer has same key in current.
598-
RecordBlob recordBlob = new RecordBlob(partitionId);
598+
RecordBlob recordBlob = new RecordBlob<>(partitionId);
599599
recordBlob.addRecords(current);
600600
recordBlob.combine(combiner, isMapCombine);
601601
for (Object record : recordBlob.getResult()) {
@@ -616,7 +616,7 @@ public void run() {
616616
private void sendCachedBuffer(RecordBuffer<K, C> cachedBuffer) throws InterruptedException {
617617
// Multiple records with the same key may span different recordbuffers. we were only combined
618618
// within the same recordbuffer. So before send to downstream, we should combine the cached.
619-
RecordBlob recordBlob = new RecordBlob(partitionId);
619+
RecordBlob recordBlob = new RecordBlob<K, C, Object>(partitionId);
620620
recordBlob.addRecords(cachedBuffer);
621621
recordBlob.combine(combiner, true);
622622
RecordBuffer recordBuffer = new RecordBuffer<>(partitionId);

0 commit comments

Comments
 (0)