Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fe/check/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ under the License.
<!-- ignore hudi disk map copied from hudi/common/util/collection/DiskMap.java -->
<suppress files="org[\\/]apache[\\/]hudi[\\/]common[\\/]util[\\/]collection[\\/]DiskMap\.java" checks="[a-zA-Z0-9]*"/>

<!-- ignore iceberg delete file index copied from iceberg/DeleteFileIndex.java -->
<suppress files="org[\\/]apache[\\/]iceberg[\\/]DeleteFileIndex\.java" checks="[a-zA-Z0-9]*"/>

<!-- ignore gensrc/thrift/ExternalTableSchema.thrift -->
<suppress files=".*thrift/schema/external/.*" checks=".*"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND = "iceberg.table.meta.cache.ttl-second";
public static final String ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND = "iceberg.snapshot.meta.cache.ttl-second";
public static final String ICEBERG_MANIFEST_CACHE_ENABLE = "iceberg.manifest.cache.enable";
public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB = "iceberg.manifest.cache.capacity-mb";
public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND = "iceberg.manifest.cache.ttl-second";
public static final boolean DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE = true;
public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB = 1024;
public static final long DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND = 48 * 60 * 60;
protected String icebergCatalogType;
protected Catalog catalog;

Expand Down Expand Up @@ -95,6 +101,29 @@ public void checkProperties() throws DdlException {
"The parameter " + ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND + " is wrong, value is "
+ partitionCacheTtlSecond);
}

String manifestCacheEnable = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
if (Objects.nonNull(manifestCacheEnable)
&& !(manifestCacheEnable.equalsIgnoreCase("true") || manifestCacheEnable.equalsIgnoreCase("false"))) {
throw new DdlException(
"The parameter " + ICEBERG_MANIFEST_CACHE_ENABLE + " is wrong, value is "
+ manifestCacheEnable);
}

String manifestCacheCapacity = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
if (Objects.nonNull(manifestCacheCapacity) && NumberUtils.toLong(manifestCacheCapacity, -1) <= 0) {
throw new DdlException(
"The parameter " + ICEBERG_MANIFEST_CACHE_CAPACITY_MB + " is wrong, value is "
+ manifestCacheCapacity);
}

String manifestCacheTtlSecond = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
if (Objects.nonNull(manifestCacheTtlSecond)
&& NumberUtils.toLong(manifestCacheTtlSecond, CACHE_NO_TTL) < CACHE_TTL_DISABLE_CACHE) {
throw new DdlException(
"The parameter " + ICEBERG_MANIFEST_CACHE_TTL_SECOND + " is wrong, value is "
+ manifestCacheTtlSecond);
}
catalogProperty.checkMetaStoreAndStorageProperties(AbstractIcebergProperties.class);
}

Expand All @@ -106,6 +135,13 @@ public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
if (Objects.nonNull(tableMetaCacheTtl) || Objects.nonNull(snapshotMetaCacheTtl)) {
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
}
String manifestCacheEnable = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null);
String manifestCacheCapacity = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_CAPACITY_MB, null);
String manifestCacheTtl = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_TTL_SECOND, null);
if (Objects.nonNull(manifestCacheEnable) || Objects.nonNull(manifestCacheCapacity)
|| Objects.nonNull(manifestCacheTtl)) {
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
import org.apache.doris.mtmv.MTMVRelatedTableIf;

import com.github.benmanes.caffeine.cache.LoadingCache;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class IcebergMetadataCache {
private LoadingCache<IcebergMetadataCacheKey, Table> tableCache;
private LoadingCache<IcebergMetadataCacheKey, IcebergSnapshotCacheValue> snapshotCache;
private LoadingCache<IcebergMetadataCacheKey, View> viewCache;
private IcebergManifestCache manifestCache;

public IcebergMetadataCache(IcebergExternalCatalog catalog, ExecutorService executor) {
this.executor = executor;
Expand Down Expand Up @@ -101,6 +103,15 @@ public void init() {
null);
this.snapshotCache = snapshotCacheFactory.buildCache(this::loadSnapshot, executor);
this.viewCache = tableCacheFactory.buildCache(this::loadView, executor);

long manifestCacheCapacityMb = NumberUtils.toLong(
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_CAPACITY_MB),
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_CAPACITY_MB);
manifestCacheCapacityMb = Math.max(manifestCacheCapacityMb, 0L);
long manifestCacheTtlSec = NumberUtils.toLong(
catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_TTL_SECOND),
IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_TTL_SECOND);
this.manifestCache = new IcebergManifestCache(manifestCacheCapacityMb, manifestCacheTtlSec);
}

public Table getIcebergTable(ExternalTable dorisTable) {
Expand All @@ -117,6 +128,10 @@ public IcebergSnapshotCacheValue getSnapshotCache(ExternalTable dorisTable) {
return snapshotCache.get(key);
}

public IcebergManifestCache getManifestCache() {
return manifestCache;
}

@NotNull
private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
Table icebergTable = getIcebergTable(key);
Expand Down Expand Up @@ -200,6 +215,7 @@ public void invalidateCatalogCache(long catalogId) {
viewCache.asMap().entrySet().stream()
.filter(entry -> entry.getKey().nameMapping.getCtlId() == catalogId)
.forEach(entry -> viewCache.invalidate(entry.getKey()));
manifestCache.invalidateAll();
}

public void invalidateTableCache(ExternalTable dorisTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccUtil;
Expand Down Expand Up @@ -1553,4 +1554,19 @@ public static String showCreateView(IcebergExternalTable icebergExternalTable) {
icebergExternalTable.getViewText();
}

public static IcebergManifestCache getManifestCache(ExternalCatalog catalog) {
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache((IcebergExternalCatalog) catalog)
.getManifestCache();
}

public static boolean isManifestCacheEnabled(ExternalCatalog catalog) {
String enabled = catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_MANIFEST_CACHE_ENABLE);
if (enabled == null) {
return IcebergExternalCatalog.DEFAULT_ICEBERG_MANIFEST_CACHE_ENABLE;
}
return Boolean.parseBoolean(enabled);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.cache;

import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.StructLike;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;

/**
* Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects.
*/
public final class ContentFileEstimator {
private static final long LIST_BASE_WEIGHT = 48L;
private static final long OBJECT_REFERENCE_WEIGHT = 8L;
private static final long CONTENT_FILE_BASE_WEIGHT = 256L;
private static final long STRING_BASE_WEIGHT = 40L;
private static final long CHAR_BYTES = 2L;
private static final long BYTE_BUFFER_BASE_WEIGHT = 16L;
private static final long MAP_BASE_WEIGHT = 48L;
private static final long MAP_ENTRY_OVERHEAD = 24L;
private static final long LONG_OBJECT_WEIGHT = 24L;
private static final long INT_OBJECT_WEIGHT = 16L;
private static final long PARTITION_BASE_WEIGHT = 48L;
private static final long PARTITION_VALUE_BASE_WEIGHT = 8L;

private ContentFileEstimator() {
}

public static long estimate(List<? extends ContentFile<?>> files) {
return listReferenceWeight(files) + estimateContentFilesWeight(files);
}

private static long listReferenceWeight(List<?> files) {
if (files == null || files.isEmpty()) {
return 0L;
}
return LIST_BASE_WEIGHT + (long) files.size() * OBJECT_REFERENCE_WEIGHT;
}

private static long estimateContentFilesWeight(List<? extends ContentFile<?>> files) {
long total = 0L;
if (files == null) {
return 0L;
}
for (ContentFile<?> file : files) {
total += estimateContentFileWeight(file);
}
return total;
}

private static long estimateContentFileWeight(ContentFile<?> file) {
if (file == null) {
return 0L;
}

long weight = CONTENT_FILE_BASE_WEIGHT;
weight += charSequenceWeight(file.path());
weight += stringWeight(file.manifestLocation());
weight += byteBufferWeight(file.keyMetadata());
weight += partitionWeight(file.partition());

weight += numericMapWeight(file.columnSizes());
weight += numericMapWeight(file.valueCounts());
weight += numericMapWeight(file.nullValueCounts());
weight += numericMapWeight(file.nanValueCounts());
weight += byteBufferMapWeight(file.lowerBounds());
weight += byteBufferMapWeight(file.upperBounds());

weight += listWeight(file.splitOffsets(), LONG_OBJECT_WEIGHT);
weight += listWeight(file.equalityFieldIds(), INT_OBJECT_WEIGHT);

weight += optionalLongWeight(file.pos());
weight += optionalLongWeight(file.dataSequenceNumber());
weight += optionalLongWeight(file.fileSequenceNumber());
weight += optionalLongWeight(file.firstRowId());
weight += optionalIntWeight(file.sortOrderId());

if (file instanceof DeleteFile) {
DeleteFile deleteFile = (DeleteFile) file;
weight += stringWeight(deleteFile.referencedDataFile());
weight += optionalLongWeight(deleteFile.contentOffset());
weight += optionalLongWeight(deleteFile.contentSizeInBytes());
}

return weight;
}

private static long listWeight(List<? extends Number> list, long elementWeight) {
if (list == null || list.isEmpty()) {
return 0L;
}
return LIST_BASE_WEIGHT + (long) list.size() * (OBJECT_REFERENCE_WEIGHT + elementWeight);
}

private static long numericMapWeight(Map<Integer, Long> map) {
if (map == null || map.isEmpty()) {
return 0L;
}
return MAP_BASE_WEIGHT + (long) map.size() * (MAP_ENTRY_OVERHEAD + LONG_OBJECT_WEIGHT);
}

private static long byteBufferMapWeight(Map<Integer, ByteBuffer> map) {
if (map == null || map.isEmpty()) {
return 0L;
}
long weight = MAP_BASE_WEIGHT + (long) map.size() * MAP_ENTRY_OVERHEAD;
for (ByteBuffer buffer : map.values()) {
weight += byteBufferWeight(buffer);
}
return weight;
}

private static long partitionWeight(StructLike partition) {
if (partition == null) {
return 0L;
}
long weight = PARTITION_BASE_WEIGHT + (long) partition.size() * PARTITION_VALUE_BASE_WEIGHT;
for (int i = 0; i < partition.size(); i++) {
Object value = partition.get(i, Object.class);
weight += estimateValueWeight(value);
}
return weight;
}

private static long estimateValueWeight(Object value) {
if (value == null) {
return 0L;
}
if (value instanceof CharSequence) {
return charSequenceWeight((CharSequence) value);
} else if (value instanceof byte[]) {
return BYTE_BUFFER_BASE_WEIGHT + ((byte[]) value).length;
} else if (value instanceof ByteBuffer) {
return byteBufferWeight((ByteBuffer) value);
} else if (value instanceof Long || value instanceof Double) {
return LONG_OBJECT_WEIGHT;
} else if (value instanceof Integer || value instanceof Float) {
return INT_OBJECT_WEIGHT;
} else if (value instanceof Short || value instanceof Character) {
return 4L;
} else if (value instanceof Boolean) {
return 1L;
}
return OBJECT_REFERENCE_WEIGHT;
}

private static long charSequenceWeight(CharSequence value) {
if (value == null) {
return 0L;
}
return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
}

private static long stringWeight(String value) {
if (value == null) {
return 0L;
}
return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
}

private static long byteBufferWeight(ByteBuffer buffer) {
if (buffer == null) {
return 0L;
}
return BYTE_BUFFER_BASE_WEIGHT + buffer.remaining();
}

private static long optionalLongWeight(Long value) {
return value == null ? 0L : LONG_OBJECT_WEIGHT;
}

private static long optionalIntWeight(Integer value) {
return value == null ? 0L : INT_OBJECT_WEIGHT;
}
}
Loading
Loading