diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md index cfe105f6ac00..8be5304d547e 100644 --- a/docs/content/spark/sql-ddl.md +++ b/docs/content/spark/sql-ddl.md @@ -116,6 +116,50 @@ spark-sql ... \ ``` +```sql +USE paimon.default; +``` +#### Creating REST Catalog + +By using the Paimon REST catalog, changes to the catalog will be directly stored in remote server. + +##### bear token +```bash +spark-sql ... \ + --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ + --conf spark.sql.catalog.paimon.metastore=rest \ + --conf spark.sql.catalog.paimon.uri= \ + --conf spark.sql.catalog.paimon.token.provider=bear \ + --conf spark.sql.catalog.paimon.token= + +``` + +##### dlf ak +```bash +spark-sql ... \ + --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ + --conf spark.sql.catalog.paimon.metastore=rest \ + --conf spark.sql.catalog.paimon.uri= \ + --conf spark.sql.catalog.paimon.token.provider=dlf \ + --conf spark.sql.catalog.paimon.dlf.accessKeyId= \ + --conf spark.sql.catalog.paimon.dlf.accessKeySecret= + +``` + +##### dlf sts token +```bash +spark-sql ... \ + --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ + --conf spark.sql.catalog.paimon.metastore=rest \ + --conf spark.sql.catalog.paimon.uri= \ + --conf spark.sql.catalog.paimon.token.provider=dlf \ + --conf spark.sql.catalog.paimon.dlf.accessKeyId= \ + --conf spark.sql.catalog.paimon.dlf.accessKeySecret= \ + --conf spark.sql.catalog.paimon.dlf.securityToken= + + +``` + ```sql USE paimon.default; ``` diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java new file mode 100644 index 000000000000..081ada0fba33 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; + +import java.util.Map; + +/** Refresh data token in test mode. */ +public class DataTokenProvider { + + private Map token; + private long expiresAtMillis; + + public DataTokenProvider(Map token, long expiresAtMillis) { + this.token = token; + this.expiresAtMillis = expiresAtMillis; + } + + public void setExpiresAtMillis(long expiresAtMillis) { + this.expiresAtMillis = expiresAtMillis; + } + + public Map getToken() { + return token; + } + + public long getExpiresAtMillis() { + return expiresAtMillis; + } + + public void setToken(Map token) { + this.token = token; + } + + public void refresh() { + this.token = + ImmutableMap.of( + "ak", + "ak-" + System.currentTimeMillis(), + "sk", + "sk-" + System.currentTimeMillis()); + this.expiresAtMillis = System.currentTimeMillis(); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java b/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java deleted file mode 100644 index bd52d7bf104c..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java +++ /dev/null @@ -1,443 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.rest; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.Snapshot; -import org.apache.paimon.TableType; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Database; -import org.apache.paimon.catalog.FileSystemCatalog; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.catalog.PropertyChange; -import org.apache.paimon.catalog.SupportsSnapshots; -import org.apache.paimon.catalog.TableMetadata; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; -import org.apache.paimon.options.Options; -import org.apache.paimon.partition.Partition; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.schema.SchemaChange; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.utils.Pair; -import org.apache.paimon.view.View; - -import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - -import static org.apache.paimon.CoreOptions.PATH; - -/** A catalog for testing RESTCatalog. */ -public class MetadataInMemoryFileSystemCatalog extends FileSystemCatalog - implements SupportsSnapshots { - - public final Map databaseStore; - public final Map tableMetadataStore; - public final Map> tablePartitionsStore; - public final Map viewStore; - public final Map tableSnapshotStore; - public final Map dataTokenStore; - public FileSystemCatalog fileSystemCatalog; - - public MetadataInMemoryFileSystemCatalog( - FileIO fileIO, - Path warehouse, - Options options, - Map databaseStore, - Map tableMetadataStore, - Map tableSnapshotStore, - Map> tablePartitionsStore, - Map viewStore, - Map dataTokenStore) { - super(fileIO, warehouse, options); - this.fileSystemCatalog = new FileSystemCatalog(fileIO, warehouse, options); - this.databaseStore = databaseStore; - this.tableMetadataStore = tableMetadataStore; - this.tablePartitionsStore = tablePartitionsStore; - this.tableSnapshotStore = tableSnapshotStore; - this.viewStore = viewStore; - this.dataTokenStore = dataTokenStore; - } - - public static MetadataInMemoryFileSystemCatalog create( - CatalogContext context, - Map databaseStore, - Map tableMetadataStore, - Map tableSnapshotStore, - Map> tablePartitionsStore, - Map viewStore, - Map dataTokenStore) { - String warehouse = CatalogFactory.warehouse(context).toUri().toString(); - - Path warehousePath = new Path(warehouse); - FileIO fileIO; - - try { - fileIO = FileIO.get(warehousePath, context); - fileIO.checkOrMkdirs(warehousePath); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - - return new MetadataInMemoryFileSystemCatalog( - fileIO, - warehousePath, - context.options(), - databaseStore, - tableMetadataStore, - tableSnapshotStore, - tablePartitionsStore, - viewStore, - dataTokenStore); - } - - @Override - public List listDatabases() { - return new ArrayList<>(databaseStore.keySet()); - } - - @Override - protected void createDatabaseImpl(String name, Map properties) { - super.createDatabaseImpl(name, properties); - databaseStore.put(name, Database.of(name, properties, null)); - } - - @Override - public Database getDatabaseImpl(String name) throws DatabaseNotExistException { - if (databaseStore.containsKey(name)) { - return databaseStore.get(name); - } - throw new DatabaseNotExistException(name); - } - - @Override - protected void dropDatabaseImpl(String name) { - super.dropDatabaseImpl(name); - databaseStore.remove(name); - } - - protected void alterDatabaseImpl(String name, List changes) - throws DatabaseNotExistException { - if (databaseStore.containsKey(name)) { - Pair, Set> setPropertiesToRemoveKeys = - PropertyChange.getSetPropertiesToRemoveKeys(changes); - Map setProperties = setPropertiesToRemoveKeys.getLeft(); - Set removeKeys = setPropertiesToRemoveKeys.getRight(); - Database database = databaseStore.get(name); - Map parameter = new HashMap<>(database.options()); - if (!setProperties.isEmpty()) { - parameter.putAll(setProperties); - } - if (!removeKeys.isEmpty()) { - parameter.keySet().removeAll(removeKeys); - } - Database alterDatabase = Database.of(name, parameter, null); - databaseStore.put(name, alterDatabase); - } else { - throw new DatabaseNotExistException(name); - } - } - - @Override - protected List listTablesImpl(String databaseName) { - List tables = new ArrayList<>(); - for (Map.Entry entry : tableMetadataStore.entrySet()) { - Identifier identifier = Identifier.fromString(entry.getKey()); - if (databaseName.equals(identifier.getDatabaseName())) { - tables.add(identifier.getTableName()); - } - } - return tables; - } - - @Override - public void createTableImpl(Identifier identifier, Schema schema) { - super.createTableImpl(identifier, schema); - try { - TableMetadata tableMetadata = - createTableMetadata( - identifier, 1L, schema, UUID.randomUUID().toString(), false); - tableMetadataStore.put(identifier.getFullName(), tableMetadata); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private TableMetadata createTableMetadata( - Identifier identifier, long schemaId, Schema schema, String uuid, boolean isExternal) { - Map options = new HashMap<>(schema.options()); - Path path = getTableLocation(identifier); - options.put(PATH.key(), path.toString()); - TableSchema tableSchema = - new TableSchema( - schemaId, - schema.fields(), - schema.fields().size() - 1, - schema.partitionKeys(), - schema.primaryKeys(), - options, - schema.comment()); - TableMetadata tableMetadata = new TableMetadata(tableSchema, isExternal, uuid); - return tableMetadata; - } - - @Override - protected void dropTableImpl(Identifier identifier, List externalPaths) { - if (tableMetadataStore.containsKey(identifier.getFullName())) { - tableMetadataStore.remove(identifier.getFullName()); - super.dropTableImpl(identifier, externalPaths); - } - } - - @Override - public void renameTableImpl(Identifier fromTable, Identifier toTable) { - if (tableMetadataStore.containsKey(fromTable.getFullName())) { - super.renameTableImpl(fromTable, toTable); - TableMetadata tableMetadata = tableMetadataStore.get(fromTable.getFullName()); - tableMetadataStore.remove(fromTable.getFullName()); - tableMetadataStore.put(toTable.getFullName(), tableMetadata); - } - } - - @Override - protected void alterTableImpl(Identifier identifier, List changes) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - if (tableMetadataStore.containsKey(identifier.getFullName())) { - TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName()); - TableSchema schema = tableMetadata.schema(); - Options options = Options.fromMap(schema.options()); - if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) { - throw new UnsupportedOperationException("Only data table support alter table."); - } - SchemaManager schemaManager = schemaManager(identifier); - try { - TableSchema newSchema = - runWithLock(identifier, () -> schemaManager.commitChanges(changes)); - TableMetadata newTableMetadata = - createTableMetadata( - identifier, - newSchema.id(), - newSchema.toSchema(), - tableMetadata.uuid(), - tableMetadata.isExternal()); - tableMetadataStore.put(identifier.getFullName(), newTableMetadata); - } catch (TableNotExistException - | ColumnAlreadyExistException - | ColumnNotExistException - | RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - private SchemaManager schemaManager(Identifier identifier) { - Path path = getTableLocation(identifier); - return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault()); - } - - @Override - public void createFormatTable(Identifier identifier, Schema schema) { - TableMetadata tableMetadata = - createTableMetadata(identifier, 1L, schema, UUID.randomUUID().toString(), true); - tableMetadataStore.put(identifier.getFullName(), tableMetadata); - } - - @Override - public TableSchema loadTableSchema(Identifier identifier) throws TableNotExistException { - if (tableMetadataStore.containsKey(identifier.getFullName())) { - return tableMetadataStore.get(identifier.getFullName()).schema(); - } - throw new TableNotExistException(identifier); - } - - @Override - protected TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException { - if (tableMetadataStore.containsKey(identifier.getFullName())) { - return tableMetadataStore.get(identifier.getFullName()); - } - throw new TableNotExistException(identifier); - } - - @Override - public void createPartitions(Identifier identifier, List> partitions) - throws TableNotExistException { - getTable(identifier); - tablePartitionsStore.put( - identifier.getFullName(), - partitions.stream() - .map(partition -> spec2Partition(partition)) - .collect(Collectors.toList())); - } - - @Override - public void dropPartitions(Identifier identifier, List> partitions) - throws TableNotExistException { - getTable(identifier); - List existPartitions = tablePartitionsStore.get(identifier.getFullName()); - partitions.forEach( - partition -> { - for (Map.Entry entry : partition.entrySet()) { - existPartitions.stream() - .filter( - p -> - p.spec().containsKey(entry.getKey()) - && p.spec() - .get(entry.getKey()) - .equals(entry.getValue())) - .findFirst() - .ifPresent( - existPartition -> existPartitions.remove(existPartition)); - } - }); - } - - @Override - public void alterPartitions(Identifier identifier, List partitions) - throws TableNotExistException { - getTable(identifier); - List existPartitions = tablePartitionsStore.get(identifier.getFullName()); - partitions.forEach( - partition -> { - for (Map.Entry entry : partition.spec().entrySet()) { - existPartitions.stream() - .filter( - p -> - p.spec().containsKey(entry.getKey()) - && p.spec() - .get(entry.getKey()) - .equals(entry.getValue())) - .findFirst() - .ifPresent( - existPartition -> existPartitions.remove(existPartition)); - } - }); - existPartitions.addAll(partitions); - tablePartitionsStore.put(identifier.getFullName(), existPartitions); - } - - @Override - public List listPartitions(Identifier identifier) throws TableNotExistException { - getTable(identifier); - return tablePartitionsStore.get(identifier.getFullName()); - } - - @Override - public View getView(Identifier identifier) throws ViewNotExistException { - if (viewStore.containsKey(identifier.getFullName())) { - return viewStore.get(identifier.getFullName()); - } - throw new ViewNotExistException(identifier); - } - - @Override - public void dropView(Identifier identifier, boolean ignoreIfNotExists) - throws ViewNotExistException { - if (viewStore.containsKey(identifier.getFullName())) { - viewStore.remove(identifier.getFullName()); - } - if (!ignoreIfNotExists) { - throw new ViewNotExistException(identifier); - } - } - - @Override - public void createView(Identifier identifier, View view, boolean ignoreIfExists) - throws ViewAlreadyExistException, DatabaseNotExistException { - getDatabase(identifier.getDatabaseName()); - if (viewStore.containsKey(identifier.getFullName()) && !ignoreIfExists) { - throw new ViewAlreadyExistException(identifier); - } - viewStore.put(identifier.getFullName(), view); - } - - @Override - public List listViews(String databaseName) throws DatabaseNotExistException { - getDatabase(databaseName); - return viewStore.keySet().stream() - .map(Identifier::fromString) - .filter(identifier -> identifier.getDatabaseName().equals(databaseName)) - .map(Identifier::getTableName) - .collect(Collectors.toList()); - } - - @Override - public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfNotExists) - throws ViewNotExistException, ViewAlreadyExistException { - if (!viewStore.containsKey(fromView.getFullName()) && !ignoreIfNotExists) { - throw new ViewNotExistException(fromView); - } - if (viewStore.containsKey(toView.getFullName())) { - throw new ViewAlreadyExistException(toView); - } - if (viewStore.containsKey(fromView.getFullName())) { - View view = viewStore.get(fromView.getFullName()); - viewStore.remove(fromView.getFullName()); - viewStore.put(toView.getFullName(), view); - } - } - - @Override - public boolean commitSnapshot( - Identifier identifier, Snapshot snapshot, List statistics) - throws TableNotExistException { - tableSnapshotStore.put(identifier.getFullName(), snapshot); - return false; - } - - @Override - public Optional loadSnapshot(Identifier identifier) throws TableNotExistException { - return Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName())); - } - - public RESTToken getToken(Identifier identifier) { - if (dataTokenStore.containsKey(identifier.getFullName())) { - return dataTokenStore.get(identifier.getFullName()); - } - long currentTimeMillis = System.currentTimeMillis(); - RESTToken token = - new RESTToken( - ImmutableMap.of( - "akId", - "akId" + currentTimeMillis, - "akSecret", - "akSecret" + currentTimeMillis), - currentTimeMillis); - dataTokenStore.put(identifier.getFullName(), token); - return dataTokenStore.get(identifier.getFullName()); - } - - private Partition spec2Partition(Map spec) { - // todo: need update - return new Partition(spec, 123, 456, 789, 123); - } -} diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 15de6189c15f..30d7b0a598db 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -23,10 +23,13 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Database; +import org.apache.paimon.catalog.FileSystemCatalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; import org.apache.paimon.catalog.RenamingSnapshotCommit; import org.apache.paimon.catalog.TableMetadata; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; @@ -61,11 +64,11 @@ import org.apache.paimon.rest.responses.ListTablesResponse; import org.apache.paimon.rest.responses.ListViewsResponse; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FormatTable; -import org.apache.paimon.table.Table; -import org.apache.paimon.types.DataField; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.Pair; import org.apache.paimon.view.View; import org.apache.paimon.view.ViewImpl; import org.apache.paimon.view.ViewSchema; @@ -76,17 +79,23 @@ import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.CoreOptions.TYPE; +import static org.apache.paimon.TableType.FORMAT_TABLE; import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER; /** Mock REST server for testing. */ @@ -95,7 +104,7 @@ public class RESTCatalogServer { private static final String PREFIX = "paimon"; private static final String DATABASE_URI = String.format("/v1/%s/databases", PREFIX); - private final MetadataInMemoryFileSystemCatalog catalog; + private final FileSystemCatalog catalog; private final Dispatcher dispatcher; private final MockWebServer server; private final String authToken; @@ -111,16 +120,17 @@ public RESTCatalogServer(String warehouse, String initToken) { authToken = initToken; Options conf = new Options(); conf.setString("warehouse", warehouse); - this.catalog = - MetadataInMemoryFileSystemCatalog.create( - CatalogContext.create(conf), - databaseStore, - tableMetadataStore, - tableSnapshotStore, - tablePartitionsStore, - viewStore, - dataTokenStore); - this.dispatcher = initDispatcher(catalog, warehouse, authToken); + CatalogContext context = CatalogContext.create(conf); + Path warehousePath = new Path(warehouse); + FileIO fileIO; + try { + fileIO = FileIO.get(warehousePath, context); + fileIO.checkOrMkdirs(warehousePath); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + this.catalog = new FileSystemCatalog(fileIO, warehousePath, context.options()); + this.dispatcher = initDispatcher(warehouse, authToken); MockWebServer mockWebServer = new MockWebServer(); mockWebServer.setDispatcher(dispatcher); server = mockWebServer; @@ -146,8 +156,7 @@ public void setDataToken(Identifier identifier, RESTToken token) { dataTokenStore.put(identifier.getFullName(), token); } - public static Dispatcher initDispatcher( - MetadataInMemoryFileSystemCatalog catalog, String warehouse, String authToken) { + public Dispatcher initDispatcher(String warehouse, String authToken) { return new Dispatcher() { @Override public MockResponse dispatch(RecordedRequest request) { @@ -159,17 +168,27 @@ public MockResponse dispatch(RecordedRequest request) { return new MockResponse().setResponseCode(401); } if (request.getPath().startsWith("/v1/config")) { - return new MockResponse() - .setResponseCode(200) - .setBody(getConfigBody(warehouse)); + String body = + String.format( + "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\", \"%s\": \"%s\"}}", + RESTCatalogInternalOptions.PREFIX.key(), + PREFIX, + CatalogOptions.WAREHOUSE.key(), + warehouse, + "header.test-header", + "test-value"); + return new MockResponse().setResponseCode(200).setBody(body); } else if (DATABASE_URI.equals(request.getPath())) { - return databasesApiHandler(catalog, request); + return databasesApiHandler(request); } else if (request.getPath().startsWith(DATABASE_URI)) { String[] resources = request.getPath() .substring((DATABASE_URI + "/").length()) .split("/"); String databaseName = resources[0]; + if (!databaseStore.containsKey(databaseName)) { + throw new Catalog.DatabaseNotExistException(databaseName); + } boolean isViews = resources.length == 2 && "views".equals(resources[1]); boolean isTables = resources.length == 2 && "tables".equals(resources[1]); boolean isTableRename = @@ -226,44 +245,28 @@ public MockResponse dispatch(RecordedRequest request) { resources.length >= 4 && "tables".equals(resources[1]) && "branches".equals(resources[3]); - if (isDropPartitions) { + Identifier identifier = + resources.length >= 3 + ? Identifier.create(databaseName, resources[2]) + : null; + // validate partition + if (isPartitions + || isDropPartitions + || isAlterPartitions + || isMarkDonePartitions) { String tableName = resources[2]; - Identifier identifier = Identifier.create(databaseName, tableName); Optional error = - checkTablePartitioned(catalog, identifier); + checkTablePartitioned( + Identifier.create(databaseName, tableName)); if (error.isPresent()) { return error.get(); } - DropPartitionsRequest dropPartitionsRequest = - OBJECT_MAPPER.readValue( - request.getBody().readUtf8(), - DropPartitionsRequest.class); - catalog.dropPartitions( - identifier, dropPartitionsRequest.getPartitionSpecs()); - return new MockResponse().setResponseCode(200); + } + if (isDropPartitions) { + return dropPartitionsHandle(identifier, request); } else if (isAlterPartitions) { - String tableName = resources[2]; - Identifier identifier = Identifier.create(databaseName, tableName); - Optional error = - checkTablePartitioned(catalog, identifier); - if (error.isPresent()) { - return error.get(); - } - AlterPartitionsRequest alterPartitionsRequest = - OBJECT_MAPPER.readValue( - request.getBody().readUtf8(), - AlterPartitionsRequest.class); - catalog.alterPartitions( - identifier, alterPartitionsRequest.getPartitions()); - return new MockResponse().setResponseCode(200); + return alterPartitionsHandle(identifier, request); } else if (isMarkDonePartitions) { - String tableName = resources[2]; - Identifier identifier = Identifier.create(databaseName, tableName); - Optional error = - checkTablePartitioned(catalog, identifier); - if (error.isPresent()) { - return error.get(); - } MarkDonePartitionsRequest markDonePartitionsRequest = OBJECT_MAPPER.readValue( request.getBody().readUtf8(), @@ -272,17 +275,8 @@ public MockResponse dispatch(RecordedRequest request) { identifier, markDonePartitionsRequest.getPartitionSpecs()); return new MockResponse().setResponseCode(200); } else if (isPartitions) { - String tableName = resources[2]; - Optional error = - checkTablePartitioned( - catalog, Identifier.create(databaseName, tableName)); - if (error.isPresent()) { - return error.get(); - } - return partitionsApiHandler(catalog, request, databaseName, tableName); + return partitionsApiHandler(request, identifier); } else if (isBranches) { - String tableName = resources[2]; - Identifier identifier = Identifier.create(databaseName, tableName); FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); BranchManager branchManager = table.branchManager(); switch (request.getMethod()) { @@ -318,56 +312,25 @@ public MockResponse dispatch(RecordedRequest request) { return new MockResponse().setResponseCode(404); } } else if (isTableToken) { - RESTToken dataToken = - catalog.getToken(Identifier.create(databaseName, resources[2])); - GetTableTokenResponse getTableTokenResponse = - new GetTableTokenResponse( - dataToken.token(), dataToken.expireAtMillis()); - return new MockResponse() - .setResponseCode(200) - .setBody( - OBJECT_MAPPER.writeValueAsString( - getTableTokenResponse)); + return handleDataToken(identifier); } else if (isTableSnapshot) { - String tableName = resources[2]; - Optional snapshotOptional = - catalog.loadSnapshot( - Identifier.create(databaseName, tableName)); - if (!snapshotOptional.isPresent()) { - response = - new ErrorResponse( - ErrorResponseResourceType.SNAPSHOT, - databaseName, - "No Snapshot", - 404); - return mockResponse(response, 404); - } - GetTableSnapshotResponse getTableSnapshotResponse = - new GetTableSnapshotResponse(snapshotOptional.get()); - return new MockResponse() - .setResponseCode(200) - .setBody( - OBJECT_MAPPER.writeValueAsString( - getTableSnapshotResponse)); + return handleSnapshot(identifier); } else if (isTableRename) { - return renameTableApiHandler(catalog, request); + return renameTableApiHandler(request); } else if (isTableCommit) { - return commitTableApiHandler( - catalog, request, databaseName, resources[2]); + return commitTableApiHandler(request); } else if (isTable) { - String tableName = resources[2]; - return tableApiHandler(catalog, request, databaseName, tableName); + return tableApiHandler(request, identifier); } else if (isTables) { - return tablesApiHandler(catalog, request, databaseName); + return tablesApiHandler(request, databaseName); } else if (isViews) { - return viewsApiHandler(catalog, request, databaseName); + return viewsApiHandler(request, databaseName); } else if (isViewRename) { - return renameViewApiHandler(catalog, request); + return renameViewApiHandler(request); } else if (isView) { - String viewName = resources[2]; - return viewApiHandler(catalog, request, databaseName, viewName); + return viewApiHandler(request, identifier); } else { - return databaseApiHandler(catalog, request, databaseName); + return databaseApiHandler(request, databaseName); } } return new MockResponse().setResponseCode(404); @@ -458,31 +421,69 @@ public MockResponse dispatch(RecordedRequest request) { }; } - private static Optional checkTablePartitioned( - Catalog catalog, Identifier identifier) { - Table table; - try { - table = catalog.getTable(identifier); - } catch (Catalog.TableNotExistException e) { - return Optional.of( - mockResponse( - new ErrorResponse(ErrorResponseResourceType.TABLE, null, "", 404), - 404)); + private MockResponse handleDataToken(Identifier tableIdentifier) throws Exception { + RESTToken dataToken; + if (dataTokenStore.containsKey(tableIdentifier.getFullName())) { + dataToken = dataTokenStore.get(tableIdentifier.getFullName()); + } else { + long currentTimeMillis = System.currentTimeMillis(); + dataToken = + new RESTToken( + ImmutableMap.of( + "akId", + "akId" + currentTimeMillis, + "akSecret", + "akSecret" + currentTimeMillis), + currentTimeMillis); + dataTokenStore.put(tableIdentifier.getFullName(), dataToken); } - boolean partitioned = CoreOptions.fromMap(table.options()).partitionedTableInMetastore(); - if (!partitioned) { - return Optional.of(mockResponse(new ErrorResponse(null, null, "", 501), 501)); + GetTableTokenResponse getTableTokenResponse = + new GetTableTokenResponse(dataToken.token(), dataToken.expireAtMillis()); + return new MockResponse() + .setResponseCode(200) + .setBody(OBJECT_MAPPER.writeValueAsString(getTableTokenResponse)); + } + + private MockResponse handleSnapshot(Identifier identifier) throws Exception { + RESTResponse response; + Optional snapshotOptional = + Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName())); + if (!snapshotOptional.isPresent()) { + response = + new ErrorResponse( + ErrorResponseResourceType.SNAPSHOT, + identifier.getDatabaseName(), + "No Snapshot", + 404); + return mockResponse(response, 404); } - return Optional.empty(); + GetTableSnapshotResponse getTableSnapshotResponse = + new GetTableSnapshotResponse(snapshotOptional.get()); + return new MockResponse() + .setResponseCode(200) + .setBody(OBJECT_MAPPER.writeValueAsString(getTableSnapshotResponse)); } - private static MockResponse commitTableApiHandler( - Catalog catalog, RecordedRequest request, String databaseName, String tableName) - throws Exception { + private Optional checkTablePartitioned(Identifier identifier) { + if (tableMetadataStore.containsKey(identifier.getFullName())) { + TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName()); + boolean partitioned = + CoreOptions.fromMap(tableMetadata.schema().options()) + .partitionedTableInMetastore(); + if (!partitioned) { + return Optional.of(mockResponse(new ErrorResponse(null, null, "", 501), 501)); + } + return Optional.empty(); + } + return Optional.of( + mockResponse( + new ErrorResponse(ErrorResponseResourceType.TABLE, null, "", 404), 404)); + } + + private MockResponse commitTableApiHandler(RecordedRequest request) throws Exception { CommitTableRequest requestBody = OBJECT_MAPPER.readValue(request.getBody().readUtf8(), CommitTableRequest.class); - FileStoreTable table = - (FileStoreTable) catalog.getTable(Identifier.create(databaseName, tableName)); + FileStoreTable table = (FileStoreTable) catalog.getTable(requestBody.getIdentifier()); RenamingSnapshotCommit commit = new RenamingSnapshotCommit(table.snapshotManager(), Lock.empty()); String branchName = requestBody.getIdentifier().getBranchName(); @@ -491,16 +492,16 @@ private static MockResponse commitTableApiHandler( } boolean success = commit.commit(requestBody.getSnapshot(), branchName, Collections.emptyList()); + commitSnapshot(requestBody.getIdentifier(), requestBody.getSnapshot(), null); CommitTableResponse response = new CommitTableResponse(success); return mockResponse(response, 200); } - private static MockResponse databasesApiHandler(Catalog catalog, RecordedRequest request) - throws Exception { + private MockResponse databasesApiHandler(RecordedRequest request) throws Exception { RESTResponse response; switch (request.getMethod()) { case "GET": - List databaseNameList = catalog.listDatabases(); + List databaseNameList = new ArrayList<>(databaseStore.keySet()); response = new ListDatabasesResponse(databaseNameList); return mockResponse(response, 200); case "POST": @@ -509,6 +510,8 @@ private static MockResponse databasesApiHandler(Catalog catalog, RecordedRequest request.getBody().readUtf8(), CreateDatabaseRequest.class); String databaseName = requestBody.getName(); catalog.createDatabase(databaseName, false); + databaseStore.put( + databaseName, Database.of(databaseName, requestBody.getOptions(), null)); response = new CreateDatabaseResponse(databaseName, requestBody.getOptions()); return mockResponse(response, 200); default: @@ -516,124 +519,215 @@ private static MockResponse databasesApiHandler(Catalog catalog, RecordedRequest } } - private static MockResponse databaseApiHandler( - Catalog catalog, RecordedRequest request, String databaseName) throws Exception { + private MockResponse databaseApiHandler(RecordedRequest request, String databaseName) + throws Exception { RESTResponse response; - switch (request.getMethod()) { - case "GET": - Database database = catalog.getDatabase(databaseName); - response = - new GetDatabaseResponse( - UUID.randomUUID().toString(), database.name(), database.options()); - return mockResponse(response, 200); - case "DELETE": - catalog.dropDatabase(databaseName, false, true); - return new MockResponse().setResponseCode(200); - case "POST": - AlterDatabaseRequest requestBody = - OBJECT_MAPPER.readValue( - request.getBody().readUtf8(), AlterDatabaseRequest.class); - List changes = new ArrayList<>(); - for (String property : requestBody.getRemovals()) { - changes.add(PropertyChange.removeProperty(property)); - } - for (Map.Entry entry : requestBody.getUpdates().entrySet()) { - changes.add(PropertyChange.setProperty(entry.getKey(), entry.getValue())); - } - catalog.alterDatabase(databaseName, changes, false); - AlterDatabaseResponse alterDatabaseResponse = - new AlterDatabaseResponse( - requestBody.getRemovals(), - requestBody.getUpdates().keySet().stream() - .collect(Collectors.toList()), - Collections.emptyList()); - return mockResponse(alterDatabaseResponse, 200); - default: - return new MockResponse().setResponseCode(404); + Database database; + if (databaseStore.containsKey(databaseName)) { + switch (request.getMethod()) { + case "GET": + database = databaseStore.get(databaseName); + response = + new GetDatabaseResponse( + UUID.randomUUID().toString(), + database.name(), + database.options()); + return mockResponse(response, 200); + case "DELETE": + catalog.dropDatabase(databaseName, false, true); + databaseStore.remove(databaseName); + return new MockResponse().setResponseCode(200); + case "POST": + AlterDatabaseRequest requestBody = + OBJECT_MAPPER.readValue( + request.getBody().readUtf8(), AlterDatabaseRequest.class); + List changes = new ArrayList<>(); + for (String property : requestBody.getRemovals()) { + changes.add(PropertyChange.removeProperty(property)); + } + for (Map.Entry entry : requestBody.getUpdates().entrySet()) { + changes.add(PropertyChange.setProperty(entry.getKey(), entry.getValue())); + } + if (databaseStore.containsKey(databaseName)) { + Pair, Set> setPropertiesToRemoveKeys = + PropertyChange.getSetPropertiesToRemoveKeys(changes); + Map setProperties = setPropertiesToRemoveKeys.getLeft(); + Set removeKeys = setPropertiesToRemoveKeys.getRight(); + database = databaseStore.get(databaseName); + Map parameter = new HashMap<>(database.options()); + if (!setProperties.isEmpty()) { + parameter.putAll(setProperties); + } + if (!removeKeys.isEmpty()) { + parameter.keySet().removeAll(removeKeys); + } + Database alterDatabase = Database.of(databaseName, parameter, null); + databaseStore.put(databaseName, alterDatabase); + } else { + throw new Catalog.DatabaseNotExistException(databaseName); + } + AlterDatabaseResponse alterDatabaseResponse = + new AlterDatabaseResponse( + requestBody.getRemovals(), + requestBody.getUpdates().keySet().stream() + .collect(Collectors.toList()), + Collections.emptyList()); + return mockResponse(alterDatabaseResponse, 200); + default: + return new MockResponse().setResponseCode(404); + } } + return new MockResponse().setResponseCode(404); } - private static MockResponse tablesApiHandler( - Catalog catalog, RecordedRequest request, String databaseName) throws Exception { + private MockResponse tablesApiHandler(RecordedRequest request, String databaseName) + throws Exception { RESTResponse response; - switch (request.getMethod()) { - case "GET": - response = new ListTablesResponse(catalog.listTables(databaseName)); - return mockResponse(response, 200); - case "POST": - CreateTableRequest requestBody = - OBJECT_MAPPER.readValue( - request.getBody().readUtf8(), CreateTableRequest.class); - catalog.createTable(requestBody.getIdentifier(), requestBody.getSchema(), false); - return new MockResponse().setResponseCode(200); - default: - return new MockResponse().setResponseCode(404); + if (databaseStore.containsKey(databaseName)) { + switch (request.getMethod()) { + case "GET": + List tables = new ArrayList<>(); + for (Map.Entry entry : tableMetadataStore.entrySet()) { + Identifier identifier = Identifier.fromString(entry.getKey()); + if (databaseName.equals(identifier.getDatabaseName())) { + tables.add(identifier.getTableName()); + } + } + response = new ListTablesResponse(tables); + return mockResponse(response, 200); + case "POST": + CreateTableRequest requestBody = + OBJECT_MAPPER.readValue( + request.getBody().readUtf8(), CreateTableRequest.class); + Identifier identifier = requestBody.getIdentifier(); + Schema schema = requestBody.getSchema(); + TableMetadata tableMetadata; + if (isFormatTable(schema)) { + tableMetadata = createFormatTable(identifier, schema); + } else { + catalog.createTable(identifier, schema, false); + tableMetadata = + createTableMetadata( + requestBody.getIdentifier(), + 1L, + requestBody.getSchema(), + UUID.randomUUID().toString(), + false); + } + tableMetadataStore.put( + requestBody.getIdentifier().getFullName(), tableMetadata); + return new MockResponse().setResponseCode(200); + default: + return new MockResponse().setResponseCode(404); + } } + return mockResponse( + new ErrorResponse(ErrorResponseResourceType.DATABASE, null, "", 404), 404); + } + + private boolean isFormatTable(Schema schema) { + return Options.fromMap(schema.options()).get(TYPE) == FORMAT_TABLE; } - private static MockResponse tableApiHandler( - Catalog catalog, RecordedRequest request, String databaseName, String tableName) + private MockResponse tableApiHandler(RecordedRequest request, Identifier identifier) throws Exception { RESTResponse response; - Identifier identifier = Identifier.create(databaseName, tableName); - switch (request.getMethod()) { - case "GET": - response = getTable(catalog, databaseName, tableName); - return mockResponse(response, 200); - case "POST": - AlterTableRequest requestBody = - OBJECT_MAPPER.readValue( - request.getBody().readUtf8(), AlterTableRequest.class); - catalog.alterTable(identifier, requestBody.getChanges(), false); - return new MockResponse().setResponseCode(200); - case "DELETE": - catalog.dropTable(identifier, false); - return new MockResponse().setResponseCode(200); - default: - return new MockResponse().setResponseCode(404); + if (tableMetadataStore.containsKey(identifier.getFullName())) { + switch (request.getMethod()) { + case "GET": + TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName()); + response = + new GetTableResponse( + tableMetadata.uuid(), + identifier.getTableName(), + tableMetadata.isExternal(), + tableMetadata.schema().id(), + tableMetadata.schema().toSchema()); + return mockResponse(response, 200); + case "POST": + AlterTableRequest requestBody = + OBJECT_MAPPER.readValue( + request.getBody().readUtf8(), AlterTableRequest.class); + alterTableImpl(identifier, requestBody.getChanges()); + return new MockResponse().setResponseCode(200); + case "DELETE": + try { + catalog.dropTable(identifier, false); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + tableMetadataStore.remove(identifier.getFullName()); + return new MockResponse().setResponseCode(200); + default: + return new MockResponse().setResponseCode(404); + } + } else { + throw new Catalog.TableNotExistException(identifier); } } - private static MockResponse renameTableApiHandler(Catalog catalog, RecordedRequest request) - throws Exception { + private MockResponse renameTableApiHandler(RecordedRequest request) throws Exception { RenameTableRequest requestBody = OBJECT_MAPPER.readValue(request.getBody().readUtf8(), RenameTableRequest.class); - catalog.renameTable(requestBody.getSource(), requestBody.getDestination(), false); + Identifier fromTable = requestBody.getSource(); + Identifier toTable = requestBody.getDestination(); + if (tableMetadataStore.containsKey(fromTable.getFullName())) { + TableMetadata tableMetadata = tableMetadataStore.get(fromTable.getFullName()); + if (!isFormatTable(tableMetadata.schema().toSchema())) { + catalog.renameTable(requestBody.getSource(), requestBody.getDestination(), false); + } + tableMetadataStore.remove(fromTable.getFullName()); + tableMetadataStore.put(toTable.getFullName(), tableMetadata); + } else { + throw new Catalog.TableNotExistException(fromTable); + } return new MockResponse().setResponseCode(200); } - private static MockResponse partitionsApiHandler( - Catalog catalog, RecordedRequest request, String databaseName, String tableName) + private MockResponse partitionsApiHandler(RecordedRequest request, Identifier tableIdentifier) throws Exception { RESTResponse response; - Identifier identifier = Identifier.create(databaseName, tableName); switch (request.getMethod()) { case "GET": - List partitions = catalog.listPartitions(identifier); + List partitions = + tablePartitionsStore.get(tableIdentifier.getFullName()); response = new ListPartitionsResponse(partitions); return mockResponse(response, 200); case "POST": CreatePartitionsRequest requestBody = OBJECT_MAPPER.readValue( request.getBody().readUtf8(), CreatePartitionsRequest.class); - catalog.createPartitions(identifier, requestBody.getPartitionSpecs()); + tablePartitionsStore.put( + tableIdentifier.getFullName(), + requestBody.getPartitionSpecs().stream() + .map(partition -> spec2Partition(partition)) + .collect(Collectors.toList())); return new MockResponse().setResponseCode(200); default: return new MockResponse().setResponseCode(404); } } - private static MockResponse viewsApiHandler( - Catalog catalog, RecordedRequest request, String databaseName) throws Exception { + private MockResponse viewsApiHandler(RecordedRequest request, String databaseName) + throws Exception { RESTResponse response; switch (request.getMethod()) { case "GET": - response = new ListViewsResponse(catalog.listViews(databaseName)); + List views = + viewStore.keySet().stream() + .map(Identifier::fromString) + .filter( + identifier -> + identifier.getDatabaseName().equals(databaseName)) + .map(Identifier::getTableName) + .collect(Collectors.toList()); + response = new ListViewsResponse(views); return mockResponse(response, 200); case "POST": CreateViewRequest requestBody = OBJECT_MAPPER.readValue( request.getBody().readUtf8(), CreateViewRequest.class); + Identifier identifier = requestBody.getIdentifier(); ViewSchema schema = requestBody.getSchema(); ViewImpl view = new ViewImpl( @@ -643,71 +737,178 @@ private static MockResponse viewsApiHandler( schema.dialects(), schema.comment(), schema.options()); - catalog.createView(requestBody.getIdentifier(), view, false); + if (viewStore.containsKey(identifier.getFullName())) { + throw new Catalog.ViewAlreadyExistException(identifier); + } + viewStore.put(identifier.getFullName(), view); return new MockResponse().setResponseCode(200); default: return new MockResponse().setResponseCode(404); } } - private static MockResponse viewApiHandler( - Catalog catalog, RecordedRequest request, String databaseName, String viewName) + private MockResponse viewApiHandler(RecordedRequest request, Identifier identifier) throws Exception { RESTResponse response; - Identifier identifier = Identifier.create(databaseName, viewName); - switch (request.getMethod()) { - case "GET": - View view = catalog.getView(identifier); - ViewSchema schema = - new ViewSchema( - view.rowType().getFields(), - view.query(), - view.dialects(), - view.comment().orElse(null), - view.options()); - response = new GetViewResponse("id", identifier.getTableName(), schema); - return mockResponse(response, 200); - case "DELETE": - catalog.dropView(identifier, false); - return new MockResponse().setResponseCode(200); - default: - return new MockResponse().setResponseCode(404); + if (viewStore.containsKey(identifier.getFullName())) { + switch (request.getMethod()) { + case "GET": + if (viewStore.containsKey(identifier.getFullName())) { + View view = viewStore.get(identifier.getFullName()); + ViewSchema schema = + new ViewSchema( + view.rowType().getFields(), + view.query(), + view.dialects(), + view.comment().orElse(null), + view.options()); + response = new GetViewResponse("id", identifier.getTableName(), schema); + return mockResponse(response, 200); + } + throw new Catalog.ViewNotExistException(identifier); + case "DELETE": + viewStore.remove(identifier.getFullName()); + return new MockResponse().setResponseCode(200); + default: + return new MockResponse().setResponseCode(404); + } } + throw new Catalog.ViewNotExistException(identifier); } - private static MockResponse renameViewApiHandler(Catalog catalog, RecordedRequest request) - throws Exception { + private MockResponse renameViewApiHandler(RecordedRequest request) throws Exception { RenameTableRequest requestBody = OBJECT_MAPPER.readValue(request.getBody().readUtf8(), RenameTableRequest.class); - catalog.renameView(requestBody.getSource(), requestBody.getDestination(), false); + Identifier fromView = requestBody.getSource(); + Identifier toView = requestBody.getDestination(); + if (!viewStore.containsKey(fromView.getFullName())) { + throw new Catalog.ViewNotExistException(fromView); + } + if (viewStore.containsKey(toView.getFullName())) { + throw new Catalog.ViewAlreadyExistException(toView); + } + if (viewStore.containsKey(fromView.getFullName())) { + View view = viewStore.get(fromView.getFullName()); + viewStore.remove(fromView.getFullName()); + viewStore.put(toView.getFullName(), view); + } return new MockResponse().setResponseCode(200); } - private static GetTableResponse getTable(Catalog catalog, String databaseName, String tableName) - throws Exception { - Identifier identifier = Identifier.create(databaseName, tableName); - Table table = catalog.getTable(identifier); - Schema schema; - Long schemaId = 1L; - if (table instanceof FileStoreTable) { - FileStoreTable fileStoreTable = (FileStoreTable) table; - schema = fileStoreTable.schema().toSchema(); - schemaId = fileStoreTable.schema().id(); + protected void alterTableImpl(Identifier identifier, List changes) + throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, + Catalog.ColumnNotExistException { + if (tableMetadataStore.containsKey(identifier.getFullName())) { + TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName()); + TableSchema schema = tableMetadata.schema(); + if (isFormatTable(schema.toSchema())) { + throw new UnsupportedOperationException("Only data table support alter table."); + } + try { + catalog.alterTable(identifier, changes, false); + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + TableSchema newSchema = table.schema(); + TableMetadata newTableMetadata = + createTableMetadata( + identifier, + newSchema.id(), + newSchema.toSchema(), + tableMetadata.uuid(), + tableMetadata.isExternal()); + tableMetadataStore.put(identifier.getFullName(), newTableMetadata); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException + | RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private boolean commitSnapshot( + Identifier identifier, Snapshot snapshot, List statistics) + throws Catalog.TableNotExistException { + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + RenamingSnapshotCommit commit = + new RenamingSnapshotCommit(table.snapshotManager(), Lock.empty()); + String branchName = identifier.getBranchName(); + if (branchName == null) { + branchName = "main"; + } + try { + boolean success = commit.commit(snapshot, branchName, Collections.emptyList()); + tableSnapshotStore.put(identifier.getFullName(), snapshot); + return success; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private MockResponse dropPartitionsHandle(Identifier identifier, RecordedRequest request) + throws Catalog.TableNotExistException, JsonProcessingException { + DropPartitionsRequest dropPartitionsRequest = + OBJECT_MAPPER.readValue(request.getBody().readUtf8(), DropPartitionsRequest.class); + List> partitionSpecs = dropPartitionsRequest.getPartitionSpecs(); + if (tableMetadataStore.containsKey(identifier.getFullName())) { + List existPartitions = tablePartitionsStore.get(identifier.getFullName()); + partitionSpecs.forEach( + partition -> { + for (Map.Entry entry : partition.entrySet()) { + existPartitions.stream() + .filter( + p -> + p.spec().containsKey(entry.getKey()) + && p.spec() + .get(entry.getKey()) + .equals(entry.getValue())) + .findFirst() + .ifPresent( + existPartition -> + existPartitions.remove(existPartition)); + } + }); + return new MockResponse().setResponseCode(200); + } else { - FormatTable formatTable = (FormatTable) table; - List fields = formatTable.rowType().getFields(); - schema = - new Schema( - fields, - table.partitionKeys(), - table.primaryKeys(), - table.options(), - table.comment().orElse(null)); + throw new Catalog.TableNotExistException(identifier); } - return new GetTableResponse(table.uuid(), table.name(), false, schemaId, schema); } - private static MockResponse mockResponse(RESTResponse response, int httpCode) { + private MockResponse alterPartitionsHandle(Identifier identifier, RecordedRequest request) + throws Catalog.TableNotExistException, JsonProcessingException { + if (tableMetadataStore.containsKey(identifier.getFullName())) { + AlterPartitionsRequest alterPartitionsRequest = + OBJECT_MAPPER.readValue( + request.getBody().readUtf8(), AlterPartitionsRequest.class); + List partitions = alterPartitionsRequest.getPartitions(); + List existPartitions = tablePartitionsStore.get(identifier.getFullName()); + partitions.forEach( + partition -> { + for (Map.Entry entry : partition.spec().entrySet()) { + existPartitions.stream() + .filter( + p -> + p.spec().containsKey(entry.getKey()) + && p.spec() + .get(entry.getKey()) + .equals(entry.getValue())) + .findFirst() + .ifPresent( + existPartition -> + existPartitions.remove(existPartition)); + } + }); + existPartitions.addAll(partitions); + tablePartitionsStore.put(identifier.getFullName(), existPartitions); + return new MockResponse().setResponseCode(200); + } else { + throw new Catalog.TableNotExistException(identifier); + } + } + + private MockResponse mockResponse(RESTResponse response, int httpCode) { try { return new MockResponse() .setResponseCode(httpCode) @@ -718,14 +919,30 @@ private static MockResponse mockResponse(RESTResponse response, int httpCode) { } } - private static String getConfigBody(String warehouseStr) { - return String.format( - "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\", \"%s\": \"%s\"}}", - RESTCatalogInternalOptions.PREFIX.key(), - PREFIX, - CatalogOptions.WAREHOUSE.key(), - warehouseStr, - "header.test-header", - "test-value"); + private TableMetadata createTableMetadata( + Identifier identifier, long schemaId, Schema schema, String uuid, boolean isExternal) { + Map options = new HashMap<>(schema.options()); + Path path = catalog.getTableLocation(identifier); + options.put(PATH.key(), path.toString()); + TableSchema tableSchema = + new TableSchema( + schemaId, + schema.fields(), + schema.fields().size() - 1, + schema.partitionKeys(), + schema.primaryKeys(), + options, + schema.comment()); + TableMetadata tableMetadata = new TableMetadata(tableSchema, isExternal, uuid); + return tableMetadata; + } + + private TableMetadata createFormatTable(Identifier identifier, Schema schema) { + return createTableMetadata(identifier, 1L, schema, UUID.randomUUID().toString(), true); + } + + private Partition spec2Partition(Map spec) { + // todo: need update + return new Partition(spec, 123, 456, 789, 123); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index b9a550950c31..92c6db658da8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -23,15 +23,25 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.rest.auth.AuthProviderEnum; import org.apache.paimon.rest.auth.BearTokenAuthProvider; import org.apache.paimon.rest.auth.RESTAuthParameter; import org.apache.paimon.rest.exceptions.NotAuthorizedException; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -43,11 +53,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis; @@ -125,12 +137,7 @@ void testListPartitionsFromFile() throws Exception { @Test void testRefreshFileIO() throws Exception { - Options options = new Options(); - options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl()); - options.set(RESTCatalogOptions.TOKEN, initToken); - options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true); - options.set(RESTCatalogOptions.TOKEN_PROVIDER, AuthProviderEnum.BEAR.identifier()); - this.catalog = new RESTCatalog(CatalogContext.create(options)); + this.catalog = initDataTokenCatalog(); List identifiers = Lists.newArrayList( Identifier.create("test_db_a", "test_table_a"), @@ -140,29 +147,101 @@ void testRefreshFileIO() throws Exception { createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1")); FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); assertEquals(true, fileStoreTable.fileIO().exists(fileStoreTable.location())); + + RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); + RESTToken fileDataToken = fileIO.validToken(); + RESTToken serverDataToken = + restCatalogServer.dataTokenStore.get(identifier.getFullName()); + assertEquals(serverDataToken, fileDataToken); } } @Test - void testSnapshotFromREST() throws Catalog.TableNotExistException { + void testRefreshFileIOWhenExpired() throws Exception { + this.catalog = initDataTokenCatalog(); + Identifier identifier = + Identifier.create("test_data_token", "table_for_testing_date_token"); + RESTToken expiredDataToken = + new RESTToken( + ImmutableMap.of("akId", "akId", "akSecret", UUID.randomUUID().toString()), + System.currentTimeMillis()); + restCatalogServer.setDataToken(identifier, expiredDataToken); + createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1")); + FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); + RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); + RESTToken fileDataToken = fileIO.validToken(); + assertEquals(expiredDataToken, fileDataToken); + RESTToken newDataToken = + new RESTToken( + ImmutableMap.of("akId", "akId", "akSecret", UUID.randomUUID().toString()), + System.currentTimeMillis() + 100_000); + restCatalogServer.setDataToken(identifier, newDataToken); + RESTToken nextFileDataToken = fileIO.validToken(); + assertEquals(newDataToken, nextFileDataToken); + assertEquals(true, nextFileDataToken.expireAtMillis() - fileDataToken.expireAtMillis() > 0); + } + + @Test + void testSnapshotFromREST() throws Exception { Options options = new Options(); options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl()); options.set(RESTCatalogOptions.TOKEN, initToken); options.set(RESTCatalogOptions.TOKEN_PROVIDER, AuthProviderEnum.BEAR.identifier()); RESTCatalog catalog = new RESTCatalog(CatalogContext.create(options)); - Identifier hasSnapshotTable = Identifier.create("test_db_a", "my_snapshot_table"); + Identifier hasSnapshotTableIdentifier = Identifier.create("test_db_a", "my_snapshot_table"); + createTable(hasSnapshotTableIdentifier, Maps.newHashMap(), Lists.newArrayList("col1")); long id = 10086; long millis = System.currentTimeMillis(); - restCatalogServer.setTableSnapshot(hasSnapshotTable, createSnapshotWithMillis(id, millis)); - Optional snapshot = catalog.loadSnapshot(hasSnapshotTable); + restCatalogServer.setTableSnapshot( + hasSnapshotTableIdentifier, createSnapshotWithMillis(id, millis)); + Optional snapshot = catalog.loadSnapshot(hasSnapshotTableIdentifier); assertThat(snapshot).isPresent(); assertThat(snapshot.get().id()).isEqualTo(id); assertThat(snapshot.get().timeMillis()).isEqualTo(millis); - - snapshot = catalog.loadSnapshot(Identifier.create("test_db_a", "unknown")); + Identifier noSnapshotTableIdentifier = Identifier.create("test_db_a_1", "unknown"); + createTable(noSnapshotTableIdentifier, Maps.newHashMap(), Lists.newArrayList("col1")); + snapshot = catalog.loadSnapshot(noSnapshotTableIdentifier); assertThat(snapshot).isEmpty(); } + @Test + public void testBatchRecordsWrite() throws Exception { + + Identifier tableIdentifier = Identifier.create("my_db", "my_table"); + createTable(tableIdentifier, Maps.newHashMap(), Lists.newArrayList("col1")); + FileStoreTable tableTestWrite = (FileStoreTable) catalog.getTable(tableIdentifier); + + // write + BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder(); + BatchTableWrite write = writeBuilder.newWrite(); + GenericRow record1 = GenericRow.of(12); + GenericRow record2 = GenericRow.of(5); + GenericRow record3 = GenericRow.of(18); + write.write(record1); + write.write(record2); + write.write(record3); + List messages = write.prepareCommit(); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(messages); + write.close(); + commit.close(); + + // read + ReadBuilder readBuilder = tableTestWrite.newReadBuilder(); + List splits = readBuilder.newScan().plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + List actual = new ArrayList<>(); + reader.forEachRemaining( + row -> { + String rowStr = + String.format("%s[%d]", row.getRowKind().shortString(), row.getInt(0)); + actual.add(rowStr); + }); + + assertThat(actual).containsExactlyInAnyOrder("+I[5]", "+I[12]", "+I[18]"); + } + @Test void testBranches() throws Exception { String databaseName = "testBranchTable"; @@ -199,6 +278,11 @@ protected boolean supportsAlterDatabase() { return true; } + // TODO implement this + @Override + @Test + public void testTableUUID() {} + private void createTable( Identifier identifier, Map options, List partitionKeys) throws Exception { @@ -214,8 +298,12 @@ private void createTable( true); } - // TODO implement this - @Override - @Test - public void testTableUUID() {} + private Catalog initDataTokenCatalog() { + Options options = new Options(); + options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl()); + options.set(RESTCatalogOptions.TOKEN, initToken); + options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true); + options.set(RESTCatalogOptions.TOKEN_PROVIDER, AuthProviderEnum.BEAR.identifier()); + return new RESTCatalog(CatalogContext.create(options)); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java index dc202ec872eb..cf66ee425b31 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java @@ -105,6 +105,16 @@ protected Map catalogOptions() { return options; } + @Test + public void testWriteAndRead() { + batchSql( + String.format( + "INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)", + DATABASE_NAME, TABLE_NAME)); + assertThat(batchSql(String.format("SELECT * FROM %s.%s", DATABASE_NAME, TABLE_NAME))) + .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 22.0D)); + } + @Override protected String getTempDirPath() { return this.warehouse; diff --git a/paimon-spark/paimon-spark-ut/pom.xml b/paimon-spark/paimon-spark-ut/pom.xml index 552eb8474fb0..78127bd46fe7 100644 --- a/paimon-spark/paimon-spark-ut/pom.xml +++ b/paimon-spark/paimon-spark-ut/pom.xml @@ -157,6 +157,13 @@ under the License. protobuf-java ${protobuf-java.version} + + + com.squareup.okhttp3 + mockwebserver + ${okhttp.version} + test + diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java new file mode 100644 index 000000000000..cd2acdbe86dd --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark; + +import org.apache.paimon.rest.RESTCatalogServer; +import org.apache.paimon.rest.auth.AuthProviderEnum; + +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for spark read from Rest catalog. */ +public class SparkCatalogWithRestTest { + + private RESTCatalogServer restCatalogServer; + private String serverUrl; + private String warehouse; + private String initToken = "init_token"; + @TempDir java.nio.file.Path tempFile; + + @BeforeEach + public void before() throws IOException { + warehouse = tempFile.toUri().toString(); + restCatalogServer = new RESTCatalogServer(warehouse, initToken); + restCatalogServer.start(); + serverUrl = restCatalogServer.getUrl(); + } + + @AfterEach() + public void after() throws Exception { + restCatalogServer.shutdown(); + } + + @Test + public void testTable() { + SparkSession spark = + SparkSession.builder() + .config("spark.sql.catalog.paimon", SparkCatalog.class.getName()) + .config("spark.sql.catalog.paimon.metastore", "rest") + .config("spark.sql.catalog.paimon.uri", serverUrl) + .config("spark.sql.catalog.paimon.token", initToken) + .config( + "spark.sql.catalog.paimon.token.provider", + AuthProviderEnum.BEAR.identifier()) + .master("local[2]") + .getOrCreate(); + + spark.sql("CREATE DATABASE paimon.db2"); + spark.sql("USE paimon.db2"); + spark.sql( + "CREATE TABLE t1 (a INT, b INT, c STRING) TBLPROPERTIES" + + " ('primary-key'='a', 'bucket'='4', 'file.format'='avro')"); + assertThat( + spark.sql("SHOW TABLES").collectAsList().stream() + .map(s -> s.get(1)) + .map(Object::toString)) + .containsExactlyInAnyOrder("t1"); + spark.sql("DROP TABLE t1"); + assertThat(spark.sql("SHOW TABLES").collectAsList().size() == 0); + spark.close(); + } +}