Skip to content

Commit 8945e67

Browse files
Support rocksdb metrics
1 parent 34b7688 commit 8945e67

File tree

11 files changed

+990
-7
lines changed

11 files changed

+990
-7
lines changed

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,54 @@ public class MetricNames {
131131
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
132132
"preWriteBufferTruncateAsErrorPerSecond";
133133

134+
// --------------------------------------------------------------------------------------------
135+
// RocksDB metrics
136+
// --------------------------------------------------------------------------------------------
137+
// Table-level RocksDB metrics (aggregated from all buckets of a table, Max aggregation)
138+
/** Maximum write stall duration across all buckets of this table (Max aggregation). */
139+
public static final String ROCKSDB_WRITE_STALL_MICROS_MAX = "rocksdbWriteStallMicrosMax";
140+
141+
/** Maximum get latency across all buckets of this table (Max aggregation). */
142+
public static final String ROCKSDB_GET_LATENCY_MICROS_MAX = "rocksdbGetLatencyMicrosMax";
143+
144+
/** Maximum write latency across all buckets of this table (Max aggregation). */
145+
public static final String ROCKSDB_WRITE_LATENCY_MICROS_MAX = "rocksdbWriteLatencyMicrosMax";
146+
147+
/** Maximum number of L0 files across all buckets of this table (Max aggregation). */
148+
public static final String ROCKSDB_NUM_FILES_AT_LEVEL0_MAX = "rocksdbNumFilesAtLevel0Max";
149+
150+
/** Maximum flush pending indicator across all buckets of this table (Max aggregation). */
151+
public static final String ROCKSDB_FLUSH_PENDING_MAX = "rocksdbFlushPendingMax";
152+
153+
/** Maximum compaction pending indicator across all buckets of this table (Max aggregation). */
154+
public static final String ROCKSDB_COMPACTION_PENDING_MAX = "rocksdbCompactionPendingMax";
155+
156+
/** Maximum compaction time across all buckets of this table (Max aggregation). */
157+
public static final String ROCKSDB_COMPACTION_TIME_MICROS_MAX =
158+
"rocksdbCompactionTimeMicrosMax";
159+
160+
// Table-level RocksDB metrics (aggregated from all buckets of a table, Sum aggregation)
161+
/** Total bytes read across all buckets of this table (Sum aggregation). */
162+
public static final String ROCKSDB_BYTES_READ_TOTAL = "rocksdbBytesReadTotal";
163+
164+
/** Total bytes written across all buckets of this table (Sum aggregation). */
165+
public static final String ROCKSDB_BYTES_WRITTEN_TOTAL = "rocksdbBytesWrittenTotal";
166+
167+
/** Total flush bytes written across all buckets of this table (Sum aggregation). */
168+
public static final String ROCKSDB_FLUSH_BYTES_WRITTEN_TOTAL = "rocksdbFlushBytesWrittenTotal";
169+
170+
/** Total compaction bytes read across all buckets of this table (Sum aggregation). */
171+
public static final String ROCKSDB_COMPACTION_BYTES_READ_TOTAL =
172+
"rocksdbCompactionBytesReadTotal";
173+
174+
/** Total compaction bytes written across all buckets of this table (Sum aggregation). */
175+
public static final String ROCKSDB_COMPACTION_BYTES_WRITTEN_TOTAL =
176+
"rocksdbCompactionBytesWrittenTotal";
177+
178+
// Server-level RocksDB metrics (aggregated from all tables, Sum aggregation)
179+
/** Total memory usage across all RocksDB instances in this server (Sum aggregation). */
180+
public static final String ROCKSDB_MEMORY_USAGE_TOTAL = "rocksdbMemoryUsageTotal";
181+
134182
// --------------------------------------------------------------------------------------------
135183
// metrics for table bucket
136184
// --------------------------------------------------------------------------------------------

fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,14 @@ public KvTablet getOrCreateKv(
266266
sharedRocksDBRateLimiter);
267267
currentKvs.put(tableBucket, tablet);
268268

269+
// Register RocksDB metrics to TableMetricGroup for aggregation
270+
// Note: BucketMetricGroup is already created by Replica when LogTablet is
271+
// created
272+
if (tablet.getRocksDBMetrics() != null) {
273+
serverMetricGroup.registerRocksDBMetrics(
274+
tablePath.getTablePath(), tableBucket, tablet.getRocksDBMetrics());
275+
}
276+
269277
LOG.info(
270278
"Created kv tablet for bucket {} in dir {}.",
271279
tableBucket,
@@ -304,6 +312,9 @@ public void dropKv(TableBucket tableBucket) {
304312
if (dropKvTablet != null) {
305313
TablePath tablePath = dropKvTablet.getTablePath();
306314
try {
315+
// Unregister RocksDB metrics from TableMetricGroup
316+
serverMetricGroup.removeTableBucketMetricGroup(tablePath, tableBucket);
317+
307318
dropKvTablet.drop();
308319
if (dropKvTablet.getPartitionName() == null) {
309320
LOG.info(
@@ -386,6 +397,14 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti
386397
currentKvs.get(tableBucket).getKvTabletDir().getAbsolutePath()));
387398
}
388399
this.currentKvs.put(tableBucket, kvTablet);
400+
401+
// Register RocksDB metrics to TableMetricGroup for aggregation
402+
// Note: BucketMetricGroup is already created by Replica when LogTablet is created
403+
if (kvTablet.getRocksDBMetrics() != null) {
404+
serverMetricGroup.registerRocksDBMetrics(
405+
physicalTablePath.getTablePath(), tableBucket, kvTablet.getRocksDBMetrics());
406+
}
407+
389408
return kvTablet;
390409
}
391410

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
5050
import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
5151
import org.apache.fluss.server.kv.rocksdb.RocksDBKvBuilder;
52+
import org.apache.fluss.server.kv.rocksdb.RocksDBMetrics;
5253
import org.apache.fluss.server.kv.rocksdb.RocksDBResourceContainer;
5354
import org.apache.fluss.server.kv.rowmerger.DefaultRowMerger;
5455
import org.apache.fluss.server.kv.rowmerger.RowMerger;
@@ -118,6 +119,9 @@ public final class KvTablet {
118119
// the changelog image mode for this tablet
119120
private final ChangelogImage changelogImage;
120121

122+
// RocksDB metrics for this tablet
123+
@Nullable private final RocksDBMetrics rocksDBMetrics;
124+
121125
/**
122126
* The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been
123127
* flushed into kv.
@@ -142,7 +146,8 @@ private KvTablet(
142146
RowMerger rowMerger,
143147
ArrowCompressionInfo arrowCompressionInfo,
144148
SchemaGetter schemaGetter,
145-
ChangelogImage changelogImage) {
149+
ChangelogImage changelogImage,
150+
@Nullable RocksDBMetrics rocksDBMetrics) {
146151
this.physicalPath = physicalPath;
147152
this.tableBucket = tableBucket;
148153
this.logTablet = logTablet;
@@ -158,6 +163,7 @@ private KvTablet(
158163
this.arrowCompressionInfo = arrowCompressionInfo;
159164
this.schemaGetter = schemaGetter;
160165
this.changelogImage = changelogImage;
166+
this.rocksDBMetrics = rocksDBMetrics;
161167
}
162168

163169
public static KvTablet create(
@@ -177,6 +183,17 @@ public static KvTablet create(
177183
RateLimiter sharedRateLimiter)
178184
throws IOException {
179185
RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter);
186+
187+
// Create RocksDB metrics accessor (will be registered to TableMetricGroup by KvManager)
188+
// Pass ResourceGuard to ensure thread-safe access during concurrent close operations
189+
// Pass ColumnFamilyHandle for column family specific properties like num-files-at-level0
190+
RocksDBMetrics rocksDBMetrics =
191+
new RocksDBMetrics(
192+
kv.getDb(),
193+
kv.getStatistics(),
194+
kv.getResourceGuard(),
195+
kv.getDefaultColumnFamilyHandle());
196+
180197
return new KvTablet(
181198
tablePath,
182199
tableBucket,
@@ -192,14 +209,16 @@ public static KvTablet create(
192209
rowMerger,
193210
arrowCompressionInfo,
194211
schemaGetter,
195-
changelogImage);
212+
changelogImage,
213+
rocksDBMetrics);
196214
}
197215

198216
private static RocksDBKv buildRocksDBKv(
199217
Configuration configuration, File kvDir, RateLimiter sharedRateLimiter)
200218
throws IOException {
219+
// Enable statistics to support RocksDB metrics
201220
RocksDBResourceContainer rocksDBResourceContainer =
202-
new RocksDBResourceContainer(configuration, kvDir, false, sharedRateLimiter);
221+
new RocksDBResourceContainer(configuration, kvDir, true, sharedRateLimiter);
203222
RocksDBKvBuilder rocksDBKvBuilder =
204223
new RocksDBKvBuilder(
205224
kvDir,
@@ -225,6 +244,16 @@ public File getKvTabletDir() {
225244
return kvTabletDir;
226245
}
227246

247+
/**
248+
* Get RocksDB metrics accessor for this tablet.
249+
*
250+
* @return the RocksDB metrics accessor, or null if not available
251+
*/
252+
@Nullable
253+
public RocksDBMetrics getRocksDBMetrics() {
254+
return rocksDBMetrics;
255+
}
256+
228257
void setFlushedLogOffset(long flushedLogOffset) {
229258
this.flushedLogOffset = flushedLogOffset;
230259
}
@@ -621,6 +650,8 @@ public void close() throws Exception {
621650
if (isClosed) {
622651
return;
623652
}
653+
// Note: RocksDB metrics lifecycle is managed by TableMetricGroup
654+
// No need to close it here
624655
if (rocksDBKv != null) {
625656
rocksDBKv.close();
626657
}

fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.rocksdb.RocksDB;
3030
import org.rocksdb.RocksDBException;
3131
import org.rocksdb.RocksIterator;
32+
import org.rocksdb.Statistics;
3233
import org.rocksdb.WriteOptions;
3334

3435
import javax.annotation.Nullable;
@@ -63,19 +64,24 @@ public class RocksDBKv implements AutoCloseable {
6364
/** Our RocksDB database. Currently, one kv tablet, one RocksDB instance. */
6465
protected final RocksDB db;
6566

67+
/** RocksDB Statistics for metrics collection. */
68+
private final @Nullable Statistics statistics;
69+
6670
// mark whether this kv is already closed and prevent duplicate closing
6771
private volatile boolean closed = false;
6872

6973
public RocksDBKv(
7074
RocksDBResourceContainer optionsContainer,
7175
RocksDB db,
7276
ResourceGuard rocksDBResourceGuard,
73-
ColumnFamilyHandle defaultColumnFamilyHandle) {
77+
ColumnFamilyHandle defaultColumnFamilyHandle,
78+
@Nullable Statistics statistics) {
7479
this.optionsContainer = optionsContainer;
7580
this.db = db;
7681
this.rocksDBResourceGuard = rocksDBResourceGuard;
7782
this.writeOptions = optionsContainer.getWriteOptions();
7883
this.defaultColumnFamilyHandle = defaultColumnFamilyHandle;
84+
this.statistics = statistics;
7985
}
8086

8187
public ResourceGuard getResourceGuard() {
@@ -206,4 +212,13 @@ public void close() throws Exception {
206212
public RocksDB getDb() {
207213
return db;
208214
}
215+
216+
@Nullable
217+
public Statistics getStatistics() {
218+
return optionsContainer.getStatistics();
219+
}
220+
221+
public ColumnFamilyHandle getDefaultColumnFamilyHandle() {
222+
return defaultColumnFamilyHandle;
223+
}
209224
}

fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKvBuilder.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,12 @@ public RocksDBKv build() throws KvBuildingException {
107107
throw new KvBuildingException(errMsg, t);
108108
}
109109
LOG.info("Finished building RocksDB kv at {}.", instanceBasePath);
110-
return new RocksDBKv(optionsContainer, db, rocksDBResourceGuard, defaultColumnFamilyHandle);
110+
return new RocksDBKv(
111+
optionsContainer,
112+
db,
113+
rocksDBResourceGuard,
114+
defaultColumnFamilyHandle,
115+
optionsContainer.getStatistics());
111116
}
112117

113118
void prepareDirectories() throws IOException {

0 commit comments

Comments
 (0)