diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index f37fb6f3369179..b9f087030f2e31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -51,7 +51,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_S3_TABLES = "s3tables"; public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; public static final String ICEBERG_TABLE_META_CACHE_TTL_SECOND = "iceberg.table.meta.cache.ttl-second"; - public static final String ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND = "iceberg.snapshot.meta.cache.ttl-second"; public static final String ICEBERG_MANIFEST_CACHE_ENABLE = "iceberg.manifest.cache.enable"; public static final String ICEBERG_MANIFEST_CACHE_CAPACITY_MB = "iceberg.manifest.cache.capacity-mb"; public static final String ICEBERG_MANIFEST_CACHE_TTL_SECOND = "iceberg.manifest.cache.ttl-second"; @@ -94,15 +93,6 @@ public void checkProperties() throws DdlException { + tableMetaCacheTtlSecond); } - // check iceberg.snapshot.meta.cache.ttl-second parameter - String partitionCacheTtlSecond = catalogProperty.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null); - if (Objects.nonNull(partitionCacheTtlSecond) && NumberUtils.toInt(partitionCacheTtlSecond, CACHE_NO_TTL) - < CACHE_TTL_DISABLE_CACHE) { - throw new DdlException( - "The parameter " + ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND + " is wrong, value is " - + partitionCacheTtlSecond); - } - String manifestCacheEnable = catalogProperty.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null); if (Objects.nonNull(manifestCacheEnable) && !(manifestCacheEnable.equalsIgnoreCase("true") || manifestCacheEnable.equalsIgnoreCase("false"))) { @@ -132,8 +122,7 @@ public void checkProperties() throws DdlException { public void notifyPropertiesUpdated(Map updatedProps) { super.notifyPropertiesUpdated(updatedProps); String tableMetaCacheTtl = updatedProps.getOrDefault(ICEBERG_TABLE_META_CACHE_TTL_SECOND, null); - String snapshotMetaCacheTtl = updatedProps.getOrDefault(ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND, null); - if (Objects.nonNull(tableMetaCacheTtl) || Objects.nonNull(snapshotMetaCacheTtl)) { + if (Objects.nonNull(tableMetaCacheTtl)) { Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(this).init(); } String manifestCacheEnable = updatedProps.getOrDefault(ICEBERG_MANIFEST_CACHE_ENABLE, null); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index 7acc22152c698a..c30cf040945da8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -18,7 +18,6 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; @@ -55,9 +54,7 @@ public class IcebergMetadataCache { private static final Logger LOG = LogManager.getLogger(IcebergMetadataCache.class); private final ExecutorService executor; private final IcebergExternalCatalog catalog; - private LoadingCache> snapshotListCache; - private LoadingCache tableCache; - private LoadingCache snapshotCache; + private LoadingCache tableCache; private LoadingCache viewCache; private IcebergManifestCache manifestCache; @@ -72,19 +69,6 @@ public void init() { catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_TABLE_META_CACHE_TTL_SECOND), ExternalCatalog.CACHE_NO_TTL); - long snapshotMetaCacheTtlSecond = NumberUtils.toLong( - catalog.getProperties().get(IcebergExternalCatalog.ICEBERG_SNAPSHOT_META_CACHE_TTL_SECOND), - ExternalCatalog.CACHE_NO_TTL); - - CacheFactory snapshotListCacheFactory = new CacheFactory( - OptionalLong.of(snapshotMetaCacheTtlSecond >= ExternalCatalog.CACHE_TTL_DISABLE_CACHE - ? snapshotMetaCacheTtlSecond : Config.external_cache_expire_time_seconds_after_access), - OptionalLong.of(Config.external_cache_refresh_time_minutes * 60), - Config.max_external_table_cache_num, - true, - null); - this.snapshotListCache = snapshotListCacheFactory.buildCache(this::loadSnapshots, executor); - CacheFactory tableCacheFactory = new CacheFactory( OptionalLong.of(tableMetaCacheTtlSecond >= ExternalCatalog.CACHE_TTL_DISABLE_CACHE ? tableMetaCacheTtlSecond : Config.external_cache_expire_time_seconds_after_access), @@ -92,16 +76,7 @@ public void init() { Config.max_external_table_cache_num, true, null); - this.tableCache = tableCacheFactory.buildCache(this::loadTable, executor); - - CacheFactory snapshotCacheFactory = new CacheFactory( - OptionalLong.of(snapshotMetaCacheTtlSecond >= ExternalCatalog.CACHE_TTL_DISABLE_CACHE - ? snapshotMetaCacheTtlSecond : Config.external_cache_expire_time_seconds_after_access), - OptionalLong.of(Config.external_cache_refresh_time_minutes * 60), - Config.max_external_table_cache_num, - true, - null); - this.snapshotCache = snapshotCacheFactory.buildCache(this::loadSnapshot, executor); + this.tableCache = tableCacheFactory.buildCache(this::loadTableCacheValue, executor); this.viewCache = tableCacheFactory.buildCache(this::loadView, executor); long manifestCacheCapacityMb = NumberUtils.toLong( @@ -116,32 +91,31 @@ public void init() { public Table getIcebergTable(ExternalTable dorisTable) { IcebergMetadataCacheKey key = new IcebergMetadataCacheKey(dorisTable.getOrBuildNameMapping()); - return tableCache.get(key); + return tableCache.get(key).getIcebergTable(); } public Table getIcebergTable(IcebergMetadataCacheKey key) { - return tableCache.get(key); + return tableCache.get(key).getIcebergTable(); } public IcebergSnapshotCacheValue getSnapshotCache(ExternalTable dorisTable) { IcebergMetadataCacheKey key = new IcebergMetadataCacheKey(dorisTable.getOrBuildNameMapping()); - return snapshotCache.get(key); - } - - public IcebergManifestCache getManifestCache() { - return manifestCache; + return tableCache.get(key).getSnapshotCacheValue(() -> loadSnapshot(dorisTable)); } - @NotNull - private List loadSnapshots(IcebergMetadataCacheKey key) { - Table icebergTable = getIcebergTable(key); + public List getSnapshotList(ExternalTable dorisTable) { + Table icebergTable = getIcebergTable(dorisTable); List snaps = Lists.newArrayList(); Iterables.addAll(snaps, icebergTable.snapshots()); return snaps; } + public IcebergManifestCache getManifestCache() { + return manifestCache; + } + @NotNull - private Table loadTable(IcebergMetadataCacheKey key) { + private IcebergTableCacheValue loadTableCacheValue(IcebergMetadataCacheKey key) { NameMapping nameMapping = key.nameMapping; CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(nameMapping.getCtlId()); if (catalog == null) { @@ -160,8 +134,10 @@ private Table loadTable(IcebergMetadataCacheKey key) { if (LOG.isDebugEnabled()) { LOG.debug("load iceberg table {}", nameMapping, new Exception()); } - return ((ExternalCatalog) catalog).getExecutionAuthenticator().execute(() - -> ops.loadTable(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName())); + Table table = ((ExternalCatalog) catalog).getExecutionAuthenticator() + .execute(() + -> ops.loadTable(nameMapping.getRemoteDbName(), nameMapping.getRemoteTblName())); + return new IcebergTableCacheValue(table); } catch (Exception e) { throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e); } @@ -169,38 +145,33 @@ private Table loadTable(IcebergMetadataCacheKey key) { } @NotNull - private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey key) throws AnalysisException { - NameMapping nameMapping = key.nameMapping; - TableIf dorisTable = Env.getCurrentEnv().getCatalogMgr().getCatalogOrAnalysisException(nameMapping.getCtlId()) - .getDbOrAnalysisException(nameMapping.getLocalDbName()) - .getTableOrAnalysisException(nameMapping.getLocalTblName()); - + private IcebergSnapshotCacheValue loadSnapshot(ExternalTable dorisTable) { if (!(dorisTable instanceof MTMVRelatedTableIf)) { - throw new AnalysisException(String.format("Table %s.%s is not a valid MTMV related table.", - nameMapping.getLocalDbName(), nameMapping.getLocalTblName())); + throw new RuntimeException(String.format("Table %s.%s is not a valid MTMV related table.", + dorisTable.getDbName(), dorisTable.getName())); } - MTMVRelatedTableIf table = (MTMVRelatedTableIf) dorisTable; - IcebergSnapshot lastedIcebergSnapshot = IcebergUtils.getLastedIcebergSnapshot((ExternalTable) table); - IcebergPartitionInfo icebergPartitionInfo; - if (!table.isValidRelatedTable()) { - icebergPartitionInfo = IcebergPartitionInfo.empty(); - } else { - icebergPartitionInfo = IcebergUtils.loadPartitionInfo((ExternalTable) table, - lastedIcebergSnapshot.getSnapshotId()); + try { + MTMVRelatedTableIf table = (MTMVRelatedTableIf) dorisTable; + IcebergSnapshot lastedIcebergSnapshot = IcebergUtils.getLastedIcebergSnapshot(dorisTable); + IcebergPartitionInfo icebergPartitionInfo; + if (!table.isValidRelatedTable()) { + icebergPartitionInfo = IcebergPartitionInfo.empty(); + } else { + icebergPartitionInfo = IcebergUtils.loadPartitionInfo(dorisTable, + lastedIcebergSnapshot.getSnapshotId()); + } + return new IcebergSnapshotCacheValue(icebergPartitionInfo, lastedIcebergSnapshot); + } catch (AnalysisException e) { + throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), e); } - return new IcebergSnapshotCacheValue(icebergPartitionInfo, lastedIcebergSnapshot); } public void invalidateCatalogCache(long catalogId) { - snapshotListCache.asMap().keySet().stream() - .filter(key -> key.nameMapping.getCtlId() == catalogId) - .forEach(snapshotListCache::invalidate); - tableCache.asMap().entrySet().stream() .filter(entry -> entry.getKey().nameMapping.getCtlId() == catalogId) .forEach(entry -> { - ManifestFiles.dropCache(entry.getValue().io()); + ManifestFiles.dropCache(entry.getValue().getIcebergTable().io()); if (LOG.isDebugEnabled()) { LOG.info("invalidate iceberg table cache {} when invalidating catalog cache", entry.getKey().nameMapping, new Exception()); @@ -208,10 +179,6 @@ public void invalidateCatalogCache(long catalogId) { tableCache.invalidate(entry.getKey()); }); - snapshotCache.asMap().keySet().stream() - .filter(key -> key.nameMapping.getCtlId() == catalogId) - .forEach(snapshotCache::invalidate); - viewCache.asMap().entrySet().stream() .filter(entry -> entry.getKey().nameMapping.getCtlId() == catalogId) .forEach(entry -> viewCache.invalidate(entry.getKey())); @@ -222,12 +189,6 @@ public void invalidateTableCache(ExternalTable dorisTable) { long catalogId = dorisTable.getCatalog().getId(); String dbName = dorisTable.getDbName(); String tblName = dorisTable.getName(); - snapshotListCache.asMap().keySet().stream() - .filter(key -> key.nameMapping.getCtlId() == catalogId - && key.nameMapping.getLocalDbName().equals(dbName) - && key.nameMapping.getLocalTblName().equals(tblName)) - .forEach(snapshotListCache::invalidate); - tableCache.asMap().entrySet().stream() .filter(entry -> { IcebergMetadataCacheKey key = entry.getKey(); @@ -236,19 +197,13 @@ public void invalidateTableCache(ExternalTable dorisTable) { && key.nameMapping.getLocalTblName().equals(tblName); }) .forEach(entry -> { - ManifestFiles.dropCache(entry.getValue().io()); + ManifestFiles.dropCache(entry.getValue().getIcebergTable().io()); if (LOG.isDebugEnabled()) { LOG.info("invalidate iceberg table cache {}", entry.getKey().nameMapping, new Exception()); } tableCache.invalidate(entry.getKey()); }); - - snapshotCache.asMap().keySet().stream() - .filter(key -> key.nameMapping.getCtlId() == catalogId - && key.nameMapping.getLocalDbName().equals(dbName) - && key.nameMapping.getLocalTblName().equals(tblName)) - .forEach(snapshotCache::invalidate); viewCache.asMap().entrySet().stream() .filter(entry -> { IcebergMetadataCacheKey key = entry.getKey(); @@ -260,11 +215,6 @@ public void invalidateTableCache(ExternalTable dorisTable) { } public void invalidateDbCache(long catalogId, String dbName) { - snapshotListCache.asMap().keySet().stream() - .filter(key -> key.nameMapping.getCtlId() == catalogId - && key.nameMapping.getLocalDbName().equals(dbName)) - .forEach(snapshotListCache::invalidate); - tableCache.asMap().entrySet().stream() .filter(entry -> { IcebergMetadataCacheKey key = entry.getKey(); @@ -272,18 +222,13 @@ public void invalidateDbCache(long catalogId, String dbName) { && key.nameMapping.getLocalDbName().equals(dbName); }) .forEach(entry -> { - ManifestFiles.dropCache(entry.getValue().io()); + ManifestFiles.dropCache(entry.getValue().getIcebergTable().io()); if (LOG.isDebugEnabled()) { LOG.info("invalidate iceberg table cache {} when invalidating db cache", entry.getKey().nameMapping, new Exception()); } tableCache.invalidate(entry.getKey()); }); - - snapshotCache.asMap().keySet().stream() - .filter(key -> key.nameMapping.getCtlId() == catalogId - && key.nameMapping.getLocalDbName().equals(dbName)) - .forEach(snapshotCache::invalidate); viewCache.asMap().entrySet().stream() .filter(entry -> { IcebergMetadataCacheKey key = entry.getKey(); @@ -334,12 +279,8 @@ public int hashCode() { public Map> getCacheStats() { Map> res = Maps.newHashMap(); - res.put("iceberg_snapshot_list_cache", ExternalMetaCacheMgr.getCacheStats(snapshotListCache.stats(), - snapshotListCache.estimatedSize())); res.put("iceberg_table_cache", ExternalMetaCacheMgr.getCacheStats(tableCache.stats(), tableCache.estimatedSize())); - res.put("iceberg_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), - snapshotCache.estimatedSize())); return res; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTableCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTableCacheValue.java new file mode 100644 index 00000000000000..b133a9125223ec --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTableCacheValue.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.iceberg.Table; + +import java.util.function.Supplier; + +public class IcebergTableCacheValue { + private final Table icebergTable; + + private volatile boolean snapshotCacheLoaded; + private volatile IcebergSnapshotCacheValue snapshotCacheValue; + + public IcebergTableCacheValue(Table icebergTable) { + this.icebergTable = icebergTable; + } + + public Table getIcebergTable() { + return icebergTable; + } + + public IcebergSnapshotCacheValue getSnapshotCacheValue(Supplier loader) { + if (!snapshotCacheLoaded) { + synchronized (this) { + if (!snapshotCacheLoaded) { + snapshotCacheValue = loader.get(); + snapshotCacheLoaded = true; + } + } + } + return snapshotCacheValue; + } +} diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index da6ee9fe4426a1..dfb0b239bea104 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1598,6 +1598,96 @@ class Suite implements GroovyInterceptable { return result } + /** + * Get the spark-iceberg container name by querying docker. + * Uses 'docker ps --filter name=spark-iceberg' to find the container. + */ + private String getSparkIcebergContainerName() { + try { + // Use docker ps with filter to find containers with 'spark-iceberg' in the name + String command = "docker ps --filter name=spark-iceberg --format {{.Names}}" + def process = command.execute() + process.waitFor() + String output = process.in.text.trim() + + if (output) { + // Get the first matching container + String containerName = output.split('\n')[0].trim() + if (containerName) { + logger.info("Found spark-iceberg container: ${containerName}".toString()) + return containerName + } + } + + logger.warn("No spark-iceberg container found via docker ps") + return null + } catch (Exception e) { + logger.warn("Failed to get spark-iceberg container via docker ps: ${e.message}".toString()) + return null + } + } + + /** + * Execute Spark SQL on the spark-iceberg container via docker exec. + * + * Usage in test suite: + * spark_iceberg "CREATE TABLE demo.test_db.t1 (id INT) USING iceberg" + * spark_iceberg "INSERT INTO demo.test_db.t1 VALUES (1)" + * def result = spark_iceberg "SELECT * FROM demo.test_db.t1" + * + * The container name is found by querying 'docker ps --filter name=spark-iceberg' + */ + String spark_iceberg(String sqlStr, int timeoutSeconds = 120) { + String containerName = getSparkIcebergContainerName() + if (containerName == null) { + throw new RuntimeException("spark-iceberg container not found. Please ensure the container is running.") + } + String masterUrl = "spark://${containerName}:7077" + + // Escape double quotes in SQL string for shell command + String escapedSql = sqlStr.replaceAll('"', '\\\\"') + + // Build docker exec command + String command = """docker exec ${containerName} spark-sql --master ${masterUrl} --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -e "${escapedSql}" """ + + logger.info("Executing Spark Iceberg SQL: ${sqlStr}".toString()) + logger.info("Container: ${containerName}".toString()) + + try { + String result = cmd(command, timeoutSeconds) + logger.info("Spark Iceberg SQL result: ${result}".toString()) + return result + } catch (Exception e) { + logger.error("Spark Iceberg SQL failed: ${e.message}".toString()) + throw e + } + } + + /** + * Execute multiple Spark SQL statements on the spark-iceberg container. + * Statements are separated by semicolons. + * + * Usage: + * spark_iceberg_multi ''' + * CREATE DATABASE IF NOT EXISTS demo.test_db; + * CREATE TABLE demo.test_db.t1 (id INT) USING iceberg; + * INSERT INTO demo.test_db.t1 VALUES (1); + * ''' + */ + List spark_iceberg_multi(String sqlStatements, int timeoutSeconds = 300) { + // Split by semicolon and execute each statement + def statements = sqlStatements.split(';').collect { it.trim() }.findAll { it } + def results = [] + + for (stmt in statements) { + if (stmt) { + results << spark_iceberg(stmt, timeoutSeconds) + } + } + + return results + } + List> db2_docker(String sqlStr, boolean isOrder = false) { String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "") def (result, meta) = JdbcUtils.executeToList(context.getDB2DockerConnection(), cleanedSqlStr) diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy new file mode 100644 index 00000000000000..7cc9f6af0b7b95 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_table_cache.groovy @@ -0,0 +1,443 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_table_cache", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String restPort = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minioPort = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + String catalogWithCache = "test_iceberg_table_cache_with_cache" + String catalogNoCache = "test_iceberg_table_cache_no_cache" + String testDb = "cache_test_db" + + // Create catalogs + sql """drop catalog if exists ${catalogWithCache}""" + sql """drop catalog if exists ${catalogNoCache}""" + + // Catalog with cache enabled (default) + sql """ + CREATE CATALOG ${catalogWithCache} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${restPort}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minioPort}", + "s3.region" = "us-east-1" + ); + """ + + // Catalog with cache disabled (TTL=0) + sql """ + CREATE CATALOG ${catalogNoCache} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${restPort}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minioPort}", + "s3.region" = "us-east-1", + "iceberg.table.meta.cache.ttl-second" = "0" + ); + """ + + try { + // Create test database via Spark + spark_iceberg "CREATE DATABASE IF NOT EXISTS demo.${testDb}" + + // ==================== Test 1: DML Operations ==================== + logger.info("========== Test 1: DML Operations ==========") + + // Test 1.1: INSERT + logger.info("--- Test 1.1: External INSERT ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_insert" + spark_iceberg "CREATE TABLE demo.${testDb}.test_insert (id INT, name STRING) USING iceberg" + spark_iceberg "INSERT INTO demo.${testDb}.test_insert VALUES (1, 'initial')" + + // Query from Doris to cache the data + sql """switch ${catalogWithCache}""" + def result1 = sql """select * from ${testDb}.test_insert order by id""" + logger.info("Initial data (with cache): ${result1}") + assertEquals(1, result1.size()) + + sql """switch ${catalogNoCache}""" + def result1_no_cache = sql """select * from ${testDb}.test_insert order by id""" + logger.info("Initial data (no cache): ${result1_no_cache}") + assertEquals(1, result1_no_cache.size()) + + // External INSERT via Spark + spark_iceberg "INSERT INTO demo.${testDb}.test_insert VALUES (2, 'external_insert')" + + // Query without refresh - cached catalog should see old data + sql """switch ${catalogWithCache}""" + def result2 = sql """select * from ${testDb}.test_insert order by id""" + logger.info("After external INSERT (with cache, no refresh): ${result2}") + assertEquals(1, result2.size()) // Should still see 1 row due to cache + + // Query without refresh - no-cache catalog should see new data + sql """switch ${catalogNoCache}""" + def result2_no_cache = sql """select * from ${testDb}.test_insert order by id""" + logger.info("After external INSERT (no cache): ${result2_no_cache}") + assertEquals(2, result2_no_cache.size()) // Should see 2 rows + + // Refresh and verify + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_insert""" + def result3 = sql """select * from ${testDb}.test_insert order by id""" + logger.info("After REFRESH TABLE (with cache): ${result3}") + assertEquals(2, result3.size()) // Should now see 2 rows + + // Test 1.2: DELETE + logger.info("--- Test 1.2: External DELETE ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_delete" + spark_iceberg "CREATE TABLE demo.${testDb}.test_delete (id INT, name STRING) USING iceberg TBLPROPERTIES ('format-version'='2')" + spark_iceberg "INSERT INTO demo.${testDb}.test_delete VALUES (1, 'row1'), (2, 'row2'), (3, 'row3')" + + // Cache the data + sql """switch ${catalogWithCache}""" + def del_result1 = sql """select * from ${testDb}.test_delete order by id""" + assertEquals(3, del_result1.size()) + + sql """switch ${catalogNoCache}""" + def del_result1_nc = sql """select * from ${testDb}.test_delete order by id""" + assertEquals(3, del_result1_nc.size()) + + // External DELETE via Spark + spark_iceberg "DELETE FROM demo.${testDb}.test_delete WHERE id = 2" + + // Verify cache behavior + sql """switch ${catalogWithCache}""" + def del_result2 = sql """select * from ${testDb}.test_delete order by id""" + logger.info("After external DELETE (with cache, no refresh): ${del_result2}") + assertEquals(3, del_result2.size()) // Should still see 3 rows + + sql """switch ${catalogNoCache}""" + def del_result2_nc = sql """select * from ${testDb}.test_delete order by id""" + logger.info("After external DELETE (no cache): ${del_result2_nc}") + assertEquals(2, del_result2_nc.size()) // Should see 2 rows + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_delete""" + def del_result3 = sql """select * from ${testDb}.test_delete order by id""" + assertEquals(2, del_result3.size()) + + // Test 1.3: UPDATE + logger.info("--- Test 1.3: External UPDATE ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_update" + spark_iceberg "CREATE TABLE demo.${testDb}.test_update (id INT, value INT) USING iceberg TBLPROPERTIES ('format-version'='2')" + spark_iceberg "INSERT INTO demo.${testDb}.test_update VALUES (1, 100), (2, 200)" + + // Cache the data + sql """switch ${catalogWithCache}""" + def upd_result1 = sql """select * from ${testDb}.test_update order by id""" + assertEquals(2, upd_result1.size()) + assertEquals(100, upd_result1[0][1]) + + // External UPDATE via Spark + spark_iceberg "UPDATE demo.${testDb}.test_update SET value = 999 WHERE id = 1" + + // Verify cache behavior + sql """switch ${catalogWithCache}""" + def upd_result2 = sql """select * from ${testDb}.test_update order by id""" + logger.info("After external UPDATE (with cache, no refresh): ${upd_result2}") + assertEquals(100, upd_result2[0][1]) // Should still see old value + + sql """switch ${catalogNoCache}""" + def upd_result2_nc = sql """select * from ${testDb}.test_update order by id""" + logger.info("After external UPDATE (no cache): ${upd_result2_nc}") + assertEquals(999, upd_result2_nc[0][1]) // Should see new value + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_update""" + def upd_result3 = sql """select * from ${testDb}.test_update order by id""" + assertEquals(999, upd_result3[0][1]) + + // Test 1.4: INSERT OVERWRITE + logger.info("--- Test 1.4: External INSERT OVERWRITE ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_overwrite" + spark_iceberg "CREATE TABLE demo.${testDb}.test_overwrite (id INT, name STRING) USING iceberg" + spark_iceberg "INSERT INTO demo.${testDb}.test_overwrite VALUES (1, 'old1'), (2, 'old2')" + + // Cache the data + sql """switch ${catalogWithCache}""" + def ow_result1 = sql """select * from ${testDb}.test_overwrite order by id""" + assertEquals(2, ow_result1.size()) + + // External INSERT OVERWRITE via Spark + spark_iceberg "INSERT OVERWRITE demo.${testDb}.test_overwrite SELECT 10, 'new'" + + // Verify cache behavior + sql """switch ${catalogWithCache}""" + def ow_result2 = sql """select * from ${testDb}.test_overwrite order by id""" + logger.info("After external INSERT OVERWRITE (with cache, no refresh): ${ow_result2}") + assertEquals(2, ow_result2.size()) // Should still see old data + + sql """switch ${catalogNoCache}""" + def ow_result2_nc = sql """select * from ${testDb}.test_overwrite order by id""" + logger.info("After external INSERT OVERWRITE (no cache): ${ow_result2_nc}") + assertEquals(1, ow_result2_nc.size()) // Should see new data + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_overwrite""" + def ow_result3 = sql """select * from ${testDb}.test_overwrite order by id""" + assertEquals(1, ow_result3.size()) + + // ==================== Test 2: Schema Change Operations ==================== + logger.info("========== Test 2: Schema Change Operations ==========") + + // Test 2.1: ADD COLUMN + logger.info("--- Test 2.1: External ADD COLUMN ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_add_column" + spark_iceberg "CREATE TABLE demo.${testDb}.test_add_column (id INT, name STRING) USING iceberg" + spark_iceberg "INSERT INTO demo.${testDb}.test_add_column VALUES (1, 'test')" + + // Cache the schema + sql """switch ${catalogWithCache}""" + def add_col_desc1 = sql """desc ${testDb}.test_add_column""" + logger.info("Initial schema (with cache): ${add_col_desc1}") + assertEquals(2, add_col_desc1.size()) + + sql """switch ${catalogNoCache}""" + def add_col_desc1_nc = sql """desc ${testDb}.test_add_column""" + assertEquals(2, add_col_desc1_nc.size()) + + // External ADD COLUMN via Spark + spark_iceberg "ALTER TABLE demo.${testDb}.test_add_column ADD COLUMN new_col INT" + + // Verify cache behavior + sql """switch ${catalogWithCache}""" + def add_col_desc2 = sql """desc ${testDb}.test_add_column""" + logger.info("After external ADD COLUMN (with cache, no refresh): ${add_col_desc2}") + assertEquals(2, add_col_desc2.size()) // Should still see 2 columns + + sql """switch ${catalogNoCache}""" + def add_col_desc2_nc = sql """desc ${testDb}.test_add_column""" + logger.info("After external ADD COLUMN (no cache): ${add_col_desc2_nc}") + assertEquals(3, add_col_desc2_nc.size()) // Should see 3 columns + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_add_column""" + def add_col_desc3 = sql """desc ${testDb}.test_add_column""" + assertEquals(3, add_col_desc3.size()) + + // Test 2.2: DROP COLUMN + logger.info("--- Test 2.2: External DROP COLUMN ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_drop_column" + spark_iceberg "CREATE TABLE demo.${testDb}.test_drop_column (id INT, name STRING, to_drop INT) USING iceberg" + spark_iceberg "INSERT INTO demo.${testDb}.test_drop_column VALUES (1, 'test', 100)" + + // Cache the schema + sql """switch ${catalogWithCache}""" + def drop_col_desc1 = sql """desc ${testDb}.test_drop_column""" + assertEquals(3, drop_col_desc1.size()) + + // External DROP COLUMN via Spark + spark_iceberg "ALTER TABLE demo.${testDb}.test_drop_column DROP COLUMN to_drop" + + // Verify cache behavior + sql """switch ${catalogWithCache}""" + def drop_col_desc2 = sql """desc ${testDb}.test_drop_column""" + logger.info("After external DROP COLUMN (with cache, no refresh): ${drop_col_desc2}") + assertEquals(3, drop_col_desc2.size()) // Should still see 3 columns + + sql """switch ${catalogNoCache}""" + def drop_col_desc2_nc = sql """desc ${testDb}.test_drop_column""" + logger.info("After external DROP COLUMN (no cache): ${drop_col_desc2_nc}") + assertEquals(2, drop_col_desc2_nc.size()) // Should see 2 columns + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_drop_column""" + def drop_col_desc3 = sql """desc ${testDb}.test_drop_column""" + assertEquals(2, drop_col_desc3.size()) + + // Test 2.3: RENAME COLUMN + logger.info("--- Test 2.3: External RENAME COLUMN ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_rename_column" + spark_iceberg "CREATE TABLE demo.${testDb}.test_rename_column (id INT, old_name STRING) USING iceberg" + spark_iceberg "INSERT INTO demo.${testDb}.test_rename_column VALUES (1, 'test')" + + // Cache the schema + sql """switch ${catalogWithCache}""" + def rename_col_desc1 = sql """desc ${testDb}.test_rename_column""" + logger.info("Initial schema: ${rename_col_desc1}") + assertTrue(rename_col_desc1.toString().contains("old_name")) + + // External RENAME COLUMN via Spark + spark_iceberg "ALTER TABLE demo.${testDb}.test_rename_column RENAME COLUMN old_name TO new_name" + + // Verify cache behavior + sql """switch ${catalogWithCache}""" + def rename_col_desc2 = sql """desc ${testDb}.test_rename_column""" + logger.info("After external RENAME COLUMN (with cache, no refresh): ${rename_col_desc2}") + assertTrue(rename_col_desc2.toString().contains("old_name")) // Should still see old name + + sql """switch ${catalogNoCache}""" + def rename_col_desc2_nc = sql """desc ${testDb}.test_rename_column""" + logger.info("After external RENAME COLUMN (no cache): ${rename_col_desc2_nc}") + assertTrue(rename_col_desc2_nc.toString().contains("new_name")) // Should see new name + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_rename_column""" + def rename_col_desc3 = sql """desc ${testDb}.test_rename_column""" + assertTrue(rename_col_desc3.toString().contains("new_name")) + + // Test 2.4: ALTER COLUMN TYPE + logger.info("--- Test 2.4: External ALTER COLUMN TYPE ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_alter_type" + spark_iceberg "CREATE TABLE demo.${testDb}.test_alter_type (id INT, value INT) USING iceberg" + spark_iceberg "INSERT INTO demo.${testDb}.test_alter_type VALUES (1, 100)" + + // Cache the schema + sql """switch ${catalogWithCache}""" + def alter_type_desc1 = sql """desc ${testDb}.test_alter_type""" + logger.info("Initial schema: ${alter_type_desc1}") + // value column should be INT + assertTrue(alter_type_desc1[1][1].toString().toLowerCase().contains("int")) + + // External ALTER COLUMN TYPE via Spark (INT -> BIGINT) + spark_iceberg "ALTER TABLE demo.${testDb}.test_alter_type ALTER COLUMN value TYPE BIGINT" + + // Verify cache behavior + sql """switch ${catalogWithCache}""" + def alter_type_desc2 = sql """desc ${testDb}.test_alter_type""" + logger.info("After external ALTER TYPE (with cache, no refresh): ${alter_type_desc2}") + assertTrue(alter_type_desc2[1][1].toString().toLowerCase().contains("int")) // Should still see INT + + sql """switch ${catalogNoCache}""" + def alter_type_desc2_nc = sql """desc ${testDb}.test_alter_type""" + logger.info("After external ALTER TYPE (no cache): ${alter_type_desc2_nc}") + assertTrue(alter_type_desc2_nc[1][1].toString().toLowerCase().contains("bigint")) // Should see BIGINT + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_alter_type""" + def alter_type_desc3 = sql """desc ${testDb}.test_alter_type""" + assertTrue(alter_type_desc3[1][1].toString().toLowerCase().contains("bigint")) + + // ==================== Test 3: Partition Evolution ==================== + logger.info("========== Test 3: Partition Evolution ==========") + + // Test 3.1: ADD PARTITION FIELD + logger.info("--- Test 3.1: External ADD PARTITION FIELD ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_add_partition" + spark_iceberg "CREATE TABLE demo.${testDb}.test_add_partition (id INT, dt DATE, value INT) USING iceberg" + spark_iceberg "INSERT INTO demo.${testDb}.test_add_partition VALUES (1, DATE'2024-01-15', 100)" + + // Cache the partition spec by querying data (show partitions is not supported for Iceberg tables) + sql """switch ${catalogWithCache}""" + def add_part_result_initial = sql """select count(*) from ${testDb}.test_add_partition""" + logger.info("Initial data count (with cache): ${add_part_result_initial}") + + // External ADD PARTITION FIELD via Spark + spark_iceberg "ALTER TABLE demo.${testDb}.test_add_partition ADD PARTITION FIELD month(dt)" + + // Insert data after partition evolution + spark_iceberg "INSERT INTO demo.${testDb}.test_add_partition VALUES (2, DATE'2024-02-20', 200)" + + // Verify cache behavior - check data count as partition spec is harder to verify directly + sql """switch ${catalogWithCache}""" + def add_part_result1 = sql """select count(*) from ${testDb}.test_add_partition""" + logger.info("After external ADD PARTITION FIELD (with cache, no refresh): ${add_part_result1}") + assertEquals(1, add_part_result1[0][0]) // Should still see 1 row + + sql """switch ${catalogNoCache}""" + def add_part_result1_nc = sql """select count(*) from ${testDb}.test_add_partition""" + logger.info("After external ADD PARTITION FIELD (no cache): ${add_part_result1_nc}") + assertEquals(2, add_part_result1_nc[0][0]) // Should see 2 rows + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_add_partition""" + def add_part_result2 = sql """select count(*) from ${testDb}.test_add_partition""" + assertEquals(2, add_part_result2[0][0]) + + // Test 3.2: DROP PARTITION FIELD + logger.info("--- Test 3.2: External DROP PARTITION FIELD ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_drop_partition" + spark_iceberg "CREATE TABLE demo.${testDb}.test_drop_partition (id INT, category STRING, value INT) USING iceberg PARTITIONED BY (category)" + spark_iceberg "INSERT INTO demo.${testDb}.test_drop_partition VALUES (1, 'A', 100), (2, 'B', 200)" + + // Cache the partition spec + sql """switch ${catalogWithCache}""" + def drop_part_result1 = sql """select * from ${testDb}.test_drop_partition order by id""" + assertEquals(2, drop_part_result1.size()) + + // External DROP PARTITION FIELD via Spark + spark_iceberg "ALTER TABLE demo.${testDb}.test_drop_partition DROP PARTITION FIELD category" + + // Insert data after partition evolution + spark_iceberg "INSERT INTO demo.${testDb}.test_drop_partition VALUES (3, 'C', 300)" + + // Verify cache behavior + sql """switch ${catalogWithCache}""" + def drop_part_result2 = sql """select count(*) from ${testDb}.test_drop_partition""" + logger.info("After external DROP PARTITION FIELD (with cache, no refresh): ${drop_part_result2}") + assertEquals(2, drop_part_result2[0][0]) // Should still see 2 rows + + sql """switch ${catalogNoCache}""" + def drop_part_result2_nc = sql """select count(*) from ${testDb}.test_drop_partition""" + logger.info("After external DROP PARTITION FIELD (no cache): ${drop_part_result2_nc}") + assertEquals(3, drop_part_result2_nc[0][0]) // Should see 3 rows + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_drop_partition""" + def drop_part_result3 = sql """select count(*) from ${testDb}.test_drop_partition""" + assertEquals(3, drop_part_result3[0][0]) + + // Test 3.3: REPLACE PARTITION FIELD + logger.info("--- Test 3.3: External REPLACE PARTITION FIELD ---") + spark_iceberg "DROP TABLE IF EXISTS demo.${testDb}.test_replace_partition" + spark_iceberg "CREATE TABLE demo.${testDb}.test_replace_partition (id INT, ts TIMESTAMP, value INT) USING iceberg PARTITIONED BY (days(ts))" + spark_iceberg "INSERT INTO demo.${testDb}.test_replace_partition VALUES (1, TIMESTAMP'2024-01-15 10:00:00', 100)" + + // Cache the partition spec + sql """switch ${catalogWithCache}""" + def replace_part_result1 = sql """select * from ${testDb}.test_replace_partition order by id""" + assertEquals(1, replace_part_result1.size()) + + // External REPLACE PARTITION FIELD via Spark (days -> months) + spark_iceberg "ALTER TABLE demo.${testDb}.test_replace_partition REPLACE PARTITION FIELD days(ts) WITH months(ts)" + + // Insert data after partition evolution + spark_iceberg "INSERT INTO demo.${testDb}.test_replace_partition VALUES (2, TIMESTAMP'2024-02-20 15:00:00', 200)" + + // Verify cache behavior + sql """switch ${catalogWithCache}""" + def replace_part_result2 = sql """select count(*) from ${testDb}.test_replace_partition""" + logger.info("After external REPLACE PARTITION FIELD (with cache, no refresh): ${replace_part_result2}") + assertEquals(1, replace_part_result2[0][0]) // Should still see 1 row + + sql """switch ${catalogNoCache}""" + def replace_part_result2_nc = sql """select count(*) from ${testDb}.test_replace_partition""" + logger.info("After external REPLACE PARTITION FIELD (no cache): ${replace_part_result2_nc}") + assertEquals(2, replace_part_result2_nc[0][0]) // Should see 2 rows + + sql """switch ${catalogWithCache}""" + sql """refresh table ${testDb}.test_replace_partition""" + def replace_part_result3 = sql """select count(*) from ${testDb}.test_replace_partition""" + assertEquals(2, replace_part_result3[0][0]) + + logger.info("All tests passed!") + + } finally { + } +}