Skip to content

Commit 5234a72

Browse files
committed
update LogFetchCollector to use ScanRecords type
1 parent 71361ee commit 5234a72

File tree

2 files changed

+20
-24
lines changed

2 files changed

+20
-24
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.fluss.client.metadata.MetadataUpdater;
2121
import org.apache.fluss.client.metadata.TestingMetadataUpdater;
22-
import org.apache.fluss.client.table.scanner.ScanRecord;
2322
import org.apache.fluss.config.Configuration;
2423
import org.apache.fluss.metadata.TableBucket;
2524
import org.apache.fluss.record.LogRecordReadContext;
@@ -31,7 +30,6 @@
3130

3231
import java.util.Collections;
3332
import java.util.HashMap;
34-
import java.util.List;
3533
import java.util.Map;
3634

3735
import static org.apache.fluss.record.TestData.DATA1;
@@ -97,10 +95,9 @@ void testNormal() throws Exception {
9795
assertThat(completedFetch.isInitialized()).isFalse();
9896

9997
// Fetch the data and validate that we get all the records we want back.
100-
Map<TableBucket, List<ScanRecord>> bucketAndRecords =
101-
logFetchCollector.collectFetch(logFetchBuffer);
102-
assertThat(bucketAndRecords.size()).isEqualTo(1);
103-
assertThat(bucketAndRecords.get(tb)).size().isEqualTo(10);
98+
ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
99+
assertThat(bucketAndRecords.buckets().size()).isEqualTo(1);
100+
assertThat(bucketAndRecords.records(tb).size()).isEqualTo(10);
104101

105102
// When we collected the data from the buffer, this will cause the completed fetch to get
106103
// initialized.
@@ -122,7 +119,7 @@ void testNormal() throws Exception {
122119

123120
// Now attempt to collect more records from the fetch buffer.
124121
bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
125-
assertThat(bucketAndRecords.size()).isEqualTo(0);
122+
assertThat(bucketAndRecords.buckets().size()).isEqualTo(0);
126123
}
127124

128125
@Test
@@ -147,14 +144,13 @@ void testCollectAfterUnassign() throws Exception {
147144
// unassign bucket 2
148145
logScannerStatus.unassignScanBuckets(Collections.singletonList(tb2));
149146

150-
Map<TableBucket, List<ScanRecord>> bucketAndRecords =
151-
logFetchCollector.collectFetch(logFetchBuffer);
147+
ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
152148
// should only contain records for bucket 1
153-
assertThat(bucketAndRecords.keySet()).containsExactly(tb1);
149+
assertThat(bucketAndRecords.buckets()).containsExactly(tb1);
154150

155151
// collect again, should be empty
156152
bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
157-
assertThat(bucketAndRecords.size()).isEqualTo(0);
153+
assertThat(bucketAndRecords.buckets().size()).isEqualTo(0);
158154
}
159155

160156
private DefaultCompletedFetch makeCompletedFetch(

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,9 @@ void testFetchWithSchemaChange() throws Exception {
147147
assertThat(logFetcher.hasAvailableFetches()).isTrue();
148148
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
149149
});
150-
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
151-
assertThat(records.size()).isEqualTo(1);
152-
List<ScanRecord> scanRecords = records.get(tb0);
150+
ScanRecords records = logFetcher.collectFetch();
151+
assertThat(records.buckets().size()).isEqualTo(1);
152+
List<ScanRecord> scanRecords = records.records(tb0);
153153
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
154154
.isEqualTo(expectedRows);
155155

@@ -193,9 +193,9 @@ void testFetchWithSchemaChange() throws Exception {
193193
assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2);
194194
});
195195
records = newSchemaLogFetcher.collectFetch();
196-
assertThat(records.size()).isEqualTo(1);
197-
assertThat(records.get(tb0)).hasSize(20);
198-
scanRecords = records.get(tb0);
196+
assertThat(records.buckets().size()).isEqualTo(1);
197+
assertThat(records.records(tb0)).hasSize(20);
198+
scanRecords = records.records(tb0);
199199
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
200200
.isEqualTo(expectedRows);
201201
newSchemaLogFetcher.close();
@@ -226,10 +226,10 @@ void testFetch() throws Exception {
226226
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
227227
});
228228

229-
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
230-
assertThat(records.size()).isEqualTo(2);
231-
assertThat(records.get(tb0).size()).isEqualTo(10);
232-
assertThat(records.get(tb1).size()).isEqualTo(10);
229+
ScanRecords records = logFetcher.collectFetch();
230+
assertThat(records.buckets().size()).isEqualTo(2);
231+
assertThat(records.records(tb0).size()).isEqualTo(10);
232+
assertThat(records.records(tb1).size()).isEqualTo(10);
233233

234234
// after collect fetch, the fetcher is empty.
235235
assertThat(logFetcher.hasAvailableFetches()).isFalse();
@@ -297,9 +297,9 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
297297
assertThat(logFetcher.hasAvailableFetches()).isTrue();
298298
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(1);
299299
});
300-
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
301-
assertThat(records.size()).isEqualTo(1);
302-
assertThat(records.get(tb0).size()).isEqualTo(10);
300+
ScanRecords records = logFetcher.collectFetch();
301+
assertThat(records.buckets().size()).isEqualTo(1);
302+
assertThat(records.records(tb0).size()).isEqualTo(10);
303303
}
304304

305305
@Test

0 commit comments

Comments
 (0)