Skip to content

Commit b48685a

Browse files
committed
Add compaction metrics system table. apache#6880
1 parent e2591d1 commit b48685a

31 files changed

+1591
-27
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,20 @@ public InlineElement getDescription() {
804804
"Ratio of the deleted rows in a data file to be forced compacted for "
805805
+ "append-only table.");
806806

807+
public static final ConfigOption<Boolean> COMPACTION_METRICS_ENABLED =
808+
key("compaction.metrics.enabled")
809+
.booleanType()
810+
.defaultValue(false)
811+
.withDescription(
812+
"If enabled, compaction metric data will be recorded for each instance.");
813+
814+
public static final ConfigOption<Integer> COMPACTION_METRIC_RETAINED_NUM =
815+
key("compaction.metrics.retained_num")
816+
.intType()
817+
.defaultValue(10)
818+
.withDescription(
819+
"Set the maximum number of times a record metric can be retained.");
820+
807821
public static final ConfigOption<ChangelogProducer> CHANGELOG_PRODUCER =
808822
key("changelog-producer")
809823
.enumType(ChangelogProducer.class)
@@ -2675,6 +2689,14 @@ public double compactionDeleteRatioThreshold() {
26752689
return options.get(COMPACTION_DELETE_RATIO_THRESHOLD);
26762690
}
26772691

2692+
public boolean compactMetricsEnabled() {
2693+
return options.get(COMPACTION_METRICS_ENABLED);
2694+
}
2695+
2696+
public int compactMetricsRetainedNum() {
2697+
return options.get(COMPACTION_METRIC_RETAINED_NUM);
2698+
}
2699+
26782700
public long dynamicBucketTargetRowNum() {
26792701
return options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM);
26802702
}
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.metrics;
20+
21+
import org.apache.paimon.annotation.Public;
22+
import org.apache.paimon.utils.JsonSerdeUtil;
23+
24+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
25+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
26+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
27+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
28+
29+
import javax.annotation.Nullable;
30+
31+
import java.io.Serializable;
32+
import java.util.Objects;
33+
34+
/** @since 1.4.0 */
35+
@Public
36+
@JsonIgnoreProperties(ignoreUnknown = true)
37+
public class CompactMetric implements Serializable {
38+
39+
private static final long serialVersionUID = 1L;
40+
41+
public static final long FIRST_SNAPSHOT_ID = 1;
42+
43+
public static final int TABLE_STORE_02_VERSION = 1;
44+
protected static final int CURRENT_VERSION = 3;
45+
46+
protected static final String FIELD_VERSION = "version";
47+
protected static final String FIELD_SNAPSHOT_ID = "snapshotId";
48+
protected static final String FIELD_COMMIT_TIME = "commitTime";
49+
protected static final String FIELD_COMPACT_AVG_DURATION = "compactAvgDuration";
50+
protected static final String FIELD_COMPACTION_MAX_DURATION = "compactMaxDuration";
51+
protected static final String FIELD_COMPACTION_MIN_DURATION = "compactMinDuration";
52+
protected static final String FIELD_BUCKETS = "compactBuckets";
53+
protected static final String FIELD_COMPACTION_TYPE = "compactType";
54+
protected static final String FIELD_IDENTIFIER = "identifier";
55+
protected static final String FIELD_COMMIT_USER = "commitUser";
56+
57+
@JsonProperty(FIELD_VERSION)
58+
@Nullable
59+
protected final Integer version;
60+
61+
@JsonProperty(FIELD_SNAPSHOT_ID)
62+
protected final long snapshotId;
63+
64+
@JsonProperty(FIELD_COMMIT_TIME)
65+
protected final long commitTime;
66+
67+
@JsonProperty(FIELD_COMPACT_AVG_DURATION)
68+
protected final long compactDuration;
69+
70+
// a manifest list recording all new changes occurred in this snapshot
71+
// for faster expire and streaming reads
72+
@JsonProperty(FIELD_COMPACTION_MAX_DURATION)
73+
protected final long compactMaxDuration;
74+
75+
@JsonProperty(FIELD_COMPACTION_MIN_DURATION)
76+
protected final long compactMinDuration;
77+
78+
// a manifest list recording all changelog produced in this snapshot
79+
// null if no changelog is produced, or for paimon <= 0.2
80+
@JsonProperty(FIELD_BUCKETS)
81+
protected final String buckets;
82+
83+
@JsonProperty(FIELD_IDENTIFIER)
84+
protected final long identifier;
85+
86+
@JsonProperty(FIELD_COMMIT_USER)
87+
protected final String commitUser;
88+
89+
@JsonProperty(FIELD_COMPACTION_TYPE)
90+
protected final String compactType;
91+
92+
public CompactMetric(
93+
long snapshotId,
94+
long commitTime,
95+
long compactDuration,
96+
long compactMaxDuration,
97+
long compactMinDuration,
98+
String buckets,
99+
String compactType,
100+
long identifier,
101+
String commitUser) {
102+
this(
103+
CURRENT_VERSION,
104+
snapshotId,
105+
commitTime,
106+
compactDuration,
107+
compactMaxDuration,
108+
compactMinDuration,
109+
buckets,
110+
compactType,
111+
identifier,
112+
commitUser);
113+
}
114+
115+
@JsonCreator
116+
public CompactMetric(
117+
@JsonProperty(FIELD_VERSION) @Nullable Integer version,
118+
@JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
119+
@JsonProperty(FIELD_COMMIT_TIME) long commitTime,
120+
@JsonProperty(FIELD_COMPACT_AVG_DURATION) long compactDuration,
121+
@JsonProperty(FIELD_COMPACTION_MAX_DURATION) long compactMaxDuration,
122+
@JsonProperty(FIELD_COMPACTION_MIN_DURATION) long compactMinDuration,
123+
@JsonProperty(FIELD_BUCKETS) @Nullable String buckets,
124+
@JsonProperty(FIELD_COMPACTION_TYPE) @Nullable String compactType,
125+
@JsonProperty(FIELD_IDENTIFIER) long identifier,
126+
@JsonProperty(FIELD_COMMIT_USER) String commitUser) {
127+
this.version = version;
128+
this.snapshotId = snapshotId;
129+
this.commitTime = commitTime;
130+
this.compactDuration = compactDuration;
131+
this.compactMaxDuration = compactMaxDuration;
132+
this.compactMinDuration = compactMinDuration;
133+
this.buckets = buckets;
134+
this.compactType = compactType;
135+
this.identifier = identifier;
136+
this.commitUser = commitUser;
137+
}
138+
139+
@JsonGetter(FIELD_VERSION)
140+
public int version() {
141+
// there is no version field for paimon <= 0.2
142+
return version == null ? TABLE_STORE_02_VERSION : version;
143+
}
144+
145+
@JsonGetter(FIELD_SNAPSHOT_ID)
146+
public long snapshotId() {
147+
return snapshotId;
148+
}
149+
150+
@JsonGetter(FIELD_COMMIT_TIME)
151+
public long commitTime() {
152+
return commitTime;
153+
}
154+
155+
@JsonGetter(FIELD_COMPACT_AVG_DURATION)
156+
public long compactDuration() {
157+
return compactDuration;
158+
}
159+
160+
@JsonGetter(FIELD_COMPACTION_MAX_DURATION)
161+
public long compactMaxDuration() {
162+
return compactMaxDuration;
163+
}
164+
165+
@JsonGetter(FIELD_COMPACTION_MIN_DURATION)
166+
public long compactMinDuration() {
167+
return compactMinDuration;
168+
}
169+
170+
@JsonGetter(FIELD_BUCKETS)
171+
public String buckets() {
172+
return buckets == null ? "{}" : buckets;
173+
}
174+
175+
@JsonGetter(FIELD_COMPACTION_TYPE)
176+
public String compactType() {
177+
return compactType;
178+
}
179+
180+
@JsonGetter(FIELD_IDENTIFIER)
181+
public long identifier() {
182+
return identifier;
183+
}
184+
185+
@JsonGetter(FIELD_COMMIT_USER)
186+
public String commitUser() {
187+
return commitUser;
188+
}
189+
190+
public String toJson() {
191+
return JsonSerdeUtil.toJson(this);
192+
}
193+
194+
@Override
195+
public int hashCode() {
196+
return Objects.hash(
197+
version,
198+
snapshotId,
199+
commitTime,
200+
compactDuration,
201+
compactMaxDuration,
202+
compactMinDuration,
203+
buckets,
204+
compactType,
205+
identifier,
206+
commitUser);
207+
}
208+
209+
@Override
210+
public boolean equals(Object o) {
211+
if (this == o) {
212+
return true;
213+
}
214+
if (o == null || getClass() != o.getClass()) {
215+
return false;
216+
}
217+
CompactMetric that = (CompactMetric) o;
218+
return Objects.equals(version, that.version)
219+
&& snapshotId == that.snapshotId
220+
&& commitTime == that.commitTime
221+
&& Objects.equals(compactDuration, that.compactDuration)
222+
&& Objects.equals(compactMaxDuration, that.compactMaxDuration)
223+
&& Objects.equals(compactMinDuration, that.compactMinDuration)
224+
&& Objects.equals(buckets, that.buckets)
225+
&& Objects.equals(compactType, that.compactType)
226+
&& Objects.equals(identifier, that.identifier)
227+
&& Objects.equals(commitUser, that.commitUser);
228+
}
229+
230+
public static CompactMetric fromJson(String json) {
231+
return JsonSerdeUtil.fromJson(json, CompactMetric.class);
232+
}
233+
}

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.paimon.tag.TagPreview;
6363
import org.apache.paimon.types.RowType;
6464
import org.apache.paimon.utils.ChangelogManager;
65+
import org.apache.paimon.utils.CompactMetricsManager;
6566
import org.apache.paimon.utils.FileStorePathFactory;
6667
import org.apache.paimon.utils.IndexFilePathFactories;
6768
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -298,6 +299,7 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
298299
options.partitionDefaultName(),
299300
pathFactory(),
300301
snapshotManager,
302+
compactMetricsManager(),
301303
manifestFileFactory(),
302304
manifestListFactory(),
303305
indexManifestFileFactory(),
@@ -513,4 +515,9 @@ public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
513515
snapshotManager(),
514516
newIndexFileHandler());
515517
}
518+
519+
@Override
520+
public CompactMetricsManager compactMetricsManager() {
521+
return new CompactMetricsManager(fileIO, options.path(), options.branch());
522+
}
516523
}

paimon-core/src/main/java/org/apache/paimon/FileStore.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.paimon.tag.TagAutoManager;
4242
import org.apache.paimon.types.RowType;
4343
import org.apache.paimon.utils.ChangelogManager;
44+
import org.apache.paimon.utils.CompactMetricsManager;
4445
import org.apache.paimon.utils.FileStorePathFactory;
4546
import org.apache.paimon.utils.InternalRowPartitionComputer;
4647
import org.apache.paimon.utils.SegmentsCache;
@@ -127,4 +128,6 @@ PartitionExpire newPartitionExpire(
127128
void setSnapshotCache(Cache<Path, Snapshot> cache);
128129

129130
GlobalIndexScanBuilder newGlobalIndexScanBuilder();
131+
132+
CompactMetricsManager compactMetricsManager();
130133
}

paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
package org.apache.paimon.append;
2020

21+
import org.apache.paimon.compact.CompactMetricMeta;
2122
import org.apache.paimon.data.BinaryRow;
2223
import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer;
2324
import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer;
2425
import org.apache.paimon.index.IndexFileMeta;
2526
import org.apache.paimon.io.CompactIncrement;
27+
import org.apache.paimon.io.CompactMetricIncrement;
2628
import org.apache.paimon.io.DataFileMeta;
2729
import org.apache.paimon.io.DataIncrement;
2830
import org.apache.paimon.manifest.FileKind;
@@ -68,6 +70,7 @@ public List<DataFileMeta> compactAfter() {
6870

6971
public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite write)
7072
throws Exception {
73+
long startMillis = System.currentTimeMillis();
7174
boolean dvEnabled = table.coreOptions().deletionVectorsEnabled();
7275
Preconditions.checkArgument(
7376
dvEnabled || compactBefore.size() > 1,
@@ -111,14 +114,18 @@ public CommitMessage doCompact(FileStoreTable table, BaseAppendFileStoreWrite wr
111114
Collections.emptyList(),
112115
newIndexFiles,
113116
deletedIndexFiles);
117+
CompactMetricIncrement compactMetricIncrement =
118+
new CompactMetricIncrement(
119+
new CompactMetricMeta("full", System.currentTimeMillis() - startMillis));
114120
return new CommitMessageImpl(
115121
partition,
116122
// bucket 0 is bucket for unaware-bucket table
117123
// for compatibility with the old design
118124
0,
119125
table.coreOptions().bucket(),
120126
DataIncrement.emptyIncrement(),
121-
compactIncrement);
127+
compactIncrement,
128+
compactMetricIncrement);
122129
}
123130

124131
public int hashCode() {

0 commit comments

Comments
 (0)