Skip to content

Commit 7f66f0b

Browse files
committed
[lake/tiering] Add per-table monitoring metrics for Lake Tiering
1 parent bafba42 commit 7f66f0b

File tree

15 files changed

+736
-59
lines changed

15 files changed

+736
-59
lines changed

fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,23 +61,59 @@ public class LakeCommitResult {
6161
// 2: committedIsReadable is true, committed snapshot is just also readable
6262
@Nullable private final ReadableSnapshot readableSnapshot;
6363

64+
// Total file size (bytes) of all live data files in the lake table after this commit.
65+
// -1 means the value is not reported by the lake implementation.
66+
private final long totalLakeFileSize;
67+
68+
// Total physical row count of all live data files in the lake table after this commit.
69+
// For append-only tables this equals the logical record count; for primary-key tables it is
70+
// the physical row count which may include un-compacted delete rows and duplicates.
71+
// -1 means the value is not reported by the lake implementation.
72+
private final long totalLakeRecordCount;
73+
6474
private LakeCommitResult(
6575
long committedSnapshotId,
6676
boolean committedIsReadable,
6777
@Nullable ReadableSnapshot readableSnapshot,
68-
@Nullable Long earliestSnapshotIDToKeep) {
78+
@Nullable Long earliestSnapshotIDToKeep,
79+
long totalLakeFileSize,
80+
long totalLakeRecordCount) {
6981
this.committedSnapshotId = committedSnapshotId;
7082
this.committedIsReadable = committedIsReadable;
7183
this.readableSnapshot = readableSnapshot;
7284
this.earliestSnapshotIDToKeep = earliestSnapshotIDToKeep;
85+
this.totalLakeFileSize = totalLakeFileSize;
86+
this.totalLakeRecordCount = totalLakeRecordCount;
7387
}
7488

7589
public static LakeCommitResult committedIsReadable(long committedSnapshotId) {
76-
return new LakeCommitResult(committedSnapshotId, true, null, KEEP_LATEST);
90+
return committedIsReadable(committedSnapshotId, -1L, -1L);
91+
}
92+
93+
public static LakeCommitResult committedIsReadable(
94+
long committedSnapshotId, long totalLakeFileSize, long totalLakeRecordCount) {
95+
return new LakeCommitResult(
96+
committedSnapshotId,
97+
true,
98+
null,
99+
KEEP_LATEST,
100+
totalLakeFileSize,
101+
totalLakeRecordCount);
77102
}
78103

79104
public static LakeCommitResult unknownReadableSnapshot(long committedSnapshotId) {
80-
return new LakeCommitResult(committedSnapshotId, false, null, KEEP_ALL_PREVIOUS);
105+
return unknownReadableSnapshot(committedSnapshotId, -1L, -1L);
106+
}
107+
108+
public static LakeCommitResult unknownReadableSnapshot(
109+
long committedSnapshotId, long totalLakeFileSize, long totalLakeRecordCount) {
110+
return new LakeCommitResult(
111+
committedSnapshotId,
112+
false,
113+
null,
114+
KEEP_ALL_PREVIOUS,
115+
totalLakeFileSize,
116+
totalLakeRecordCount);
81117
}
82118

83119
public static LakeCommitResult withReadableSnapshot(
@@ -89,12 +125,32 @@ public static LakeCommitResult withReadableSnapshot(
89125
// the readable log end offset for readable snapshot
90126
Map<TableBucket, Long> readableLogEndOffsets,
91127
@Nullable Long earliestSnapshotIDToKeep) {
128+
return withReadableSnapshot(
129+
committedSnapshotId,
130+
readableSnapshotId,
131+
tieredLogEndOffsets,
132+
readableLogEndOffsets,
133+
earliestSnapshotIDToKeep,
134+
-1L,
135+
-1L);
136+
}
137+
138+
public static LakeCommitResult withReadableSnapshot(
139+
long committedSnapshotId,
140+
long readableSnapshotId,
141+
Map<TableBucket, Long> tieredLogEndOffsets,
142+
Map<TableBucket, Long> readableLogEndOffsets,
143+
@Nullable Long earliestSnapshotIDToKeep,
144+
long totalLakeFileSize,
145+
long totalLakeRecordCount) {
92146
return new LakeCommitResult(
93147
committedSnapshotId,
94148
false,
95149
new ReadableSnapshot(
96150
readableSnapshotId, tieredLogEndOffsets, readableLogEndOffsets),
97-
earliestSnapshotIDToKeep);
151+
earliestSnapshotIDToKeep,
152+
totalLakeFileSize,
153+
totalLakeRecordCount);
98154
}
99155

100156
public long getCommittedSnapshotId() {
@@ -110,6 +166,27 @@ public ReadableSnapshot getReadableSnapshot() {
110166
return readableSnapshot;
111167
}
112168

169+
/**
170+
* Returns the total file size (bytes) of all live data files in the lake table after this
171+
* commit.
172+
*
173+
* @return total file size in bytes, or -1 if not reported by the lake implementation
174+
*/
175+
public long getTotalLakeFileSize() {
176+
return totalLakeFileSize;
177+
}
178+
179+
/**
180+
* Returns the total physical row count of all live data files in the lake table after this
181+
* commit. For append-only tables this equals the logical record count; for primary-key tables
182+
* this is the physical row count which may include un-compacted delete rows and duplicates.
183+
*
184+
* @return total physical row count, or -1 if not reported by the lake implementation
185+
*/
186+
public long getTotalLakeRecordCount() {
187+
return totalLakeRecordCount;
188+
}
189+
113190
/**
114191
* Gets the earliest snapshot ID to keep.
115192
*

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ public class MetricNames {
6363
public static final String LAKE_TIERING_PENDING_TABLES_COUNT = "pendingTablesCount";
6464
public static final String LAKE_TIERING_RUNNING_TABLES_COUNT = "runningTablesCount";
6565

66+
// for lake tiering table-level metrics
67+
public static final String LAKE_TIERING_TABLE_TIER_LAG = "tierLag";
68+
public static final String LAKE_TIERING_TABLE_TIER_DURATION = "tierDuration";
69+
public static final String LAKE_TIERING_TABLE_FAILURES_TOTAL = "failuresTotal";
70+
public static final String LAKE_TIERING_TABLE_FILE_SIZE = "fileSize";
71+
public static final String LAKE_TIERING_TABLE_RECORD_COUNT = "recordCount";
72+
6673
// --------------------------------------------------------------------------------------------
6774
// metrics for tablet server
6875
// --------------------------------------------------------------------------------------------

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
2626
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
2727
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
28+
import org.apache.fluss.flink.tiering.event.TieringStats;
2829
import org.apache.fluss.flink.tiering.source.TableBucketWriteResult;
2930
import org.apache.fluss.flink.tiering.source.TieringSource;
3031
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
@@ -94,6 +95,22 @@ public class TieringCommitOperator<WriteResult, Committable>
9495
private final Map<Long, List<TableBucketWriteResult<WriteResult>>>
9596
collectedTableBucketWriteResults;
9697

98+
/**
99+
* The result of one table's commit round, holding the lake committable (nullable for empty
100+
* commits where no data was written) and the associated tiering statistics.
101+
*/
102+
private final class CommitResult {
103+
/** The lake committable, or {@code null} if nothing was written in this round. */
104+
@Nullable final Committable committable;
105+
/** Per-table tiering statistics collected during this round. */
106+
final TieringStats stats;
107+
108+
CommitResult(@Nullable Committable committable, TieringStats stats) {
109+
this.committable = committable;
110+
this.stats = stats;
111+
}
112+
}
113+
97114
public TieringCommitOperator(
98115
StreamOperatorParameters<CommittableMessage<Committable>> parameters,
99116
Configuration flussConf,
@@ -135,18 +152,21 @@ public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
135152

136153
if (committableWriteResults != null) {
137154
try {
138-
Committable committable =
155+
CommitResult commitResult =
139156
commitWriteResults(
140157
tableId,
141158
tableBucketWriteResult.tablePath(),
142159
committableWriteResults);
143-
// only emit when committable is not-null
144-
if (committable != null) {
145-
output.collect(new StreamRecord<>(new CommittableMessage<>(committable)));
160+
// only emit downstream when actual data was written
161+
if (commitResult.committable != null) {
162+
output.collect(
163+
new StreamRecord<>(new CommittableMessage<>(commitResult.committable)));
146164
}
147-
// notify that the table id has been finished tier
165+
// always notify the coordinator that this table's tiering round is done,
166+
// even for empty commits — otherwise the coordinator will keep waiting
148167
operatorEventGateway.sendEventToCoordinator(
149-
new SourceEventWrapper(new FinishedTieringEvent(tableId)));
168+
new SourceEventWrapper(
169+
new FinishedTieringEvent(tableId, commitResult.stats)));
150170
} catch (Exception e) {
151171
// if any exception happens, send to source coordinator to mark it as failed
152172
operatorEventGateway.sendEventToCoordinator(
@@ -162,28 +182,31 @@ public void processElement(StreamRecord<TableBucketWriteResult<WriteResult>> str
162182
}
163183
}
164184

165-
@Nullable
166-
private Committable commitWriteResults(
185+
/**
186+
* Commits the collected write results for one table to the lake and Fluss.
187+
*
188+
* <p>Always returns a non-null {@link CommitResult}. When all buckets produced no data (empty
189+
* commit), {@link CommitResult#committable} is {@code null} and stats are {@link
190+
* TieringStats#UNKNOWN}.
191+
*/
192+
private CommitResult commitWriteResults(
167193
long tableId,
168194
TablePath tablePath,
169195
List<TableBucketWriteResult<WriteResult>> committableWriteResults)
170196
throws Exception {
171-
// filter out non-null write result
172-
committableWriteResults =
197+
// filter down to buckets that actually produced data
198+
List<TableBucketWriteResult<WriteResult>> nonEmptyResults =
173199
committableWriteResults.stream()
174-
.filter(
175-
writeResultTableBucketWriteResult ->
176-
writeResultTableBucketWriteResult.writeResult() != null)
200+
.filter(r -> r.writeResult() != null)
177201
.collect(Collectors.toList());
178202

179-
// empty, means all write result is null, which is a empty commit,
180-
// return null to skip the empty commit
181-
if (committableWriteResults.isEmpty()) {
203+
// all buckets were empty — nothing to commit to the lake
204+
if (nonEmptyResults.isEmpty()) {
182205
LOG.info(
183206
"Commit tiering write results is empty for table {}, table path {}",
184207
tableId,
185208
tablePath);
186-
return null;
209+
return new CommitResult(null, TieringStats.UNKNOWN);
187210
}
188211

189212
// Check if the table was dropped and recreated during tiering.
@@ -202,18 +225,15 @@ private Committable commitWriteResults(
202225
try (LakeCommitter<WriteResult, Committable> lakeCommitter =
203226
lakeTieringFactory.createLakeCommitter(
204227
new TieringCommitterInitContext(
205-
tablePath,
206-
admin.getTableInfo(tablePath).get(),
207-
lakeTieringConfig,
208-
flussConfig))) {
228+
tablePath, currentTableInfo, lakeTieringConfig, flussConfig))) {
209229
List<WriteResult> writeResults =
210-
committableWriteResults.stream()
230+
nonEmptyResults.stream()
211231
.map(TableBucketWriteResult::writeResult)
212232
.collect(Collectors.toList());
213233

214234
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
215235
Map<TableBucket, Long> logMaxTieredTimestamps = new HashMap<>();
216-
for (TableBucketWriteResult<WriteResult> writeResult : committableWriteResults) {
236+
for (TableBucketWriteResult<WriteResult> writeResult : nonEmptyResults) {
217237
TableBucket tableBucket = writeResult.tableBucket();
218238
logEndOffsets.put(tableBucket, writeResult.logEndOffset());
219239
logMaxTieredTimestamps.put(tableBucket, writeResult.maxTimestamp());
@@ -251,7 +271,11 @@ private Committable commitWriteResults(
251271
lakeBucketTieredOffsetsFile,
252272
logEndOffsets,
253273
logMaxTieredTimestamps);
254-
return committable;
274+
return new CommitResult(
275+
committable,
276+
new TieringStats(
277+
lakeCommitResult.getTotalLakeFileSize(),
278+
lakeCommitResult.getTotalLakeRecordCount()));
255279
}
256280
}
257281

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,23 @@ public class FinishedTieringEvent implements SourceEvent {
2626

2727
private final long tableId;
2828

29-
public FinishedTieringEvent(long tableId) {
29+
/** Statistics collected during this tiering round. */
30+
private final TieringStats stats;
31+
32+
public FinishedTieringEvent(long tableId, TieringStats stats) {
3033
this.tableId = tableId;
34+
this.stats = stats != null ? stats : TieringStats.UNKNOWN;
35+
}
36+
37+
public FinishedTieringEvent(long tableId) {
38+
this(tableId, TieringStats.UNKNOWN);
3139
}
3240

3341
public long getTableId() {
3442
return tableId;
3543
}
44+
45+
public TieringStats getStats() {
46+
return stats;
47+
}
3648
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.flink.tiering.event;
20+
21+
import java.io.Serializable;
22+
23+
/**
24+
* Immutable statistics for a single completed tiering round of a lake table.
25+
*
26+
* <p>All long fields use {@code -1} as the sentinel value meaning "unknown / not supported".
27+
*/
28+
public final class TieringStats implements Serializable {
29+
30+
private static final long serialVersionUID = 1L;
31+
32+
/**
33+
* A {@code TieringStats} instance where every field is {@code -1} (unknown/unsupported). Use
34+
* this as the default when no stats are available.
35+
*/
36+
public static final TieringStats UNKNOWN = new TieringStats(-1L, -1L);
37+
38+
// -----------------------------------------------------------------------------------------
39+
// Lake data stats (reported by the lake committer)
40+
// -----------------------------------------------------------------------------------------
41+
42+
/** Cumulative total file size (bytes) of the lake table after this tiering round. */
43+
private final long fileSize;
44+
45+
/** Cumulative total record count of the lake table after this tiering round. */
46+
private final long recordCount;
47+
48+
public TieringStats(long fileSize, long recordCount) {
49+
this.fileSize = fileSize;
50+
this.recordCount = recordCount;
51+
}
52+
53+
public long getFileSize() {
54+
return fileSize;
55+
}
56+
57+
public long getRecordCount() {
58+
return recordCount;
59+
}
60+
61+
@Override
62+
public String toString() {
63+
return "TieringStats{" + "fileSize=" + fileSize + ", recordCount=" + recordCount + '}';
64+
}
65+
}

0 commit comments

Comments
 (0)