From aaedb72f0c9aceb9d9d4580ff61bdf8d062751e0 Mon Sep 17 00:00:00 2001 From: kaveti Date: Fri, 27 Feb 2026 15:15:21 +0530 Subject: [PATCH 1/2] Support GCS vended credentials from Iceberg REST catalog --- .../trino/filesystem/gcs/GcsFileSystem.java | 25 +- .../filesystem/gcs/GcsFileSystemFactory.java | 20 +- .../filesystem/gcs/GcsStorageFactory.java | 49 ++- .../filesystem/gcs/TestGcsStorageFactory.java | 197 +++++++++ .../gcs/GcsFileSystemConstants.java | 28 ++ .../IcebergRestCatalogFileSystemFactory.java | 62 +++ .../TestIcebergGcsVendingRestCatalog.java | 377 ++++++++++++++++++ ...stIcebergRestCatalogFileSystemFactory.java | 262 ++++++++++++ .../GcsCredentialVendingCatalogAdapter.java | 61 +++ 9 files changed, 1072 insertions(+), 9 deletions(-) create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConstants.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalog.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogFileSystemFactory.java create mode 100644 plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/GcsCredentialVendingCatalogAdapter.java diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java index 03940719aa1c..d67205cd1386 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java @@ -76,8 +76,23 @@ public class GcsFileSystem private final long writeBlockSizeBytes; private final int pageSize; private final int batchSize; + private final Optional defaultEncryptionKey; + private final Optional defaultDecryptionKey; public GcsFileSystem(ListeningExecutorService executorService, Storage storage, int readBlockSizeBytes, long writeBlockSizeBytes, int pageSize, int batchSize) + { + this(executorService, storage, readBlockSizeBytes, writeBlockSizeBytes, pageSize, batchSize, Optional.empty(), Optional.empty()); + } + + public GcsFileSystem( + ListeningExecutorService executorService, + Storage storage, + int readBlockSizeBytes, + long writeBlockSizeBytes, + int pageSize, + int batchSize, + Optional defaultEncryptionKey, + Optional defaultDecryptionKey) { this.executorService = requireNonNull(executorService, "executorService is null"); this.storage = requireNonNull(storage, "storage is null"); @@ -85,6 +100,8 @@ public GcsFileSystem(ListeningExecutorService executorService, Storage storage, this.writeBlockSizeBytes = writeBlockSizeBytes; this.pageSize = pageSize; this.batchSize = batchSize; + this.defaultEncryptionKey = requireNonNull(defaultEncryptionKey, "defaultEncryptionKey is null"); + this.defaultDecryptionKey = requireNonNull(defaultDecryptionKey, "defaultDecryptionKey is null"); } @Override @@ -92,7 +109,7 @@ public TrinoInputFile newInputFile(Location location) { GcsLocation gcsLocation = new GcsLocation(location); checkIsValidFile(gcsLocation); - return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty(), Optional.empty(), Optional.empty()); + return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty(), Optional.empty(), defaultDecryptionKey); } @Override @@ -108,7 +125,7 @@ public TrinoInputFile newInputFile(Location location, long length) { GcsLocation gcsLocation = new GcsLocation(location); checkIsValidFile(gcsLocation); - return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.empty(), Optional.empty()); + return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.empty(), defaultDecryptionKey); } @Override @@ -124,7 +141,7 @@ public TrinoInputFile newInputFile(Location location, long length, Instant lastM { GcsLocation gcsLocation = new GcsLocation(location); checkIsValidFile(gcsLocation); - return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.of(lastModified), Optional.empty()); + return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.of(lastModified), defaultDecryptionKey); } @Override @@ -140,7 +157,7 @@ public TrinoOutputFile newOutputFile(Location location) { GcsLocation gcsLocation = new GcsLocation(location); checkIsValidFile(gcsLocation); - return new GcsOutputFile(gcsLocation, storage, writeBlockSizeBytes, Optional.empty()); + return new GcsOutputFile(gcsLocation, storage, writeBlockSizeBytes, defaultEncryptionKey); } @Override diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java index 4901dc5f50fc..3d069d328041 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java @@ -17,11 +17,17 @@ import com.google.inject.Inject; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.encryption.EncryptionKey; import io.trino.spi.security.ConnectorIdentity; import jakarta.annotation.PreDestroy; +import java.util.Base64; +import java.util.Optional; + import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newCachedThreadPool; @@ -56,6 +62,18 @@ public void stop() @Override public TrinoFileSystem create(ConnectorIdentity identity) { - return new GcsFileSystem(executorService, storageFactory.create(identity), readBlockSizeBytes, writeBlockSizeBytes, pageSize, batchSize); + Optional defaultEncryptionKey = extractEncryptionKey(identity, EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY); + Optional defaultDecryptionKey = extractEncryptionKey(identity, EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY); + return new GcsFileSystem(executorService, storageFactory.create(identity), readBlockSizeBytes, writeBlockSizeBytes, pageSize, batchSize, defaultEncryptionKey, defaultDecryptionKey); + } + + private static Optional extractEncryptionKey(ConnectorIdentity identity, String extraCredentialKey) + { + String base64Key = identity.getExtraCredentials().get(extraCredentialKey); + if (base64Key == null) { + return Optional.empty(); + } + byte[] keyBytes = Base64.getDecoder().decode(base64Key); + return Optional.of(new EncryptionKey(keyBytes, "AES256")); } } diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java index aff17b743dd2..eef133a2ac32 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsStorageFactory.java @@ -14,6 +14,9 @@ package io.trino.filesystem.gcs; import com.google.api.gax.retrying.RetrySettings; +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.NoCredentials; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.inject.Inject; @@ -22,11 +25,18 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.time.Duration; +import java.util.Date; import java.util.Map; import java.util.Optional; import static com.google.cloud.storage.StorageRetryStrategy.getUniformStorageRetryStrategy; import static com.google.common.net.HttpHeaders.USER_AGENT; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY; import static java.util.Objects.requireNonNull; public class GcsStorageFactory @@ -59,14 +69,45 @@ public GcsStorageFactory(GcsFileSystemConfig config, GcsAuth gcsAuth) public Storage create(ConnectorIdentity identity) { try { + Map extraCredentials = identity.getExtraCredentials(); + boolean noAuth = Boolean.parseBoolean(extraCredentials.getOrDefault(EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY, "false")); + String vendedOAuthToken = extraCredentials.get(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY); + StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder(); - if (projectId != null) { - storageOptionsBuilder.setProjectId(projectId); + + String effectiveProjectId = extraCredentials.getOrDefault(EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY, projectId); + if (effectiveProjectId != null) { + storageOptionsBuilder.setProjectId(effectiveProjectId); } - gcsAuth.setAuth(storageOptionsBuilder, identity); + if (noAuth) { + storageOptionsBuilder.setCredentials(NoCredentials.getInstance()); + } + else if (vendedOAuthToken != null) { + Date expirationTime = null; + String expiresAt = extraCredentials.get(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY); + if (expiresAt != null) { + expirationTime = new Date(Long.parseLong(expiresAt)); + } + AccessToken accessToken = new AccessToken(vendedOAuthToken, expirationTime); + storageOptionsBuilder.setCredentials(GoogleCredentials.create(accessToken)); + } + else { + gcsAuth.setAuth(storageOptionsBuilder, identity); + } - endpoint.ifPresent(storageOptionsBuilder::setHost); + String vendedServiceHost = extraCredentials.get(EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY); + if (vendedServiceHost != null) { + storageOptionsBuilder.setHost(vendedServiceHost); + } + else { + endpoint.ifPresent(storageOptionsBuilder::setHost); + } + + String vendedUserProject = extraCredentials.get(EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY); + if (vendedUserProject != null) { + storageOptionsBuilder.setQuotaProjectId(vendedUserProject); + } // Note: without uniform strategy we cannot retry idempotent operations. // The trino-filesystem api does not violate the conditions for idempotency, see https://cloud.google.com/storage/docs/retry-strategy#java for details. diff --git a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsStorageFactory.java b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsStorageFactory.java index 5649bdf7a4d3..7a64f5925f9b 100644 --- a/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsStorageFactory.java +++ b/lib/trino-filesystem-gcs/src/test/java/io/trino/filesystem/gcs/TestGcsStorageFactory.java @@ -14,12 +14,21 @@ package io.trino.filesystem.gcs; import com.google.auth.Credentials; +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.NoCredentials; import com.google.cloud.storage.Storage; +import com.google.common.collect.ImmutableMap; import io.trino.spi.security.ConnectorIdentity; import org.junit.jupiter.api.Test; import static io.trino.filesystem.gcs.GcsFileSystemConfig.AuthType; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY; import static org.assertj.core.api.Assertions.assertThat; final class TestGcsStorageFactory @@ -38,4 +47,192 @@ void testApplicationDefaultCredentials() assertThat(actualCredentials).isEqualTo(NoCredentials.getInstance()); } + + @Test + void testVendedOAuthToken() + throws Exception + { + GcsFileSystemConfig config = new GcsFileSystemConfig().setAuthType(AuthType.APPLICATION_DEFAULT); + GcsStorageFactory storageFactory = new GcsStorageFactory(config, new ApplicationDefaultAuth()); + + ConnectorIdentity identity = ConnectorIdentity.forUser("test") + .withExtraCredentials(ImmutableMap.of( + EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY, "ya29.test-token")) + .build(); + + try (Storage storage = storageFactory.create(identity)) { + Credentials credentials = storage.getOptions().getCredentials(); + assertThat(credentials).isInstanceOf(GoogleCredentials.class); + GoogleCredentials googleCredentials = (GoogleCredentials) credentials; + AccessToken accessToken = googleCredentials.getAccessToken(); + assertThat(accessToken).isNotNull(); + assertThat(accessToken.getTokenValue()).isEqualTo("ya29.test-token"); + } + } + + @Test + void testVendedOAuthTokenWithExpiration() + throws Exception + { + GcsFileSystemConfig config = new GcsFileSystemConfig().setAuthType(AuthType.APPLICATION_DEFAULT); + GcsStorageFactory storageFactory = new GcsStorageFactory(config, new ApplicationDefaultAuth()); + + ConnectorIdentity identity = ConnectorIdentity.forUser("test") + .withExtraCredentials(ImmutableMap.of( + EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY, "ya29.test-token", + EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY, "1700000000000")) + .build(); + + try (Storage storage = storageFactory.create(identity)) { + Credentials credentials = storage.getOptions().getCredentials(); + assertThat(credentials).isInstanceOf(GoogleCredentials.class); + GoogleCredentials googleCredentials = (GoogleCredentials) credentials; + AccessToken accessToken = googleCredentials.getAccessToken(); + assertThat(accessToken).isNotNull(); + assertThat(accessToken.getTokenValue()).isEqualTo("ya29.test-token"); + assertThat(accessToken.getExpirationTime()).isNotNull(); + assertThat(accessToken.getExpirationTime().getTime()).isEqualTo(1700000000000L); + } + } + + @Test + void testVendedProjectId() + throws Exception + { + GcsFileSystemConfig config = new GcsFileSystemConfig() + .setAuthType(AuthType.APPLICATION_DEFAULT) + .setProjectId("static-project"); + GcsStorageFactory storageFactory = new GcsStorageFactory(config, new ApplicationDefaultAuth()); + + ConnectorIdentity identity = ConnectorIdentity.forUser("test") + .withExtraCredentials(ImmutableMap.of( + EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY, "ya29.test-token", + EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY, "vended-project")) + .build(); + + try (Storage storage = storageFactory.create(identity)) { + assertThat(storage.getOptions().getProjectId()).isEqualTo("vended-project"); + } + } + + @Test + void testVendedServiceHost() + throws Exception + { + GcsFileSystemConfig config = new GcsFileSystemConfig() + .setAuthType(AuthType.APPLICATION_DEFAULT); + GcsStorageFactory storageFactory = new GcsStorageFactory(config, new ApplicationDefaultAuth()); + + ConnectorIdentity identity = ConnectorIdentity.forUser("test") + .withExtraCredentials(ImmutableMap.of( + EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY, "ya29.test-token", + EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY, "https://custom-storage.googleapis.com")) + .build(); + + try (Storage storage = storageFactory.create(identity)) { + assertThat(storage.getOptions().getHost()).isEqualTo("https://custom-storage.googleapis.com"); + } + } + + @Test + void testVendedNoAuth() + throws Exception + { + GcsFileSystemConfig config = new GcsFileSystemConfig().setAuthType(AuthType.APPLICATION_DEFAULT); + GcsStorageFactory storageFactory = new GcsStorageFactory(config, new ApplicationDefaultAuth()); + + ConnectorIdentity identity = ConnectorIdentity.forUser("test") + .withExtraCredentials(ImmutableMap.of( + EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY, "true")) + .build(); + + try (Storage storage = storageFactory.create(identity)) { + assertThat(storage.getOptions().getCredentials()).isEqualTo(NoCredentials.getInstance()); + } + } + + @Test + void testNoAuthTakesPriorityOverOAuthToken() + throws Exception + { + GcsFileSystemConfig config = new GcsFileSystemConfig().setAuthType(AuthType.APPLICATION_DEFAULT); + GcsStorageFactory storageFactory = new GcsStorageFactory(config, new ApplicationDefaultAuth()); + + ConnectorIdentity identity = ConnectorIdentity.forUser("test") + .withExtraCredentials(ImmutableMap.of( + EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY, "true", + EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY, "ya29.test-token")) + .build(); + + try (Storage storage = storageFactory.create(identity)) { + assertThat(storage.getOptions().getCredentials()).isEqualTo(NoCredentials.getInstance()); + } + } + + @Test + void testVendedUserProject() + throws Exception + { + GcsFileSystemConfig config = new GcsFileSystemConfig() + .setAuthType(AuthType.APPLICATION_DEFAULT); + GcsStorageFactory storageFactory = new GcsStorageFactory(config, new ApplicationDefaultAuth()); + + ConnectorIdentity identity = ConnectorIdentity.forUser("test") + .withExtraCredentials(ImmutableMap.of( + EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY, "ya29.test-token", + EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY, "billing-project")) + .build(); + + try (Storage storage = storageFactory.create(identity)) { + assertThat(storage.getOptions().getQuotaProjectId()).isEqualTo("billing-project"); + } + } + + @Test + void testNoAuthFalseDoesNotSkipAuth() + throws Exception + { + GcsFileSystemConfig config = new GcsFileSystemConfig().setAuthType(AuthType.APPLICATION_DEFAULT); + GcsStorageFactory storageFactory = new GcsStorageFactory(config, new ApplicationDefaultAuth()); + + ConnectorIdentity identity = ConnectorIdentity.forUser("test") + .withExtraCredentials(ImmutableMap.of( + EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY, "false", + EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY, "ya29.test-token")) + .build(); + + try (Storage storage = storageFactory.create(identity)) { + Credentials credentials = storage.getOptions().getCredentials(); + assertThat(credentials).isInstanceOf(GoogleCredentials.class); + assertThat(((GoogleCredentials) credentials).getAccessToken().getTokenValue()).isEqualTo("ya29.test-token"); + } + } + + @Test + void testUserProjectNotSetWithoutVendedCredentials() + throws Exception + { + GcsFileSystemConfig config = new GcsFileSystemConfig() + .setAuthType(AuthType.APPLICATION_DEFAULT); + GcsStorageFactory storageFactory = new GcsStorageFactory(config, new ApplicationDefaultAuth()); + + try (Storage storage = storageFactory.create(ConnectorIdentity.ofUser("test"))) { + assertThat(storage.getOptions().getQuotaProjectId()).isNull(); + } + } + + @Test + void testStaticConfigUsedWithoutVendedCredentials() + throws Exception + { + GcsFileSystemConfig config = new GcsFileSystemConfig() + .setAuthType(AuthType.APPLICATION_DEFAULT) + .setProjectId("static-project"); + GcsStorageFactory storageFactory = new GcsStorageFactory(config, new ApplicationDefaultAuth()); + + try (Storage storage = storageFactory.create(ConnectorIdentity.ofUser("test"))) { + assertThat(storage.getOptions().getProjectId()).isEqualTo("static-project"); + assertThat(storage.getOptions().getCredentials()).isEqualTo(NoCredentials.getInstance()); + } + } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConstants.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConstants.java new file mode 100644 index 000000000000..23c765c474e9 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConstants.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.gcs; + +public final class GcsFileSystemConstants +{ + public static final String EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY = "internal$gcs_oauth2_token"; + public static final String EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY = "internal$gcs_oauth2_token_expires_at"; + public static final String EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY = "internal$gcs_project_id"; + public static final String EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY = "internal$gcs_service_host"; + public static final String EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY = "internal$gcs_no_auth"; + public static final String EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY = "internal$gcs_user_project"; + public static final String EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY = "internal$gcs_encryption_key"; + public static final String EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY = "internal$gcs_decryption_key"; + + private GcsFileSystemConstants() {} +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java index b5e777c7ee15..f65cc74500d9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java @@ -22,6 +22,14 @@ import java.util.Map; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY; import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY; import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY; import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY; @@ -34,6 +42,15 @@ public class IcebergRestCatalogFileSystemFactory private static final String VENDED_S3_SECRET_KEY = "s3.secret-access-key"; private static final String VENDED_S3_SESSION_TOKEN = "s3.session-token"; + private static final String VENDED_GCS_OAUTH_TOKEN = "gcs.oauth2.token"; + private static final String VENDED_GCS_OAUTH_TOKEN_EXPIRES_AT = "gcs.oauth2.token-expires-at"; + private static final String VENDED_GCS_PROJECT_ID = "gcs.project-id"; + private static final String VENDED_GCS_SERVICE_HOST = "gcs.service.host"; + private static final String VENDED_GCS_NO_AUTH = "gcs.no-auth"; + private static final String VENDED_GCS_USER_PROJECT = "gcs.user-project"; + private static final String VENDED_GCS_ENCRYPTION_KEY = "gcs.encryption-key"; + private static final String VENDED_GCS_DECRYPTION_KEY = "gcs.decryption-key"; + private final TrinoFileSystemFactory fileSystemFactory; private final boolean vendedCredentialsEnabled; @@ -66,6 +83,51 @@ public TrinoFileSystem create(ConnectorIdentity identity, Map fi return fileSystemFactory.create(identityWithExtraCredentials); } + if (vendedCredentialsEnabled && hasAnyGcsVendedProperty(fileIoProperties)) { + ImmutableMap.Builder extraCredentialsBuilder = ImmutableMap.builder(); + + addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_OAUTH_TOKEN, EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY); + addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_OAUTH_TOKEN_EXPIRES_AT, EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY); + addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_PROJECT_ID, EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY); + addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_SERVICE_HOST, EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY); + addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_NO_AUTH, EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY); + addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_USER_PROJECT, EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY); + addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_ENCRYPTION_KEY, EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY); + addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_DECRYPTION_KEY, EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY); + + ConnectorIdentity identityWithExtraCredentials = ConnectorIdentity.forUser(identity.getUser()) + .withGroups(identity.getGroups()) + .withPrincipal(identity.getPrincipal()) + .withEnabledSystemRoles(identity.getEnabledSystemRoles()) + .withConnectorRole(identity.getConnectorRole()) + .withExtraCredentials(extraCredentialsBuilder.buildOrThrow()) + .build(); + return fileSystemFactory.create(identityWithExtraCredentials); + } + return fileSystemFactory.create(identity); } + + private static boolean hasAnyGcsVendedProperty(Map fileIoProperties) + { + return fileIoProperties.containsKey(VENDED_GCS_OAUTH_TOKEN) || + fileIoProperties.containsKey(VENDED_GCS_NO_AUTH) || + fileIoProperties.containsKey(VENDED_GCS_PROJECT_ID) || + fileIoProperties.containsKey(VENDED_GCS_SERVICE_HOST) || + fileIoProperties.containsKey(VENDED_GCS_USER_PROJECT) || + fileIoProperties.containsKey(VENDED_GCS_ENCRYPTION_KEY) || + fileIoProperties.containsKey(VENDED_GCS_DECRYPTION_KEY); + } + + private static void addOptionalProperty( + ImmutableMap.Builder builder, + Map fileIoProperties, + String vendedKey, + String extraCredentialKey) + { + String value = fileIoProperties.get(vendedKey); + if (value != null) { + builder.put(extraCredentialKey, value); + } + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalog.java new file mode 100644 index 000000000000..d2b2d4a1c07c --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalog.java @@ -0,0 +1,377 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.airlift.http.server.HttpServerConfig; +import io.airlift.http.server.HttpServerInfo; +import io.airlift.http.server.ServerFeature; +import io.airlift.http.server.testing.TestingHttpServer; +import io.airlift.node.NodeInfo; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.rest.GcsCredentialVendingCatalogAdapter; +import org.apache.iceberg.rest.RestCatalogServlet; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; + +import static io.trino.testing.TestingNames.randomNameSuffix; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +class TestIcebergGcsVendingRestCatalog + extends AbstractTestQueryFramework +{ + private static final String BUCKET_NAME = "test-iceberg-gcs-vending-" + randomNameSuffix(); + private static final int FAKE_GCS_PORT = 4443; + + private GenericContainer fakeGcsServer; + private JdbcCatalog backendCatalog; + private TestingHttpServer restServer; + + @SuppressWarnings("resource") + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + fakeGcsServer = new GenericContainer<>("fsouza/fake-gcs-server:1.49.3") + .withExposedPorts(FAKE_GCS_PORT) + .withCommand("-scheme", "http", "-port", String.valueOf(FAKE_GCS_PORT)) + .waitingFor(new HttpWaitStrategy() + .forPort(FAKE_GCS_PORT) + .forPath("/storage/v1/b") + .forStatusCode(200)); + fakeGcsServer.start(); + + String fakeGcsUrl = "http://%s:%d".formatted(fakeGcsServer.getHost(), fakeGcsServer.getMappedPort(FAKE_GCS_PORT)); + + updateExternalUrl(fakeGcsUrl); + createBucket(fakeGcsUrl, BUCKET_NAME); + + backendCatalog = createBackendCatalog(fakeGcsUrl); + + // GCS properties that the REST catalog will vend to Trino + Map vendedGcsProperties = ImmutableMap.of( + "gcs.no-auth", "true", + "gcs.project-id", "test-project", + "gcs.service.host", fakeGcsUrl); + + GcsCredentialVendingCatalogAdapter adapter = new GcsCredentialVendingCatalogAdapter(backendCatalog, vendedGcsProperties); + RestCatalogServlet servlet = new RestCatalogServlet(adapter); + + NodeInfo nodeInfo = new NodeInfo("test"); + HttpServerConfig httpConfig = new HttpServerConfig() + .setHttpPort(0) + .setHttpEnabled(true); + HttpServerInfo httpServerInfo = new HttpServerInfo(httpConfig, nodeInfo); + restServer = new TestingHttpServer("gcs-vending-rest-catalog", httpServerInfo, nodeInfo, httpConfig, servlet, ServerFeature.builder() + .withLegacyUriCompliance(true) + .build()); + restServer.start(); + + return IcebergQueryRunner.builder() + .setIcebergProperties(ImmutableMap.builder() + .put("iceberg.catalog.type", "rest") + .put("iceberg.rest-catalog.uri", restServer.getBaseUrl().toString()) + .put("iceberg.rest-catalog.vended-credentials-enabled", "true") + .put("fs.native-gcs.enabled", "true") + .put("gcs.project-id", "test-project") + .put("gcs.endpoint", fakeGcsUrl) + .put("gcs.auth-type", "APPLICATION_DEFAULT") + .buildOrThrow()) + .build(); + } + + @AfterAll + public void tearDown() + { + if (restServer != null) { + try { + restServer.stop(); + } + catch (Exception e) { + // ignore + } + } + if (backendCatalog != null) { + try { + backendCatalog.close(); + } + catch (Exception e) { + // ignore + } + } + if (fakeGcsServer != null) { + fakeGcsServer.close(); + } + } + + @Test + void testCreateTableInsertAndSelect() + { + String tableName = "test_gcs_vending_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, name VARCHAR)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'alice'), (2, 'bob')", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'alice'), (2, 'bob')"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + void testCreateTableAsSelect() + { + String tableName = "test_gcs_ctas_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS id, VARCHAR 'hello' AS greeting", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'hello')"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + void testTableLocationIsOnGcs() + { + String tableName = "test_gcs_location_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS id", 1); + + String filePath = (String) computeScalar("SELECT file_path FROM \"" + tableName + "$files\" LIMIT 1"); + assertThat(filePath).startsWith("gs://" + BUCKET_NAME + "/"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + void testSchemaOperations() + { + String schemaName = "test_gcs_schema_" + randomNameSuffix(); + assertUpdate("CREATE SCHEMA " + schemaName); + assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).contains(schemaName); + + String tableName = schemaName + ".test_table_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INTEGER)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1)", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES 1"); + + assertUpdate("DROP TABLE " + tableName); + assertUpdate("DROP SCHEMA " + schemaName); + } + + @Test + void testUpdateAndDelete() + { + String tableName = "test_gcs_update_delete_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, value VARCHAR)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'original'), (2, 'keep'), (3, 'remove')", 3); + + assertUpdate("UPDATE " + tableName + " SET value = 'updated' WHERE id = 1", 1); + assertQuery("SELECT value FROM " + tableName + " WHERE id = 1", "VALUES 'updated'"); + + assertUpdate("DELETE FROM " + tableName + " WHERE id = 3", 1); + assertQuery("SELECT * FROM " + tableName + " ORDER BY id", "VALUES (1, 'updated'), (2, 'keep')"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + void testPartitionedTable() + { + String tableName = "test_gcs_partitioned_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, region VARCHAR) WITH (partitioning = ARRAY['region'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'us'), (2, 'eu'), (3, 'us')", 3); + + assertQuery("SELECT id FROM " + tableName + " WHERE region = 'us' ORDER BY id", "VALUES 1, 3"); + assertQuery("SELECT id FROM " + tableName + " WHERE region = 'eu'", "VALUES 2"); + + long partitionCount = (long) computeScalar("SELECT count(*) FROM \"" + tableName + "$partitions\""); + assertThat(partitionCount).isEqualTo(2L); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + void testSnapshotsAndTimeTravel() + { + String tableName = "test_gcs_snapshots_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INTEGER)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES (1)", 1); + long snapshotAfterFirstInsert = (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC LIMIT 1"); + + assertUpdate("INSERT INTO " + tableName + " VALUES (2)", 1); + assertQuery("SELECT * FROM " + tableName + " ORDER BY id", "VALUES 1, 2"); + + // Time travel to first snapshot + assertQuery("SELECT * FROM " + tableName + " FOR VERSION AS OF " + snapshotAfterFirstInsert, "VALUES 1"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + void testMetadataTables() + { + String tableName = "test_gcs_metadata_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, part VARCHAR) WITH (partitioning = ARRAY['part'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'b')", 1); + + long snapshotCount = (long) computeScalar("SELECT count(*) FROM \"" + tableName + "$snapshots\""); + assertThat(snapshotCount).isGreaterThanOrEqualTo(2L); + + long fileCount = (long) computeScalar("SELECT count(*) FROM \"" + tableName + "$files\""); + assertThat(fileCount).isEqualTo(2L); + + long historyCount = (long) computeScalar("SELECT count(*) FROM \"" + tableName + "$history\""); + assertThat(historyCount).isGreaterThanOrEqualTo(2L); + + long partitionCount = (long) computeScalar("SELECT count(*) FROM \"" + tableName + "$partitions\""); + assertThat(partitionCount).isEqualTo(2L); + + // Verify file paths are on GCS + String filePath = (String) computeScalar("SELECT file_path FROM \"" + tableName + "$files\" LIMIT 1"); + assertThat(filePath).startsWith("gs://" + BUCKET_NAME + "/"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + void testMultipleDataTypes() + { + String tableName = "test_gcs_types_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (" + + "c_integer INTEGER, " + + "c_bigint BIGINT, " + + "c_real REAL, " + + "c_double DOUBLE, " + + "c_decimal DECIMAL(10,2), " + + "c_varchar VARCHAR, " + + "c_boolean BOOLEAN, " + + "c_date DATE, " + + "c_timestamp TIMESTAMP(6))"); + + assertUpdate("INSERT INTO " + tableName + " VALUES (" + + "42, " + + "BIGINT '9999999999', " + + "REAL '3.14', " + + "DOUBLE '2.718281828', " + + "DECIMAL '12345.67', " + + "VARCHAR 'hello world', " + + "true, " + + "DATE '2024-01-15', " + + "TIMESTAMP '2024-01-15 10:30:00.123456')", 1); + + assertQuery("SELECT c_integer, c_bigint, c_varchar, c_boolean FROM " + tableName, + "VALUES (42, 9999999999, 'hello world', true)"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + void testCreateOrReplaceTable() + { + String tableName = "test_gcs_create_or_replace_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS id, VARCHAR 'v1' AS version", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'v1')"); + + long v1SnapshotId = (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC LIMIT 1"); + + assertUpdate("CREATE OR REPLACE TABLE " + tableName + " AS SELECT 2 AS id, VARCHAR 'v2' AS version", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (2, 'v2')"); + + // Time travel to original version + assertQuery("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId, "VALUES (1, 'v1')"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + void testAlterTable() + { + String tableName = "test_gcs_alter_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INTEGER)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1)", 1); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN name VARCHAR"); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'alice')", 1); + + assertQuery("SELECT * FROM " + tableName + " ORDER BY id", "VALUES (1, NULL), (2, 'alice')"); + + assertUpdate("ALTER TABLE " + tableName + " RENAME COLUMN name TO full_name"); + assertQuery("SELECT id, full_name FROM " + tableName + " WHERE id = 2", "VALUES (2, 'alice')"); + + assertUpdate("DROP TABLE " + tableName); + } + + private static void updateExternalUrl(String fakeGcsUrl) + throws IOException, InterruptedException + { + HttpClient client = HttpClient.newHttpClient(); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(fakeGcsUrl + "/_internal/config")) + .header("Content-Type", "application/json") + .PUT(HttpRequest.BodyPublishers.ofString("{\"externalUrl\": \"" + fakeGcsUrl + "\"}")) + .build(); + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode()).isEqualTo(200); + } + + private static void createBucket(String fakeGcsUrl, String bucketName) + throws IOException, InterruptedException + { + HttpClient client = HttpClient.newHttpClient(); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(fakeGcsUrl + "/storage/v1/b?project=test-project")) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString("{\"name\": \"" + bucketName + "\"}")) + .build(); + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode()).isEqualTo(200); + } + + private static JdbcCatalog createBackendCatalog(String fakeGcsUrl) + throws IOException + { + Path tempFile = Files.createTempFile("iceberg-gcs-test-jdbc", null); + tempFile.toFile().deleteOnExit(); + + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.put(CatalogProperties.URI, "jdbc:h2:file:" + tempFile.toAbsolutePath()); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "schema-version", "V1"); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, "gs://" + BUCKET_NAME + "/warehouse"); + properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.gcp.gcs.GCSFileIO"); + properties.put("gcs.project-id", "test-project"); + properties.put("gcs.service.host", fakeGcsUrl); + properties.put("gcs.no-auth", "true"); + + JdbcCatalog catalog = new JdbcCatalog(); + catalog.setConf(new Configuration()); + catalog.initialize("gcs_backend", properties.buildOrThrow()); + + return catalog; + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogFileSystemFactory.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogFileSystemFactory.java new file mode 100644 index 000000000000..56e40a7375e5 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogFileSystemFactory.java @@ -0,0 +1,262 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.spi.security.ConnectorIdentity; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY; +import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY; +import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY; +import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY; +import static io.trino.filesystem.s3.S3FileSystemConstants.EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY; +import static org.assertj.core.api.Assertions.assertThat; + +final class TestIcebergRestCatalogFileSystemFactory +{ + @Test + void testS3VendedCredentials() + { + AtomicReference capturedIdentity = new AtomicReference<>(); + TrinoFileSystemFactory delegate = identity -> { + capturedIdentity.set(identity); + return null; + }; + + IcebergRestCatalogConfig config = new IcebergRestCatalogConfig() + .setBaseUri("http://localhost") + .setVendedCredentialsEnabled(true); + IcebergRestCatalogFileSystemFactory factory = new IcebergRestCatalogFileSystemFactory(delegate, config); + + Map fileIoProperties = ImmutableMap.of( + "s3.access-key-id", "test-access-key", + "s3.secret-access-key", "test-secret-key", + "s3.session-token", "test-session-token"); + + factory.create(ConnectorIdentity.ofUser("test"), fileIoProperties); + + ConnectorIdentity identity = capturedIdentity.get(); + assertThat(identity).isNotNull(); + assertThat(identity.getExtraCredentials()) + .containsEntry(EXTRA_CREDENTIALS_ACCESS_KEY_PROPERTY, "test-access-key") + .containsEntry(EXTRA_CREDENTIALS_SECRET_KEY_PROPERTY, "test-secret-key") + .containsEntry(EXTRA_CREDENTIALS_SESSION_TOKEN_PROPERTY, "test-session-token"); + } + + @Test + void testGcsVendedCredentialsWithOAuthTokenOnly() + { + AtomicReference capturedIdentity = new AtomicReference<>(); + TrinoFileSystemFactory delegate = identity -> { + capturedIdentity.set(identity); + return null; + }; + + IcebergRestCatalogConfig config = new IcebergRestCatalogConfig() + .setBaseUri("http://localhost") + .setVendedCredentialsEnabled(true); + IcebergRestCatalogFileSystemFactory factory = new IcebergRestCatalogFileSystemFactory(delegate, config); + + Map fileIoProperties = ImmutableMap.of( + "gcs.oauth2.token", "ya29.test-token"); + + factory.create(ConnectorIdentity.ofUser("test"), fileIoProperties); + + ConnectorIdentity identity = capturedIdentity.get(); + assertThat(identity).isNotNull(); + assertThat(identity.getExtraCredentials()) + .containsEntry(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY, "ya29.test-token") + .doesNotContainKey(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY) + .doesNotContainKey(EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY) + .doesNotContainKey(EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY); + } + + @Test + void testGcsVendedCredentialsWithAllProperties() + { + AtomicReference capturedIdentity = new AtomicReference<>(); + TrinoFileSystemFactory delegate = identity -> { + capturedIdentity.set(identity); + return null; + }; + + IcebergRestCatalogConfig config = new IcebergRestCatalogConfig() + .setBaseUri("http://localhost") + .setVendedCredentialsEnabled(true); + IcebergRestCatalogFileSystemFactory factory = new IcebergRestCatalogFileSystemFactory(delegate, config); + + Map fileIoProperties = ImmutableMap.builder() + .put("gcs.oauth2.token", "ya29.test-token") + .put("gcs.oauth2.token-expires-at", "1700000000000") + .put("gcs.project-id", "my-gcp-project") + .put("gcs.service.host", "https://custom-storage.googleapis.com") + .put("gcs.user-project", "billing-project") + .put("gcs.encryption-key", "dGVzdC1lbmNyeXB0aW9uLWtleS0xMjM0NTY3OA==") + .put("gcs.decryption-key", "dGVzdC1kZWNyeXB0aW9uLWtleS0xMjM0NTY3OA==") + .buildOrThrow(); + + factory.create(ConnectorIdentity.ofUser("test"), fileIoProperties); + + ConnectorIdentity identity = capturedIdentity.get(); + assertThat(identity).isNotNull(); + assertThat(identity.getExtraCredentials()) + .containsEntry(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY, "ya29.test-token") + .containsEntry(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY, "1700000000000") + .containsEntry(EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY, "my-gcp-project") + .containsEntry(EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY, "https://custom-storage.googleapis.com") + .containsEntry(EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY, "billing-project") + .containsEntry(EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY, "dGVzdC1lbmNyeXB0aW9uLWtleS0xMjM0NTY3OA==") + .containsEntry(EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY, "dGVzdC1kZWNyeXB0aW9uLWtleS0xMjM0NTY3OA=="); + } + + @Test + void testGcsVendedNoAuth() + { + AtomicReference capturedIdentity = new AtomicReference<>(); + TrinoFileSystemFactory delegate = identity -> { + capturedIdentity.set(identity); + return null; + }; + + IcebergRestCatalogConfig config = new IcebergRestCatalogConfig() + .setBaseUri("http://localhost") + .setVendedCredentialsEnabled(true); + IcebergRestCatalogFileSystemFactory factory = new IcebergRestCatalogFileSystemFactory(delegate, config); + + Map fileIoProperties = ImmutableMap.of( + "gcs.no-auth", "true", + "gcs.project-id", "public-project"); + + factory.create(ConnectorIdentity.ofUser("test"), fileIoProperties); + + ConnectorIdentity identity = capturedIdentity.get(); + assertThat(identity).isNotNull(); + assertThat(identity.getExtraCredentials()) + .containsEntry(EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY, "true") + .containsEntry(EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY, "public-project") + .doesNotContainKey(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY); + } + + @Test + void testGcsVendedEncryptionKeys() + { + AtomicReference capturedIdentity = new AtomicReference<>(); + TrinoFileSystemFactory delegate = identity -> { + capturedIdentity.set(identity); + return null; + }; + + IcebergRestCatalogConfig config = new IcebergRestCatalogConfig() + .setBaseUri("http://localhost") + .setVendedCredentialsEnabled(true); + IcebergRestCatalogFileSystemFactory factory = new IcebergRestCatalogFileSystemFactory(delegate, config); + + Map fileIoProperties = ImmutableMap.of( + "gcs.encryption-key", "dGVzdC1lbmNyeXB0aW9uLWtleS0xMjM0NTY3OA==", + "gcs.decryption-key", "dGVzdC1kZWNyeXB0aW9uLWtleS0xMjM0NTY3OA=="); + + factory.create(ConnectorIdentity.ofUser("test"), fileIoProperties); + + ConnectorIdentity identity = capturedIdentity.get(); + assertThat(identity).isNotNull(); + assertThat(identity.getExtraCredentials()) + .containsEntry(EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY, "dGVzdC1lbmNyeXB0aW9uLWtleS0xMjM0NTY3OA==") + .containsEntry(EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY, "dGVzdC1kZWNyeXB0aW9uLWtleS0xMjM0NTY3OA=="); + } + + @Test + void testGcsVendedCredentialsDisabled() + { + AtomicReference capturedIdentity = new AtomicReference<>(); + TrinoFileSystemFactory delegate = identity -> { + capturedIdentity.set(identity); + return null; + }; + + IcebergRestCatalogConfig config = new IcebergRestCatalogConfig() + .setBaseUri("http://localhost") + .setVendedCredentialsEnabled(false); + IcebergRestCatalogFileSystemFactory factory = new IcebergRestCatalogFileSystemFactory(delegate, config); + + Map fileIoProperties = ImmutableMap.of( + "gcs.oauth2.token", "ya29.test-token", + "gcs.project-id", "my-gcp-project"); + + ConnectorIdentity originalIdentity = ConnectorIdentity.ofUser("test"); + factory.create(originalIdentity, fileIoProperties); + + ConnectorIdentity identity = capturedIdentity.get(); + assertThat(identity).isNotNull(); + assertThat(identity.getExtraCredentials()).isEmpty(); + } + + @Test + void testGcsVendedUserProjectOnly() + { + AtomicReference capturedIdentity = new AtomicReference<>(); + TrinoFileSystemFactory delegate = identity -> { + capturedIdentity.set(identity); + return null; + }; + + IcebergRestCatalogConfig config = new IcebergRestCatalogConfig() + .setBaseUri("http://localhost") + .setVendedCredentialsEnabled(true); + IcebergRestCatalogFileSystemFactory factory = new IcebergRestCatalogFileSystemFactory(delegate, config); + + Map fileIoProperties = ImmutableMap.of( + "gcs.user-project", "billing-project"); + + factory.create(ConnectorIdentity.ofUser("test"), fileIoProperties); + + ConnectorIdentity identity = capturedIdentity.get(); + assertThat(identity).isNotNull(); + assertThat(identity.getExtraCredentials()) + .containsEntry(EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY, "billing-project") + .doesNotContainKey(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY) + .doesNotContainKey(EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY); + } + + @Test + void testNoVendedCredentialsInProperties() + { + AtomicReference capturedIdentity = new AtomicReference<>(); + TrinoFileSystemFactory delegate = identity -> { + capturedIdentity.set(identity); + return null; + }; + + IcebergRestCatalogConfig config = new IcebergRestCatalogConfig() + .setBaseUri("http://localhost") + .setVendedCredentialsEnabled(true); + IcebergRestCatalogFileSystemFactory factory = new IcebergRestCatalogFileSystemFactory(delegate, config); + + ConnectorIdentity originalIdentity = ConnectorIdentity.ofUser("test"); + factory.create(originalIdentity, ImmutableMap.of()); + + ConnectorIdentity identity = capturedIdentity.get(); + assertThat(identity).isSameAs(originalIdentity); + } +} diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/GcsCredentialVendingCatalogAdapter.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/GcsCredentialVendingCatalogAdapter.java new file mode 100644 index 000000000000..c609fa49ed58 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/GcsCredentialVendingCatalogAdapter.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.rest; + +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +import java.util.Map; +import java.util.function.Consumer; + +import static java.util.Objects.requireNonNull; + +/** + * A {@link RESTCatalogAdapter} that injects GCS credential properties into + * {@link LoadTableResponse} config, simulating a REST catalog server that + * vends GCS credentials. + */ +public class GcsCredentialVendingCatalogAdapter + extends RESTCatalogAdapter +{ + private final Map gcsCredentialConfig; + + public GcsCredentialVendingCatalogAdapter(Catalog catalog, Map gcsCredentialConfig) + { + super(catalog); + this.gcsCredentialConfig = ImmutableMap.copyOf(requireNonNull(gcsCredentialConfig, "gcsCredentialConfig is null")); + } + + @Override + protected T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> configConsumer) + { + T response = super.execute(request, responseType, errorHandler, configConsumer); + if (response instanceof LoadTableResponse loadTableResponse) { + @SuppressWarnings("unchecked") + T modified = (T) LoadTableResponse.builder() + .withTableMetadata(loadTableResponse.tableMetadata()) + .addAllConfig(loadTableResponse.config()) + .addAllConfig(gcsCredentialConfig) + .build(); + return modified; + } + return response; + } +} From d382c1f8a2d5c49efc046db3f39a3ec7ceb1e424 Mon Sep 17 00:00:00 2001 From: kaveti Date: Thu, 5 Mar 2026 14:04:50 +0530 Subject: [PATCH 2/2] Refactor Iceberg GCS vending tests and remove encryption key mechanism - Rewrite TestIcebergGcsVendingRestCatalog as TestIcebergGcsVendingRestCatalogConnectorSmokeTest extending BaseIcebergConnectorSmokeTest with real GCS credentials - Create IcebergGcsRestCatalogBackendContainer for GCS-specific REST catalog backend - Rename IcebergRestCatalogBackendContainer to IcebergS3RestCatalogBackendContainer - Remove old test class and GcsCredentialVendingCatalogAdapter - Remove default encryption/decryption key mechanism from GcsFileSystem and GcsFileSystemFactory to align with S3FileSystem and AzureFileSystem (only per-call newEncrypted* methods) - Remove encryption key constants from GcsFileSystemConstants - Remove encryption key mapping from IcebergRestCatalogFileSystemFactory - Add new GCS vending test to cloud-tests profile in pom.xml --- .../trino/filesystem/gcs/GcsFileSystem.java | 25 +- .../filesystem/gcs/GcsFileSystemFactory.java | 20 +- .../gcs/GcsFileSystemConstants.java | 2 - plugin/trino-iceberg/pom.xml | 2 + .../IcebergRestCatalogFileSystemFactory.java | 11 +- .../TestIcebergGcsVendingRestCatalog.java | 377 ------------------ ...sVendingRestCatalogConnectorSmokeTest.java | 330 +++++++++++++++ ...stIcebergRestCatalogFileSystemFactory.java | 35 +- ...gVendingRestCatalogConnectorSmokeTest.java | 6 +- .../GcsCredentialVendingCatalogAdapter.java | 61 --- ...IcebergGcsRestCatalogBackendContainer.java | 50 +++ ...IcebergS3RestCatalogBackendContainer.java} | 4 +- 12 files changed, 394 insertions(+), 529 deletions(-) delete mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalog.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalogConnectorSmokeTest.java delete mode 100644 plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/GcsCredentialVendingCatalogAdapter.java create mode 100644 testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergGcsRestCatalogBackendContainer.java rename testing/trino-testing-containers/src/main/java/io/trino/testing/containers/{IcebergRestCatalogBackendContainer.java => IcebergS3RestCatalogBackendContainer.java} (95%) diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java index d67205cd1386..03940719aa1c 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystem.java @@ -76,23 +76,8 @@ public class GcsFileSystem private final long writeBlockSizeBytes; private final int pageSize; private final int batchSize; - private final Optional defaultEncryptionKey; - private final Optional defaultDecryptionKey; public GcsFileSystem(ListeningExecutorService executorService, Storage storage, int readBlockSizeBytes, long writeBlockSizeBytes, int pageSize, int batchSize) - { - this(executorService, storage, readBlockSizeBytes, writeBlockSizeBytes, pageSize, batchSize, Optional.empty(), Optional.empty()); - } - - public GcsFileSystem( - ListeningExecutorService executorService, - Storage storage, - int readBlockSizeBytes, - long writeBlockSizeBytes, - int pageSize, - int batchSize, - Optional defaultEncryptionKey, - Optional defaultDecryptionKey) { this.executorService = requireNonNull(executorService, "executorService is null"); this.storage = requireNonNull(storage, "storage is null"); @@ -100,8 +85,6 @@ public GcsFileSystem( this.writeBlockSizeBytes = writeBlockSizeBytes; this.pageSize = pageSize; this.batchSize = batchSize; - this.defaultEncryptionKey = requireNonNull(defaultEncryptionKey, "defaultEncryptionKey is null"); - this.defaultDecryptionKey = requireNonNull(defaultDecryptionKey, "defaultDecryptionKey is null"); } @Override @@ -109,7 +92,7 @@ public TrinoInputFile newInputFile(Location location) { GcsLocation gcsLocation = new GcsLocation(location); checkIsValidFile(gcsLocation); - return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty(), Optional.empty(), defaultDecryptionKey); + return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.empty(), Optional.empty(), Optional.empty()); } @Override @@ -125,7 +108,7 @@ public TrinoInputFile newInputFile(Location location, long length) { GcsLocation gcsLocation = new GcsLocation(location); checkIsValidFile(gcsLocation); - return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.empty(), defaultDecryptionKey); + return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.empty(), Optional.empty()); } @Override @@ -141,7 +124,7 @@ public TrinoInputFile newInputFile(Location location, long length, Instant lastM { GcsLocation gcsLocation = new GcsLocation(location); checkIsValidFile(gcsLocation); - return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.of(lastModified), defaultDecryptionKey); + return new GcsInputFile(gcsLocation, storage, readBlockSizeBytes, OptionalLong.of(length), Optional.of(lastModified), Optional.empty()); } @Override @@ -157,7 +140,7 @@ public TrinoOutputFile newOutputFile(Location location) { GcsLocation gcsLocation = new GcsLocation(location); checkIsValidFile(gcsLocation); - return new GcsOutputFile(gcsLocation, storage, writeBlockSizeBytes, defaultEncryptionKey); + return new GcsOutputFile(gcsLocation, storage, writeBlockSizeBytes, Optional.empty()); } @Override diff --git a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java index 3d069d328041..4901dc5f50fc 100644 --- a/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java +++ b/lib/trino-filesystem-gcs/src/main/java/io/trino/filesystem/gcs/GcsFileSystemFactory.java @@ -17,17 +17,11 @@ import com.google.inject.Inject; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.filesystem.encryption.EncryptionKey; import io.trino.spi.security.ConnectorIdentity; import jakarta.annotation.PreDestroy; -import java.util.Base64; -import java.util.Optional; - import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY; -import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newCachedThreadPool; @@ -62,18 +56,6 @@ public void stop() @Override public TrinoFileSystem create(ConnectorIdentity identity) { - Optional defaultEncryptionKey = extractEncryptionKey(identity, EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY); - Optional defaultDecryptionKey = extractEncryptionKey(identity, EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY); - return new GcsFileSystem(executorService, storageFactory.create(identity), readBlockSizeBytes, writeBlockSizeBytes, pageSize, batchSize, defaultEncryptionKey, defaultDecryptionKey); - } - - private static Optional extractEncryptionKey(ConnectorIdentity identity, String extraCredentialKey) - { - String base64Key = identity.getExtraCredentials().get(extraCredentialKey); - if (base64Key == null) { - return Optional.empty(); - } - byte[] keyBytes = Base64.getDecoder().decode(base64Key); - return Optional.of(new EncryptionKey(keyBytes, "AES256")); + return new GcsFileSystem(executorService, storageFactory.create(identity), readBlockSizeBytes, writeBlockSizeBytes, pageSize, batchSize); } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConstants.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConstants.java index 23c765c474e9..8f8e4906d608 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConstants.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/gcs/GcsFileSystemConstants.java @@ -21,8 +21,6 @@ public final class GcsFileSystemConstants public static final String EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY = "internal$gcs_service_host"; public static final String EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY = "internal$gcs_no_auth"; public static final String EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY = "internal$gcs_user_project"; - public static final String EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY = "internal$gcs_encryption_key"; - public static final String EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY = "internal$gcs_decryption_key"; private GcsFileSystemConstants() {} } diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 6246d51ffb8d..4d938a8a3729 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -778,6 +778,7 @@ **/TestIcebergS3TablesConnectorSmokeTest.java **/TestIcebergBigLakeMetastoreConnectorSmokeTest.java **/TestIcebergGcsConnectorSmokeTest.java + **/TestIcebergGcsVendingRestCatalogConnectorSmokeTest.java **/TestIcebergAbfsConnectorSmokeTest.java **/Test*FailureRecoveryTest.java **/TestIcebergSnowflakeCatalogConnectorSmokeTest.java @@ -845,6 +846,7 @@ **/TestIcebergS3TablesConnectorSmokeTest.java **/TestIcebergBigLakeMetastoreConnectorSmokeTest.java **/TestIcebergGcsConnectorSmokeTest.java + **/TestIcebergGcsVendingRestCatalogConnectorSmokeTest.java **/TestIcebergAbfsConnectorSmokeTest.java **/TestIcebergSnowflakeCatalogConnectorSmokeTest.java **/TestTrinoSnowflakeCatalog.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java index f65cc74500d9..34d8ed2227e4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogFileSystemFactory.java @@ -22,8 +22,6 @@ import java.util.Map; -import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY; -import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY; import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY; import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY; import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY; @@ -48,8 +46,6 @@ public class IcebergRestCatalogFileSystemFactory private static final String VENDED_GCS_SERVICE_HOST = "gcs.service.host"; private static final String VENDED_GCS_NO_AUTH = "gcs.no-auth"; private static final String VENDED_GCS_USER_PROJECT = "gcs.user-project"; - private static final String VENDED_GCS_ENCRYPTION_KEY = "gcs.encryption-key"; - private static final String VENDED_GCS_DECRYPTION_KEY = "gcs.decryption-key"; private final TrinoFileSystemFactory fileSystemFactory; private final boolean vendedCredentialsEnabled; @@ -92,9 +88,6 @@ public TrinoFileSystem create(ConnectorIdentity identity, Map fi addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_SERVICE_HOST, EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY); addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_NO_AUTH, EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY); addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_USER_PROJECT, EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY); - addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_ENCRYPTION_KEY, EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY); - addOptionalProperty(extraCredentialsBuilder, fileIoProperties, VENDED_GCS_DECRYPTION_KEY, EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY); - ConnectorIdentity identityWithExtraCredentials = ConnectorIdentity.forUser(identity.getUser()) .withGroups(identity.getGroups()) .withPrincipal(identity.getPrincipal()) @@ -114,9 +107,7 @@ private static boolean hasAnyGcsVendedProperty(Map fileIoPropert fileIoProperties.containsKey(VENDED_GCS_NO_AUTH) || fileIoProperties.containsKey(VENDED_GCS_PROJECT_ID) || fileIoProperties.containsKey(VENDED_GCS_SERVICE_HOST) || - fileIoProperties.containsKey(VENDED_GCS_USER_PROJECT) || - fileIoProperties.containsKey(VENDED_GCS_ENCRYPTION_KEY) || - fileIoProperties.containsKey(VENDED_GCS_DECRYPTION_KEY); + fileIoProperties.containsKey(VENDED_GCS_USER_PROJECT); } private static void addOptionalProperty( diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalog.java deleted file mode 100644 index d2b2d4a1c07c..000000000000 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalog.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg.catalog.rest; - -import com.google.common.collect.ImmutableMap; -import io.airlift.http.server.HttpServerConfig; -import io.airlift.http.server.HttpServerInfo; -import io.airlift.http.server.ServerFeature; -import io.airlift.http.server.testing.TestingHttpServer; -import io.airlift.node.NodeInfo; -import io.trino.plugin.iceberg.IcebergQueryRunner; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.QueryRunner; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.jdbc.JdbcCatalog; -import org.apache.iceberg.rest.GcsCredentialVendingCatalogAdapter; -import org.apache.iceberg.rest.RestCatalogServlet; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; - -import java.io.IOException; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Map; - -import static io.trino.testing.TestingNames.randomNameSuffix; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; - -@TestInstance(PER_CLASS) -class TestIcebergGcsVendingRestCatalog - extends AbstractTestQueryFramework -{ - private static final String BUCKET_NAME = "test-iceberg-gcs-vending-" + randomNameSuffix(); - private static final int FAKE_GCS_PORT = 4443; - - private GenericContainer fakeGcsServer; - private JdbcCatalog backendCatalog; - private TestingHttpServer restServer; - - @SuppressWarnings("resource") - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - fakeGcsServer = new GenericContainer<>("fsouza/fake-gcs-server:1.49.3") - .withExposedPorts(FAKE_GCS_PORT) - .withCommand("-scheme", "http", "-port", String.valueOf(FAKE_GCS_PORT)) - .waitingFor(new HttpWaitStrategy() - .forPort(FAKE_GCS_PORT) - .forPath("/storage/v1/b") - .forStatusCode(200)); - fakeGcsServer.start(); - - String fakeGcsUrl = "http://%s:%d".formatted(fakeGcsServer.getHost(), fakeGcsServer.getMappedPort(FAKE_GCS_PORT)); - - updateExternalUrl(fakeGcsUrl); - createBucket(fakeGcsUrl, BUCKET_NAME); - - backendCatalog = createBackendCatalog(fakeGcsUrl); - - // GCS properties that the REST catalog will vend to Trino - Map vendedGcsProperties = ImmutableMap.of( - "gcs.no-auth", "true", - "gcs.project-id", "test-project", - "gcs.service.host", fakeGcsUrl); - - GcsCredentialVendingCatalogAdapter adapter = new GcsCredentialVendingCatalogAdapter(backendCatalog, vendedGcsProperties); - RestCatalogServlet servlet = new RestCatalogServlet(adapter); - - NodeInfo nodeInfo = new NodeInfo("test"); - HttpServerConfig httpConfig = new HttpServerConfig() - .setHttpPort(0) - .setHttpEnabled(true); - HttpServerInfo httpServerInfo = new HttpServerInfo(httpConfig, nodeInfo); - restServer = new TestingHttpServer("gcs-vending-rest-catalog", httpServerInfo, nodeInfo, httpConfig, servlet, ServerFeature.builder() - .withLegacyUriCompliance(true) - .build()); - restServer.start(); - - return IcebergQueryRunner.builder() - .setIcebergProperties(ImmutableMap.builder() - .put("iceberg.catalog.type", "rest") - .put("iceberg.rest-catalog.uri", restServer.getBaseUrl().toString()) - .put("iceberg.rest-catalog.vended-credentials-enabled", "true") - .put("fs.native-gcs.enabled", "true") - .put("gcs.project-id", "test-project") - .put("gcs.endpoint", fakeGcsUrl) - .put("gcs.auth-type", "APPLICATION_DEFAULT") - .buildOrThrow()) - .build(); - } - - @AfterAll - public void tearDown() - { - if (restServer != null) { - try { - restServer.stop(); - } - catch (Exception e) { - // ignore - } - } - if (backendCatalog != null) { - try { - backendCatalog.close(); - } - catch (Exception e) { - // ignore - } - } - if (fakeGcsServer != null) { - fakeGcsServer.close(); - } - } - - @Test - void testCreateTableInsertAndSelect() - { - String tableName = "test_gcs_vending_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, name VARCHAR)"); - assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'alice'), (2, 'bob')", 2); - assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'alice'), (2, 'bob')"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - void testCreateTableAsSelect() - { - String tableName = "test_gcs_ctas_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS id, VARCHAR 'hello' AS greeting", 1); - assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'hello')"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - void testTableLocationIsOnGcs() - { - String tableName = "test_gcs_location_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS id", 1); - - String filePath = (String) computeScalar("SELECT file_path FROM \"" + tableName + "$files\" LIMIT 1"); - assertThat(filePath).startsWith("gs://" + BUCKET_NAME + "/"); - assertUpdate("DROP TABLE " + tableName); - } - - @Test - void testSchemaOperations() - { - String schemaName = "test_gcs_schema_" + randomNameSuffix(); - assertUpdate("CREATE SCHEMA " + schemaName); - assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).contains(schemaName); - - String tableName = schemaName + ".test_table_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (id INTEGER)"); - assertUpdate("INSERT INTO " + tableName + " VALUES (1)", 1); - assertQuery("SELECT * FROM " + tableName, "VALUES 1"); - - assertUpdate("DROP TABLE " + tableName); - assertUpdate("DROP SCHEMA " + schemaName); - } - - @Test - void testUpdateAndDelete() - { - String tableName = "test_gcs_update_delete_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, value VARCHAR)"); - assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'original'), (2, 'keep'), (3, 'remove')", 3); - - assertUpdate("UPDATE " + tableName + " SET value = 'updated' WHERE id = 1", 1); - assertQuery("SELECT value FROM " + tableName + " WHERE id = 1", "VALUES 'updated'"); - - assertUpdate("DELETE FROM " + tableName + " WHERE id = 3", 1); - assertQuery("SELECT * FROM " + tableName + " ORDER BY id", "VALUES (1, 'updated'), (2, 'keep')"); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - void testPartitionedTable() - { - String tableName = "test_gcs_partitioned_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, region VARCHAR) WITH (partitioning = ARRAY['region'])"); - assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'us'), (2, 'eu'), (3, 'us')", 3); - - assertQuery("SELECT id FROM " + tableName + " WHERE region = 'us' ORDER BY id", "VALUES 1, 3"); - assertQuery("SELECT id FROM " + tableName + " WHERE region = 'eu'", "VALUES 2"); - - long partitionCount = (long) computeScalar("SELECT count(*) FROM \"" + tableName + "$partitions\""); - assertThat(partitionCount).isEqualTo(2L); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - void testSnapshotsAndTimeTravel() - { - String tableName = "test_gcs_snapshots_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (id INTEGER)"); - - assertUpdate("INSERT INTO " + tableName + " VALUES (1)", 1); - long snapshotAfterFirstInsert = (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC LIMIT 1"); - - assertUpdate("INSERT INTO " + tableName + " VALUES (2)", 1); - assertQuery("SELECT * FROM " + tableName + " ORDER BY id", "VALUES 1, 2"); - - // Time travel to first snapshot - assertQuery("SELECT * FROM " + tableName + " FOR VERSION AS OF " + snapshotAfterFirstInsert, "VALUES 1"); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - void testMetadataTables() - { - String tableName = "test_gcs_metadata_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, part VARCHAR) WITH (partitioning = ARRAY['part'])"); - assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1); - assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'b')", 1); - - long snapshotCount = (long) computeScalar("SELECT count(*) FROM \"" + tableName + "$snapshots\""); - assertThat(snapshotCount).isGreaterThanOrEqualTo(2L); - - long fileCount = (long) computeScalar("SELECT count(*) FROM \"" + tableName + "$files\""); - assertThat(fileCount).isEqualTo(2L); - - long historyCount = (long) computeScalar("SELECT count(*) FROM \"" + tableName + "$history\""); - assertThat(historyCount).isGreaterThanOrEqualTo(2L); - - long partitionCount = (long) computeScalar("SELECT count(*) FROM \"" + tableName + "$partitions\""); - assertThat(partitionCount).isEqualTo(2L); - - // Verify file paths are on GCS - String filePath = (String) computeScalar("SELECT file_path FROM \"" + tableName + "$files\" LIMIT 1"); - assertThat(filePath).startsWith("gs://" + BUCKET_NAME + "/"); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - void testMultipleDataTypes() - { - String tableName = "test_gcs_types_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (" + - "c_integer INTEGER, " + - "c_bigint BIGINT, " + - "c_real REAL, " + - "c_double DOUBLE, " + - "c_decimal DECIMAL(10,2), " + - "c_varchar VARCHAR, " + - "c_boolean BOOLEAN, " + - "c_date DATE, " + - "c_timestamp TIMESTAMP(6))"); - - assertUpdate("INSERT INTO " + tableName + " VALUES (" + - "42, " + - "BIGINT '9999999999', " + - "REAL '3.14', " + - "DOUBLE '2.718281828', " + - "DECIMAL '12345.67', " + - "VARCHAR 'hello world', " + - "true, " + - "DATE '2024-01-15', " + - "TIMESTAMP '2024-01-15 10:30:00.123456')", 1); - - assertQuery("SELECT c_integer, c_bigint, c_varchar, c_boolean FROM " + tableName, - "VALUES (42, 9999999999, 'hello world', true)"); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - void testCreateOrReplaceTable() - { - String tableName = "test_gcs_create_or_replace_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 AS id, VARCHAR 'v1' AS version", 1); - assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'v1')"); - - long v1SnapshotId = (long) computeScalar("SELECT snapshot_id FROM \"" + tableName + "$snapshots\" ORDER BY committed_at DESC LIMIT 1"); - - assertUpdate("CREATE OR REPLACE TABLE " + tableName + " AS SELECT 2 AS id, VARCHAR 'v2' AS version", 1); - assertQuery("SELECT * FROM " + tableName, "VALUES (2, 'v2')"); - - // Time travel to original version - assertQuery("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId, "VALUES (1, 'v1')"); - - assertUpdate("DROP TABLE " + tableName); - } - - @Test - void testAlterTable() - { - String tableName = "test_gcs_alter_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (id INTEGER)"); - assertUpdate("INSERT INTO " + tableName + " VALUES (1)", 1); - - assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN name VARCHAR"); - assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'alice')", 1); - - assertQuery("SELECT * FROM " + tableName + " ORDER BY id", "VALUES (1, NULL), (2, 'alice')"); - - assertUpdate("ALTER TABLE " + tableName + " RENAME COLUMN name TO full_name"); - assertQuery("SELECT id, full_name FROM " + tableName + " WHERE id = 2", "VALUES (2, 'alice')"); - - assertUpdate("DROP TABLE " + tableName); - } - - private static void updateExternalUrl(String fakeGcsUrl) - throws IOException, InterruptedException - { - HttpClient client = HttpClient.newHttpClient(); - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(fakeGcsUrl + "/_internal/config")) - .header("Content-Type", "application/json") - .PUT(HttpRequest.BodyPublishers.ofString("{\"externalUrl\": \"" + fakeGcsUrl + "\"}")) - .build(); - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); - assertThat(response.statusCode()).isEqualTo(200); - } - - private static void createBucket(String fakeGcsUrl, String bucketName) - throws IOException, InterruptedException - { - HttpClient client = HttpClient.newHttpClient(); - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(fakeGcsUrl + "/storage/v1/b?project=test-project")) - .header("Content-Type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString("{\"name\": \"" + bucketName + "\"}")) - .build(); - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); - assertThat(response.statusCode()).isEqualTo(200); - } - - private static JdbcCatalog createBackendCatalog(String fakeGcsUrl) - throws IOException - { - Path tempFile = Files.createTempFile("iceberg-gcs-test-jdbc", null); - tempFile.toFile().deleteOnExit(); - - ImmutableMap.Builder properties = ImmutableMap.builder(); - properties.put(CatalogProperties.URI, "jdbc:h2:file:" + tempFile.toAbsolutePath()); - properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); - properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); - properties.put(JdbcCatalog.PROPERTY_PREFIX + "schema-version", "V1"); - properties.put(CatalogProperties.WAREHOUSE_LOCATION, "gs://" + BUCKET_NAME + "/warehouse"); - properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.gcp.gcs.GCSFileIO"); - properties.put("gcs.project-id", "test-project"); - properties.put("gcs.service.host", fakeGcsUrl); - properties.put("gcs.no-auth", "true"); - - JdbcCatalog catalog = new JdbcCatalog(); - catalog.setConf(new Configuration()); - catalog.initialize("gcs_backend", properties.buildOrThrow()); - - return catalog; - } -} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..41bf71e89f89 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergGcsVendingRestCatalogConnectorSmokeTest.java @@ -0,0 +1,330 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.filesystem.Location; +import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.QueryFailedException; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import io.trino.testing.containers.IcebergGcsRestCatalogBackendContainer; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.RESTSessionCatalog; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Base64; +import java.util.Optional; + +import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting; +import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static io.trino.testing.TestingProperties.requiredNonEmptySystemProperty; +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.iceberg.FileFormat.PARQUET; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestIcebergGcsVendingRestCatalogConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + private static final Logger LOG = Logger.get(TestIcebergGcsVendingRestCatalogConnectorSmokeTest.class); + + private final String gcpStorageBucket; + private final String gcpCredentialKey; + private String warehouseLocation; + private IcebergGcsRestCatalogBackendContainer restCatalogBackendContainer; + + public TestIcebergGcsVendingRestCatalogConnectorSmokeTest() + { + super(new IcebergConfig().getFileFormat().toIceberg()); + this.gcpStorageBucket = requiredNonEmptySystemProperty("testing.gcp-storage-bucket"); + this.gcpCredentialKey = requiredNonEmptySystemProperty("testing.gcp-credentials-key"); + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_CREATE_MATERIALIZED_VIEW, + SUPPORTS_RENAME_MATERIALIZED_VIEW, + SUPPORTS_RENAME_SCHEMA -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + byte[] jsonKeyBytes = Base64.getDecoder().decode(gcpCredentialKey); + Path gcpCredentialsFile = Files.createTempFile("gcp-credentials", ".json"); + gcpCredentialsFile.toFile().deleteOnExit(); + Files.write(gcpCredentialsFile, jsonKeyBytes); + String gcpCredentials = new String(jsonKeyBytes, UTF_8); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonKey = mapper.readTree(gcpCredentials); + String gcpProjectId = jsonKey.get("project_id").asText(); + + this.warehouseLocation = "gs://%s/gcs-vending-rest-test/".formatted(gcpStorageBucket); + + restCatalogBackendContainer = closeAfterClass(new IcebergGcsRestCatalogBackendContainer( + Optional.empty(), + warehouseLocation, + gcpCredentialsFile.toAbsolutePath().toString(), + gcpProjectId)); + restCatalogBackendContainer.start(); + + return IcebergQueryRunner.builder() + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.file-format", format.name()) + .put("iceberg.catalog.type", "rest") + .put("iceberg.rest-catalog.uri", "http://" + restCatalogBackendContainer.getRestCatalogEndpoint()) + .put("iceberg.rest-catalog.vended-credentials-enabled", "true") + .put("iceberg.writer-sort-buffer-size", "1MB") + .put("fs.native-gcs.enabled", "true") + .put("gcs.project-id", gcpProjectId) + .put("gcs.json-key", gcpCredentials) + .buildOrThrow()) + .setInitialTables(REQUIRED_TPCH_TABLES) + .build(); + } + + @AfterAll + public void removeTestData() + { + if (fileSystem == null) { + return; + } + try { + fileSystem.deleteDirectory(Location.of(warehouseLocation)); + } + catch (IOException e) { + // The GCS bucket should be configured to expire objects automatically. Clean up issues do not need to fail the test. + LOG.warn(e, "Failed to clean up GCS test directory: %s", warehouseLocation); + } + } + + @Test + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasMessageContaining("createMaterializedView is not supported for Iceberg REST catalog"); + } + + @Test + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessageContaining("renameNamespace is not supported for Iceberg REST catalog"); + } + + @Override + protected void dropTableFromCatalog(String tableName) + { + // TODO: Get register table tests working + } + + @Override + protected String getMetadataLocation(String tableName) + { + try (RESTSessionCatalog catalog = new RESTSessionCatalog()) { + catalog.initialize("rest-catalog", ImmutableMap.of(CatalogProperties.URI, "http://" + restCatalogBackendContainer.getRestCatalogEndpoint())); + SessionCatalog.SessionContext context = new SessionCatalog.SessionContext( + "user-default", + "user", + ImmutableMap.of(), + ImmutableMap.of(), + SESSION.getIdentity()); + return ((BaseTable) catalog.loadTable(context, toIdentifier(tableName))).operations().current().metadataFileLocation(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected String schemaPath() + { + return format("%s%s", warehouseLocation, getSession().getSchema().orElseThrow()); + } + + @Override + protected boolean locationExists(String location) + { + try { + return fileSystem.newInputFile(Location.of(location)).exists(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Test + @Override + public void testRegisterTableWithTableLocation() + { + assertThatThrownBy(super::testRegisterTableWithTableLocation) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithComments() + { + assertThatThrownBy(super::testRegisterTableWithComments) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithShowCreateTable() + { + assertThatThrownBy(super::testRegisterTableWithShowCreateTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithReInsert() + { + assertThatThrownBy(super::testRegisterTableWithReInsert) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithDroppedTable() + { + assertThatThrownBy(super::testRegisterTableWithDroppedTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithDifferentTableName() + { + assertThatThrownBy(super::testRegisterTableWithDifferentTableName) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithMetadataFile() + { + assertThatThrownBy(super::testRegisterTableWithMetadataFile) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRegisterTableWithTrailingSpaceInLocation() + { + assertThatThrownBy(super::testRegisterTableWithTrailingSpaceInLocation) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testUnregisterTable() + { + assertThatThrownBy(super::testUnregisterTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testRepeatUnregisterTable() + { + assertThatThrownBy(super::testRepeatUnregisterTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Test + @Override + public void testDropTableWithMissingMetadataFile() + { + assertThatThrownBy(super::testDropTableWithMissingMetadataFile) + .hasMessageMatching("Failed to load table: (.*)"); + } + + @Test + @Override + public void testDropTableWithMissingSnapshotFile() + { + assertThatThrownBy(super::testDropTableWithMissingSnapshotFile) + .isInstanceOf(QueryFailedException.class) + .cause() + .hasMessageContaining("Failed to drop table") + .hasNoCause(); + } + + @Test + @Override + public void testDropTableWithMissingManifestListFile() + { + assertThatThrownBy(super::testDropTableWithMissingManifestListFile) + .hasMessageContaining("Table location should not exist"); + } + + @Test + @Override + public void testDropTableWithNonExistentTableLocation() + { + assertThatThrownBy(super::testDropTableWithNonExistentTableLocation) + .hasMessageMatching("Failed to load table: (.*)"); + } + + @Override + protected boolean isFileSorted(Location path, String sortColumnName) + { + if (format == PARQUET) { + return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); + } + return checkOrcFileSorting(fileSystem, path, sortColumnName); + } + + @Override + protected void deleteDirectory(String location) + { + try { + fileSystem.deleteDirectory(Location.of(location)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private TableIdentifier toIdentifier(String tableName) + { + return TableIdentifier.of(getSession().getSchema().orElseThrow(), tableName); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogFileSystemFactory.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogFileSystemFactory.java index 56e40a7375e5..bbb656c91098 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogFileSystemFactory.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogFileSystemFactory.java @@ -21,8 +21,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; -import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY; -import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY; import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_NO_AUTH_PROPERTY; import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY; import static io.trino.filesystem.gcs.GcsFileSystemConstants.EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY; @@ -113,8 +111,6 @@ void testGcsVendedCredentialsWithAllProperties() .put("gcs.project-id", "my-gcp-project") .put("gcs.service.host", "https://custom-storage.googleapis.com") .put("gcs.user-project", "billing-project") - .put("gcs.encryption-key", "dGVzdC1lbmNyeXB0aW9uLWtleS0xMjM0NTY3OA==") - .put("gcs.decryption-key", "dGVzdC1kZWNyeXB0aW9uLWtleS0xMjM0NTY3OA==") .buildOrThrow(); factory.create(ConnectorIdentity.ofUser("test"), fileIoProperties); @@ -126,9 +122,7 @@ void testGcsVendedCredentialsWithAllProperties() .containsEntry(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_EXPIRES_AT_PROPERTY, "1700000000000") .containsEntry(EXTRA_CREDENTIALS_GCS_PROJECT_ID_PROPERTY, "my-gcp-project") .containsEntry(EXTRA_CREDENTIALS_GCS_SERVICE_HOST_PROPERTY, "https://custom-storage.googleapis.com") - .containsEntry(EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY, "billing-project") - .containsEntry(EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY, "dGVzdC1lbmNyeXB0aW9uLWtleS0xMjM0NTY3OA==") - .containsEntry(EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY, "dGVzdC1kZWNyeXB0aW9uLWtleS0xMjM0NTY3OA=="); + .containsEntry(EXTRA_CREDENTIALS_GCS_USER_PROJECT_PROPERTY, "billing-project"); } @Test @@ -159,33 +153,6 @@ void testGcsVendedNoAuth() .doesNotContainKey(EXTRA_CREDENTIALS_GCS_OAUTH_TOKEN_PROPERTY); } - @Test - void testGcsVendedEncryptionKeys() - { - AtomicReference capturedIdentity = new AtomicReference<>(); - TrinoFileSystemFactory delegate = identity -> { - capturedIdentity.set(identity); - return null; - }; - - IcebergRestCatalogConfig config = new IcebergRestCatalogConfig() - .setBaseUri("http://localhost") - .setVendedCredentialsEnabled(true); - IcebergRestCatalogFileSystemFactory factory = new IcebergRestCatalogFileSystemFactory(delegate, config); - - Map fileIoProperties = ImmutableMap.of( - "gcs.encryption-key", "dGVzdC1lbmNyeXB0aW9uLWtleS0xMjM0NTY3OA==", - "gcs.decryption-key", "dGVzdC1kZWNyeXB0aW9uLWtleS0xMjM0NTY3OA=="); - - factory.create(ConnectorIdentity.ofUser("test"), fileIoProperties); - - ConnectorIdentity identity = capturedIdentity.get(); - assertThat(identity).isNotNull(); - assertThat(identity.getExtraCredentials()) - .containsEntry(EXTRA_CREDENTIALS_GCS_ENCRYPTION_KEY_PROPERTY, "dGVzdC1lbmNyeXB0aW9uLWtleS0xMjM0NTY3OA==") - .containsEntry(EXTRA_CREDENTIALS_GCS_DECRYPTION_KEY_PROPERTY, "dGVzdC1kZWNyeXB0aW9uLWtleS0xMjM0NTY3OA=="); - } - @Test void testGcsVendedCredentialsDisabled() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java index c1c78a5959c8..d76a1a38a8a2 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java @@ -25,7 +25,7 @@ import io.trino.testing.QueryFailedException; import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; -import io.trino.testing.containers.IcebergRestCatalogBackendContainer; +import io.trino.testing.containers.IcebergS3RestCatalogBackendContainer; import io.trino.testing.containers.Minio; import io.trino.testing.minio.MinioClient; import org.apache.iceberg.BaseTable; @@ -67,7 +67,7 @@ public class TestIcebergVendingRestCatalogConnectorSmokeTest { private final String bucketName; private String warehouseLocation; - private IcebergRestCatalogBackendContainer restCatalogBackendContainer; + private IcebergS3RestCatalogBackendContainer restCatalogBackendContainer; private Minio minio; public TestIcebergVendingRestCatalogConnectorSmokeTest() @@ -106,7 +106,7 @@ protected QueryRunner createQueryRunner() .build(); AssumeRoleResponse assumeRoleResponse = stsClient.assumeRole(AssumeRoleRequest.builder().build()); - restCatalogBackendContainer = closeAfterClass(new IcebergRestCatalogBackendContainer( + restCatalogBackendContainer = closeAfterClass(new IcebergS3RestCatalogBackendContainer( Optional.of(network), warehouseLocation, assumeRoleResponse.credentials().accessKeyId(), diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/GcsCredentialVendingCatalogAdapter.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/GcsCredentialVendingCatalogAdapter.java deleted file mode 100644 index c609fa49ed58..000000000000 --- a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/GcsCredentialVendingCatalogAdapter.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.iceberg.rest; - -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.rest.responses.ErrorResponse; -import org.apache.iceberg.rest.responses.LoadTableResponse; - -import java.util.Map; -import java.util.function.Consumer; - -import static java.util.Objects.requireNonNull; - -/** - * A {@link RESTCatalogAdapter} that injects GCS credential properties into - * {@link LoadTableResponse} config, simulating a REST catalog server that - * vends GCS credentials. - */ -public class GcsCredentialVendingCatalogAdapter - extends RESTCatalogAdapter -{ - private final Map gcsCredentialConfig; - - public GcsCredentialVendingCatalogAdapter(Catalog catalog, Map gcsCredentialConfig) - { - super(catalog); - this.gcsCredentialConfig = ImmutableMap.copyOf(requireNonNull(gcsCredentialConfig, "gcsCredentialConfig is null")); - } - - @Override - protected T execute( - HTTPRequest request, - Class responseType, - Consumer errorHandler, - Consumer> configConsumer) - { - T response = super.execute(request, responseType, errorHandler, configConsumer); - if (response instanceof LoadTableResponse loadTableResponse) { - @SuppressWarnings("unchecked") - T modified = (T) LoadTableResponse.builder() - .withTableMetadata(loadTableResponse.tableMetadata()) - .addAllConfig(loadTableResponse.config()) - .addAllConfig(gcsCredentialConfig) - .build(); - return modified; - } - return response; - } -} diff --git a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergGcsRestCatalogBackendContainer.java b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergGcsRestCatalogBackendContainer.java new file mode 100644 index 000000000000..93197cf40894 --- /dev/null +++ b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergGcsRestCatalogBackendContainer.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.testing.containers; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.testcontainers.containers.Network; + +import java.util.Optional; + +public class IcebergGcsRestCatalogBackendContainer + extends BaseTestContainer +{ + public IcebergGcsRestCatalogBackendContainer( + Optional network, + String warehouseLocation, + String gcpCredentialsFilePath, + String gcpProjectId) + { + super( + "apache/iceberg-rest-fixture:1.10.1", + "iceberg-rest", + ImmutableSet.of(8181), + ImmutableMap.of("/gcs-credentials.json", gcpCredentialsFilePath), + ImmutableMap.of( + "CATALOG_INCLUDE__CREDENTIALS", "true", + "CATALOG_WAREHOUSE", warehouseLocation, + "CATALOG_IO__IMPL", "org.apache.iceberg.gcp.gcs.GCSFileIO", + "CATALOG_GCS_PROJECT__ID", gcpProjectId, + "GOOGLE_APPLICATION_CREDENTIALS", "/gcs-credentials.json"), + network, + 5); + } + + public String getRestCatalogEndpoint() + { + return getMappedHostAndPortForExposedPort(8181).toString(); + } +} diff --git a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergRestCatalogBackendContainer.java b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergS3RestCatalogBackendContainer.java similarity index 95% rename from testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergRestCatalogBackendContainer.java rename to testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergS3RestCatalogBackendContainer.java index d4fb6064dca0..0fa48b239d8e 100644 --- a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergRestCatalogBackendContainer.java +++ b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/IcebergS3RestCatalogBackendContainer.java @@ -21,10 +21,10 @@ import static io.trino.testing.containers.Minio.MINIO_REGION; -public class IcebergRestCatalogBackendContainer +public class IcebergS3RestCatalogBackendContainer extends BaseTestContainer { - public IcebergRestCatalogBackendContainer( + public IcebergS3RestCatalogBackendContainer( Optional network, String warehouseLocation, String minioAccessKey,