Skip to content

Commit 9a88328

Browse files
authored
[enhance](paimon)Doris Paimon Scan Metrics Integration (#59281)
### What problem does this PR solve? #### Overview This change pipes Apache Paimon scan metrics directly into Doris query profiles so operators can inspect per-scan statistics (manifests, file counts, scan durations, cache hit/miss) from the FE profile UI. The integration consists of three pieces: - Summary Profile Slot - SummaryProfile now includes a Paimon Scan Metrics entry in the execution summary list so the FE profile table reserves space for the Paimon telemetry. - Metric Registry + Reporter - a new PaimonMetricRegistry collects Paimon MetricGroups, and PaimonScanMetricsReporter formats ScanMetrics values and appends them to the summary entry whenever a Paimon scan runs. - Scan Integration - PaimonScanNode attaches a registry to the TableScan (when InnerTableScan supports withMetricsRegistry), plans splits, and reports metrics after planning. All metrics remain scoped to Paimon scans; other table formats are untouched and still populate their own runtime-profile sections as before. #### Implementation Details 1. SummaryProfile `fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java` - Added the constant `PAIMON_SCAN_METRICS = "Paimon Scan Metrics"`. - Inserted the key into `EXECUTION_SUMMARY_KEYS` (with indentation level 3) so the runtime profile tree displays it under the scheduling block when present. - No default text is shown unless metrics are actually reported; the entry stays N/A otherwise. 2. PaimonMetricRegistry `fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonMetricRegistry.java` - Extends `MetricRegistry` and caches `MetricGroup` instances keyed by `name:table`. - Uses the `table` tag to disambiguate groups (`MetricRegistry.KEY_TABLE` is private in Paimon). - Exposes `getGroup/removeGroup/clear` helpers to manage registry lifecycle. 3. PaimonScanMetricsReporter `fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonScanMetricsReporter.java` - Reads `ScanMetrics.GROUP_NAME` from the registry and appends a per-scan entry under "Paimon Scan Metrics" in the SummaryProfile. - Metrics covered: `last_scan_duration`, `scan_duration` (count/mean/p95/max), `last_scanned_manifests`, `last_scan_skipped_table_files`, `last_scan_resulted_table_files`, `manifest_hit_cache`, `manifest_missed_cache`. - If the direct lookup fails, falls back to the single ScanMetrics group in the registry (and skips reporting if multiple groups exist). - Cleans up the registry group after reporting. 4. PaimonScanNode `fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java` - Creates a `TableScan` via `newReadBuilder().withFilter().withProjection().newScan()`. - When scan is an `InnerTableScan`, attaches `PaimonMetricRegistry` with `withMetricsRegistry`. - Plans splits, reports metrics, and clears the registry to avoid leaks. 5. Regression test `regression-test/suites/external_table_p0/paimon/test_paimon_scan_metrics_profile.groovy` - Adds a Paimon scan metrics profile case that asserts "Paimon Scan Metrics" appears in the query profile. #### Example Profile Output After running a query against `test_paimon_scan_metrics.db1.all_table`, the FE profile shows: ``` Paimon Scan Metrics: Table Scan (test_paimon_scan_metrics.db1.all_table): - last_scan_duration: 7ms - scan_duration: count=1, mean=7ms, p95=7ms, max=7ms - last_scanned_manifests: 1 - last_scan_skipped_table_files: 0 - last_scan_resulted_table_files: 3 - manifest_hit_cache: 0 - manifest_missed_cache: 1 ```
1 parent b1303c3 commit 9a88328

File tree

4 files changed

+243
-3
lines changed

4 files changed

+243
-3
lines changed

fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public class SummaryProfile {
133133
public static final String LATENCY_FROM_BE_TO_FE = "RPC Latency From BE To FE";
134134
public static final String SPLITS_ASSIGNMENT_WEIGHT = "Splits Assignment Weight";
135135
public static final String ICEBERG_SCAN_METRICS = "Iceberg Scan Metrics";
136+
public static final String PAIMON_SCAN_METRICS = "Paimon Scan Metrics";
136137

137138
// These info will display on FE's web ui table, every one will be displayed as
138139
// a column, so that should not
@@ -166,6 +167,7 @@ public class SummaryProfile {
166167
SINK_SET_PARTITION_VALUES_TIME,
167168
CREATE_SCAN_RANGE_TIME,
168169
ICEBERG_SCAN_METRICS,
170+
PAIMON_SCAN_METRICS,
169171
NEREIDS_DISTRIBUTE_TIME,
170172
GET_META_VERSION_TIME,
171173
GET_PARTITION_VERSION_TIME,
@@ -218,6 +220,7 @@ public class SummaryProfile {
218220
.put(SINK_SET_PARTITION_VALUES_TIME, 3)
219221
.put(CREATE_SCAN_RANGE_TIME, 2)
220222
.put(ICEBERG_SCAN_METRICS, 3)
223+
.put(PAIMON_SCAN_METRICS, 3)
221224
.put(GET_PARTITION_VERSION_TIME, 1)
222225
.put(GET_PARTITION_VERSION_COUNT, 1)
223226
.put(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, 1)
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.paimon.profile;
19+
20+
import org.apache.paimon.metrics.MetricGroup;
21+
import org.apache.paimon.metrics.MetricGroupImpl;
22+
import org.apache.paimon.metrics.MetricRegistry;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.util.Collection;
27+
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
30+
public class PaimonMetricRegistry extends MetricRegistry {
31+
private static final Logger LOG = LoggerFactory.getLogger(PaimonMetricRegistry.class);
32+
private static final String TABLE_TAG_KEY = "table";
33+
private final ConcurrentHashMap<String, MetricGroup> groups = new ConcurrentHashMap<>();
34+
35+
@Override
36+
protected MetricGroup createMetricGroup(String name, Map<String, String> tags) {
37+
MetricGroup group = new MetricGroupImpl(name, tags);
38+
String table = tags == null ? "" : tags.getOrDefault(TABLE_TAG_KEY, "");
39+
groups.put(buildKey(name, table), group);
40+
LOG.debug("Created metric group: {}:{}", name, table);
41+
return group;
42+
}
43+
44+
public MetricGroup getGroup(String name, String table) {
45+
String key = buildKey(name, table);
46+
MetricGroup group = groups.get(key);
47+
if (group == null) {
48+
LOG.warn("MetricGroup not found: {}", key);
49+
}
50+
return group;
51+
}
52+
53+
public void removeGroup(String name, String table) {
54+
groups.remove(buildKey(name, table));
55+
}
56+
57+
public Collection<MetricGroup> getAllGroups() {
58+
return groups.values();
59+
}
60+
61+
public Map<String, MetricGroup> getAllGroupsAsMap() {
62+
return new ConcurrentHashMap<>(groups);
63+
}
64+
65+
public void clear() {
66+
groups.clear();
67+
}
68+
69+
private static String buildKey(String name, String table) {
70+
return name + ":" + table;
71+
}
72+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.paimon.profile;
19+
20+
import org.apache.doris.catalog.TableIf;
21+
import org.apache.doris.common.profile.RuntimeProfile;
22+
import org.apache.doris.common.profile.SummaryProfile;
23+
import org.apache.doris.common.util.DebugUtil;
24+
import org.apache.doris.qe.ConnectContext;
25+
26+
import org.apache.paimon.metrics.Counter;
27+
import org.apache.paimon.metrics.Gauge;
28+
import org.apache.paimon.metrics.Histogram;
29+
import org.apache.paimon.metrics.HistogramStatistics;
30+
import org.apache.paimon.metrics.Metric;
31+
import org.apache.paimon.metrics.MetricGroup;
32+
import org.apache.paimon.operation.metrics.ScanMetrics;
33+
34+
import java.util.Map;
35+
import java.util.concurrent.TimeUnit;
36+
37+
public class PaimonScanMetricsReporter {
38+
private static final double P95 = 0.95d;
39+
40+
public static void report(TableIf table, String paimonTableName, PaimonMetricRegistry registry) {
41+
if (registry == null || paimonTableName == null) {
42+
return;
43+
}
44+
String resolvedTableName = paimonTableName;
45+
MetricGroup group = registry.getGroup(ScanMetrics.GROUP_NAME, paimonTableName);
46+
if (group == null) {
47+
String prefix = ScanMetrics.GROUP_NAME + ":";
48+
for (Map.Entry<String, MetricGroup> entry : registry.getAllGroupsAsMap().entrySet()) {
49+
String key = entry.getKey();
50+
if (!key.startsWith(prefix)) {
51+
continue;
52+
}
53+
if (group != null) {
54+
group = null;
55+
break;
56+
}
57+
group = entry.getValue();
58+
resolvedTableName = key.substring(prefix.length());
59+
}
60+
}
61+
if (group == null) {
62+
return;
63+
}
64+
Map<String, Metric> metrics = group.getMetrics();
65+
if (metrics == null || metrics.isEmpty()) {
66+
return;
67+
}
68+
69+
SummaryProfile summaryProfile = SummaryProfile.getSummaryProfile(ConnectContext.get());
70+
if (summaryProfile == null) {
71+
return;
72+
}
73+
RuntimeProfile executionSummary = summaryProfile.getExecutionSummary();
74+
if (executionSummary == null) {
75+
return;
76+
}
77+
78+
RuntimeProfile paimonGroup = executionSummary.getChildMap().get(SummaryProfile.PAIMON_SCAN_METRICS);
79+
if (paimonGroup == null) {
80+
paimonGroup = new RuntimeProfile(SummaryProfile.PAIMON_SCAN_METRICS);
81+
executionSummary.addChild(paimonGroup, true);
82+
}
83+
84+
String displayName = table == null ? paimonTableName : table.getNameWithFullQualifiers();
85+
RuntimeProfile scanProfile = new RuntimeProfile("Table Scan (" + displayName + ")");
86+
appendDuration(scanProfile, metrics, ScanMetrics.LAST_SCAN_DURATION, "last_scan_duration");
87+
appendHistogram(scanProfile, metrics, ScanMetrics.SCAN_DURATION, "scan_duration");
88+
appendCounter(scanProfile, metrics, ScanMetrics.LAST_SCANNED_MANIFESTS, "last_scanned_manifests");
89+
appendCounter(scanProfile, metrics, ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES,
90+
"last_scan_skipped_table_files");
91+
appendCounter(scanProfile, metrics, ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES,
92+
"last_scan_resulted_table_files");
93+
appendCounter(scanProfile, metrics, ScanMetrics.MANIFEST_HIT_CACHE, "manifest_hit_cache");
94+
appendCounter(scanProfile, metrics, ScanMetrics.MANIFEST_MISSED_CACHE, "manifest_missed_cache");
95+
paimonGroup.addChild(scanProfile, true);
96+
registry.removeGroup(ScanMetrics.GROUP_NAME, resolvedTableName);
97+
}
98+
99+
private static void appendDuration(RuntimeProfile profile, Map<String, Metric> metrics, String metricKey,
100+
String profileKey) {
101+
Long value = getLongValue(metrics.get(metricKey));
102+
if (value == null) {
103+
return;
104+
}
105+
profile.addInfoString(profileKey, formatDuration(value));
106+
}
107+
108+
private static void appendCounter(RuntimeProfile profile, Map<String, Metric> metrics, String metricKey,
109+
String profileKey) {
110+
Long value = getLongValue(metrics.get(metricKey));
111+
if (value == null) {
112+
return;
113+
}
114+
profile.addInfoString(profileKey, Long.toString(value));
115+
}
116+
117+
private static void appendHistogram(RuntimeProfile profile, Map<String, Metric> metrics, String metricKey,
118+
String profileKey) {
119+
Metric metric = metrics.get(metricKey);
120+
if (!(metric instanceof Histogram)) {
121+
return;
122+
}
123+
Histogram histogram = (Histogram) metric;
124+
HistogramStatistics stats = histogram.getStatistics();
125+
if (stats == null) {
126+
return;
127+
}
128+
String formatted = "count=" + histogram.getCount()
129+
+ ", mean=" + formatDuration(stats.getMean())
130+
+ ", p95=" + formatDuration(stats.getQuantile(P95))
131+
+ ", max=" + formatDuration(stats.getMax());
132+
profile.addInfoString(profileKey, formatted);
133+
}
134+
135+
private static Long getLongValue(Metric metric) {
136+
if (metric instanceof Counter) {
137+
return ((Counter) metric).getCount();
138+
}
139+
if (metric instanceof Gauge) {
140+
Object value = ((Gauge) metric).getValue();
141+
if (value instanceof Number) {
142+
return ((Number) value).longValue();
143+
}
144+
}
145+
return null;
146+
}
147+
148+
private static String formatDuration(double nanos) {
149+
long ms = TimeUnit.NANOSECONDS.toMillis(Math.round(nanos));
150+
return DebugUtil.getPrettyStringMs(ms);
151+
}
152+
}

fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
3535
import org.apache.doris.datasource.paimon.PaimonExternalTable;
3636
import org.apache.doris.datasource.paimon.PaimonUtil;
37+
import org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry;
38+
import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter;
3739
import org.apache.doris.datasource.property.storage.StorageProperties;
3840
import org.apache.doris.planner.PlanNodeId;
3941
import org.apache.doris.qe.SessionVariable;
@@ -55,8 +57,10 @@
5557
import org.apache.paimon.table.Table;
5658
import org.apache.paimon.table.source.DataSplit;
5759
import org.apache.paimon.table.source.DeletionFile;
60+
import org.apache.paimon.table.source.InnerTableScan;
5861
import org.apache.paimon.table.source.RawFile;
5962
import org.apache.paimon.table.source.ReadBuilder;
63+
import org.apache.paimon.table.source.TableScan;
6064

6165
import java.io.IOException;
6266
import java.util.ArrayList;
@@ -413,9 +417,19 @@ public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() throws
413417
.filter(i -> i >= 0)
414418
.toArray();
415419
ReadBuilder readBuilder = paimonTable.newReadBuilder();
416-
return readBuilder.withFilter(predicates)
420+
TableScan scan = readBuilder.withFilter(predicates)
417421
.withProjection(projected)
418-
.newScan().plan().splits();
422+
.newScan();
423+
PaimonMetricRegistry registry = new PaimonMetricRegistry();
424+
if (scan instanceof InnerTableScan) {
425+
scan = ((InnerTableScan) scan).withMetricsRegistry(registry);
426+
}
427+
List<org.apache.paimon.table.source.Split> splits = scan.plan().splits();
428+
PaimonScanMetricsReporter.report(source.getTargetTable(), paimonTable.name(), registry);
429+
if (!registry.getAllGroups().isEmpty()) {
430+
registry.clear();
431+
}
432+
return splits;
419433
}
420434

421435
private String getFileFormat(String path) {
@@ -699,4 +713,3 @@ private Table getProcessedTable() throws UserException {
699713
return baseTable;
700714
}
701715
}
702-

0 commit comments

Comments
 (0)