Skip to content

Commit 71361ee

Browse files
committed
[lake/tiering] add total bytes read tracking and metrics for tiering jobs
1 parent 05b6f66 commit 71361ee

File tree

12 files changed

+183
-22
lines changed

12 files changed

+183
-22
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanner.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@ public interface BatchScanner extends Closeable {
4747
@Nullable
4848
CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException;
4949

50+
/**
51+
* Returns the total bytes read from the underlying storage.
52+
*
53+
* <p>For snapshot scanners, this is the sum of value lengths read from RocksDB. For other
54+
* scanners, this may return 0 if not applicable.
55+
*
56+
* @return total bytes read, or 0 if not applicable
57+
*/
58+
default long getTotalBytesRead() {
59+
throw new UnsupportedOperationException("Not supported.");
60+
}
61+
5062
/** Closes the scanner and should release all resources. */
5163
@Override
5264
void close() throws IOException;

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScanner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ public void close() {
180180
}
181181
}
182182

183+
@Override
184+
public long getTotalBytesRead() {
185+
return snapshotFilesReader != null ? snapshotFilesReader.getTotalBytesRead() : 0;
186+
}
187+
183188
private void initReaderAsynchronously() {
184189
CompletableFuture.runAsync(
185190
() ->

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ class SnapshotFilesReader implements CloseableIterator<InternalRow> {
7474

7575
private final CloseableRegistry closeableRegistry;
7676

77+
/** Total bytes read from RocksDB (sum of key + value lengths). */
78+
private long totalBytesRead = 0;
79+
7780
SnapshotFilesReader(
7881
KvFormat kvFormat,
7982
Path rocksDbPath,
@@ -160,9 +163,16 @@ public boolean hasNext() {
160163
return !isClose && rocksIteratorWrapper.isValid();
161164
}
162165

166+
/** Returns the total bytes read from RocksDB (sum of value lengths). */
167+
public long getTotalBytesRead() {
168+
return totalBytesRead;
169+
}
170+
163171
@Override
164172
public InternalRow next() {
165173
byte[] value = rocksIteratorWrapper.value();
174+
// accumulate bytes read (key + value)
175+
totalBytesRead += value.length;
166176
rocksIteratorWrapper.next();
167177

168178
BinaryValue originValue = valueDecoder.decodeValue(value);

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ public LogFetchCollector(
8383
* @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
8484
* the defaultResetPolicy is NONE
8585
*/
86-
public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logFetchBuffer) {
86+
public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
8787
Map<TableBucket, List<ScanRecord>> fetched = new HashMap<>();
88+
long totalSizeInBytes = 0;
8889
int recordsRemaining = maxPollRecords;
8990

9091
try {
@@ -109,6 +110,8 @@ public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logF
109110
}
110111
throw e;
111112
}
113+
// count bytes the first time we see this CompletedFetch
114+
totalSizeInBytes += completedFetch.sizeInBytes;
112115
} else {
113116
logFetchBuffer.setNextInLineFetch(completedFetch);
114117
}
@@ -143,7 +146,7 @@ public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logF
143146
}
144147
}
145148

146-
return fetched;
149+
return new ScanRecords(fetched, totalSizeInBytes);
147150
}
148151

149152
private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.fluss.client.metadata.MetadataUpdater;
2323
import org.apache.fluss.client.metrics.ScannerMetricGroup;
2424
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
25-
import org.apache.fluss.client.table.scanner.ScanRecord;
2625
import org.apache.fluss.cluster.BucketLocation;
2726
import org.apache.fluss.config.ConfigOptions;
2827
import org.apache.fluss.config.Configuration;
@@ -161,7 +160,7 @@ public boolean hasAvailableFetches() {
161160
return !logFetchBuffer.isEmpty();
162161
}
163162

164-
public Map<TableBucket, List<ScanRecord>> collectFetch() {
163+
public ScanRecords collectFetch() {
165164
return logFetchCollector.collectFetch(logFetchBuffer);
166165
}
167166

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.client.metadata.MetadataUpdater;
2222
import org.apache.fluss.client.metrics.ScannerMetricGroup;
2323
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
24-
import org.apache.fluss.client.table.scanner.ScanRecord;
2524
import org.apache.fluss.config.Configuration;
2625
import org.apache.fluss.exception.WakeupException;
2726
import org.apache.fluss.metadata.SchemaGetter;
@@ -41,8 +40,6 @@
4140
import java.time.Duration;
4241
import java.util.Collections;
4342
import java.util.ConcurrentModificationException;
44-
import java.util.List;
45-
import java.util.Map;
4643
import java.util.concurrent.atomic.AtomicInteger;
4744
import java.util.concurrent.atomic.AtomicLong;
4845

@@ -141,25 +138,25 @@ public ScanRecords poll(Duration timeout) {
141138
long timeoutNanos = timeout.toNanos();
142139
long startNanos = System.nanoTime();
143140
do {
144-
Map<TableBucket, List<ScanRecord>> fetchRecords = pollForFetches();
145-
if (fetchRecords.isEmpty()) {
141+
ScanRecords scanRecords = pollForFetches();
142+
if (scanRecords.isEmpty()) {
146143
try {
147144
if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) {
148145
// logFetcher waits for the timeout and no data in buffer,
149146
// so we return empty
150-
return new ScanRecords(fetchRecords);
147+
return scanRecords;
151148
}
152149
} catch (WakeupException e) {
153150
// wakeup() is called, we need to return empty
154-
return new ScanRecords(fetchRecords);
151+
return scanRecords;
155152
}
156153
} else {
157154
// before returning the fetched records, we can send off the next round of
158155
// fetches and avoid block waiting for their responses to enable pipelining
159156
// while the user is handling the fetched records.
160157
logFetcher.sendFetches();
161158

162-
return new ScanRecords(fetchRecords);
159+
return scanRecords;
163160
}
164161
} while (System.nanoTime() - startNanos < timeoutNanos);
165162

@@ -247,10 +244,10 @@ public void wakeup() {
247244
logFetcher.wakeup();
248245
}
249246

250-
private Map<TableBucket, List<ScanRecord>> pollForFetches() {
251-
Map<TableBucket, List<ScanRecord>> fetchedRecords = logFetcher.collectFetch();
252-
if (!fetchedRecords.isEmpty()) {
253-
return fetchedRecords;
247+
private ScanRecords pollForFetches() {
248+
ScanRecords scanRecords = logFetcher.collectFetch();
249+
if (!scanRecords.isEmpty()) {
250+
return scanRecords;
254251
}
255252

256253
// send any new fetches (won't resend pending fetches).

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,24 @@ public class ScanRecords implements Iterable<ScanRecord> {
4040
public static final ScanRecords EMPTY = new ScanRecords(Collections.emptyMap());
4141

4242
private final Map<TableBucket, List<ScanRecord>> records;
43+
private final long sizeInBytes;
4344

4445
public ScanRecords(Map<TableBucket, List<ScanRecord>> records) {
46+
this(records, 0L);
47+
}
48+
49+
public ScanRecords(Map<TableBucket, List<ScanRecord>> records, long sizeInBytes) {
4550
this.records = records;
51+
this.sizeInBytes = sizeInBytes;
52+
}
53+
54+
/**
55+
* Total size in bytes of the fetch response that produced this record set. For log records,
56+
* this reflects the actual bytes received from the server (including batch headers). Returns 0
57+
* if size information is unavailable (e.g. for empty or synthetic records).
58+
*/
59+
public long sizeInBytes() {
60+
return sizeInBytes;
4661
}
4762

4863
/**

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,4 +280,12 @@ public class MetricNames {
280280
public static final String NETTY_NUM_ALLOCATIONS_PER_SECONDS = "numAllocationsPerSecond";
281281
public static final String NETTY_NUM_HUGE_ALLOCATIONS_PER_SECONDS =
282282
"numHugeAllocationsPerSecond";
283+
284+
// --------------------------------------------------------------------------------------------
285+
// metrics for tiering service
286+
// --------------------------------------------------------------------------------------------
287+
288+
// for lake tiering metrics - operator level
289+
public static final String TIERING_SERVICE_READ_BYTES = "readBytes";
290+
public static final String TIERING_SERVICE_READ_BYTES_RATE = "readBytesPerSecond";
283291
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ public void close() throws Exception {
134134
splitScanner.close();
135135
}
136136

137+
/** Returns the total bytes read from the underlying scanner. */
138+
public long getTotalBytesRead() {
139+
return splitScanner.getTotalBytesRead();
140+
}
141+
137142
private static class ScanRecordBatch implements CloseableIterator<ScanRecord> {
138143
private final CloseableIterator<InternalRow> rowIterator;
139144
private int currentSplitIndex;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.client.Connection;
2323
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
2424
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
25+
import org.apache.fluss.flink.tiering.source.metrics.TieringReaderMetrics;
2526
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
2627
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
2728
import org.apache.fluss.lake.writer.LakeTieringFactory;
@@ -73,17 +74,31 @@ public TieringSourceReader(
7374
Duration pollTimeout) {
7475
super(
7576
elementsQueue,
76-
new TieringSourceFetcherManager<>(
77-
elementsQueue,
78-
() -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout),
79-
context.getConfiguration(),
80-
(ignore) -> {}),
77+
createFetcherManager(
78+
elementsQueue, context, connection, lakeTieringFactory, pollTimeout),
8179
new TableBucketWriteResultEmitter<>(),
8280
context.getConfiguration(),
8381
context);
8482
this.connection = connection;
8583
}
8684

85+
private static <WriteResult> TieringSourceFetcherManager<WriteResult> createFetcherManager(
86+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
87+
elementsQueue,
88+
SourceReaderContext context,
89+
Connection connection,
90+
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
91+
Duration pollTimeout) {
92+
TieringReaderMetrics tieringReaderMetrics = new TieringReaderMetrics(context.metricGroup());
93+
return new TieringSourceFetcherManager<>(
94+
elementsQueue,
95+
() ->
96+
new TieringSplitReader<>(
97+
connection, lakeTieringFactory, pollTimeout, tieringReaderMetrics),
98+
context.getConfiguration(),
99+
(ignore) -> {});
100+
}
101+
87102
@Override
88103
public void start() {
89104
// we request a split only if we did not get splits during the checkpoint restore

0 commit comments

Comments
 (0)