Skip to content

Commit e3e0590

Browse files
[Feature](iceberg) Add manifest-level cache for Iceberg tables to reduce I/O and parsing overhead (#59056)
### What problem does this PR solve? ## Motivation During Iceberg query planning, FE needs to read and parse the metadata chain: ManifestList → Manifest → DataFile/DeleteFile. When frequently querying hot partitions or executing small batch queries, the same Manifest files are repeatedly read and parsed, causing significant I/O and CPU overhead. ## Solution This PR introduces a manifest-level cache (`IcebergManifestCache`) in FE to cache the parsed DataFile/DeleteFile lists per manifest file. The cache is implemented using Caffeine with weight-based LRU eviction and TTL support. ### Key Components - **IcebergManifestCache**: Core cache implementation using Caffeine - Weight-based LRU eviction controlled by `iceberg.manifest.cache.capacity-mb` - TTL expiration via `iceberg.manifest.cache.ttl-second` - Single-flight loading to prevent duplicate parsing of the same manifest - **ManifestCacheKey**: Cache key consisting of: - Manifest file path - **ManifestCacheValue**: Cached payload containing: - List of `DataFile` or `DeleteFile` - Estimated memory weight for eviction - **IcebergManifestCacheLoader**: Helper class to load and populate the cache using `ManifestFiles.read()` ### Cache Invalidation Strategy - Key changes automatically invalidate stale entries (length/lastModified/sequenceNumber changes) - TTL prevents stale data when underlying storage doesn't support precise mtime/etag - Different snapshots use different manifest paths/keys, ensuring snapshot-level isolation ### Iceberg Catalog Properties | Config | Default | Description | |--------|---------|-------------| | `iceberg.manifest.cache.enable` | `true` | Enable/disable manifest cache | | `iceberg.manifest.cache.capacity-mb` | `1024` | Maximum cache capacity in MB | | `iceberg.manifest.cache.ttl-second` | `48 * 60 * 60` | Cache entry expiration after access | ### Integration Point The cache is integrated in `IcebergScanNode.planFileScanTaskWithManifestCache()`, which: 1. Loads delete manifests via cache and builds `DeleteFileIndex` 2. Loads data manifests via cache and creates `FileScanTask` for each data file 3. Falls back to original scan if cache loading fails
1 parent d1e9f7e commit e3e0590

File tree

14 files changed

+1893
-8
lines changed

14 files changed

+1893
-8
lines changed

fe/check/checkstyle/suppressions.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ under the License.
6969
<!-- ignore hudi disk map copied from hudi/common/util/collection/DiskMap.java -->
7070
<suppress files="org[\\/]apache[\\/]hudi[\\/]common[\\/]util[\\/]collection[\\/]DiskMap\.java" checks="[a-zA-Z0-9]*"/>
7171

72+
<!-- ignore iceberg delete file index copied from iceberg/DeleteFileIndex.java -->
73+
<suppress files="org[\\/]apache[\\/]iceberg[\\/]DeleteFileIndex\.java" checks="[a-zA-Z0-9]*"/>
74+
7275
<!-- ignore gensrc/thrift/ExternalTableSchema.thrift -->
7376
<suppress files=".*thrift/schema/external/.*" checks=".*"/>
7477
</suppressions>

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
5151
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
5252
public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND = "iceberg.table.meta.cache.ttl-second";
5353
public static final String ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND = "iceberg.snapshot.meta.cache.ttl-second";
54+
public static final String ICEBERG_MANIFEST_CACHE_ENABLE = "iceberg.manifest.cache.enable";
55+
public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB = "iceberg.manifest.cache.capacity-mb";
56+
public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND = "iceberg.manifest.cache.ttl-second";
57+
public static final boolean DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE = true;
58+
public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 1024;
59+
public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND = 48 * 60 * 60;
5460
protected String icebergCatalogType;
5561
protected Catalog catalog;
5662

@@ -95,6 +101,29 @@ public void checkProperties() throws DdlException {
95101
"The parameter " + ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND + " is wrong, value is "
96102
+ partitionCacheTtlSecond);
97103
}
104+
105+
String manifestCacheEnable = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
106+
if (Objects.nonNull(manifestCacheEnable)
107+
&& !(manifestCacheEnable.equalsIgnoreCase("true") || manifestCacheEnable.equalsIgnoreCase("false"))) {
108+
throw new DdlException(
109+
"The parameter " + ICEBERG_MANIFEST_CACHE_ENABLE + " is wrong, value is "
110+
+ manifestCacheEnable);
111+
}
112+
113+
String manifestCacheCapacity = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
114+
if (Objects.nonNull(manifestCacheCapacity) && NumberUtils.toLong(manifestCacheCapacity, -1) <= 0) {
115+
throw new DdlException(
116+
"The parameter " + ICEBERG_MANIFEST_CACHE_CAPACITY_MB + " is wrong, value is "
117+
+ manifestCacheCapacity);
118+
}
119+
120+
String manifestCacheTtlSecond = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
121+
if (Objects.nonNull(manifestCacheTtlSecond)
122+
&& NumberUtils.toLong(manifestCacheTtlSecond, CACHE_NO_TTL) < CACHE_TTL_DISABLE_CACHE) {
123+
throw new DdlException(
124+
"The parameter " + ICEBERG_MANIFEST_CACHE_TTL_SECOND + " is wrong, value is "
125+
+ manifestCacheTtlSecond);
126+
}
98127
catalogProperty.checkMetaStoreAndStorageProperties(AbstractIcebergProperties.class);
99128
}
100129

@@ -106,6 +135,13 @@ public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
106135
if (Objects.nonNull(tableMetaCacheTtl) || Objects.nonNull(snapshotMetaCacheTtl)) {
107136
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
108137
}
138+
String manifestCacheEnable = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
139+
String manifestCacheCapacity = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
140+
String manifestCacheTtl = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
141+
if (Objects.nonNull(manifestCacheEnable) || Objects.nonNull(manifestCacheCapacity)
142+
|| Objects.nonNull(manifestCacheTtl)) {
143+
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
144+
}
109145
}
110146

111147
@Override

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.doris.datasource.ExternalTable;
2929
import org.apache.doris.datasource.NameMapping;
3030
import org.apache.doris.datasource.hive.HMSExternalCatalog;
31+
import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
3132
import org.apache.doris.mtmv.MTMVRelatedTableIf;
3233

3334
import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -58,6 +59,7 @@ public class IcebergMetadataCache {
5859
private LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
5960
private LoadingCache<IcebergMetadataCacheKey, IcebergSnapshotCacheValue> snapshotCache;
6061
private LoadingCache<IcebergMetadataCacheKey, View> viewCache;
62+
private IcebergManifestCache manifestCache;
6163

6264
public IcebergMetadataCache(IcebergExternalCatalog catalog, ExecutorService executor) {
6365
this.executor = executor;
@@ -101,6 +103,15 @@ public void init() {
101103
null);
102104
this.snapshotCache = snapshotCacheFactory.buildCache(this::loadSnapshot, executor);
103105
this.viewCache = tableCacheFactory.buildCache(this::loadView, executor);
106+
107+
long manifestCacheCapacityMb = NumberUtils.toLong(
108+
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY_MB),
109+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB);
110+
manifestCacheCapacityMb = Math.max(manifestCacheCapacityMb, 0L);
111+
long manifestCacheTtlSec = NumberUtils.toLong(
112+
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND),
113+
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND);
114+
this.manifestCache = new IcebergManifestCache(manifestCacheCapacityMb, manifestCacheTtlSec);
104115
}
105116

106117
public Table getIcebergTable(ExternalTable dorisTable) {
@@ -117,6 +128,10 @@ public IcebergSnapshotCacheValue getSnapshotCache(ExternalTable dorisTable) {
117128
return snapshotCache.get(key);
118129
}
119130

131+
public IcebergManifestCache getManifestCache() {
132+
return manifestCache;
133+
}
134+
120135
@NotNull
121136
private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
122137
Table icebergTable = getIcebergTable(key);
@@ -200,6 +215,7 @@ public void invalidateCatalogCache(long catalogId) {
200215
viewCache.asMap().entrySet().stream()
201216
.filter(entry -> entry.getKey().nameMapping.getCtlId() == catalogId)
202217
.forEach(entry -> viewCache.invalidate(entry.getKey()));
218+
manifestCache.invalidateAll();
203219
}
204220

205221
public void invalidateTableCache(ExternalTable dorisTable) {

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.doris.datasource.ExternalSchemaCache;
5757
import org.apache.doris.datasource.ExternalTable;
5858
import org.apache.doris.datasource.SchemaCacheValue;
59+
import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
5960
import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo;
6061
import org.apache.doris.datasource.mvcc.MvccSnapshot;
6162
import org.apache.doris.datasource.mvcc.MvccUtil;
@@ -1553,4 +1554,19 @@ public static String showCreateView(IcebergExternalTable icebergExternalTable) {
15531554
icebergExternalTable.getViewText();
15541555
}
15551556

1557+
public static IcebergManifestCache getManifestCache(ExternalCatalog catalog) {
1558+
return Env.getCurrentEnv()
1559+
.getExtMetaCacheMgr()
1560+
.getIcebergMetadataCache((IcebergExternalCatalog) catalog)
1561+
.getManifestCache();
1562+
}
1563+
1564+
public static boolean isManifestCacheEnabled(ExternalCatalog catalog) {
1565+
String enabled = catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE);
1566+
if (enabled == null) {
1567+
return IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE;
1568+
}
1569+
return Boolean.parseBoolean(enabled);
1570+
}
1571+
15561572
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.iceberg.cache;
19+
20+
import org.apache.iceberg.ContentFile;
21+
import org.apache.iceberg.DeleteFile;
22+
import org.apache.iceberg.StructLike;
23+
24+
import java.nio.ByteBuffer;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
/**
29+
* Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects.
30+
*/
31+
public final class ContentFileEstimator {
32+
private static final long LIST_BASE_WEIGHT = 48L;
33+
private static final long OBJECT_REFERENCE_WEIGHT = 8L;
34+
private static final long CONTENT_FILE_BASE_WEIGHT = 256L;
35+
private static final long STRING_BASE_WEIGHT = 40L;
36+
private static final long CHAR_BYTES = 2L;
37+
private static final long BYTE_BUFFER_BASE_WEIGHT = 16L;
38+
private static final long MAP_BASE_WEIGHT = 48L;
39+
private static final long MAP_ENTRY_OVERHEAD = 24L;
40+
private static final long LONG_OBJECT_WEIGHT = 24L;
41+
private static final long INT_OBJECT_WEIGHT = 16L;
42+
private static final long PARTITION_BASE_WEIGHT = 48L;
43+
private static final long PARTITION_VALUE_BASE_WEIGHT = 8L;
44+
45+
private ContentFileEstimator() {
46+
}
47+
48+
public static long estimate(List<? extends ContentFile<?>> files) {
49+
return listReferenceWeight(files) + estimateContentFilesWeight(files);
50+
}
51+
52+
private static long listReferenceWeight(List<?> files) {
53+
if (files == null || files.isEmpty()) {
54+
return 0L;
55+
}
56+
return LIST_BASE_WEIGHT + (long) files.size() * OBJECT_REFERENCE_WEIGHT;
57+
}
58+
59+
private static long estimateContentFilesWeight(List<? extends ContentFile<?>> files) {
60+
long total = 0L;
61+
if (files == null) {
62+
return 0L;
63+
}
64+
for (ContentFile<?> file : files) {
65+
total += estimateContentFileWeight(file);
66+
}
67+
return total;
68+
}
69+
70+
private static long estimateContentFileWeight(ContentFile<?> file) {
71+
if (file == null) {
72+
return 0L;
73+
}
74+
75+
long weight = CONTENT_FILE_BASE_WEIGHT;
76+
weight += charSequenceWeight(file.path());
77+
weight += stringWeight(file.manifestLocation());
78+
weight += byteBufferWeight(file.keyMetadata());
79+
weight += partitionWeight(file.partition());
80+
81+
weight += numericMapWeight(file.columnSizes());
82+
weight += numericMapWeight(file.valueCounts());
83+
weight += numericMapWeight(file.nullValueCounts());
84+
weight += numericMapWeight(file.nanValueCounts());
85+
weight += byteBufferMapWeight(file.lowerBounds());
86+
weight += byteBufferMapWeight(file.upperBounds());
87+
88+
weight += listWeight(file.splitOffsets(), LONG_OBJECT_WEIGHT);
89+
weight += listWeight(file.equalityFieldIds(), INT_OBJECT_WEIGHT);
90+
91+
weight += optionalLongWeight(file.pos());
92+
weight += optionalLongWeight(file.dataSequenceNumber());
93+
weight += optionalLongWeight(file.fileSequenceNumber());
94+
weight += optionalLongWeight(file.firstRowId());
95+
weight += optionalIntWeight(file.sortOrderId());
96+
97+
if (file instanceof DeleteFile) {
98+
DeleteFile deleteFile = (DeleteFile) file;
99+
weight += stringWeight(deleteFile.referencedDataFile());
100+
weight += optionalLongWeight(deleteFile.contentOffset());
101+
weight += optionalLongWeight(deleteFile.contentSizeInBytes());
102+
}
103+
104+
return weight;
105+
}
106+
107+
private static long listWeight(List<? extends Number> list, long elementWeight) {
108+
if (list == null || list.isEmpty()) {
109+
return 0L;
110+
}
111+
return LIST_BASE_WEIGHT + (long) list.size() * (OBJECT_REFERENCE_WEIGHT + elementWeight);
112+
}
113+
114+
private static long numericMapWeight(Map<Integer, Long> map) {
115+
if (map == null || map.isEmpty()) {
116+
return 0L;
117+
}
118+
return MAP_BASE_WEIGHT + (long) map.size() * (MAP_ENTRY_OVERHEAD + LONG_OBJECT_WEIGHT);
119+
}
120+
121+
private static long byteBufferMapWeight(Map<Integer, ByteBuffer> map) {
122+
if (map == null || map.isEmpty()) {
123+
return 0L;
124+
}
125+
long weight = MAP_BASE_WEIGHT + (long) map.size() * MAP_ENTRY_OVERHEAD;
126+
for (ByteBuffer buffer : map.values()) {
127+
weight += byteBufferWeight(buffer);
128+
}
129+
return weight;
130+
}
131+
132+
private static long partitionWeight(StructLike partition) {
133+
if (partition == null) {
134+
return 0L;
135+
}
136+
long weight = PARTITION_BASE_WEIGHT + (long) partition.size() * PARTITION_VALUE_BASE_WEIGHT;
137+
for (int i = 0; i < partition.size(); i++) {
138+
Object value = partition.get(i, Object.class);
139+
weight += estimateValueWeight(value);
140+
}
141+
return weight;
142+
}
143+
144+
private static long estimateValueWeight(Object value) {
145+
if (value == null) {
146+
return 0L;
147+
}
148+
if (value instanceof CharSequence) {
149+
return charSequenceWeight((CharSequence) value);
150+
} else if (value instanceof byte[]) {
151+
return BYTE_BUFFER_BASE_WEIGHT + ((byte[]) value).length;
152+
} else if (value instanceof ByteBuffer) {
153+
return byteBufferWeight((ByteBuffer) value);
154+
} else if (value instanceof Long || value instanceof Double) {
155+
return LONG_OBJECT_WEIGHT;
156+
} else if (value instanceof Integer || value instanceof Float) {
157+
return INT_OBJECT_WEIGHT;
158+
} else if (value instanceof Short || value instanceof Character) {
159+
return 4L;
160+
} else if (value instanceof Boolean) {
161+
return 1L;
162+
}
163+
return OBJECT_REFERENCE_WEIGHT;
164+
}
165+
166+
private static long charSequenceWeight(CharSequence value) {
167+
if (value == null) {
168+
return 0L;
169+
}
170+
return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
171+
}
172+
173+
private static long stringWeight(String value) {
174+
if (value == null) {
175+
return 0L;
176+
}
177+
return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
178+
}
179+
180+
private static long byteBufferWeight(ByteBuffer buffer) {
181+
if (buffer == null) {
182+
return 0L;
183+
}
184+
return BYTE_BUFFER_BASE_WEIGHT + buffer.remaining();
185+
}
186+
187+
private static long optionalLongWeight(Long value) {
188+
return value == null ? 0L : LONG_OBJECT_WEIGHT;
189+
}
190+
191+
private static long optionalIntWeight(Integer value) {
192+
return value == null ? 0L : INT_OBJECT_WEIGHT;
193+
}
194+
}

0 commit comments

Comments
 (0)