Skip to content

Commit 5c3992c

Browse files
committed
[lake/tiering] add bytes read tracking for log fetch operations
1 parent b03cec4 commit 5c3992c

File tree

16 files changed

+316
-51
lines changed

16 files changed

+316
-51
lines changed

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ boolean isConsumed() {
109109
return isConsumed;
110110
}
111111

112+
public int getSizeInBytes() {
113+
return sizeInBytes;
114+
}
115+
112116
boolean isInitialized() {
113117
return initialized;
114118
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,10 @@ public LogFetchCollector(
8383
* @throws LogOffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
8484
* the defaultResetPolicy is NONE
8585
*/
86-
public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logFetchBuffer) {
86+
public ScanRecords collectFetch(final LogFetchBuffer logFetchBuffer) {
8787
Map<TableBucket, List<ScanRecord>> fetched = new HashMap<>();
8888
int recordsRemaining = maxPollRecords;
89+
long totalBytesRead = 0;
8990

9091
try {
9192
while (recordsRemaining > 0) {
@@ -135,6 +136,11 @@ public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logF
135136

136137
recordsRemaining -= records.size();
137138
}
139+
140+
// Only count bytes when the fetch is fully consumed
141+
if (nextInLineFetch.isConsumed()) {
142+
totalBytesRead += nextInLineFetch.getSizeInBytes();
143+
}
138144
}
139145
}
140146
} catch (FetchException e) {
@@ -143,7 +149,7 @@ public Map<TableBucket, List<ScanRecord>> collectFetch(final LogFetchBuffer logF
143149
}
144150
}
145151

146-
return fetched;
152+
return new ScanRecords(fetched, totalBytesRead);
147153
}
148154

149155
private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.fluss.client.metadata.MetadataUpdater;
2323
import org.apache.fluss.client.metrics.ScannerMetricGroup;
2424
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
25-
import org.apache.fluss.client.table.scanner.ScanRecord;
2625
import org.apache.fluss.cluster.BucketLocation;
2726
import org.apache.fluss.config.ConfigOptions;
2827
import org.apache.fluss.config.Configuration;
@@ -161,7 +160,7 @@ public boolean hasAvailableFetches() {
161160
return !logFetchBuffer.isEmpty();
162161
}
163162

164-
public Map<TableBucket, List<ScanRecord>> collectFetch() {
163+
public ScanRecords collectFetch() {
165164
return logFetchCollector.collectFetch(logFetchBuffer);
166165
}
167166

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.client.metadata.MetadataUpdater;
2222
import org.apache.fluss.client.metrics.ScannerMetricGroup;
2323
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
24-
import org.apache.fluss.client.table.scanner.ScanRecord;
2524
import org.apache.fluss.config.Configuration;
2625
import org.apache.fluss.exception.WakeupException;
2726
import org.apache.fluss.metadata.SchemaGetter;
@@ -41,8 +40,6 @@
4140
import java.time.Duration;
4241
import java.util.Collections;
4342
import java.util.ConcurrentModificationException;
44-
import java.util.List;
45-
import java.util.Map;
4643
import java.util.concurrent.atomic.AtomicInteger;
4744
import java.util.concurrent.atomic.AtomicLong;
4845

@@ -141,25 +138,25 @@ public ScanRecords poll(Duration timeout) {
141138
long timeoutNanos = timeout.toNanos();
142139
long startNanos = System.nanoTime();
143140
do {
144-
Map<TableBucket, List<ScanRecord>> fetchRecords = pollForFetches();
145-
if (fetchRecords.isEmpty()) {
141+
ScanRecords scanRecords = pollForFetches();
142+
if (scanRecords.isEmpty()) {
146143
try {
147144
if (!logFetcher.awaitNotEmpty(startNanos + timeoutNanos)) {
148145
// logFetcher waits for the timeout and no data in buffer,
149146
// so we return empty
150-
return new ScanRecords(fetchRecords);
147+
return scanRecords;
151148
}
152149
} catch (WakeupException e) {
153150
// wakeup() is called, we need to return empty
154-
return new ScanRecords(fetchRecords);
151+
return scanRecords;
155152
}
156153
} else {
157154
// before returning the fetched records, we can send off the next round of
158155
// fetches and avoid block waiting for their responses to enable pipelining
159156
// while the user is handling the fetched records.
160157
logFetcher.sendFetches();
161158

162-
return new ScanRecords(fetchRecords);
159+
return scanRecords;
163160
}
164161
} while (System.nanoTime() - startNanos < timeoutNanos);
165162

@@ -247,10 +244,10 @@ public void wakeup() {
247244
logFetcher.wakeup();
248245
}
249246

250-
private Map<TableBucket, List<ScanRecord>> pollForFetches() {
251-
Map<TableBucket, List<ScanRecord>> fetchedRecords = logFetcher.collectFetch();
252-
if (!fetchedRecords.isEmpty()) {
253-
return fetchedRecords;
247+
private ScanRecords pollForFetches() {
248+
ScanRecords scanRecords = logFetcher.collectFetch();
249+
if (!scanRecords.isEmpty()) {
250+
return scanRecords;
254251
}
255252

256253
// send any new fetches (won't resend pending fetches).

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,27 @@
3737
*/
3838
@PublicEvolving
3939
public class ScanRecords implements Iterable<ScanRecord> {
40-
public static final ScanRecords EMPTY = new ScanRecords(Collections.emptyMap());
40+
public static final ScanRecords EMPTY = new ScanRecords(Collections.emptyMap(), 0);
4141

4242
private final Map<TableBucket, List<ScanRecord>> records;
43+
private final long totalBytesRead;
4344

4445
public ScanRecords(Map<TableBucket, List<ScanRecord>> records) {
46+
this(records, 0);
47+
}
48+
49+
public ScanRecords(Map<TableBucket, List<ScanRecord>> records, long totalBytesRead) {
4550
this.records = records;
51+
this.totalBytesRead = totalBytesRead;
52+
}
53+
54+
/**
55+
* Get the total bytes read from the Fluss log in this batch.
56+
*
57+
* @return the total bytes read
58+
*/
59+
public long getTotalBytesRead() {
60+
return totalBytesRead;
4661
}
4762

4863
/**

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.fluss.client.metadata.MetadataUpdater;
2121
import org.apache.fluss.client.metadata.TestingMetadataUpdater;
22-
import org.apache.fluss.client.table.scanner.ScanRecord;
2322
import org.apache.fluss.config.Configuration;
2423
import org.apache.fluss.metadata.TableBucket;
2524
import org.apache.fluss.record.LogRecordReadContext;
@@ -31,7 +30,6 @@
3130

3231
import java.util.Collections;
3332
import java.util.HashMap;
34-
import java.util.List;
3533
import java.util.Map;
3634

3735
import static org.apache.fluss.record.TestData.DATA1;
@@ -97,10 +95,9 @@ void testNormal() throws Exception {
9795
assertThat(completedFetch.isInitialized()).isFalse();
9896

9997
// Fetch the data and validate that we get all the records we want back.
100-
Map<TableBucket, List<ScanRecord>> bucketAndRecords =
101-
logFetchCollector.collectFetch(logFetchBuffer);
102-
assertThat(bucketAndRecords.size()).isEqualTo(1);
103-
assertThat(bucketAndRecords.get(tb)).size().isEqualTo(10);
98+
ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
99+
assertThat(bucketAndRecords.buckets().size()).isEqualTo(1);
100+
assertThat(bucketAndRecords.records(tb).size()).isEqualTo(10);
104101

105102
// When we collected the data from the buffer, this will cause the completed fetch to get
106103
// initialized.
@@ -122,7 +119,7 @@ void testNormal() throws Exception {
122119

123120
// Now attempt to collect more records from the fetch buffer.
124121
bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
125-
assertThat(bucketAndRecords.size()).isEqualTo(0);
122+
assertThat(bucketAndRecords.buckets().size()).isEqualTo(0);
126123
}
127124

128125
@Test
@@ -147,14 +144,48 @@ void testCollectAfterUnassign() throws Exception {
147144
// unassign bucket 2
148145
logScannerStatus.unassignScanBuckets(Collections.singletonList(tb2));
149146

150-
Map<TableBucket, List<ScanRecord>> bucketAndRecords =
151-
logFetchCollector.collectFetch(logFetchBuffer);
147+
ScanRecords bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
152148
// should only contain records for bucket 1
153-
assertThat(bucketAndRecords.keySet()).containsExactly(tb1);
149+
assertThat(bucketAndRecords.buckets()).containsExactly(tb1);
154150

155151
// collect again, should be empty
156152
bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
157-
assertThat(bucketAndRecords.size()).isEqualTo(0);
153+
assertThat(bucketAndRecords.buckets().size()).isEqualTo(0);
154+
}
155+
156+
@Test
157+
void testTotalBytesRead() throws Exception {
158+
TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 1L, 1);
159+
TableBucket tb2 = new TableBucket(DATA1_TABLE_ID, 1L, 2);
160+
Map<TableBucket, Long> scanBuckets = new HashMap<>();
161+
scanBuckets.put(tb1, 0L);
162+
scanBuckets.put(tb2, 0L);
163+
logScannerStatus.assignScanBuckets(scanBuckets);
164+
165+
CompletedFetch completedFetch1 =
166+
makeCompletedFetch(
167+
tb1,
168+
new FetchLogResultForBucket(tb1, genMemoryLogRecordsByObject(DATA1), 10L),
169+
0L);
170+
CompletedFetch completedFetch2 =
171+
makeCompletedFetch(
172+
tb2,
173+
new FetchLogResultForBucket(tb2, genMemoryLogRecordsByObject(DATA1), 10L),
174+
0L);
175+
176+
logFetchBuffer.add(completedFetch1);
177+
logFetchBuffer.add(completedFetch2);
178+
179+
ScanRecords scanRecords = logFetchCollector.collectFetch(logFetchBuffer);
180+
181+
// Both fetches should be fully consumed
182+
assertThat(completedFetch1.isConsumed()).isTrue();
183+
assertThat(completedFetch2.isConsumed()).isTrue();
184+
185+
// totalBytesRead should be the sum of both completed fetches
186+
long expectedBytes = completedFetch1.getSizeInBytes() + completedFetch2.getSizeInBytes();
187+
assertThat(scanRecords.getTotalBytesRead()).isEqualTo(expectedBytes);
188+
assertThat(scanRecords.getTotalBytesRead()).isGreaterThan(0);
158189
}
159190

160191
private DefaultCompletedFetch makeCompletedFetch(

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,9 @@ void testFetchWithSchemaChange() throws Exception {
147147
assertThat(logFetcher.hasAvailableFetches()).isTrue();
148148
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
149149
});
150-
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
151-
assertThat(records.size()).isEqualTo(1);
152-
List<ScanRecord> scanRecords = records.get(tb0);
150+
ScanRecords records = logFetcher.collectFetch();
151+
assertThat(records.buckets().size()).isEqualTo(1);
152+
List<ScanRecord> scanRecords = records.records(tb0);
153153
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
154154
.isEqualTo(expectedRows);
155155

@@ -193,9 +193,9 @@ void testFetchWithSchemaChange() throws Exception {
193193
assertThat(newSchemaLogFetcher.getCompletedFetchesSize()).isEqualTo(2);
194194
});
195195
records = newSchemaLogFetcher.collectFetch();
196-
assertThat(records.size()).isEqualTo(1);
197-
assertThat(records.get(tb0)).hasSize(20);
198-
scanRecords = records.get(tb0);
196+
assertThat(records.buckets().size()).isEqualTo(1);
197+
assertThat(records.records(tb0)).hasSize(20);
198+
scanRecords = records.records(tb0);
199199
assertThat(scanRecords.stream().map(ScanRecord::getRow).collect(Collectors.toList()))
200200
.isEqualTo(expectedRows);
201201
newSchemaLogFetcher.close();
@@ -226,10 +226,10 @@ void testFetch() throws Exception {
226226
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(2);
227227
});
228228

229-
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
230-
assertThat(records.size()).isEqualTo(2);
231-
assertThat(records.get(tb0).size()).isEqualTo(10);
232-
assertThat(records.get(tb1).size()).isEqualTo(10);
229+
ScanRecords records = logFetcher.collectFetch();
230+
assertThat(records.buckets().size()).isEqualTo(2);
231+
assertThat(records.records(tb0).size()).isEqualTo(10);
232+
assertThat(records.records(tb1).size()).isEqualTo(10);
233233

234234
// after collect fetch, the fetcher is empty.
235235
assertThat(logFetcher.hasAvailableFetches()).isFalse();
@@ -297,9 +297,9 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception {
297297
assertThat(logFetcher.hasAvailableFetches()).isTrue();
298298
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(1);
299299
});
300-
Map<TableBucket, List<ScanRecord>> records = logFetcher.collectFetch();
301-
assertThat(records.size()).isEqualTo(1);
302-
assertThat(records.get(tb0).size()).isEqualTo(10);
300+
ScanRecords records = logFetcher.collectFetch();
301+
assertThat(records.buckets().size()).isEqualTo(1);
302+
assertThat(records.records(tb0).size()).isEqualTo(10);
303303
}
304304

305305
@Test

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/ScanRecordsTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,18 @@ void iterator() {
5757
}
5858
assertThat(c).isEqualTo(4);
5959
}
60+
61+
@Test
62+
void testTotalBytesRead() {
63+
Map<TableBucket, List<ScanRecord>> records = new LinkedHashMap<>();
64+
long tableId = 0;
65+
ScanRecord record1 = new ScanRecord(0L, 1000L, ChangeType.INSERT, row(1, "a"));
66+
records.put(new TableBucket(tableId, 0), Arrays.asList(record1));
67+
68+
// New constructor carries totalBytesRead
69+
assertThat(new ScanRecords(records, 1024L).getTotalBytesRead()).isEqualTo(1024L);
70+
71+
// Old constructor defaults to 0 for backward compatibility
72+
assertThat(new ScanRecords(records).getTotalBytesRead()).isEqualTo(0L);
73+
}
6074
}

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,4 +280,12 @@ public class MetricNames {
280280
public static final String NETTY_NUM_ALLOCATIONS_PER_SECONDS = "numAllocationsPerSecond";
281281
public static final String NETTY_NUM_HUGE_ALLOCATIONS_PER_SECONDS =
282282
"numHugeAllocationsPerSecond";
283+
284+
// --------------------------------------------------------------------------------------------
285+
// metrics for tiering service
286+
// --------------------------------------------------------------------------------------------
287+
288+
// for lake tiering metrics - operator level
289+
public static final String TIERING_SERVICE_READ_BYTES = "readBytes";
290+
public static final String TIERING_SERVICE_READ_BYTES_RATE = "readBytesPerSecond";
283291
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.client.Connection;
2323
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
2424
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
25+
import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics;
2526
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
2627
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
2728
import org.apache.fluss.lake.writer.LakeTieringFactory;
@@ -73,17 +74,31 @@ public TieringSourceReader(
7374
Duration pollTimeout) {
7475
super(
7576
elementsQueue,
76-
new TieringSourceFetcherManager<>(
77-
elementsQueue,
78-
() -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout),
79-
context.getConfiguration(),
80-
(ignore) -> {}),
77+
createFetcherManager(
78+
elementsQueue, context, connection, lakeTieringFactory, pollTimeout),
8179
new TableBucketWriteResultEmitter<>(),
8280
context.getConfiguration(),
8381
context);
8482
this.connection = connection;
8583
}
8684

85+
private static <WriteResult> TieringSourceFetcherManager<WriteResult> createFetcherManager(
86+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
87+
elementsQueue,
88+
SourceReaderContext context,
89+
Connection connection,
90+
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
91+
Duration pollTimeout) {
92+
TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup());
93+
return new TieringSourceFetcherManager<>(
94+
elementsQueue,
95+
() ->
96+
new TieringSplitReader<>(
97+
connection, lakeTieringFactory, pollTimeout, tieringMetrics),
98+
context.getConfiguration(),
99+
(ignore) -> {});
100+
}
101+
87102
@Override
88103
public void start() {
89104
// we request a split only if we did not get splits during the checkpoint restore

0 commit comments

Comments
 (0)