Skip to content

Commit 58c267e

Browse files
suxiaogang223morningman
authored andcommitted
Manifest cache for tpch1000 (#59178)
1 parent 20dcce0 commit 58c267e

File tree

14 files changed

+1681
-1
lines changed

14 files changed

+1681
-1
lines changed

be/src/clucene

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit bb22247973e55dcac9a3eaafedc57cc6c36d2fc3

fe/check/checkstyle/suppressions.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ under the License.
6969
<!-- ignore hudi disk map copied from hudi/common/util/collection/DiskMap.java -->
7070
<suppress files="org[\\/]apache[\\/]hudi[\\/]common[\\/]util[\\/]collection[\\/]DiskMap\.java" checks="[a-zA-Z0-9]*"/>
7171

72+
<!-- ignore iceberg delete file index copied from iceberg/DeleteFileIndex.java -->
73+
<suppress files="org[\\/]apache[\\/]iceberg[\\/]DeleteFileIndex\.java" checks="[a-zA-Z0-9]*"/>
74+
7275
<!-- ignore gensrc/thrift/ExternalTableSchema.thrift -->
7376
<suppress files=".*thrift/schema/external/.*" checks=".*"/>
7477
</suppressions>

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2298,6 +2298,18 @@ public class Config extends ConfigBase {
22982298
})
22992299
public static long external_cache_refresh_time_minutes = 10; // 10 mins
23002300

2301+
@ConfField(description = {"是否启用 Iceberg Manifest DataFile/DeleteFile 缓存。",
2302+
"Whether to enable Iceberg manifest DataFile/DeleteFile cache."})
2303+
public static boolean iceberg_manifest_cache_enable = true;
2304+
2305+
@ConfField(description = {"Iceberg Manifest 缓存的容量上限,单位 MB。",
2306+
"Iceberg manifest cache capacity in MB."})
2307+
public static long iceberg_manifest_cache_capacity_mb = 1024;
2308+
2309+
@ConfField(description = {"Iceberg Manifest 缓存的访问过期时间(秒),0 或负数表示不过期。",
2310+
"Iceberg manifest cache expire after access in seconds. 0 or negative disables expiration."})
2311+
public static long iceberg_manifest_cache_ttl_sec = 48 * 60 * 60;
2312+
23012313
/**
23022314
* Github workflow test type, for setting some session variables
23032315
* only for certain test type. E.g. only settting batch_size to small

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor;
3030
import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
3131
import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor;
32+
import org.apache.doris.datasource.iceberg.IcebergManifestCacheMgr;
3233
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
3334
import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
3435
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
@@ -97,6 +98,7 @@ public class ExternalMetaCacheMgr {
9798
private FileSystemCache fsCache;
9899
// all external table row count cache.
99100
private ExternalRowCountCache rowCountCache;
101+
private final IcebergManifestCacheMgr icebergManifestCacheMgr;
100102
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
101103
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
102104
private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
@@ -128,6 +130,7 @@ public ExternalMetaCacheMgr(boolean isCheckpointCatalog) {
128130
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
129131

130132
hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
133+
icebergManifestCacheMgr = new IcebergManifestCacheMgr();
131134
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor);
132135
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
133136
paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor);
@@ -199,6 +202,10 @@ public HudiMetadataCacheMgr getHudiMetadataCacheMgr() {
199202
return hudiMetadataCacheMgr;
200203
}
201204

205+
public IcebergManifestCacheMgr getIcebergManifestCacheMgr() {
206+
return icebergManifestCacheMgr;
207+
}
208+
202209
public IcebergMetadataCache getIcebergMetadataCache() {
203210
return icebergMetadataCacheMgr.getIcebergMetadataCache();
204211
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.iceberg;
19+
20+
import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
21+
22+
/**
23+
* Wrapper manager for Iceberg manifest cache.
24+
*/
25+
public class IcebergManifestCacheMgr {
26+
private final IcebergManifestCache manifestCache;
27+
28+
public IcebergManifestCacheMgr() {
29+
this.manifestCache = new IcebergManifestCache();
30+
}
31+
32+
public IcebergManifestCache getManifestCache() {
33+
return manifestCache;
34+
}
35+
}

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.doris.datasource.ExternalSchemaCache;
5757
import org.apache.doris.datasource.ExternalTable;
5858
import org.apache.doris.datasource.SchemaCacheValue;
59+
import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
5960
import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo;
6061
import org.apache.doris.datasource.mvcc.MvccSnapshot;
6162
import org.apache.doris.datasource.mvcc.MvccUtil;
@@ -1452,4 +1453,11 @@ public static String showCreateView(IcebergExternalTable icebergExternalTable) {
14521453
icebergExternalTable.getViewText();
14531454
}
14541455

1456+
public static IcebergManifestCache getManifestCache() {
1457+
return Env.getCurrentEnv()
1458+
.getExtMetaCacheMgr()
1459+
.getIcebergManifestCacheMgr()
1460+
.getManifestCache();
1461+
}
1462+
14551463
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.iceberg.cache;
19+
20+
import org.apache.iceberg.ContentFile;
21+
import org.apache.iceberg.DeleteFile;
22+
import org.apache.iceberg.StructLike;
23+
24+
import java.nio.ByteBuffer;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
/**
29+
* Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects.
30+
*/
31+
public final class ContentFileEstimater {
32+
private static final long LIST_BASE_WEIGHT = 48L;
33+
private static final long OBJECT_REFERENCE_WEIGHT = 8L;
34+
private static final long CONTENT_FILE_BASE_WEIGHT = 256L;
35+
private static final long STRING_BASE_WEIGHT = 40L;
36+
private static final long CHAR_BYTES = 2L;
37+
private static final long BYTE_BUFFER_BASE_WEIGHT = 16L;
38+
private static final long MAP_BASE_WEIGHT = 48L;
39+
private static final long MAP_ENTRY_OVERHEAD = 24L;
40+
private static final long LONG_OBJECT_WEIGHT = 24L;
41+
private static final long INT_OBJECT_WEIGHT = 16L;
42+
private static final long PARTITION_BASE_WEIGHT = 48L;
43+
private static final long PARTITION_VALUE_BASE_WEIGHT = 8L;
44+
45+
private ContentFileEstimater() {
46+
}
47+
48+
public static long estimate(List<? extends ContentFile<?>> files) {
49+
return listReferenceWeight(files) + estimateContentFilesWeight(files);
50+
}
51+
52+
private static long listReferenceWeight(List<?> files) {
53+
if (files == null || files.isEmpty()) {
54+
return 0L;
55+
}
56+
return LIST_BASE_WEIGHT + (long) files.size() * OBJECT_REFERENCE_WEIGHT;
57+
}
58+
59+
private static long estimateContentFilesWeight(List<? extends ContentFile<?>> files) {
60+
long total = 0L;
61+
if (files == null) {
62+
return 0L;
63+
}
64+
for (ContentFile<?> file : files) {
65+
total += estimateContentFileWeight(file);
66+
}
67+
return total;
68+
}
69+
70+
private static long estimateContentFileWeight(ContentFile<?> file) {
71+
if (file == null) {
72+
return 0L;
73+
}
74+
75+
long weight = CONTENT_FILE_BASE_WEIGHT;
76+
weight += charSequenceWeight(file.path());
77+
weight += stringWeight(file.manifestLocation());
78+
weight += byteBufferWeight(file.keyMetadata());
79+
weight += partitionWeight(file.partition());
80+
81+
weight += numericMapWeight(file.columnSizes());
82+
weight += numericMapWeight(file.valueCounts());
83+
weight += numericMapWeight(file.nullValueCounts());
84+
weight += numericMapWeight(file.nanValueCounts());
85+
weight += byteBufferMapWeight(file.lowerBounds());
86+
weight += byteBufferMapWeight(file.upperBounds());
87+
88+
weight += listWeight(file.splitOffsets(), LONG_OBJECT_WEIGHT);
89+
weight += listWeight(file.equalityFieldIds(), INT_OBJECT_WEIGHT);
90+
91+
weight += optionalLongWeight(file.pos());
92+
weight += optionalLongWeight(file.dataSequenceNumber());
93+
weight += optionalLongWeight(file.fileSequenceNumber());
94+
weight += optionalLongWeight(file.firstRowId());
95+
weight += optionalIntWeight(file.sortOrderId());
96+
97+
if (file instanceof DeleteFile) {
98+
DeleteFile deleteFile = (DeleteFile) file;
99+
weight += stringWeight(deleteFile.referencedDataFile());
100+
weight += optionalLongWeight(deleteFile.contentOffset());
101+
weight += optionalLongWeight(deleteFile.contentSizeInBytes());
102+
}
103+
104+
return weight;
105+
}
106+
107+
private static long listWeight(List<? extends Number> list, long elementWeight) {
108+
if (list == null || list.isEmpty()) {
109+
return 0L;
110+
}
111+
return LIST_BASE_WEIGHT + (long) list.size() * (OBJECT_REFERENCE_WEIGHT + elementWeight);
112+
}
113+
114+
private static long numericMapWeight(Map<Integer, Long> map) {
115+
if (map == null || map.isEmpty()) {
116+
return 0L;
117+
}
118+
return MAP_BASE_WEIGHT + (long) map.size() * (MAP_ENTRY_OVERHEAD + LONG_OBJECT_WEIGHT);
119+
}
120+
121+
private static long byteBufferMapWeight(Map<Integer, ByteBuffer> map) {
122+
if (map == null || map.isEmpty()) {
123+
return 0L;
124+
}
125+
long weight = MAP_BASE_WEIGHT + (long) map.size() * MAP_ENTRY_OVERHEAD;
126+
for (ByteBuffer buffer : map.values()) {
127+
weight += byteBufferWeight(buffer);
128+
}
129+
return weight;
130+
}
131+
132+
private static long partitionWeight(StructLike partition) {
133+
if (partition == null) {
134+
return 0L;
135+
}
136+
long weight = PARTITION_BASE_WEIGHT + (long) partition.size() * PARTITION_VALUE_BASE_WEIGHT;
137+
for (int i = 0; i < partition.size(); i++) {
138+
Object value = partition.get(i, Object.class);
139+
weight += estimateValueWeight(value);
140+
}
141+
return weight;
142+
}
143+
144+
private static long estimateValueWeight(Object value) {
145+
if (value == null) {
146+
return 0L;
147+
}
148+
if (value instanceof CharSequence) {
149+
return charSequenceWeight((CharSequence) value);
150+
} else if (value instanceof byte[]) {
151+
return BYTE_BUFFER_BASE_WEIGHT + ((byte[]) value).length;
152+
} else if (value instanceof ByteBuffer) {
153+
return byteBufferWeight((ByteBuffer) value);
154+
} else if (value instanceof Long || value instanceof Double) {
155+
return LONG_OBJECT_WEIGHT;
156+
} else if (value instanceof Integer || value instanceof Float) {
157+
return INT_OBJECT_WEIGHT;
158+
} else if (value instanceof Short || value instanceof Character) {
159+
return 4L;
160+
} else if (value instanceof Boolean) {
161+
return 1L;
162+
}
163+
return OBJECT_REFERENCE_WEIGHT;
164+
}
165+
166+
private static long charSequenceWeight(CharSequence value) {
167+
if (value == null) {
168+
return 0L;
169+
}
170+
return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
171+
}
172+
173+
private static long stringWeight(String value) {
174+
if (value == null) {
175+
return 0L;
176+
}
177+
return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
178+
}
179+
180+
private static long byteBufferWeight(ByteBuffer buffer) {
181+
if (buffer == null) {
182+
return 0L;
183+
}
184+
return BYTE_BUFFER_BASE_WEIGHT + buffer.remaining();
185+
}
186+
187+
private static long optionalLongWeight(Long value) {
188+
return value == null ? 0L : LONG_OBJECT_WEIGHT;
189+
}
190+
191+
private static long optionalIntWeight(Integer value) {
192+
return value == null ? 0L : INT_OBJECT_WEIGHT;
193+
}
194+
}

0 commit comments

Comments
 (0)