diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 3df3f0d56223..67d6b23187dd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -216,7 +216,7 @@ public void alterDatabase(String name, List changes, boolean ign if (changes == null || changes.isEmpty()) { return; } - alterDatabaseImpl(name, changes); + this.alterDatabaseImpl(name, changes); } catch (DatabaseNotExistException e) { if (ignoreIfNotExists) { return; diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index c1cc5bc230e1..366051fad5b8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -94,7 +94,8 @@ protected void dropDatabaseImpl(String name) { } @Override - protected void alterDatabaseImpl(String name, List changes) { + protected void alterDatabaseImpl(String name, List changes) + throws DatabaseNotExistException { throw new UnsupportedOperationException("Alter database is not supported."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 9f373bd10ae3..141b22b45dc6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -241,9 +241,9 @@ public void alterDatabase(String name, List changes, boolean ign request, AlterDatabaseResponse.class, restAuthFunction); - if (response.getUpdated().isEmpty()) { - throw new IllegalStateException("Failed to update properties"); - } + // if (response.getUpdated().isEmpty()) { + // throw new IllegalStateException("Failed to update properties"); + // } } catch (NoSuchResourceException e) { if (!ignoreIfNotExists) { throw new DatabaseNotExistException(name); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java b/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java new file mode 100644 index 000000000000..e0c5ab9446ec --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.TableType; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Database; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.catalog.SupportsSnapshots; +import org.apache.paimon.catalog.TableMetadata; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.view.View; + +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.paimon.CoreOptions.PATH; + +/** A catalog for testing RESTCatalog. */ +public class MetadataInMemoryFileSystemCatalog extends FileSystemCatalog + implements SupportsSnapshots { + + public final Map databaseStore; + public final Map tableMetadataStore; + public final Map> tablePartitionsStore; + public final Map viewStore; + public final Map tableSnapshotStore; + public final Map dataTokenStore; + public FileSystemCatalog fileSystemCatalog; + + public MetadataInMemoryFileSystemCatalog( + FileIO fileIO, + Path warehouse, + Options options, + Map databaseStore, + Map tableMetadataStore, + Map tableSnapshotStore, + Map> tablePartitionsStore, + Map viewStore, + Map dataTokenStore) { + super(fileIO, warehouse, options); + this.fileSystemCatalog = new FileSystemCatalog(fileIO, warehouse, options); + this.databaseStore = databaseStore; + this.tableMetadataStore = tableMetadataStore; + this.tablePartitionsStore = tablePartitionsStore; + this.tableSnapshotStore = tableSnapshotStore; + this.viewStore = viewStore; + this.dataTokenStore = dataTokenStore; + } + + public static MetadataInMemoryFileSystemCatalog create( + CatalogContext context, + Map databaseStore, + Map tableMetadataStore, + Map tableSnapshotStore, + Map> tablePartitionsStore, + Map viewStore, + Map dataTokenStore) { + String warehouse = CatalogFactory.warehouse(context).toUri().toString(); + + Path warehousePath = new Path(warehouse); + FileIO fileIO; + + try { + fileIO = FileIO.get(warehousePath, context); + fileIO.checkOrMkdirs(warehousePath); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return new MetadataInMemoryFileSystemCatalog( + fileIO, + warehousePath, + context.options(), + databaseStore, + tableMetadataStore, + tableSnapshotStore, + tablePartitionsStore, + viewStore, + dataTokenStore); + } + + @Override + public List listDatabases() { + return new ArrayList<>(databaseStore.keySet()); + } + + @Override + protected void createDatabaseImpl(String name, Map properties) { + super.createDatabaseImpl(name, properties); + databaseStore.put(name, Database.of(name, properties, null)); + } + + @Override + public Database getDatabaseImpl(String name) throws DatabaseNotExistException { + if (databaseStore.containsKey(name)) { + return databaseStore.get(name); + } + throw new DatabaseNotExistException(name); + } + + @Override + protected void dropDatabaseImpl(String name) { + super.dropDatabaseImpl(name); + databaseStore.remove(name); + } + + protected void alterDatabaseImpl(String name, List changes) + throws DatabaseNotExistException { + if (databaseStore.containsKey(name)) { + Pair, Set> setPropertiesToRemoveKeys = + PropertyChange.getSetPropertiesToRemoveKeys(changes); + Map setProperties = setPropertiesToRemoveKeys.getLeft(); + Set removeKeys = setPropertiesToRemoveKeys.getRight(); + Database database = databaseStore.get(name); + Map parameter = new HashMap<>(database.options()); + if (!setProperties.isEmpty()) { + parameter.putAll(setProperties); + } + if (!removeKeys.isEmpty()) { + parameter.keySet().removeAll(removeKeys); + } + Database alterDatabase = Database.of(name, parameter, null); + databaseStore.put(name, alterDatabase); + } else { + throw new DatabaseNotExistException(name); + } + } + + @Override + protected List listTablesImpl(String databaseName) { + List tables = new ArrayList<>(); + for (Map.Entry entry : tableMetadataStore.entrySet()) { + Identifier identifier = Identifier.fromString(entry.getKey()); + if (databaseName.equals(identifier.getDatabaseName())) { + tables.add(identifier.getTableName()); + } + } + return tables; + } + + @Override + public void createTableImpl(Identifier identifier, Schema schema) { + super.createTableImpl(identifier, schema); + try { + TableMetadata tableMetadata = + createTableMetadata( + identifier, 1L, schema, UUID.randomUUID().toString(), false); + tableMetadataStore.put(identifier.getFullName(), tableMetadata); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private TableMetadata createTableMetadata( + Identifier identifier, long schemaId, Schema schema, String uuid, boolean isExternal) { + Map options = new HashMap<>(schema.options()); + Path path = getTableLocation(identifier); + options.put(PATH.key(), path.toString()); + TableSchema tableSchema = + new TableSchema( + schemaId, + schema.fields(), + schema.fields().size() - 1, + schema.partitionKeys(), + schema.primaryKeys(), + options, + schema.comment()); + TableMetadata tableMetadata = new TableMetadata(tableSchema, isExternal, uuid); + return tableMetadata; + } + + @Override + protected void dropTableImpl(Identifier identifier) { + if (tableMetadataStore.containsKey(identifier.getFullName())) { + tableMetadataStore.remove(identifier.getFullName()); + super.dropTableImpl(identifier); + } + } + + @Override + public void renameTableImpl(Identifier fromTable, Identifier toTable) { + if (tableMetadataStore.containsKey(fromTable.getFullName())) { + super.renameTableImpl(fromTable, toTable); + TableMetadata tableMetadata = tableMetadataStore.get(fromTable.getFullName()); + tableMetadataStore.remove(fromTable.getFullName()); + tableMetadataStore.put(toTable.getFullName(), tableMetadata); + } + } + + @Override + protected void alterTableImpl(Identifier identifier, List changes) + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + if (tableMetadataStore.containsKey(identifier.getFullName())) { + TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName()); + TableSchema schema = tableMetadata.schema(); + Options options = Options.fromMap(schema.options()); + if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) { + throw new UnsupportedOperationException("Only data table support alter table."); + } + SchemaManager schemaManager = schemaManager(identifier); + try { + TableSchema newSchema = + runWithLock(identifier, () -> schemaManager.commitChanges(changes)); + TableMetadata newTableMetadata = + createTableMetadata( + identifier, + newSchema.id(), + newSchema.toSchema(), + tableMetadata.uuid(), + tableMetadata.isExternal()); + tableMetadataStore.put(identifier.getFullName(), newTableMetadata); + } catch (TableNotExistException + | ColumnAlreadyExistException + | ColumnNotExistException + | RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private SchemaManager schemaManager(Identifier identifier) { + Path path = getTableLocation(identifier); + return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault()); + } + + @Override + public void createFormatTable(Identifier identifier, Schema schema) { + TableMetadata tableMetadata = + createTableMetadata(identifier, 1L, schema, UUID.randomUUID().toString(), true); + tableMetadataStore.put(identifier.getFullName(), tableMetadata); + } + + @Override + public TableSchema loadTableSchema(Identifier identifier) throws TableNotExistException { + if (tableMetadataStore.containsKey(identifier.getFullName())) { + return tableMetadataStore.get(identifier.getFullName()).schema(); + } + throw new TableNotExistException(identifier); + } + + @Override + protected TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException { + if (tableMetadataStore.containsKey(identifier.getFullName())) { + return tableMetadataStore.get(identifier.getFullName()); + } + throw new TableNotExistException(identifier); + } + + @Override + public void createPartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + getTable(identifier); + tablePartitionsStore.put( + identifier.getFullName(), + partitions.stream() + .map(partition -> spec2Partition(partition)) + .collect(Collectors.toList())); + } + + @Override + public void dropPartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + getTable(identifier); + List existPartitions = tablePartitionsStore.get(identifier.getFullName()); + partitions.forEach( + partition -> { + for (Map.Entry entry : partition.entrySet()) { + existPartitions.stream() + .filter( + p -> + p.spec().containsKey(entry.getKey()) + && p.spec() + .get(entry.getKey()) + .equals(entry.getValue())) + .findFirst() + .ifPresent( + existPartition -> existPartitions.remove(existPartition)); + } + }); + } + + @Override + public void alterPartitions(Identifier identifier, List partitions) + throws TableNotExistException { + getTable(identifier); + List existPartitions = tablePartitionsStore.get(identifier.getFullName()); + partitions.forEach( + partition -> { + for (Map.Entry entry : partition.spec().entrySet()) { + existPartitions.stream() + .filter( + p -> + p.spec().containsKey(entry.getKey()) + && p.spec() + .get(entry.getKey()) + .equals(entry.getValue())) + .findFirst() + .ifPresent( + existPartition -> existPartitions.remove(existPartition)); + } + }); + existPartitions.addAll(partitions); + tablePartitionsStore.put(identifier.getFullName(), existPartitions); + } + + @Override + public List listPartitions(Identifier identifier) throws TableNotExistException { + getTable(identifier); + return tablePartitionsStore.get(identifier.getFullName()); + } + + @Override + public View getView(Identifier identifier) throws ViewNotExistException { + if (viewStore.containsKey(identifier.getFullName())) { + return viewStore.get(identifier.getFullName()); + } + throw new ViewNotExistException(identifier); + } + + @Override + public void dropView(Identifier identifier, boolean ignoreIfNotExists) + throws ViewNotExistException { + if (viewStore.containsKey(identifier.getFullName())) { + viewStore.remove(identifier.getFullName()); + } + if (!ignoreIfNotExists) { + throw new ViewNotExistException(identifier); + } + } + + @Override + public void createView(Identifier identifier, View view, boolean ignoreIfExists) + throws ViewAlreadyExistException, DatabaseNotExistException { + getDatabase(identifier.getDatabaseName()); + if (viewStore.containsKey(identifier.getFullName()) && !ignoreIfExists) { + throw new ViewAlreadyExistException(identifier); + } + viewStore.put(identifier.getFullName(), view); + } + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException { + getDatabase(databaseName); + return viewStore.keySet().stream() + .map(Identifier::fromString) + .filter(identifier -> identifier.getDatabaseName().equals(databaseName)) + .map(Identifier::getTableName) + .collect(Collectors.toList()); + } + + @Override + public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfNotExists) + throws ViewNotExistException, ViewAlreadyExistException { + if (!viewStore.containsKey(fromView.getFullName()) && !ignoreIfNotExists) { + throw new ViewNotExistException(fromView); + } + if (viewStore.containsKey(toView.getFullName())) { + throw new ViewAlreadyExistException(toView); + } + if (viewStore.containsKey(fromView.getFullName())) { + View view = viewStore.get(fromView.getFullName()); + viewStore.remove(fromView.getFullName()); + viewStore.put(toView.getFullName(), view); + } + } + + @Override + public boolean commitSnapshot( + Identifier identifier, Snapshot snapshot, List statistics) + throws TableNotExistException { + tableSnapshotStore.put(identifier.getFullName(), snapshot); + return false; + } + + @Override + public Optional loadSnapshot(Identifier identifier) throws TableNotExistException { + return Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName())); + } + + public RESTToken getToken(Identifier identifier) { + if (dataTokenStore.containsKey(identifier.getFullName())) { + return dataTokenStore.get(identifier.getFullName()); + } + long currentTimeMillis = System.currentTimeMillis(); + RESTToken token = + new RESTToken( + ImmutableMap.of( + "akId", + "akId" + currentTimeMillis, + "akSecret", + "akSecret" + currentTimeMillis), + currentTimeMillis); + dataTokenStore.put(identifier.getFullName(), token); + return dataTokenStore.get(identifier.getFullName()); + } + + private Partition spec2Partition(Map spec) { + // todo: need update + return new Partition(spec, 123, 456, 789, 123); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 564e1e5d8a9a..fd84bf79245f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -19,16 +19,20 @@ package org.apache.paimon.rest; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.catalog.PropertyChange; import org.apache.paimon.catalog.RenamingSnapshotCommit; +import org.apache.paimon.catalog.TableMetadata; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.auth.BearTokenAuthProvider; +import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterPartitionsRequest; import org.apache.paimon.rest.requests.AlterTableRequest; import org.apache.paimon.rest.requests.CommitTableRequest; @@ -39,6 +43,7 @@ import org.apache.paimon.rest.requests.DropPartitionsRequest; import org.apache.paimon.rest.requests.MarkDonePartitionsRequest; import org.apache.paimon.rest.requests.RenameTableRequest; +import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.CommitTableResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponse; @@ -67,16 +72,18 @@ import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; -import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER; -import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis; /** Mock REST server for testing. */ public class RESTCatalogServer { @@ -84,16 +91,31 @@ public class RESTCatalogServer { private static final String PREFIX = "paimon"; private static final String DATABASE_URI = String.format("/v1/%s/databases", PREFIX); - private final Catalog catalog; + private final MetadataInMemoryFileSystemCatalog catalog; private final Dispatcher dispatcher; private final MockWebServer server; private final String authToken; + public final Map databaseStore = new HashMap<>(); + public final Map tableMetadataStore = new HashMap<>(); + public final Map> tablePartitionsStore = new HashMap<>(); + public final Map viewStore = new HashMap<>(); + public final Map tableSnapshotStore = new HashMap<>(); + Map dataTokenStore = new HashMap<>(); + public RESTCatalogServer(String warehouse, String initToken) { authToken = initToken; Options conf = new Options(); conf.setString("warehouse", warehouse); - this.catalog = TestRESTCatalog.create(CatalogContext.create(conf)); + this.catalog = + MetadataInMemoryFileSystemCatalog.create( + CatalogContext.create(conf), + databaseStore, + tableMetadataStore, + tableSnapshotStore, + tablePartitionsStore, + viewStore, + dataTokenStore); this.dispatcher = initDispatcher(catalog, warehouse, authToken); MockWebServer mockWebServer = new MockWebServer(); mockWebServer.setDispatcher(dispatcher); @@ -112,7 +134,16 @@ public void shutdown() throws IOException { server.shutdown(); } - public static Dispatcher initDispatcher(Catalog catalog, String warehouse, String authToken) { + public void setTableSnapshot(Identifier identifier, Snapshot snapshot) { + tableSnapshotStore.put(identifier.getFullName(), snapshot); + } + + public void setDataToken(Identifier identifier, RESTToken token) { + dataTokenStore.put(identifier.getFullName(), token); + } + + public static Dispatcher initDispatcher( + MetadataInMemoryFileSystemCatalog catalog, String warehouse, String authToken) { return new Dispatcher() { @Override public MockResponse dispatch(RecordedRequest request) { @@ -241,17 +272,22 @@ public MockResponse dispatch(RecordedRequest request) { } return partitionsApiHandler(catalog, request, databaseName, tableName); } else if (isTableToken) { + RESTToken dataToken = + catalog.getToken(Identifier.create(databaseName, resources[2])); GetTableTokenResponse getTableTokenResponse = new GetTableTokenResponse( - ImmutableMap.of("key", "value"), - System.currentTimeMillis()); + dataToken.token(), dataToken.expireAtMillis()); return new MockResponse() .setResponseCode(200) .setBody( OBJECT_MAPPER.writeValueAsString( getTableTokenResponse)); } else if (isTableSnapshot) { - if (!"my_snapshot_table".equals(resources[2])) { + String tableName = resources[2]; + Optional snapshotOptional = + catalog.loadSnapshot( + Identifier.create(databaseName, tableName)); + if (!snapshotOptional.isPresent()) { response = new ErrorResponse( ErrorResponseResourceType.SNAPSHOT, @@ -261,8 +297,7 @@ public MockResponse dispatch(RecordedRequest request) { return mockResponse(response, 404); } GetTableSnapshotResponse getTableSnapshotResponse = - new GetTableSnapshotResponse( - createSnapshotWithMillis(10086, 100)); + new GetTableSnapshotResponse(snapshotOptional.get()); return new MockResponse() .setResponseCode(200) .setBody( @@ -447,6 +482,25 @@ private static MockResponse databaseApiHandler( case "DELETE": catalog.dropDatabase(databaseName, false, true); return new MockResponse().setResponseCode(200); + case "POST": + AlterDatabaseRequest requestBody = + OBJECT_MAPPER.readValue( + request.getBody().readUtf8(), AlterDatabaseRequest.class); + List changes = new ArrayList<>(); + for (String property : requestBody.getRemovals()) { + changes.add(PropertyChange.removeProperty(property)); + } + for (Map.Entry entry : requestBody.getUpdates().entrySet()) { + changes.add(PropertyChange.setProperty(entry.getKey(), entry.getValue())); + } + catalog.alterDatabase(databaseName, changes, false); + AlterDatabaseResponse alterDatabaseResponse = + new AlterDatabaseResponse( + requestBody.getRemovals(), + requestBody.getUpdates().keySet().stream() + .collect(Collectors.toList()), + Collections.emptyList()); + return mockResponse(alterDatabaseResponse, 200); default: return new MockResponse().setResponseCode(404); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index b0cf23202af0..3ba1943d1054 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -50,6 +50,7 @@ import java.util.Optional; import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; +import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -149,12 +150,14 @@ void testSnapshotFromREST() throws Catalog.TableNotExistException { options.set(RESTCatalogOptions.TOKEN, initToken); options.set(RESTCatalogOptions.TOKEN_PROVIDER, AuthProviderEnum.BEAR.identifier()); RESTCatalog catalog = new RESTCatalog(CatalogContext.create(options)); - - Optional snapshot = - catalog.loadSnapshot(Identifier.create("test_db_a", "my_snapshot_table")); + Identifier hasSnapshotTable = Identifier.create("test_db_a", "my_snapshot_table"); + long id = 10086; + long millis = System.currentTimeMillis(); + restCatalogServer.setTableSnapshot(hasSnapshotTable, createSnapshotWithMillis(id, millis)); + Optional snapshot = catalog.loadSnapshot(hasSnapshotTable); assertThat(snapshot).isPresent(); - assertThat(snapshot.get().id()).isEqualTo(10086); - assertThat(snapshot.get().timeMillis()).isEqualTo(100); + assertThat(snapshot.get().id()).isEqualTo(id); + assertThat(snapshot.get().timeMillis()).isEqualTo(millis); snapshot = catalog.loadSnapshot(Identifier.create("test_db_a", "unknown")); assertThat(snapshot).isEmpty(); @@ -175,6 +178,11 @@ protected boolean supportsView() { return true; } + @Override + protected boolean supportsAlterDatabase() { + return true; + } + private void createTable( Identifier identifier, Map options, List partitionKeys) throws Exception { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java b/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java deleted file mode 100644 index a812965310bb..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.rest; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.TableType; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.FileSystemCatalog; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.catalog.TableMetadata; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; -import org.apache.paimon.options.Options; -import org.apache.paimon.partition.Partition; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaChange; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.view.View; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** A catalog for testing RESTCatalog. */ -public class TestRESTCatalog extends FileSystemCatalog { - - public Map tableFullName2Schema = new HashMap(); - public Map> tableFullName2Partitions = - new HashMap>(); - public final Map viewFullName2View = new HashMap(); - - public TestRESTCatalog(FileIO fileIO, Path warehouse, Options options) { - super(fileIO, warehouse, options); - } - - public static TestRESTCatalog create(CatalogContext context) { - String warehouse = CatalogFactory.warehouse(context).toUri().toString(); - - Path warehousePath = new Path(warehouse); - FileIO fileIO; - - try { - fileIO = FileIO.get(warehousePath, context); - fileIO.checkOrMkdirs(warehousePath); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - - return new TestRESTCatalog(fileIO, warehousePath, context.options()); - } - - @Override - public void createPartitions(Identifier identifier, List> partitions) - throws TableNotExistException { - getTable(identifier); - tableFullName2Partitions.put( - identifier.getFullName(), - partitions.stream() - .map(partition -> spec2Partition(partition)) - .collect(Collectors.toList())); - } - - @Override - public void dropPartitions(Identifier identifier, List> partitions) - throws TableNotExistException { - getTable(identifier); - List existPartitions = tableFullName2Partitions.get(identifier.getFullName()); - partitions.forEach( - partition -> { - for (Map.Entry entry : partition.entrySet()) { - existPartitions.stream() - .filter( - p -> - p.spec().containsKey(entry.getKey()) - && p.spec() - .get(entry.getKey()) - .equals(entry.getValue())) - .findFirst() - .ifPresent( - existPartition -> existPartitions.remove(existPartition)); - } - }); - } - - @Override - public void alterPartitions(Identifier identifier, List partitions) - throws TableNotExistException { - getTable(identifier); - List existPartitions = tableFullName2Partitions.get(identifier.getFullName()); - partitions.forEach( - partition -> { - for (Map.Entry entry : partition.spec().entrySet()) { - existPartitions.stream() - .filter( - p -> - p.spec().containsKey(entry.getKey()) - && p.spec() - .get(entry.getKey()) - .equals(entry.getValue())) - .findFirst() - .ifPresent( - existPartition -> existPartitions.remove(existPartition)); - } - }); - existPartitions.addAll(partitions); - tableFullName2Partitions.put(identifier.getFullName(), existPartitions); - } - - @Override - public List listPartitions(Identifier identifier) throws TableNotExistException { - getTable(identifier); - return tableFullName2Partitions.get(identifier.getFullName()); - } - - @Override - public View getView(Identifier identifier) throws ViewNotExistException { - if (viewFullName2View.containsKey(identifier.getFullName())) { - return viewFullName2View.get(identifier.getFullName()); - } - throw new ViewNotExistException(identifier); - } - - @Override - public void dropView(Identifier identifier, boolean ignoreIfNotExists) - throws ViewNotExistException { - if (viewFullName2View.containsKey(identifier.getFullName())) { - viewFullName2View.remove(identifier.getFullName()); - } - if (!ignoreIfNotExists) { - throw new ViewNotExistException(identifier); - } - } - - @Override - public void createView(Identifier identifier, View view, boolean ignoreIfExists) - throws ViewAlreadyExistException, DatabaseNotExistException { - getDatabase(identifier.getDatabaseName()); - if (viewFullName2View.containsKey(identifier.getFullName()) && !ignoreIfExists) { - throw new ViewAlreadyExistException(identifier); - } - viewFullName2View.put(identifier.getFullName(), view); - } - - @Override - public List listViews(String databaseName) throws DatabaseNotExistException { - getDatabase(databaseName); - return viewFullName2View.keySet().stream() - .map(v -> Identifier.fromString(v)) - .filter(identifier -> identifier.getDatabaseName().equals(databaseName)) - .map(identifier -> identifier.getTableName()) - .collect(Collectors.toList()); - } - - @Override - public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfNotExists) - throws ViewNotExistException, ViewAlreadyExistException { - if (!viewFullName2View.containsKey(fromView.getFullName()) && !ignoreIfNotExists) { - throw new ViewNotExistException(fromView); - } - if (viewFullName2View.containsKey(toView.getFullName())) { - throw new ViewAlreadyExistException(toView); - } - if (viewFullName2View.containsKey(fromView.getFullName())) { - View view = viewFullName2View.get(fromView.getFullName()); - viewFullName2View.remove(fromView.getFullName()); - viewFullName2View.put(toView.getFullName(), view); - } - } - - @Override - protected List listTablesImpl(String databaseName) { - List tables = super.listTablesImpl(databaseName); - for (Map.Entry entry : tableFullName2Schema.entrySet()) { - Identifier identifier = Identifier.fromString(entry.getKey()); - if (databaseName.equals(identifier.getDatabaseName())) { - tables.add(identifier.getTableName()); - } - } - return tables; - } - - @Override - protected void dropTableImpl(Identifier identifier) { - if (tableFullName2Schema.containsKey(identifier.getFullName())) { - tableFullName2Schema.remove(identifier.getFullName()); - } else { - super.dropTableImpl(identifier); - } - } - - @Override - public void renameTableImpl(Identifier fromTable, Identifier toTable) { - if (tableFullName2Schema.containsKey(fromTable.getFullName())) { - TableSchema tableSchema = tableFullName2Schema.get(fromTable.getFullName()); - tableFullName2Schema.remove(fromTable.getFullName()); - tableFullName2Schema.put(toTable.getFullName(), tableSchema); - } else { - super.renameTableImpl(fromTable, toTable); - } - } - - @Override - protected void alterTableImpl(Identifier identifier, List changes) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - if (tableFullName2Schema.containsKey(identifier.getFullName())) { - TableSchema schema = tableFullName2Schema.get(identifier.getFullName()); - Options options = Options.fromMap(schema.options()); - if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) { - throw new UnsupportedOperationException("Only data table support alter table."); - } - } else { - super.alterTableImpl(identifier, changes); - } - } - - @Override - public void createFormatTable(Identifier identifier, Schema schema) { - Map options = new HashMap<>(schema.options()); - options.put("path", "/tmp/format_table"); - TableSchema tableSchema = - new TableSchema( - 1L, - schema.fields(), - 1, - schema.partitionKeys(), - schema.primaryKeys(), - options, - schema.comment()); - tableFullName2Schema.put(identifier.getFullName(), tableSchema); - } - - @Override - protected TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException { - if (tableFullName2Schema.containsKey(identifier.getFullName())) { - TableSchema tableSchema = tableFullName2Schema.get(identifier.getFullName()); - return new TableMetadata(tableSchema, false, "uuid"); - } - return super.loadTableMetadata(identifier); - } - - private Partition spec2Partition(Map spec) { - return new Partition(spec, 123, 456, 789, 123); - } -}