Skip to content

Commit f234e83

Browse files
committed
add bytes read tracking for log fetch operations
1 parent 179241f commit f234e83

File tree

8 files changed

+145
-9
lines changed

8 files changed

+145
-9
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ boolean isConsumed() {
109109
return isConsumed;
110110
}
111111

112+
public int getSizeInBytes() {
113+
return sizeInBytes;
114+
}
115+
112116
boolean isInitialized() {
113117
return initialized;
114118
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public LogFetchCollector(
8686
public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
8787
Map<TableBucket, List<ScanRecord>> fetched = new HashMap<>();
8888
int recordsRemaining = maxPollRecords;
89+
long totalBytesRead = 0;
8990

9091
try {
9192
while (recordsRemaining > 0) {
@@ -135,6 +136,11 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
135136

136137
recordsRemaining -= records.size();
137138
}
139+
140+
// Only count bytes when the fetch is fully consumed
141+
if (nextInLineFetch.isConsumed()) {
142+
totalBytesRead += nextInLineFetch.getSizeInBytes();
143+
}
138144
}
139145
}
140146
} catch (FetchException e) {
@@ -143,7 +149,7 @@ public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
143149
}
144150
}
145151

146-
return new ScanRecords(fetched);
152+
return new ScanRecords(fetched, totalBytesRead);
147153
}
148154

149155
private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,27 @@
3737
*/
3838
@PublicEvolving
3939
public class ScanRecords implements Iterable<ScanRecord> {
40-
public static final ScanRecords EMPTY = new ScanRecords(Collections.emptyMap());
40+
public static final ScanRecords EMPTY = new ScanRecords(Collections.emptyMap(), 0);
4141

4242
private final Map<TableBucket, List<ScanRecord>> records;
43+
private final long totalBytesRead;
4344

4445
public ScanRecords(Map<TableBucket, List<ScanRecord>> records) {
46+
this(records, 0);
47+
}
48+
49+
public ScanRecords(Map<TableBucket, List<ScanRecord>> records, long totalBytesRead) {
4550
this.records = records;
51+
this.totalBytesRead = totalBytesRead;
52+
}
53+
54+
/**
55+
* Get the total bytes read from the Fluss log in this batch.
56+
*
57+
* @return the total bytes read
58+
*/
59+
public long getTotalBytesRead() {
60+
return totalBytesRead;
4661
}
4762

4863
/**

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,41 @@ void testCollectAfterUnassign() throws Exception {
153153
assertThat(bucketAndRecords.buckets().size()).isEqualTo(0);
154154
}
155155

156+
@Test
157+
void testTotalBytesRead() throws Exception {
158+
TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 1L, 1);
159+
TableBucket tb2 = new TableBucket(DATA1_TABLE_ID, 1L, 2);
160+
Map<TableBucket, Long> scanBuckets = new HashMap<>();
161+
scanBuckets.put(tb1, 0L);
162+
scanBuckets.put(tb2, 0L);
163+
logScannerStatus.assignScanBuckets(scanBuckets);
164+
165+
CompletedFetch completedFetch1 =
166+
makeCompletedFetch(
167+
tb1,
168+
new FetchLogResultForBucket(tb1, genMemoryLogRecordsByObject(DATA1), 10L),
169+
0L);
170+
CompletedFetch completedFetch2 =
171+
makeCompletedFetch(
172+
tb2,
173+
new FetchLogResultForBucket(tb2, genMemoryLogRecordsByObject(DATA1), 10L),
174+
0L);
175+
176+
logFetchBuffer.add(completedFetch1);
177+
logFetchBuffer.add(completedFetch2);
178+
179+
ScanRecords scanRecords = logFetchCollector.collectFetch(logFetchBuffer);
180+
181+
// Both fetches should be fully consumed
182+
assertThat(completedFetch1.isConsumed()).isTrue();
183+
assertThat(completedFetch2.isConsumed()).isTrue();
184+
185+
// totalBytesRead should be the sum of both completed fetches
186+
long expectedBytes = completedFetch1.getSizeInBytes() + completedFetch2.getSizeInBytes();
187+
assertThat(scanRecords.getTotalBytesRead()).isEqualTo(expectedBytes);
188+
assertThat(scanRecords.getTotalBytesRead()).isGreaterThan(0);
189+
}
190+
156191
private DefaultCompletedFetch makeCompletedFetch(
157192
TableBucket tableBucket, FetchLogResultForBucket resultForBucket, long offset) {
158193
return new DefaultCompletedFetch(

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,18 @@ void iterator() {
5757
}
5858
assertThat(c).isEqualTo(4);
5959
}
60+
61+
@Test
62+
void testTotalBytesRead() {
63+
Map<TableBucket, List<ScanRecord>> records = new LinkedHashMap<>();
64+
long tableId = 0;
65+
ScanRecord record1 = new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a"));
66+
records.put(new TableBucket(tableId, 0), Arrays.asList(record1));
67+
68+
// New constructor carries totalBytesRead
69+
assertThat(new ScanRecords(records, 1024L).getTotalBytesRead()).isEqualTo(1024L);
70+
71+
// Old constructor defaults to 0 for backward compatibility
72+
assertThat(new ScanRecords(records).getTotalBytesRead()).isEqualTo(0L);
73+
}
6074
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.fluss.metadata.TableBucket;
3535
import org.apache.fluss.metadata.TableInfo;
3636
import org.apache.fluss.metadata.TablePath;
37+
import org.apache.fluss.row.BinaryRow;
38+
import org.apache.fluss.row.InternalRow;
3739
import org.apache.fluss.utils.CloseableIterator;
3840

3941
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -355,6 +357,9 @@ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> forLogRecords(
355357
Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = new HashMap<>();
356358
Map<TableBucket, String> finishedSplitIds = new HashMap<>();
357359
LOG.info("for log records to tier table {}.", currentTableId);
360+
361+
// Report bytes read from Fluss log records
362+
tieringMetrics.recordBytesRead(scanRecords.getTotalBytesRead());
358363
for (TableBucket bucket : scanRecords.buckets()) {
359364
LOG.info("tiering table bucket {}.", bucket);
360365
List<ScanRecord> bucketScanRecords = scanRecords.records(bucket);
@@ -443,7 +448,6 @@ private TableBucketWriteResult<WriteResult> completeLakeWriter(
443448
WriteResult writeResult = null;
444449
if (lakeWriter != null) {
445450
writeResult = lakeWriter.complete();
446-
tieringMetrics.recordBytesWritten(lakeWriter.getBytesWritten());
447451
lakeWriter.close();
448452
}
449453
return toTableBucketWriteResult(
@@ -510,10 +514,15 @@ private TableBucketWriteResultWithSplitIds forSnapshotSplitRecords(
510514
LakeWriter<WriteResult> lakeWriter =
511515
getOrCreateLakeWriter(
512516
bucket, checkNotNull(currentSnapshotSplit).getPartitionName());
517+
long bytesRead = 0;
513518
while (recordIterator.hasNext()) {
514519
ScanRecord scanRecord = recordIterator.next().record();
515520
lakeWriter.write(scanRecord);
521+
InternalRow row = scanRecord.getRow();
522+
// Snapshot path always produces BinaryRow (CompactedRow/IndexedRow).
523+
bytesRead += ((BinaryRow) row).getSizeInBytes();
516524
}
525+
tieringMetrics.recordBytesRead(bytesRead);
517526
recordIterator.close();
518527
return emptyTableBucketWriteResultWithSplitIds();
519528
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/metrics/TieringMetrics.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@
3434
* <p>The following metrics are available:
3535
*
3636
* <ul>
37-
* <li>{@code fluss.tieringService.writtenBytes} - Counter: cumulative bytes written to the lake
38-
* since the job started (physical file sizes).
39-
* <li>{@code fluss.tieringService.writtenBytesPerSecond} - Meter: write bytes-per-second rate
40-
* derived from the counter using a 60-second sliding window.
37+
* <li>{@code fluss.tieringService.writtenBytes} - Counter: cumulative bytes read from Fluss
38+
* records since the job started (actual Fluss records size).
39+
* <li>{@code fluss.tieringService.writtenBytesPerSecond} - Meter: bytes-per-second rate derived
40+
* from the counter using a 60-second sliding window.
4141
* </ul>
4242
*/
4343
@Internal
@@ -61,8 +61,8 @@ public TieringMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
6161
MetricNames.TIERING_SERVICE_WRITTEN_BYTES_RATE, new MeterView(writtenBytesCounter));
6262
}
6363

64-
/** Records bytes written to the lake. Called once per bucket completion. */
65-
public void recordBytesWritten(long bytes) {
64+
/** Records bytes read from Fluss records. Called per batch or record processing. */
65+
public void recordBytesRead(long bytes) {
6666
writtenBytesCounter.inc(bytes);
6767
}
6868
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.tiering.source;
19+
20+
import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics;
21+
import org.apache.fluss.metrics.MetricNames;
22+
23+
import org.apache.flink.metrics.Counter;
24+
import org.apache.flink.metrics.testutils.MetricListener;
25+
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
26+
import org.junit.jupiter.api.Test;
27+
28+
import java.util.Optional;
29+
30+
import static org.assertj.core.api.Assertions.assertThat;
31+
32+
/** Test for {@link TieringMetrics}. */
33+
class TieringMetricsTest {
34+
35+
@Test
36+
void testRecordBytesRead() {
37+
MetricListener metricListener = new MetricListener();
38+
TieringMetrics tieringMetrics =
39+
new TieringMetrics(
40+
InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
41+
42+
tieringMetrics.recordBytesRead(100);
43+
tieringMetrics.recordBytesRead(200);
44+
45+
Optional<Counter> counter =
46+
metricListener.getCounter(
47+
TieringMetrics.FLUSS_METRIC_GROUP,
48+
TieringMetrics.TIERING_SERVICE_GROUP,
49+
MetricNames.TIERING_SERVICE_WRITTEN_BYTES);
50+
assertThat(counter).isPresent();
51+
assertThat(counter.get().getCount()).isEqualTo(300);
52+
}
53+
}

0 commit comments

Comments
 (0)